You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by su...@apache.org on 2015/03/04 08:34:55 UTC

falcon git commit: FALCON-950 Rerun does not work on succeeded instances. Contributed by Suhas Vasu

Repository: falcon
Updated Branches:
  refs/heads/master ccdf02e7e -> fd8614538


FALCON-950 Rerun does not work on succeeded instances. Contributed by Suhas Vasu


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/fd861453
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/fd861453
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/fd861453

Branch: refs/heads/master
Commit: fd861453807246fc3af2c82b137c643dad2a0e1f
Parents: ccdf02e
Author: Suhas Vasu <su...@inmobi.com>
Authored: Wed Mar 4 13:04:40 2015 +0530
Committer: Suhas Vasu <su...@inmobi.com>
Committed: Wed Mar 4 13:04:40 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../java/org/apache/falcon/cli/FalconCLI.java   | 16 ++++++++-
 .../org/apache/falcon/client/FalconClient.java  | 36 ++++++++++++++++----
 .../workflow/engine/AbstractWorkflowEngine.java |  2 +-
 docs/src/site/twiki/FalconCLI.twiki             |  6 ++--
 docs/src/site/twiki/restapi/InstanceRerun.twiki | 25 +++++++++++++-
 .../workflow/engine/OozieWorkflowEngine.java    |  9 +++--
 .../resource/AbstractInstanceManager.java       |  6 ++--
 .../falcon/resource/channel/HTTPChannel.java    |  4 +--
 .../resource/proxy/InstanceManagerProxy.java    |  5 +--
 .../apache/falcon/resource/InstanceManager.java |  5 +--
 11 files changed, 95 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/fd861453/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 21ee6e8..a2a54bf 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -92,6 +92,8 @@ Trunk (Unreleased)
    (Suhas vasu)
 
   BUG FIXES
+   FALCON-950 Rerun does not work on succeeded instances (Suhas Vasu)
+
    FALCON-1048 Incorrect documentation for feed instacnce listing api. (Suhas
    Vasu via Srikanth Sundarrajan)
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/fd861453/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
index ac76a9c..92b5347 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
@@ -95,6 +95,7 @@ public class FalconCLI {
     public static final String NUM_RESULTS_OPT = "numResults";
     public static final String NUM_INSTANCES_OPT = "numInstances";
     public static final String PATTERN_OPT = "pattern";
+    public static final String FORCE_RERUN_FLAG = "force";
 
     public static final String INSTANCE_CMD = "instance";
     public static final String START_OPT = "start";
@@ -286,11 +287,15 @@ public class FalconCLI {
         } else if (optionsList.contains(RERUN_OPT)) {
             validateNotEmpty(start, START_OPT);
             validateNotEmpty(end, END_OPT);
+            boolean isForced = false;
+            if (optionsList.contains(FORCE_RERUN_FLAG)) {
+                isForced = true;
+            }
             result =
                 ResponseHelper.getString(client
                         .rerunInstances(type, entity, start, end, filePath, colo,
                                 clusters, sourceClusters,
-                                lifeCycles));
+                                lifeCycles, isForced));
         } else if (optionsList.contains(LOG_OPT)) {
             validateOrderBy(orderBy, instanceAction);
             validateFilterBy(filterBy, instanceAction);
@@ -355,6 +360,12 @@ public class FalconCLI {
                 throw new FalconCLIException("Invalid argument: sourceClusters");
             }
         }
+
+        if (optionsList.contains(FORCE_RERUN_FLAG)) {
+            if (!optionsList.contains(RERUN_OPT)) {
+                throw new FalconCLIException("Force option can be used only with instance rerun");
+            }
+        }
     }
 
     private void entityCommand(CommandLine commandLine, FalconClient client)
@@ -797,6 +808,8 @@ public class FalconCLI {
                 "Start returning instances from this offset");
         Option numResults = new Option(NUM_RESULTS_OPT, true,
                 "Number of results to return per request");
+        Option forceRerun = new Option(FORCE_RERUN_FLAG, false,
+                "Flag to forcefully rerun entire workflow of an instance");
 
         instanceOptions.addOption(url);
         instanceOptions.addOptionGroup(group);
