You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sh...@apache.org on 2014/06/19 12:00:44 UTC
git commit: FALCON-263 API to get workflow parameters. Contributed by
pavan kumar kolamuri
Repository: incubator-falcon
Updated Branches:
refs/heads/master b68946a02 -> 1be98326b
FALCON-263 API to get workflow parameters. Contributed by pavan kumar kolamuri
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/1be98326
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/1be98326
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/1be98326
Branch: refs/heads/master
Commit: 1be98326bd6ce1658bf8a9db592bd9b6a768e8b2
Parents: b68946a
Author: Shwetha GS <sh...@inmobi.com>
Authored: Thu Jun 19 15:30:36 2014 +0530
Committer: Shwetha GS <sh...@inmobi.com>
Committed: Thu Jun 19 15:30:36 2014 +0530
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../java/org/apache/falcon/cli/FalconCLI.java | 15 ++++++--
.../org/apache/falcon/client/FalconClient.java | 23 +++++++++++-
.../apache/falcon/resource/InstancesResult.java | 7 ++++
.../workflow/engine/AbstractWorkflowEngine.java | 3 ++
docs/src/site/twiki/FalconCLI.twiki | 7 ++++
.../workflow/engine/OozieWorkflowEngine.java | 38 ++++++++++++++++----
.../resource/AbstractInstanceManager.java | 24 +++++++++++++
.../resource/proxy/InstanceManagerProxy.java | 21 +++++++++++
.../apache/falcon/resource/InstanceManager.java | 14 ++++++++
.../java/org/apache/falcon/cli/FalconCLIIT.java | 10 ++++++
11 files changed, 153 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1be98326/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0b007e2..f72c1c4 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,7 @@ Trunk (Unreleased)
INCOMPATIBLE CHANGES
NEW FEATURES
+ FALCON-263 API to get workflow parameters. (pavan kumar kolamuri via Shwetha GS)
IMPROVEMENTS
FALCON-280 Validate the ACL in Feed entity with the user submitting the entity
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1be98326/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
index c51c3c0..753c39f 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
@@ -93,6 +93,7 @@ public class FalconCLI {
public static final String CURRENT_COLO = "current.colo";
public static final String CLIENT_PROPERTIES = "/client.properties";
public static final String LIFECYCLE_OPT = "lifecycle";
+ public static final String PARARMS_OPT = "params";
// Graph Commands
public static final String GRAPH_CMD = "graph";
@@ -241,6 +242,9 @@ public class FalconCLI {
result = client.rerunInstances(type, entity, start, end, colo, clusters, sourceClusters, lifeCycles);
} else if (optionsList.contains(LOG_OPT)) {
result = client.getLogsOfInstances(type, entity, start, end, colo, runid, lifeCycles);
+ } else if (optionsList.contains(PARARMS_OPT)) {
+ // start time is the nominal time of instance
+ result = client.getParamsOfInstance(type, entity, start, colo, clusters, sourceClusters, lifeCycles);
} else {
throw new FalconCLIException("Invalid command");
}
@@ -548,6 +552,12 @@ public class FalconCLI {
"Logs print the logs for process instances for a given process in "
+ "the range start time and optional end time");
+ Option params = new Option(
+ PARARMS_OPT,
+ false,
+ "Displays the workflow parameters for a given instance of specified nominal time");
+
+
OptionGroup group = new OptionGroup();
group.addOption(running);
group.addOption(status);
@@ -559,10 +569,12 @@ public class FalconCLI {
group.addOption(rerun);
group.addOption(logs);
group.addOption(continues);
+ group.addOption(params);
Option url = new Option(URL_OPTION, true, "Falcon URL");
Option start = new Option(START_OPT, true,
- "Start time is required for commands, status, kill, suspend, resume and re-run");
+ "Start time is required for commands, status, kill, suspend, resume and re-run"
+ + "and it is nominal time while displaying workflow params");
Option end = new Option(
END_OPT,
true,
@@ -592,7 +604,6 @@ public class FalconCLI {
"describes life cycle of entity , for feed it can be replication/retention "
+ "and for process it can be execution");
-
instanceOptions.addOption(url);
instanceOptions.addOptionGroup(group);
instanceOptions.addOption(start);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1be98326/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index beecc0f..ed0a0ba 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -201,7 +201,8 @@ public class FalconClient {
RESUME("api/instance/resume/", HttpMethod.POST, MediaType.APPLICATION_JSON),
RERUN("api/instance/rerun/", HttpMethod.POST, MediaType.APPLICATION_JSON),
LOG("api/instance/logs/", HttpMethod.GET, MediaType.APPLICATION_JSON),
- SUMMARY("api/instance/summary/", HttpMethod.GET, MediaType.APPLICATION_JSON);
+ SUMMARY("api/instance/summary/", HttpMethod.GET, MediaType.APPLICATION_JSON),
+ PARAMS("api/instance/params/", HttpMethod.GET, MediaType.APPLICATION_JSON);
private String path;
private String method;
@@ -424,6 +425,16 @@ public class FalconClient {
return sendInstanceRequest(Instances.LOG, type, entity, start, end,
null, runId, colo, lifeCycles);
}
+
+ public String getParamsOfInstance(String type, String entity,
+ String start, String colo,
+ String clusters, String sourceClusters,
+ List<LifeCycle> lifeCycles)
+ throws FalconCLIException, UnsupportedEncodingException {
+
+ return sendInstanceRequest(Instances.PARAMS, type, entity,
+ start, null, null, null, colo, lifeCycles);
+ }
//RESUME CHECKSTYLE CHECK ParameterNumberCheck
public String getThreadDump() throws FalconCLIException {
@@ -611,6 +622,7 @@ public class FalconClient {
}
}
+
//RESUME CHECKSTYLE CHECK VisibilityModifierCheck
private void checkLifeCycleOption(List<LifeCycle> lifeCycles, String type) throws FalconCLIException {
@@ -742,6 +754,15 @@ public class FalconClient {
toAppend = instance.getLogFile() != null ? instance.getLogFile() : "-";
sb.append(toAppend).append("\n");
+ if (instance.getWfParams() != null) {
+ Map<String, String> props = instance.getWfParams();
+ sb.append("Workflow params").append("\n");
+ for (Map.Entry<String, String> entry : props.entrySet()) {
+ sb.append(entry.getKey()).append("=").append(entry.getValue()).append("\n");
+ }
+ sb.append("\n");
+ }
+
}
}
sb.append("\nAdditional Information:\n");
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1be98326/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/InstancesResult.java b/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
index da0ccc5..c3c93f2 100644
--- a/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
+++ b/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
@@ -19,8 +19,10 @@
package org.apache.falcon.resource;
import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlElementWrapper;
import javax.xml.bind.annotation.XmlRootElement;
import java.util.Date;
+import java.util.Map;
/**
* Pojo for JAXB marshalling / unmarshalling.
@@ -99,6 +101,9 @@ public class InstancesResult extends APIResult {
@XmlElement
public InstanceAction[] actions;
+ @XmlElementWrapper(name="params")
+ public Map<String, String> wfParams;
+
public Instance() {
}
@@ -144,6 +149,8 @@ public class InstancesResult extends APIResult {
return details;
}
+ public Map<String, String> getWfParams() { return wfParams; }
+
@Override
public String toString() {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1be98326/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
index c28cb03..eedd81f 100644
--- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
+++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
@@ -94,4 +94,7 @@ public abstract class AbstractWorkflowEngine {
public abstract Properties getWorkflowProperties(String cluster, String jobId) throws FalconException;
public abstract InstancesResult getJobDetails(String cluster, String jobId) throws FalconException;
+
+ public abstract InstancesResult getInstanceParams(Entity entity, Date start, Date end,
+ List<LifeCycle> lifeCycles) throws FalconException;
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1be98326/docs/src/site/twiki/FalconCLI.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki
index dbb6981..2c5d9c5 100644
--- a/docs/src/site/twiki/FalconCLI.twiki
+++ b/docs/src/site/twiki/FalconCLI.twiki
@@ -172,6 +172,13 @@ This can be used with instance management options. Default values are replicatio
Usage:
$FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>> -status -lifecycle <<lifecycletype>> -start "yyyy-MM-dd'T'HH:mm'Z'" -end "yyyy-MM-dd'T'HH:mm'Z'"
+---+++Params
+
+Displays the workflow params of a given instance. Where start time is considered as nominal time of that instance.
+
+Usage:
+$FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>> -params -start "yyyy-MM-dd'T'HH:mm'Z'"
+
---++ Graphs Options
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1be98326/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 7166b13..eb0c213 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -44,6 +44,7 @@ import org.apache.falcon.util.OozieUtils;
import org.apache.falcon.util.RuntimeProperties;
import org.apache.falcon.workflow.OozieWorkflowBuilder;
import org.apache.falcon.workflow.WorkflowBuilder;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.oozie.client.BundleJob;
@@ -60,6 +61,7 @@ import org.apache.oozie.client.rest.RestConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -481,8 +483,14 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
return doSummaryJobAction(entity, start, end, null, lifeCycles);
}
+ @Override
+ public InstancesResult getInstanceParams(Entity entity, Date start, Date end,
+ List<LifeCycle> lifeCycles) throws FalconException {
+ return doJobAction(JobAction.PARAMS, entity, start, end, null, lifeCycles);
+ }
+
private static enum JobAction {
- KILL, SUSPEND, RESUME, RERUN, STATUS, SUMMARY
+ KILL, SUSPEND, RESUME, RERUN, STATUS, SUMMARY, PARAMS
}
private WorkflowJob getWorkflowInfo(String cluster, String wfId) throws FalconException {
@@ -536,6 +544,9 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
instance.endTime = jobInfo.getEndTime();
instance.logFile = jobInfo.getConsoleUrl();
instance.sourceCluster = sourceCluster;
+ if (action == JobAction.PARAMS) {
+ instance.wfParams = getWFParams(jobInfo);
+ }
}
instance.details = coordinatorAction.getMissingDependencies();
instances.add(instance);
@@ -624,6 +635,16 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
return instancesSummaryResult;
}
+ private Map<String, String> getWFParams(WorkflowJob jobInfo) {
+ Map<String, String> wfParams = new HashMap<String, String>();
+ Configuration conf = new Configuration(false);
+ conf.addResource(new ByteArrayInputStream(jobInfo.getConf().getBytes()));
+ for (Map.Entry<String, String> entry : conf) {
+ wfParams.put(entry.getKey(), entry.getValue());
+ }
+ return wfParams;
+ }
+
private void updateInstanceSummary(CoordinatorJob coordJob, Map<String, Long> instancesSummary) {
List<CoordinatorAction> actions = coordJob.getActions();
@@ -689,6 +710,9 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
case STATUS:
break;
+ case PARAMS:
+ break;
+
default:
throw new IllegalArgumentException("Unhandled action " + action);
}
@@ -795,10 +819,13 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
boolean retentionCoord = isRetentionCoord(coord);
Frequency freq = createFrequency(String.valueOf(coord.getFrequency()), coord.getTimeUnit());
TimeZone tz = EntityUtil.getTimeZone(coord.getTimeZone());
- Date iterStart = EntityUtil.getNextStartTime(coord.getStartTime(), freq, tz, start);
+
Date iterEnd = ((nextMaterializedTime.before(end) || retentionCoord) ? nextMaterializedTime : end);
+ Calendar endCal = Calendar.getInstance(EntityUtil.getTimeZone(coord.getTimeZone()));
+ endCal.setTime(EntityUtil.getNextStartTime(coord.getStartTime(), freq, tz, iterEnd));
+ endCal.add(freq.getTimeUnit().getCalendarUnit(), -(Integer.valueOf((coord.getFrequency()))));
- while (iterStart.before(iterEnd)) {
+ while (start.compareTo(endCal.getTime()) <= 0) {
if (retentionCoord) {
if (retentionInstancesCount >= maxRetentionInstancesCount) {
break;
@@ -806,13 +833,10 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
retentionInstancesCount++;
}
- int sequence = EntityUtil.getInstanceSequence(coord.getStartTime(), freq, tz, iterEnd);
+ int sequence = EntityUtil.getInstanceSequence(coord.getStartTime(), freq, tz, endCal.getTime());
String actionId = coord.getId() + "@" + sequence;
addCoordAction(client, actions, actionId);
- Calendar endCal = Calendar.getInstance(EntityUtil.getTimeZone(coord.getTimeZone()));
- endCal.setTime(iterEnd);
endCal.add(freq.getTimeUnit().getCalendarUnit(), -(Integer.valueOf((coord.getFrequency()))));
- iterEnd = endCal.getTime();
}
}
actionsMap.put(cluster, actions);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1be98326/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
index 3b87469..0df81cd 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
@@ -157,6 +157,30 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
}
}
+ public InstancesResult getInstanceParams(String type,
+ String entity, String startTime,
+ String colo, List<LifeCycle> lifeCycles) {
+ checkColo(colo);
+ checkType(type);
+ try {
+ lifeCycles = checkAndUpdateLifeCycle(lifeCycles, type);
+ if (lifeCycles.size() != 1) {
+ throw new FalconException("For displaying wf-params there can't be more than one lifecycle "
+ + lifeCycles);
+ }
+ validateParams(type, entity, startTime, null);
+
+ Entity entityObject = EntityUtil.getEntity(type, entity);
+ Date start = EntityUtil.parseDateUTC(startTime);
+ Date end = getEndDate(start, null);
+ AbstractWorkflowEngine wfEngine = getWorkflowEngine();
+ return wfEngine.getInstanceParams(entityObject, start, end, lifeCycles);
+ } catch (Throwable e) {
+ LOG.error("Failed to display params of an instance", e);
+ throw FalconWebException.newInstanceException(e, Response.Status.BAD_REQUEST);
+ }
+ }
+
public InstancesResult killInstance(HttpServletRequest request,
String type, String entity, String startStr,
String endStr, String colo,
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1be98326/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
index 42b4aeb..36ce6c9 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
@@ -133,6 +133,27 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
}
@GET
+ @Path("params/{type}/{entity}")
+ @Produces(MediaType.APPLICATION_JSON)
+ @Monitored(event = "instance-params")
+ @Override
+ public InstancesResult getInstanceParams(
+ @Dimension("type") @PathParam("type") final String type,
+ @Dimension("entity") @PathParam("entity") final String entity,
+ @Dimension("start-time") @QueryParam("start") final String start,
+ @Dimension("colo") @QueryParam("colo") String colo,
+ @Dimension("lifecycle") @QueryParam("lifecycle") final List<LifeCycle> lifeCycles) {
+ return new InstanceProxy() {
+ @Override
+ protected InstancesResult doExecute(String colo) throws FalconException {
+ return getInstanceManager(colo).invoke("getInstanceParams",
+ type, entity, start, colo, lifeCycles);
+ }
+ }.execute(colo, type, entity);
+ }
+
+
+ @GET
@Path("logs/{type}/{entity}")
@Produces(MediaType.APPLICATION_JSON)
@Monitored(event = "instance-logs")
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1be98326/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java b/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
index bdf5e1b..cdf703d 100644
--- a/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
+++ b/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
@@ -90,6 +90,20 @@ public class InstanceManager extends AbstractInstanceManager {
return super.getLogs(type, entity, startStr, endStr, colo, runId, lifeCycles);
}
+ @GET
+ @Path("params/{type}/{entity}")
+ @Produces(MediaType.APPLICATION_JSON)
+ @Monitored(event = "instance-params")
+ @Override
+ public InstancesResult getInstanceParams(
+ @Dimension("type") @PathParam("type") String type,
+ @Dimension("entity") @PathParam("entity") String entity,
+ @Dimension("start-time") @QueryParam("start") String start,
+ @Dimension("colo") @QueryParam("colo") String colo,
+ @Dimension("lifecycle") @QueryParam("lifecycle") List<LifeCycle> lifeCycles) {
+ return super.getInstanceParams(type, entity, start, colo, lifeCycles);
+ }
+
@POST
@Path("kill/{type}/{entity}")
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1be98326/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
index 9471e27..ea40411 100644
--- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
@@ -383,6 +383,11 @@ public class FalconCLIIT {
+ overlay.get("outputFeedName")
+ " -start "+ SchemaHelper.getDateFormat().format(new Date())));
+ Assert.assertEquals(0,
+ executeWithURL("instance -params -type process -name "
+ + overlay.get("processName")
+ + " -start " + START_INSTANCE));
+
}
public void testInstanceRunningAndSummaryCommands() throws Exception {
@@ -418,6 +423,11 @@ public class FalconCLIIT {
executeWithURL("instance -summary -type feed -lifecycle eviction -name "
+ overlay.get("outputFeedName")
+ " -start "+ SchemaHelper.getDateFormat().format(new Date())));
+
+ Assert.assertEquals(0,
+ executeWithURL("instance -params -type process -name "
+ + overlay.get("processName")
+ + " -start " + START_INSTANCE));
}