You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by su...@apache.org on 2015/03/04 08:34:55 UTC
falcon git commit: FALCON-950 Rerun does not work on succeeded
instances. Contributed by Suhas Vasu
Repository: falcon
Updated Branches:
refs/heads/master ccdf02e7e -> fd8614538
FALCON-950 Rerun does not work on succeeded instances. Contributed by Suhas Vasu
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/fd861453
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/fd861453
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/fd861453
Branch: refs/heads/master
Commit: fd861453807246fc3af2c82b137c643dad2a0e1f
Parents: ccdf02e
Author: Suhas Vasu <su...@inmobi.com>
Authored: Wed Mar 4 13:04:40 2015 +0530
Committer: Suhas Vasu <su...@inmobi.com>
Committed: Wed Mar 4 13:04:40 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 2 ++
.../java/org/apache/falcon/cli/FalconCLI.java | 16 ++++++++-
.../org/apache/falcon/client/FalconClient.java | 36 ++++++++++++++++----
.../workflow/engine/AbstractWorkflowEngine.java | 2 +-
docs/src/site/twiki/FalconCLI.twiki | 6 ++--
docs/src/site/twiki/restapi/InstanceRerun.twiki | 25 +++++++++++++-
.../workflow/engine/OozieWorkflowEngine.java | 9 +++--
.../resource/AbstractInstanceManager.java | 6 ++--
.../falcon/resource/channel/HTTPChannel.java | 4 +--
.../resource/proxy/InstanceManagerProxy.java | 5 +--
.../apache/falcon/resource/InstanceManager.java | 5 +--
11 files changed, 95 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/fd861453/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 21ee6e8..a2a54bf 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -92,6 +92,8 @@ Trunk (Unreleased)
(Suhas vasu)
BUG FIXES
+ FALCON-950 Rerun does not work on succeeded instances (Suhas Vasu)
+
FALCON-1048 Incorrect documentation for feed instacnce listing api. (Suhas
Vasu via Srikanth Sundarrajan)
http://git-wip-us.apache.org/repos/asf/falcon/blob/fd861453/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
index ac76a9c..92b5347 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
@@ -95,6 +95,7 @@ public class FalconCLI {
public static final String NUM_RESULTS_OPT = "numResults";
public static final String NUM_INSTANCES_OPT = "numInstances";
public static final String PATTERN_OPT = "pattern";
+ public static final String FORCE_RERUN_FLAG = "force";
public static final String INSTANCE_CMD = "instance";
public static final String START_OPT = "start";
@@ -286,11 +287,15 @@ public class FalconCLI {
} else if (optionsList.contains(RERUN_OPT)) {
validateNotEmpty(start, START_OPT);
validateNotEmpty(end, END_OPT);
+ boolean isForced = false;
+ if (optionsList.contains(FORCE_RERUN_FLAG)) {
+ isForced = true;
+ }
result =
ResponseHelper.getString(client
.rerunInstances(type, entity, start, end, filePath, colo,
clusters, sourceClusters,
- lifeCycles));
+ lifeCycles, isForced));
} else if (optionsList.contains(LOG_OPT)) {
validateOrderBy(orderBy, instanceAction);
validateFilterBy(filterBy, instanceAction);
@@ -355,6 +360,12 @@ public class FalconCLI {
throw new FalconCLIException("Invalid argument: sourceClusters");
}
}
+
+ if (optionsList.contains(FORCE_RERUN_FLAG)) {
+ if (!optionsList.contains(RERUN_OPT)) {
+ throw new FalconCLIException("Force option can be used only with instance rerun");
+ }
+ }
}
private void entityCommand(CommandLine commandLine, FalconClient client)
@@ -797,6 +808,8 @@ public class FalconCLI {
"Start returning instances from this offset");
Option numResults = new Option(NUM_RESULTS_OPT, true,
"Number of results to return per request");
+ Option forceRerun = new Option(FORCE_RERUN_FLAG, false,
+ "Flag to forcefully rerun entire workflow of an instance");
instanceOptions.addOption(url);
instanceOptions.addOptionGroup(group);
@@ -815,6 +828,7 @@ public class FalconCLI {
instanceOptions.addOption(orderBy);
instanceOptions.addOption(sortOrder);
instanceOptions.addOption(numResults);
+ instanceOptions.addOption(forceRerun);
return instanceOptions;
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/fd861453/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 86397c4..a866bb0 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -461,7 +461,8 @@ public class FalconClient {
public InstancesResult rerunInstances(String type, String entity, String start,
String end, String filePath, String colo,
- String clusters, String sourceClusters, List<LifeCycle> lifeCycles)
+ String clusters, String sourceClusters, List<LifeCycle> lifeCycles,
+ Boolean isForced)
throws FalconCLIException, IOException {
StringBuilder buffer = new StringBuilder();
@@ -480,7 +481,7 @@ public class FalconClient {
}
String temp = (buffer.length() == 0) ? null : buffer.toString();
return sendInstanceRequest(Instances.RERUN, type, entity, start, end,
- getServletInputStream(clusters, sourceClusters, temp), null, colo, lifeCycles);
+ getServletInputStream(clusters, sourceClusters, temp), null, colo, lifeCycles, isForced);
}
public InstancesResult getLogsOfInstances(String type, String entity, String start,
@@ -605,7 +606,8 @@ public class FalconClient {
String start, String end, String runId, String colo,
String fields, String filterBy, String tags,
String orderBy, String sortOrder, Integer offset,
- Integer numResults, Integer numInstances, String searchPattern) {
+ Integer numResults, Integer numInstances, String searchPattern,
+ Boolean isForced) {
if (!StringUtils.isEmpty(fields)) {
resource = resource.queryParam("fields", fields);
@@ -647,6 +649,9 @@ public class FalconClient {
if (!StringUtils.isEmpty(searchPattern)) {
resource = resource.queryParam("pattern", searchPattern);
}
+ if (isForced != null) {
+ resource = resource.queryParam("force", String.valueOf(isForced));
+ }
return resource;
}
@@ -664,7 +669,7 @@ public class FalconClient {
resource = addParamsToResource(resource, start, end, null, null,
fields, filterBy, filterTags,
orderBy, sortOrder,
- offset, numResults, numInstances, null);
+ offset, numResults, numInstances, null, null);
ClientResponse clientResponse = resource
.header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
@@ -734,16 +739,35 @@ public class FalconClient {
.getEntity(InstancesResult.class);
}
+ private InstancesResult sendInstanceRequest(Instances instances, String type,
+ String entity, String start, String end, InputStream props,
+ String runid, String colo, List<LifeCycle> lifeCycles,
+ Boolean isForced) throws FalconCLIException {
+ return sendInstanceRequest(instances, type, entity, start, end, props,
+ runid, colo, lifeCycles, "", "", "", 0, DEFAULT_NUM_RESULTS, isForced).getEntity(InstancesResult.class);
+ }
+
+
+
private ClientResponse sendInstanceRequest(Instances instances, String type, String entity,
String start, String end, InputStream props, String runid, String colo,
List<LifeCycle> lifeCycles, String filterBy, String orderBy, String sortOrder,
Integer offset, Integer numResults) throws FalconCLIException {
+
+ return sendInstanceRequest(instances, type, entity, start, end, props,
+ runid, colo, lifeCycles, filterBy, orderBy, sortOrder, offset, numResults, null);
+ }
+
+ private ClientResponse sendInstanceRequest(Instances instances, String type, String entity,
+ String start, String end, InputStream props, String runid, String colo,
+ List<LifeCycle> lifeCycles, String filterBy, String orderBy, String sortOrder,
+ Integer offset, Integer numResults, Boolean isForced) throws FalconCLIException {
checkType(type);
WebResource resource = service.path(instances.path).path(type)
.path(entity);
resource = addParamsToResource(resource, start, end, runid, colo,
- null, filterBy, null, orderBy, sortOrder, offset, numResults, null, null);
+ null, filterBy, null, orderBy, sortOrder, offset, numResults, null, null, isForced);
if (lifeCycles != null) {
checkLifeCycleOption(lifeCycles, type);
@@ -800,7 +824,7 @@ public class FalconClient {
WebResource resource = service.path(entities.path)
.path(entityType);
resource = addParamsToResource(resource, null, null, null, null, fields, filterBy, filterTags,
- orderBy, sortOrder, offset, numResults, null, searchPattern);
+ orderBy, sortOrder, offset, numResults, null, searchPattern, null);
ClientResponse clientResponse = resource
.header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
http://git-wip-us.apache.org/repos/asf/falcon/blob/fd861453/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
index 6b10679..07fafb5 100644
--- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
+++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
@@ -74,7 +74,7 @@ public abstract class AbstractWorkflowEngine {
List<LifeCycle> lifeCycles) throws FalconException;
public abstract InstancesResult reRunInstances(Entity entity, Date start, Date end, Properties props,
- List<LifeCycle> lifeCycles) throws FalconException;
+ List<LifeCycle> lifeCycles, Boolean isForced) throws FalconException;
public abstract InstancesResult suspendInstances(Entity entity, Date start, Date end, Properties props,
List<LifeCycle> lifeCycles) throws FalconException;
http://git-wip-us.apache.org/repos/asf/falcon/blob/fd861453/docs/src/site/twiki/FalconCLI.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki
index ef2152f..d503d22 100644
--- a/docs/src/site/twiki/FalconCLI.twiki
+++ b/docs/src/site/twiki/FalconCLI.twiki
@@ -142,10 +142,12 @@ $FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>> -continue
---+++Rerun
-Rerun option is used to rerun instances of a given process. This option is valid only for process instances in terminal state, i.e. SUCCEDDED, KILLED or FAILED. Optionally, you can specify the properties to override.
+Rerun option is used to rerun instances of a given process. On issuing a rerun, by default the execution resumes from the last failed node in the workflow. This option is valid only for process instances in terminal state, i.e. SUCCEEDED, KILLED or FAILED.
+If one wants to forcefully rerun the entire workflow, -force should be passed along with -rerun
+Additionally, you can also specify properties to override via a properties file.
Usage:
-$FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>> -rerun -start "yyyy-MM-dd'T'HH:mm'Z'" -end "yyyy-MM-dd'T'HH:mm'Z'" [-file <<properties file>>]
+$FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>> -rerun -start "yyyy-MM-dd'T'HH:mm'Z'" -end "yyyy-MM-dd'T'HH:mm'Z'" [-force] [-file <<properties file>>]
---+++Resume
http://git-wip-us.apache.org/repos/asf/falcon/blob/fd861453/docs/src/site/twiki/restapi/InstanceRerun.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/InstanceRerun.twiki b/docs/src/site/twiki/restapi/InstanceRerun.twiki
index d98ae3a..ec30a1e 100644
--- a/docs/src/site/twiki/restapi/InstanceRerun.twiki
+++ b/docs/src/site/twiki/restapi/InstanceRerun.twiki
@@ -5,7 +5,7 @@
* <a href="#Examples">Examples</a>
---++ Description
-Rerun instances of an entity.
+Rerun instances of an entity. On issuing a rerun, by default the execution resumes from the last failed node in the workflow.
---++ Parameters
* :entity-type can either be a feed or a process.
@@ -13,6 +13,7 @@ Rerun instances of an entity.
* start is the start time of the instance that you want to refer to
* end is the end time of the instance that you want to refer to
* lifecycle <optional param> can be Eviction/Replication(default) for feed and Execution(default) for process.
+ * force <optional param> can be used to forcefully rerun the entire instance.
---++ Results
Results of the rerun command.
@@ -40,3 +41,25 @@ POST http://localhost:15000/api/instance/rerun/process/SampleProcess?colo=*&star
"status": "SUCCEEDED"
}
</verbatim>
+
+<verbatim>
+POST http://localhost:15000/api/instance/rerun/process/SampleProcess?colo=*&start=2013-04-03T07:00Z&end=2014-04-03T07:00Z&force=true
+</verbatim>
+---+++ Result
+<verbatim>
+{
+ "instances": [
+ {
+ "details": "",
+ "startTime": "2013-10-21T15:10:47-07:00",
+ "cluster": "primary-cluster",
+ "logFile": "http:\/\/localhost:11000\/oozie?job=0000070-131021115933395-oozie-rgau-W",
+ "status": "RUNNING",
+ "instance": "2012-04-03T07:00Z"
+ }
+ ],
+ "requestId": "default\/7a3582bd-608c-45a7-9b74-1837b51ba6d5\n",
+ "message": "default\/RERUN\n",
+ "status": "SUCCEEDED"
+}
+</verbatim>
http://git-wip-us.apache.org/repos/asf/falcon/blob/fd861453/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 2733cca..62c04ea 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -496,7 +496,11 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
@Override
public InstancesResult reRunInstances(Entity entity, Date start, Date end,
- Properties props, List<LifeCycle> lifeCycles) throws FalconException {
+ Properties props, List<LifeCycle> lifeCycles,
+ Boolean isForced) throws FalconException {
+ if (isForced != null && isForced) {
+ props.put(OozieClient.RERUN_FAIL_NODES, String.valueOf(!isForced));
+ }
return doJobAction(JobAction.RERUN, entity, start, end, props, lifeCycles);
}
@@ -1317,7 +1321,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
jobprops.putAll(props);
}
//if user has set any of these oozie rerun properties then force rerun flag is ignored
- if (!jobprops.contains(OozieClient.RERUN_FAIL_NODES) && !jobprops.contains(OozieClient.RERUN_SKIP_NODES)) {
+ if (!jobprops.containsKey(OozieClient.RERUN_FAIL_NODES)
+ && !jobprops.containsKey(OozieClient.RERUN_SKIP_NODES)) {
jobprops.put(OozieClient.RERUN_FAIL_NODES, String.valueOf(!isForced));
}
jobprops.remove(OozieClient.COORDINATOR_APP_PATH);
http://git-wip-us.apache.org/repos/asf/falcon/blob/fd861453/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
index ed30869..f9f41d3 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
@@ -443,9 +443,10 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
}
}
+ //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
public InstancesResult reRunInstance(String type, String entity, String startStr,
String endStr, HttpServletRequest request,
- String colo, List<LifeCycle> lifeCycles) {
+ String colo, List<LifeCycle> lifeCycles, Boolean isForced) {
checkColo(colo);
checkType(type);
try {
@@ -458,12 +459,13 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
Properties props = getProperties(request);
AbstractWorkflowEngine wfEngine = getWorkflowEngine();
return wfEngine.reRunInstances(entityObject,
- startAndEndDate.first, startAndEndDate.second, props, lifeCycles);
+ startAndEndDate.first, startAndEndDate.second, props, lifeCycles, isForced);
} catch (Exception e) {
LOG.error("Failed to rerun instances", e);
throw FalconWebException.newInstanceException(e, Response.Status.BAD_REQUEST);
}
}
+ //RESUME CHECKSTYLE CHECK ParameterNumberCheck
private Properties getProperties(HttpServletRequest request) throws IOException {
Properties props = new Properties();
http://git-wip-us.apache.org/repos/asf/falcon/blob/fd861453/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java b/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java
index 7f261ce..b8db4b5 100644
--- a/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java
+++ b/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java
@@ -138,8 +138,8 @@ public class HTTPChannel extends AbstractChannel {
Annotation[][] paramAnnotations = method.getParameterAnnotations();
StringBuilder queryString = new StringBuilder("?");
for (int index = 0; index < args.length; index++) {
- if (args[index] instanceof String) {
- String arg = (String) args[index];
+ if (args[index] instanceof String || args[index] instanceof Boolean) {
+ String arg = String.valueOf(args[index]);
for (int annotation = 0; annotation < paramAnnotations[index].length; annotation++) {
Annotation paramAnnotation = paramAnnotations[index][annotation];
String annotationClass = paramAnnotation.annotationType().getName();
http://git-wip-us.apache.org/repos/asf/falcon/blob/fd861453/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
index e6cf904..e304bd8 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
@@ -327,14 +327,15 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
@Dimension("end-time") @QueryParam("end") final String endStr,
@Context HttpServletRequest request,
@Dimension("colo") @QueryParam("colo") String colo,
- @Dimension("lifecycle") @QueryParam("lifecycle") final List<LifeCycle> lifeCycles) {
+ @Dimension("lifecycle") @QueryParam("lifecycle") final List<LifeCycle> lifeCycles,
+ @Dimension("force") @QueryParam("force") final Boolean isForced) {
final HttpServletRequest bufferedRequest = new BufferedRequest(request);
return new InstanceProxy<InstancesResult>(InstancesResult.class) {
@Override
protected InstancesResult doExecute(String colo) throws FalconException {
return getInstanceManager(colo).invoke("reRunInstance",
- type, entity, startStr, endStr, bufferedRequest, colo, lifeCycles);
+ type, entity, startStr, endStr, bufferedRequest, colo, lifeCycles, isForced);
}
}.execute(colo, type, entity);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/fd861453/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java b/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
index d4e0ae0..dc533a2 100644
--- a/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
+++ b/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
@@ -224,8 +224,9 @@ public class InstanceManager extends AbstractInstanceManager {
@Dimension("end-time") @QueryParam("end") String endStr,
@Context HttpServletRequest request,
@Dimension("colo") @QueryParam("colo") String colo,
- @Dimension("lifecycle") @QueryParam("lifecycle") List<LifeCycle> lifeCycles) {
- return super.reRunInstance(type, entity, startStr, endStr, request, colo, lifeCycles);
+ @Dimension("lifecycle") @QueryParam("lifecycle") List<LifeCycle> lifeCycles,
+ @Dimension("force") @QueryParam("force") Boolean isForced) {
+ return super.reRunInstance(type, entity, startStr, endStr, request, colo, lifeCycles, isForced);
}
//RESUME CHECKSTYLE CHECK ParameterNumberCheck