@@ -815,6 +828,7 @@ public class FalconCLI {
         instanceOptions.addOption(orderBy);
         instanceOptions.addOption(sortOrder);
         instanceOptions.addOption(numResults);
+        instanceOptions.addOption(forceRerun);
 
         return instanceOptions;
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/fd861453/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 86397c4..a866bb0 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -461,7 +461,8 @@ public class FalconClient {
 
     public InstancesResult rerunInstances(String type, String entity, String start,
                                  String end, String filePath, String colo,
-                                 String clusters, String sourceClusters, List<LifeCycle> lifeCycles)
+                                 String clusters, String sourceClusters, List<LifeCycle> lifeCycles,
+                                 Boolean isForced)
         throws FalconCLIException, IOException {
 
         StringBuilder buffer = new StringBuilder();
@@ -480,7 +481,7 @@ public class FalconClient {
         }
         String temp = (buffer.length() == 0) ? null : buffer.toString();
         return sendInstanceRequest(Instances.RERUN, type, entity, start, end,
-                getServletInputStream(clusters, sourceClusters, temp), null, colo, lifeCycles);
+                getServletInputStream(clusters, sourceClusters, temp), null, colo, lifeCycles, isForced);
     }
 
     public InstancesResult getLogsOfInstances(String type, String entity, String start,
@@ -605,7 +606,8 @@ public class FalconClient {
                                             String start, String end, String runId, String colo,
                                             String fields, String filterBy, String tags,
                                             String orderBy, String sortOrder, Integer offset,
-                                            Integer numResults, Integer numInstances, String searchPattern) {
+                                            Integer numResults, Integer numInstances, String searchPattern,
+                                            Boolean isForced) {
 
         if (!StringUtils.isEmpty(fields)) {
             resource = resource.queryParam("fields", fields);
@@ -647,6 +649,9 @@ public class FalconClient {
         if (!StringUtils.isEmpty(searchPattern)) {
             resource = resource.queryParam("pattern", searchPattern);
         }
+        if (isForced != null) {
+            resource = resource.queryParam("force", String.valueOf(isForced));
+        }
         return resource;
 
     }
@@ -664,7 +669,7 @@ public class FalconClient {
         resource = addParamsToResource(resource, start, end, null, null,
                 fields, filterBy, filterTags,
                 orderBy, sortOrder,
-                offset, numResults, numInstances, null);
+                offset, numResults, numInstances, null, null);
 
         ClientResponse clientResponse = resource
                 .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
@@ -734,16 +739,35 @@ public class FalconClient {
                 .getEntity(InstancesResult.class);
     }
 
+    private InstancesResult sendInstanceRequest(Instances instances, String type,
+                                                String entity, String start, String end, InputStream props,
+                                                String runid, String colo, List<LifeCycle> lifeCycles,
+                                                Boolean isForced) throws FalconCLIException {
+        return sendInstanceRequest(instances, type, entity, start, end, props,
+                runid, colo, lifeCycles, "", "", "", 0, DEFAULT_NUM_RESULTS, isForced).getEntity(InstancesResult.class);
+    }
+
+
+
     private ClientResponse sendInstanceRequest(Instances instances, String type, String entity,
                                        String start, String end, InputStream props, String runid, String colo,
                                        List<LifeCycle> lifeCycles, String filterBy, String orderBy, String sortOrder,
                                        Integer offset, Integer numResults) throws FalconCLIException {
+
+        return sendInstanceRequest(instances, type, entity, start, end, props,
+                runid, colo, lifeCycles, filterBy, orderBy, sortOrder, offset, numResults, null);
+    }
+
+    private ClientResponse sendInstanceRequest(Instances instances, String type, String entity,
+                                       String start, String end, InputStream props, String runid, String colo,
+                                       List<LifeCycle> lifeCycles, String filterBy, String orderBy, String sortOrder,
+                                       Integer offset, Integer numResults, Boolean isForced) throws FalconCLIException {
         checkType(type);
         WebResource resource = service.path(instances.path).path(type)
                 .path(entity);
 
         resource = addParamsToResource(resource, start, end, runid, colo,
-                null, filterBy, null, orderBy, sortOrder, offset, numResults, null, null);
+                null, filterBy, null, orderBy, sortOrder, offset, numResults, null, null, isForced);
 
         if (lifeCycles != null) {
             checkLifeCycleOption(lifeCycles, type);
@@ -800,7 +824,7 @@ public class FalconClient {
         WebResource resource = service.path(entities.path)
                 .path(entityType);
         resource = addParamsToResource(resource, null, null, null, null, fields, filterBy, filterTags,
-                orderBy, sortOrder, offset, numResults, null, searchPattern);
+                orderBy, sortOrder, offset, numResults, null, searchPattern, null);
 
         ClientResponse clientResponse = resource
                 .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)

http://git-wip-us.apache.org/repos/asf/falcon/blob/fd861453/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
index 6b10679..07fafb5 100644
--- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
+++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
@@ -74,7 +74,7 @@ public abstract class AbstractWorkflowEngine {
                                                   List<LifeCycle> lifeCycles) throws FalconException;
 
     public abstract InstancesResult reRunInstances(Entity entity, Date start, Date end, Properties props,
-                                                   List<LifeCycle> lifeCycles) throws FalconException;
+                                                   List<LifeCycle> lifeCycles, Boolean isForced) throws FalconException;
 
     public abstract InstancesResult suspendInstances(Entity entity, Date start, Date end, Properties props,
                                                      List<LifeCycle> lifeCycles) throws FalconException;

http://git-wip-us.apache.org/repos/asf/falcon/blob/fd861453/docs/src/site/twiki/FalconCLI.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki
index ef2152f..d503d22 100644
--- a/docs/src/site/twiki/FalconCLI.twiki
+++ b/docs/src/site/twiki/FalconCLI.twiki
@@ -142,10 +142,12 @@ $FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>> -continue
 
 ---+++Rerun
 
-Rerun option is used to rerun instances of a given process. This option is valid only for process instances in terminal state, i.e. SUCCEDDED, KILLED or FAILED. Optionally, you can specify the properties to override.
+Rerun option is used to rerun instances of a given process. On issuing a rerun, by default the execution resumes from the last failed node in the workflow. This option is valid only for process instances in terminal state, i.e. SUCCEEDED, KILLED or FAILED.
+If one wants to forcefully rerun the entire workflow, -force should be passed along with -rerun
+Additionally, you can also specify properties to override via a properties file.
 
 Usage:
-$FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>> -rerun -start "yyyy-MM-dd'T'HH:mm'Z'" -end "yyyy-MM-dd'T'HH:mm'Z'" [-file <<properties file>>]
+$FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>> -rerun -start "yyyy-MM-dd'T'HH:mm'Z'" -end "yyyy-MM-dd'T'HH:mm'Z'" [-force] [-file <<properties file>>]
 
 ---+++Resume
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/fd861453/docs/src/site/twiki/restapi/InstanceRerun.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/InstanceRerun.twiki b/docs/src/site/twiki/restapi/InstanceRerun.twiki
index d98ae3a..ec30a1e 100644
--- a/docs/src/site/twiki/restapi/InstanceRerun.twiki
+++ b/docs/src/site/twiki/restapi/InstanceRerun.twiki
@@ -5,7 +5,7 @@
    * <a href="#Examples">Examples</a>
 
 ---++ Description
-Rerun instances of an entity.
+Rerun instances of an entity. On issuing a rerun, by default the execution resumes from the last failed node in the workflow.
 
 ---++ Parameters
    * :entity-type can either be a feed or a process.
@@ -13,6 +13,7 @@ Rerun instances of an entity.
    * start is the start time of the instance that you want to refer to
    * end is the end time of the instance that you want to refer to
    * lifecycle <optional param> can be Eviction/Replication(default) for feed and Execution(default) for process.
+   * force <optional param> can be used to forcefully rerun the entire instance.
 
 ---++ Results
 Results of the rerun command.
@@ -40,3 +41,25 @@ POST http://localhost:15000/api/instance/rerun/process/SampleProcess?colo=*&star
     "status": "SUCCEEDED"
 }
 </verbatim>
+
+<verbatim>
+POST http://localhost:15000/api/instance/rerun/process/SampleProcess?colo=*&start=2013-04-03T07:00Z&end=2014-04-03T07:00Z&force=true
+</verbatim>
+---+++ Result
+<verbatim>
+{
+    "instances": [
+        {
+            "details": "",
+            "startTime": "2013-10-21T15:10:47-07:00",
+            "cluster": "primary-cluster",
+            "logFile": "http:\/\/localhost:11000\/oozie?job=0000070-131021115933395-oozie-rgau-W",
+            "status": "RUNNING",
+            "instance": "2012-04-03T07:00Z"
+        }
+    ],
+    "requestId": "default\/7a3582bd-608c-45a7-9b74-1837b51ba6d5\n",
+    "message": "default\/RERUN\n",
+    "status": "SUCCEEDED"
+}
+</verbatim>

http://git-wip-us.apache.org/repos/asf/falcon/blob/fd861453/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 2733cca..62c04ea 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -496,7 +496,11 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
 
     @Override
     public InstancesResult reRunInstances(Entity entity, Date start, Date end,
-                                          Properties props, List<LifeCycle> lifeCycles) throws FalconException {
+                                          Properties props, List<LifeCycle> lifeCycles,
+                                          Boolean isForced) throws FalconException {
+        if (isForced != null && isForced) {
+            props.put(OozieClient.RERUN_FAIL_NODES, String.valueOf(!isForced));
+        }
         return doJobAction(JobAction.RERUN, entity, start, end, props, lifeCycles);
     }
 
@@ -1317,7 +1321,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
                 jobprops.putAll(props);
             }
             //if user has set any of these oozie rerun properties then force rerun flag is ignored
