You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/01/19 18:14:36 UTC

[1/4] falcon git commit: FALCON-1723 Rerun with skip fail actions won't work in few cases (by Pavan Kolamuri)

Repository: falcon
Updated Branches:
  refs/heads/master ae59db3c8 -> ecefdd079


FALCON-1723 Rerun with skip fail actions won't work in few cases (by Pavan Kolamuri)


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/ccd536a8
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/ccd536a8
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/ccd536a8

Branch: refs/heads/master
Commit: ccd536a8da34e9f2547c1faaf3f32987bb976098
Parents: 0084c35
Author: Pallavi Rao <pa...@inmobi.com>
Authored: Tue Jan 19 19:24:42 2016 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Tue Jan 19 19:24:42 2016 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 docs/src/site/twiki/FalconCLI.twiki             |  2 +-
 .../workflow/engine/OozieWorkflowEngine.java    | 54 ++++++++++++++------
 .../workflow/engine/FalconWorkflowEngine.java   |  3 ++
 4 files changed, 45 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/ccd536a8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 43c7b81..f80e7a1 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -116,6 +116,8 @@ Proposed Release Version: 0.9
   OPTIMIZATIONS
 
   BUG FIXES
+    FALCON-1723 Rerun with skip fail actions won't work in few cases (Pavan Kolamuri via Pallavi Rao)
+
     FALCON-1538 Prism status gives wrong info(Praveen Adlakha via Ajay Yadava)
 
     FALCON-1715 IllegalStateException in MetadataMappingService when entity is scheduled via native scheduler (Pallavi Rao)

http://git-wip-us.apache.org/repos/asf/falcon/blob/ccd536a8/docs/src/site/twiki/FalconCLI.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki
index c62d56d..5395f12 100644
--- a/docs/src/site/twiki/FalconCLI.twiki
+++ b/docs/src/site/twiki/FalconCLI.twiki
@@ -241,7 +241,7 @@ $FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>> -continue
 
 Rerun option is used to rerun instances of a given process. On issuing a rerun, by default the execution resumes from the last failed node in the workflow. This option is valid only for process instances in terminal state, i.e. SUCCEEDED, KILLED or FAILED.
 If one wants to forcefully rerun the entire workflow, -force should be passed along with -rerun
-Additionally, you can also specify properties to override via a properties file.
+Additionally, you can also specify properties to override via a properties file and this will be prioritized over force option in case of contradiction.
 
 Usage:
 $FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>> -rerun -start "yyyy-MM-dd'T'HH:mm'Z'" -end "yyyy-MM-dd'T'HH:mm'Z'" [-force] [-file <<properties file>>]

http://git-wip-us.apache.org/repos/asf/falcon/blob/ccd536a8/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 72c029c..04f5e93 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -560,10 +560,10 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     public InstancesResult reRunInstances(Entity entity, Date start, Date end,
                                           Properties props, List<LifeCycle> lifeCycles,
                                           Boolean isForced) throws FalconException {
-        if (isForced != null && isForced) {
-            props.put(OozieClient.RERUN_FAIL_NODES, String.valueOf(!isForced));
+        if (isForced == null) {
+            isForced = false;
         }
-        return doJobAction(JobAction.RERUN, entity, start, end, props, lifeCycles);
+        return doJobAction(JobAction.RERUN, entity, start, end, props, lifeCycles, false, isForced);
     }
 
     @Override
@@ -642,8 +642,10 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         return doJobAction(action, entity, start, end, props, lifeCycles, null);
     }
 
-    private InstancesResult doJobAction(JobAction action, Entity entity, Date start, Date end, Properties props,
-                                        List<LifeCycle> lifeCycles, Boolean allAttempts) throws FalconException {
+    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
+    private InstancesResult doJobAction(JobAction action, Entity entity, Date start, Date end,
+                                        Properties props, List<LifeCycle> lifeCycles,
+                                        Boolean allAttempts, boolean isForced) throws FalconException {
         Map<String, List<CoordinatorAction>> actionsMap = getCoordActions(entity, start, end, lifeCycles);
         List<String> clusterList = getIncludedClusters(props, FALCON_INSTANCE_ACTION_CLUSTERS);
         List<String> sourceClusterList = getIncludedClusters(props, FALCON_INSTANCE_SOURCE_CLUSTERS);
@@ -674,7 +676,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
                 instance.sourceCluster = sourceCluster;
                 if (action.equals(JobAction.STATUS) && Boolean.TRUE.equals(allAttempts)) {
                     try {
-                        performAction(cluster, action, coordinatorAction, props, instance);
+                        performAction(cluster, action, coordinatorAction, props, instance, isForced);
                         if (instance.getRunId() > 0) {
                             instanceList = getAllInstances(cluster, coordinatorAction, nominalTimeStr);
                         } else {
@@ -692,7 +694,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
                     }
                 } else {
                     try {
-                        performAction(cluster, action, coordinatorAction, props, instance);
+                        performAction(cluster, action, coordinatorAction, props, instance, isForced);
                     } catch (FalconException e) {
                         LOG.warn("Unable to perform action {} on cluster", action, e);
                         instance.status = WorkflowStatus.ERROR;
@@ -711,6 +713,13 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         return instancesResult;
     }
 
+    //RESUME CHECKSTYLE CHECK ParameterNumberCheck
+
+    private InstancesResult doJobAction(JobAction action, Entity entity, Date start, Date end, Properties props,
+                                        List<LifeCycle> lifeCycles, Boolean allAttempts) throws FalconException {
+        return doJobAction(action, entity, start, end, props, lifeCycles, allAttempts, false);
+    }
+
     private InstancesSummaryResult doSummaryJobAction(Entity entity, Date start,
                                                       Date end, Properties props,
                                                       List<LifeCycle> lifeCycles) throws FalconException {
@@ -878,7 +887,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     private void performAction(String cluster, JobAction action, CoordinatorAction coordinatorAction,
-        Properties props, InstancesResult.Instance instance) throws FalconException {
+        Properties props, InstancesResult.Instance instance, boolean isForced) throws FalconException {
         WorkflowJob jobInfo = null;
         String status = coordinatorAction.getStatus().name();
         if (StringUtils.isNotEmpty(coordinatorAction.getExternalId())) {
@@ -925,7 +934,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
                 status = Status.RUNNING.name();
             } else if (jobInfo != null && WF_RERUN_PRECOND.contains(jobInfo.getStatus())) {
                 //wf re-run
-                reRun(cluster, jobInfo.getId(), props, false);
+                reRun(cluster, jobInfo.getId(), props, isForced);
                 status = Status.RUNNING.name();
             }
             break;
@@ -1481,17 +1490,31 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         OozieClient client = OozieClientFactory.get(cluster);
         try {
             WorkflowJob jobInfo = client.getJobInfo(jobId);
-            Properties jobprops = OozieUtils.toProperties(jobInfo.getConf());
-            if (props != null) {
-                jobprops.putAll(props);
+            if (props == null) {
+                props = new Properties();
             }
+
             //if user has set any of these oozie rerun properties then force rerun flag is ignored
-            if (!jobprops.containsKey(OozieClient.RERUN_FAIL_NODES)
-                    && !jobprops.containsKey(OozieClient.RERUN_SKIP_NODES)) {
-                jobprops.put(OozieClient.RERUN_FAIL_NODES, String.valueOf(!isForced));
+            if (!props.containsKey(OozieClient.RERUN_FAIL_NODES)
+                    && !props.containsKey(OozieClient.RERUN_SKIP_NODES)) {
+                props.put(OozieClient.RERUN_FAIL_NODES, String.valueOf(!isForced));
             }
+
+            Properties jobprops = OozieUtils.toProperties(jobInfo.getConf());
+            jobprops.putAll(props);
+
             jobprops.remove(OozieClient.COORDINATOR_APP_PATH);
             jobprops.remove(OozieClient.BUNDLE_APP_PATH);
+
+            // In case if both props exists one should be removed otherwise it will fail.
+            // This case will occur when user runs workflow with skip-nodes property and
+            // try to do force rerun or rerun with fail-nodes property.
+            if (jobprops.containsKey(OozieClient.RERUN_FAIL_NODES)
+                    && jobprops.containsKey(OozieClient.RERUN_SKIP_NODES)) {
+                LOG.warn("Both " + OozieClient.RERUN_SKIP_NODES + " and " + OozieClient.RERUN_FAIL_NODES
+                        + " are present in workflow params removing" + OozieClient.RERUN_SKIP_NODES);
+                jobprops.remove(OozieClient.RERUN_SKIP_NODES);
+            }
             client.reRun(jobId, jobprops);
             assertStatus(cluster, jobId, Job.Status.RUNNING);
             LOG.info("Rerun job {} on cluster {}", jobId, cluster);
@@ -1501,6 +1524,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         }
     }
 
+
     private void assertStatus(String cluster, String jobId, Status... statuses) throws FalconException {
 
         String actualStatus = null;

http://git-wip-us.apache.org/repos/asf/falcon/blob/ccd536a8/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
index efe9049..c9d6b86 100644
--- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
+++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
@@ -371,6 +371,9 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine {
     @Override
     public InstancesResult reRunInstances(Entity entity, Date start, Date end, Properties props,
                                           List<LifeCycle> lifeCycles, Boolean isForced) throws FalconException {
+        if (isForced == null) {
+            isForced = false;
+        }
         return doJobAction(JobAction.RERUN, entity, start, end, props, lifeCycles, isForced);
     }
 


[4/4] falcon git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/falcon

Posted by pa...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/falcon


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/ecefdd07
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/ecefdd07
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/ecefdd07

Branch: refs/heads/master
Commit: ecefdd079731f11f90e5916ef20afd0815e2d748
Parents: 8a739e1 ae59db3
Author: Pallavi Rao <pa...@inmobi.com>
Authored: Tue Jan 19 22:44:19 2016 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Tue Jan 19 22:44:19 2016 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../falcon/client/FalconCLIException.java       |   2 +-
 .../org/apache/falcon/client/FalconClient.java  | 931 +++++++------------
 3 files changed, 330 insertions(+), 605 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/ecefdd07/CHANGES.txt
----------------------------------------------------------------------


[3/4] falcon git commit: FALCON-1742 Implement instance summary api for native scheduler (By Pallavi Rao)

Posted by pa...@apache.org.
FALCON-1742 Implement instance summary api for native scheduler (By Pallavi Rao)


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/8a739e1f
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/8a739e1f
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/8a739e1f

Branch: refs/heads/master
Commit: 8a739e1f631a9a5c7805c5c9f0e1b0521b6c3a06
Parents: 5fb3a7a
Author: Pallavi Rao <pa...@inmobi.com>
Authored: Tue Jan 19 19:39:40 2016 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Tue Jan 19 19:39:40 2016 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../falcon/resource/EntitySummaryResult.java    |  2 +-
 .../falcon/state/store/InMemoryStateStore.java  | 21 ++++++++++++
 .../falcon/state/store/InstanceStateStore.java  | 13 ++++++++
 .../falcon/state/store/jdbc/BeanMapperUtil.java | 20 +++++++++++
 .../falcon/state/store/jdbc/InstanceBean.java   |  3 +-
 .../falcon/state/store/jdbc/JDBCStateStore.java | 17 ++++++++++
 .../workflow/engine/FalconWorkflowEngine.java   | 23 ++++++++++++-
 .../state/service/store/TestJDBCStateStore.java | 35 ++++++++++++++++++++
 .../apache/falcon/unit/FalconUnitClient.java    |  6 ++++
 .../InstanceSchedulerManagerJerseyIT.java       | 24 ++++++++++++++
 11 files changed, 163 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/8a739e1f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 255706d..f616298 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -23,6 +23,8 @@ Proposed Release Version: 0.9
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+    FALCON-1742 Implement instance summary api for native scheduler (Pallavi Rao)
+
     FALCON-1677 Support re-tries for timed-out instances (Narayan Periwal via Pallavi Rao)
 
     FALCON-1643 Add CLI option to display captured replication metrics(Peeyush Bishnoi via Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/8a739e1f/client/src/main/java/org/apache/falcon/resource/EntitySummaryResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/EntitySummaryResult.java b/client/src/main/java/org/apache/falcon/resource/EntitySummaryResult.java
index 4a885ec..3ebfe26 100644
--- a/client/src/main/java/org/apache/falcon/resource/EntitySummaryResult.java
+++ b/client/src/main/java/org/apache/falcon/resource/EntitySummaryResult.java
@@ -35,7 +35,7 @@ public class EntitySummaryResult extends APIResult {
      * Workflow status as being set in result object.
      */
     public static enum WorkflowStatus {
-        WAITING, RUNNING, SUSPENDED, KILLED, FAILED, SUCCEEDED, ERROR
+        WAITING, RUNNING, SUSPENDED, KILLED, FAILED, SUCCEEDED, ERROR, READY
     }
 
     @XmlElement

http://git-wip-us.apache.org/repos/asf/falcon/blob/8a739e1f/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java
index c4ced46..69f1e48 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java
@@ -218,6 +218,27 @@ public final class InMemoryStateStore extends AbstractStateStore {
     }
 
     @Override
+    public Map<InstanceState.STATE, Long> getExecutionInstanceSummary(Entity entity, String cluster,
+            DateTime start, DateTime end) throws StateStoreException {
+        Map<InstanceState.STATE, Long> summary = new HashMap<>();
+        for (InstanceState state : getAllExecutionInstances(entity, cluster)) {
+            ExecutionInstance instance = state.getInstance();
+            DateTime instanceTime = instance.getInstanceTime();
+            // Start date inclusive and end date exclusive.
+            // If start date and end date are equal no instances will be added.
+            if ((instanceTime.isEqual(start) || instanceTime.isAfter(start))
+                    && instanceTime.isBefore(end)) {
+                if (summary.containsKey(state.getCurrentState())) {
+                    summary.put(state.getCurrentState(), summary.get(state.getCurrentState()) + 1L);
+                } else {
+                    summary.put(state.getCurrentState(), 1L);
+                }
+            }
+        }
+        return summary;
+    }
+
+    @Override
     public InstanceState getLastExecutionInstance(Entity entity, String cluster) throws StateStoreException {
         EntityClusterID id = new EntityClusterID(entity, cluster);
         if (!entityStates.containsKey(id.getEntityID().getKey())) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/8a739e1f/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java
index 8ce8fa0..b7269f8 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java
@@ -17,6 +17,7 @@
  */
 package org.apache.falcon.state.store;
 
+import java.util.Map;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.exception.StateStoreException;
 import org.apache.falcon.state.EntityClusterID;
@@ -101,6 +102,18 @@ public interface InstanceStateStore {
      */
     Collection<InstanceState> getExecutionInstances(EntityClusterID entityClusterID,
                                                     Collection<InstanceState.STATE> states) throws StateStoreException;
+
+    /**
+     * @param entity
+     * @param cluster
+     * @param states
+     * @param start
+     * @param end
+     * @return - A map of state and the no. of instances in that state.
+     * @throws StateStoreException
+     */
+    Map<InstanceState.STATE, Long> getExecutionInstanceSummary(Entity entity, String cluster,
+                                                               DateTime start, DateTime end) throws StateStoreException;
     /**
      * @param entity
      * @param cluster

http://git-wip-us.apache.org/repos/asf/falcon/blob/8a739e1f/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
index 3def14a..194819e 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
@@ -17,6 +17,8 @@
  */
 package org.apache.falcon.state.store.jdbc;
 
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.commons.io.IOUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.EntityUtil;
@@ -301,4 +303,22 @@ public final class BeanMapperUtil {
             IOUtils.closeQuietly(out);
         }
     }
+
+    /**
+     * @param summary
+     * @return A map of state and count given the JQL result.
+     */
+    public static Map<InstanceState.STATE, Long> getInstanceStateSummary(Collection<Object[]> summary) {
+        Map<InstanceState.STATE, Long> stateSummary = new HashMap<>();
+        if (summary != null && !summary.isEmpty()) {
+            for (Object[] projection : summary) {
+                // Has to have 2 columns (state and count), else Array will be out of bounds.
+                if (projection.length == 2) {
+                    stateSummary.put(InstanceState.STATE.valueOf((String)projection[0]),
+                            Long.valueOf(((Number)projection[1]).longValue()));
+                }
+            }
+        }
+        return stateSummary;
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/8a739e1f/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
index 7f7b966..e8385b1 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
@@ -50,7 +50,8 @@ import java.sql.Timestamp;
         @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_FOR_STATES", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.currentState IN (:currentState)"),
         @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER_FOR_STATES_WITH_RANGE", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.currentState IN (:currentState) AND a.instanceTime >= :startTime AND a.instanceTime < :endTime"),
         @NamedQuery(name = "GET_LAST_INSTANCE_FOR_ENTITY_CLUSTER", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster order by a.instanceTime desc"),
-        @NamedQuery(name = "DELETE_INSTANCES_TABLE", query = "delete from InstanceBean a")
+        @NamedQuery(name = "DELETE_INSTANCES_TABLE", query = "delete from InstanceBean a"),
+        @NamedQuery(name = "GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE", query = "select a.currentState, COUNT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.instanceTime >= :startTime AND a.instanceTime < :endTime GROUP BY a.currentState")
 })
 //RESUME CHECKSTYLE CHECK  LineLengthCheck
 @Table(name = "INSTANCES")

http://git-wip-us.apache.org/repos/asf/falcon/blob/8a739e1f/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
index 2eafbce..1c07286 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
@@ -17,6 +17,7 @@
  */
 package org.apache.falcon.state.store.jdbc;
 
+import java.util.Map;
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.exception.StateStoreException;
@@ -357,6 +358,22 @@ public final class JDBCStateStore extends AbstractStateStore {
     }
 
     @Override
+    public Map<InstanceState.STATE, Long> getExecutionInstanceSummary(Entity entity, String cluster,
+            DateTime start, DateTime end) throws StateStoreException {
+        String entityKey = new EntityClusterID(entity, cluster).getEntityID().getKey();
+        EntityManager entityManager = getEntityManager();
+        Query q = entityManager.createNamedQuery("GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE");
+        q.setParameter("entityId", entityKey);
+        q.setParameter("cluster", cluster);
+        q.setParameter("startTime", new Timestamp(start.getMillis()));
+        q.setParameter("endTime", new Timestamp(end.getMillis()));
+        List result  = q.getResultList();
+        entityManager.close();
+
+        return BeanMapperUtil.getInstanceStateSummary(result);
+    }
+
+    @Override
     public Collection<InstanceState> getExecutionInstances(Entity entity, String cluster,
                                                            Collection<InstanceState.STATE> states, DateTime start,
                                                            DateTime end) throws StateStoreException {

http://git-wip-us.apache.org/repos/asf/falcon/blob/8a739e1f/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
index c9d6b86..7ce2420 100644
--- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
+++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
@@ -18,6 +18,7 @@
 
 package org.apache.falcon.workflow.engine;
 
+import java.util.HashMap;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.LifeCycle;
@@ -398,7 +399,27 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine {
     @Override
     public InstancesSummaryResult getSummary(Entity entity, Date start, Date end,
                                              List<LifeCycle> lifeCycles) throws FalconException {
-        throw new FalconException("Not yet Implemented");
+        Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
+        List<InstancesSummaryResult.InstanceSummary> instanceSummaries = new ArrayList<>();
+
+        // Iterate over entity clusters
+        for (String cluster : clusters) {
+            LOG.debug("Retrieving summary of instances for cluster : {}", cluster);
+            Map<InstanceState.STATE, Long> summaries = STATE_STORE.getExecutionInstanceSummary(entity, cluster,
+                    new DateTime(start), new DateTime(end));
+            Map<String, Long> summaryMap = new HashMap<>();
+            // Iterate over the map and convert STATE to String
+            for (Map.Entry<InstanceState.STATE, Long> summary : summaries.entrySet()) {
+                summaryMap.put(summary.getKey().name(), summary.getValue());
+            }
+            instanceSummaries.add(new InstancesSummaryResult.InstanceSummary(cluster, summaryMap));
+        }
+
+        InstancesSummaryResult instancesSummaryResult =
+                new InstancesSummaryResult(APIResult.Status.SUCCEEDED, JobAction.SUMMARY.name());
+        instancesSummaryResult.setInstancesSummary(instanceSummaries.
+                toArray(new InstancesSummaryResult.InstanceSummary[instanceSummaries.size()]));
+        return instancesSummaryResult;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/falcon/blob/8a739e1f/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java b/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
index 2a383cc..d597e27 100644
--- a/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
+++ b/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
@@ -17,6 +17,7 @@
  */
 package org.apache.falcon.state.service.store;
 
+import java.util.Map;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.cluster.util.EmbeddedCluster;
@@ -445,7 +446,41 @@ public class TestJDBCStateStore extends AbstractSchedulerTestBase {
         Assert.assertEquals(instances.size(), 0);
     }
 
+    @Test
+    public void testGetExecutionSummaryWithRange() throws Exception {
+        storeEntity(EntityType.CLUSTER, "testCluster");
+        storeEntity(EntityType.FEED, "clicksFeed");
+        storeEntity(EntityType.FEED, "clicksSummary");
+
+        long instance1Time = System.currentTimeMillis() - 180000;
+        long instance2Time = System.currentTimeMillis();
+        EntityState entityState = getEntityState(EntityType.PROCESS, "clicksProcess");
+        ExecutionInstance processExecutionInstance1 = BeanMapperUtil.getExecutionInstance(
+                entityState.getEntity().getEntityType(), entityState.getEntity(),
+                instance1Time, "cluster1", instance1Time);
+        InstanceState instanceState1 = new InstanceState(processExecutionInstance1);
+        instanceState1.setCurrentState(InstanceState.STATE.RUNNING);
 
+        ExecutionInstance processExecutionInstance2 = BeanMapperUtil.getExecutionInstance(
+                entityState.getEntity().getEntityType(), entityState.getEntity(),
+                instance2Time, "cluster1", instance2Time);
+        InstanceState instanceState2 = new InstanceState(processExecutionInstance2);
+        instanceState2.setCurrentState(InstanceState.STATE.SUCCEEDED);
+
+        stateStore.putExecutionInstance(instanceState1);
+        stateStore.putExecutionInstance(instanceState2);
+
+
+        Map<InstanceState.STATE, Long> summary = stateStore.getExecutionInstanceSummary(entityState.getEntity(),
+                "cluster1", new DateTime(instance1Time), new DateTime(instance1Time + 60000));
+        Assert.assertEquals(summary.size(), 1);
+        Assert.assertEquals(summary.get(InstanceState.STATE.RUNNING).longValue(), 1L);
+
+        summary = stateStore.getExecutionInstanceSummary(entityState.getEntity(),
+                "cluster1", new DateTime(instance2Time), new DateTime(instance2Time + 60000));
+        Assert.assertEquals(summary.size(), 1);
+        Assert.assertEquals(summary.get(InstanceState.STATE.SUCCEEDED).longValue(), 1L);
+    }
 
     private void initInstanceState(InstanceState instanceState) {
         instanceState.setCurrentState(InstanceState.STATE.READY);

http://git-wip-us.apache.org/repos/asf/falcon/blob/8a739e1f/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
index a82cf03..37221f3 100644
--- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
+++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
@@ -324,6 +324,12 @@ public class FalconUnitClient extends AbstractFalconClient {
                                                         String colo, List<LifeCycle> lifeCycles, String filterBy,
                                                         String orderBy, String sortOrder, String doAsUser) throws
             FalconCLIException {
+        if (StringUtils.isBlank(orderBy)) {
+            orderBy = DEFAULT_ORDERBY;
+        }
+        if (StringUtils.isBlank(sortOrder)) {
+            sortOrder = DEFAULT_SORTED_ORDER;
+        }
         return localInstanceManager.getSummary(type, entity, start, end, colo, lifeCycles, filterBy, orderBy,
                 sortOrder);
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/8a739e1f/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java
index b1c8ce0..b06725f 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java
@@ -144,4 +144,28 @@ public class InstanceSchedulerManagerJerseyIT extends AbstractSchedulerManagerJe
         Assert.assertEquals(result.getInstances()[0].getInstance(), "2012-04-22T00:00Z");
         Assert.assertEquals(result.getInstances()[2].getInstance(), START_INSTANCE);
     }
+
+    @Test
+    public void testInstanceSummary() throws Exception {
+        UnitTestContext context = new UnitTestContext();
+        Map<String, String> overlay = context.getUniqueOverlay();
+
+        setupProcessExecution(context, overlay, 3);
+
+        String processName = overlay.get(PROCESS_NAME);
+        String colo = overlay.get(COLO);
+
+        waitForStatus(EntityType.PROCESS.toString(), processName,
+                START_INSTANCE, InstancesResult.WorkflowStatus.RUNNING);
+
+        InstancesSummaryResult result = falconUnitClient.getSummaryOfInstances(EntityType.PROCESS.toString(),
+                processName, START_INSTANCE, "2012-04-23T00:00Z", colo, null, null, null, null, null);
+
+        Assert.assertEquals(result.getInstancesSummary().length, 1);
+        Assert.assertEquals(result.getInstancesSummary()[0].getCluster(), overlay.get(CLUSTER));
+        Assert.assertEquals(result.getInstancesSummary()[0].getSummaryMap().size(), 2);
+        // Parallelism is 2
+        Assert.assertEquals(result.getInstancesSummary()[0].getSummaryMap().get("RUNNING").longValue(), 2L);
+        Assert.assertEquals(result.getInstancesSummary()[0].getSummaryMap().get("READY").longValue(), 1L);
+    }
 }


[2/4] falcon git commit: FALCON-1727 Suspend fails with InvalidStateTransitionException if entity has 'KILLED' instances (by Pallavi Rao)

Posted by pa...@apache.org.
FALCON-1727 Suspend fails with InvalidStateTransitionException if entity has 'KILLED' instances (by Pallavi Rao)


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/5fb3a7ab
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/5fb3a7ab
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/5fb3a7ab

Branch: refs/heads/master
Commit: 5fb3a7ab830255d9321ff4a5a8adb481c6c0d683
Parents: ccd536a
Author: Pallavi Rao <pa...@inmobi.com>
Authored: Tue Jan 19 19:27:21 2016 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Tue Jan 19 19:27:21 2016 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../falcon/execution/ProcessExecutor.java       | 39 +++++++++++++-------
 .../org/apache/falcon/state/EntityState.java    |  3 +-
 3 files changed, 29 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/5fb3a7ab/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f80e7a1..255706d 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -116,6 +116,8 @@ Proposed Release Version: 0.9
   OPTIMIZATIONS
 
   BUG FIXES
+    FALCON-1727 Suspend fails with InvalidStateTransitionException if entity has 'KILLED' instances (Pallavi Rao)
+
     FALCON-1723 Rerun with skip fail actions won't work in few cases (Pavan Kolamuri via Pallavi Rao)
 
     FALCON-1538 Prism status gives wrong info(Praveen Adlakha via Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/5fb3a7ab/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
index 40fe1b3..188cec2 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
@@ -56,7 +56,6 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.Properties;
-import java.util.TimeZone;
 
 /**
  * This class is responsible for managing execution instances of a process.
@@ -153,8 +152,7 @@ public class ProcessExecutor extends EntityExecutor {
                 suspend(instance);
             } catch (FalconException e) {
                 // Proceed with next
-                errMsg.append("Instance suspend failed for : " + instance.getId() + " due to " + e.getMessage());
-                LOG.error("Instance suspend failed for : " + instance.getId(), e);
+                errMsg.append(handleError(instance, e, EntityState.EVENT.SUSPEND));
             }
         }
         for (InstanceState instanceState : STATE_STORE.getExecutionInstances(process, cluster,
@@ -163,8 +161,7 @@ public class ProcessExecutor extends EntityExecutor {
             try {
                 suspend(instance);
             } catch (FalconException e) {
-                errMsg.append("Instance suspend failed for : " + instance.getId() + " due to " + e.getMessage());
-                LOG.error("Instance suspend failed for : " + instance.getId(), e);
+                errMsg.append(handleError(instance, e, EntityState.EVENT.SUSPEND));
             }
         }
         // Some errors
@@ -173,6 +170,24 @@ public class ProcessExecutor extends EntityExecutor {
         }
     }
 
+    // Error handling for an operation.
+    private String handleError(ExecutionInstance instance, FalconException e, EntityState.EVENT action)
+        throws StateStoreException {
+        try {
+            // If the instance terminated while a kill/suspend operation was in progress, ignore the exception.
+            InstanceState.STATE currentState = STATE_STORE.getExecutionInstance(instance.getId()).getCurrentState();
+            if (InstanceState.getTerminalStates().contains(currentState)) {
+                return "";
+            }
+        } catch (StateStoreException sse) {
+            throw sse;
+        }
+
+        String errMsg = "Instance " + action.name() + " failed for: " + instance.getId() + " due to " + e.getMessage();
+        LOG.error(errMsg, e);
+        return errMsg;
+    }
+
     //  Returns last materialized instance's time.
     private Date getLastInstanceTime() throws StateStoreException {
         InstanceState instanceState = STATE_STORE.getLastExecutionInstance(process, cluster);
@@ -198,8 +213,8 @@ public class ProcessExecutor extends EntityExecutor {
             try {
                 resume(instance);
             } catch (FalconException e) {
-                errMsg.append("Instance suspend failed for : " + instance.getId() + " due to " + e.getMessage());
-                LOG.error("Instance suspend failed for : " + instance.getId(), e);
+                errMsg.append("Instance resume failed for : " + instance.getId() + " due to " + e.getMessage());
+                LOG.error("Instance resume failed for : " + instance.getId(), e);
             }
         }
         registerForNotifications(getLastInstanceTime());
@@ -219,8 +234,7 @@ public class ProcessExecutor extends EntityExecutor {
                 kill(instance);
             } catch (FalconException e) {
                 // Proceed with next
-                errMsg.append("Instance kill failed for : " + instance.getId() + " due to " + e.getMessage());
-                LOG.error("Instance kill failed for : " + instance.getId(), e);
+                errMsg.append(handleError(instance, e, EntityState.EVENT.KILL));
             }
         }
         for (InstanceState instanceState : STATE_STORE.getExecutionInstances(process, cluster,
@@ -229,8 +243,7 @@ public class ProcessExecutor extends EntityExecutor {
             try {
                 kill(instance);
             } catch (FalconException e) {
-                errMsg.append("Instance kill failed for : " + instance.getId() + " due to " + e.getMessage());
-                LOG.error("Instance kill failed for : " + instance.getId(), e);
+                errMsg.append(handleError(instance, e, EntityState.EVENT.KILL));
             }
         }
         // Some errors
@@ -248,12 +261,10 @@ public class ProcessExecutor extends EntityExecutor {
             LOG.error("Suspend failed for instance id : " + instance.getId(), e);
             throw new FalconException("Suspend failed for instance : " + instance.getId(), e);
         }
-
     }
 
     @Override
     public void resume(ExecutionInstance instance) throws FalconException {
-
         try {
             instance.resume();
             if (((ProcessExecutionInstance) instance).isScheduled()) {
@@ -452,7 +463,7 @@ public class ProcessExecutor extends EntityExecutor {
         requestBuilder.setFrequency(process.getFrequency())
                 .setStartTime(new DateTime(startTime))
                 .setEndTime(new DateTime(endTime))
-                .setTimeZone(TimeZone.getTimeZone("UTC"));
+                .setTimeZone(EntityUtil.getTimeZone(process));
         NotificationServicesRegistry.register(requestBuilder.build());
         LOG.info("Registered for a time based notification for process {}  with frequency: {}, "
                 + "start time: {}, end time: {}", process.getName(), process.getFrequency(), startTime, endTime);

http://git-wip-us.apache.org/repos/asf/falcon/blob/5fb3a7ab/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/EntityState.java b/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
index f44f174..ae57fa1 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
@@ -81,7 +81,8 @@ public class EntityState implements StateMachine<EntityState.STATE, EntityState.
         SUBMIT,
         SCHEDULE,
         SUSPEND,
-        RESUME
+        RESUME,
+        KILL
     }
 
     /**