-            if (!jobprops.contains(OozieClient.RERUN_FAIL_NODES) && !jobprops.contains(OozieClient.RERUN_SKIP_NODES)) {
+            if (!jobprops.containsKey(OozieClient.RERUN_FAIL_NODES)
+                    && !jobprops.containsKey(OozieClient.RERUN_SKIP_NODES)) {
                 jobprops.put(OozieClient.RERUN_FAIL_NODES, String.valueOf(!isForced));
             }
             jobprops.remove(OozieClient.COORDINATOR_APP_PATH);

http://git-wip-us.apache.org/repos/asf/falcon/blob/fd861453/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
index ed30869..f9f41d3 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
@@ -443,9 +443,10 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
         }
     }
 
+    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
     public InstancesResult reRunInstance(String type, String entity, String startStr,
                                          String endStr, HttpServletRequest request,
-                                         String colo, List<LifeCycle> lifeCycles) {
+                                         String colo, List<LifeCycle> lifeCycles, Boolean isForced) {
         checkColo(colo);
         checkType(type);
         try {
@@ -458,12 +459,13 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
             Properties props = getProperties(request);
             AbstractWorkflowEngine wfEngine = getWorkflowEngine();
             return wfEngine.reRunInstances(entityObject,
-                    startAndEndDate.first, startAndEndDate.second, props, lifeCycles);
+                    startAndEndDate.first, startAndEndDate.second, props, lifeCycles, isForced);
         } catch (Exception e) {
             LOG.error("Failed to rerun instances", e);
             throw FalconWebException.newInstanceException(e, Response.Status.BAD_REQUEST);
         }
     }
+    //RESUME CHECKSTYLE CHECK ParameterNumberCheck
 
     private Properties getProperties(HttpServletRequest request) throws IOException {
         Properties props = new Properties();

http://git-wip-us.apache.org/repos/asf/falcon/blob/fd861453/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java b/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java
index 7f261ce..b8db4b5 100644
--- a/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java
+++ b/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java
@@ -138,8 +138,8 @@ public class HTTPChannel extends AbstractChannel {
         Annotation[][] paramAnnotations = method.getParameterAnnotations();
         StringBuilder queryString = new StringBuilder("?");
         for (int index = 0; index < args.length; index++) {
-            if (args[index] instanceof String) {
-                String arg = (String) args[index];
+            if (args[index] instanceof String || args[index] instanceof Boolean) {
+                String arg = String.valueOf(args[index]);
                 for (int annotation = 0; annotation < paramAnnotations[index].length; annotation++) {
                     Annotation paramAnnotation = paramAnnotations[index][annotation];
                     String annotationClass = paramAnnotation.annotationType().getName();

http://git-wip-us.apache.org/repos/asf/falcon/blob/fd861453/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
index e6cf904..e304bd8 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
@@ -327,14 +327,15 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
             @Dimension("end-time") @QueryParam("end") final String endStr,
             @Context HttpServletRequest request,
             @Dimension("colo") @QueryParam("colo") String colo,
-            @Dimension("lifecycle") @QueryParam("lifecycle") final List<LifeCycle> lifeCycles) {
+            @Dimension("lifecycle") @QueryParam("lifecycle") final List<LifeCycle> lifeCycles,
+            @Dimension("force") @QueryParam("force") final Boolean isForced) {
 
         final HttpServletRequest bufferedRequest = new BufferedRequest(request);
         return new InstanceProxy<InstancesResult>(InstancesResult.class) {
             @Override
             protected InstancesResult doExecute(String colo) throws FalconException {
                 return getInstanceManager(colo).invoke("reRunInstance",
-                        type, entity, startStr, endStr, bufferedRequest, colo, lifeCycles);
+                        type, entity, startStr, endStr, bufferedRequest, colo, lifeCycles, isForced);
             }
         }.execute(colo, type, entity);
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/fd861453/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java b/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
index d4e0ae0..dc533a2 100644
--- a/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
+++ b/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
@@ -224,8 +224,9 @@ public class InstanceManager extends AbstractInstanceManager {
             @Dimension("end-time") @QueryParam("end") String endStr,
             @Context HttpServletRequest request,
             @Dimension("colo") @QueryParam("colo") String colo,
-            @Dimension("lifecycle") @QueryParam("lifecycle") List<LifeCycle> lifeCycles) {
-        return super.reRunInstance(type, entity, startStr, endStr, request, colo, lifeCycles);
+            @Dimension("lifecycle") @QueryParam("lifecycle") List<LifeCycle> lifeCycles,
+            @Dimension("force") @QueryParam("force") Boolean isForced) {
+        return super.reRunInstance(type, entity, startStr, endStr, request, colo, lifeCycles, isForced);
     }
     //RESUME CHECKSTYLE CHECK ParameterNumberCheck