You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/01/19 18:14:38 UTC

[3/4] falcon git commit: FALCON-1742 Implement instance summary api for native scheduler (By Pallavi Rao)

FALCON-1742 Implement instance summary api for native scheduler (By Pallavi Rao)


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/8a739e1f
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/8a739e1f
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/8a739e1f

Branch: refs/heads/master
Commit: 8a739e1f631a9a5c7805c5c9f0e1b0521b6c3a06
Parents: 5fb3a7a
Author: Pallavi Rao <pa...@inmobi.com>
Authored: Tue Jan 19 19:39:40 2016 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Tue Jan 19 19:39:40 2016 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../falcon/resource/EntitySummaryResult.java    |  2 +-
 .../falcon/state/store/InMemoryStateStore.java  | 21 ++++++++++++
 .../falcon/state/store/InstanceStateStore.java  | 13 ++++++++
 .../falcon/state/store/jdbc/BeanMapperUtil.java | 20 +++++++++++
 .../falcon/state/store/jdbc/InstanceBean.java   |  3 +-
 .../falcon/state/store/jdbc/JDBCStateStore.java | 17 ++++++++++
 .../workflow/engine/FalconWorkflowEngine.java   | 23 ++++++++++++-
 .../state/service/store/TestJDBCStateStore.java | 35 ++++++++++++++++++++
 .../apache/falcon/unit/FalconUnitClient.java    |  6 ++++
 .../InstanceSchedulerManagerJerseyIT.java       | 24 ++++++++++++++
 11 files changed, 163 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/8a739e1f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 255706d..f616298 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -23,6 +23,8 @@ Proposed Release Version: 0.9
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+    FALCON-1742 Implement instance summary api for native scheduler (Pallavi Rao)
+
     FALCON-1677 Support re-tries for timed-out instances (Narayan Periwal via Pallavi Rao)
 
     FALCON-1643 Add CLI option to display captured replication metrics(Peeyush Bishnoi via Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/8a739e1f/client/src/main/java/org/apache/falcon/resource/EntitySummaryResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/EntitySummaryResult.java b/client/src/main/java/org/apache/falcon/resource/EntitySummaryResult.java
index 4a885ec..3ebfe26 100644
--- a/client/src/main/java/org/apache/falcon/resource/EntitySummaryResult.java
+++ b/client/src/main/java/org/apache/falcon/resource/EntitySummaryResult.java
@@ -35,7 +35,7 @@ public class EntitySummaryResult extends APIResult {
      * Workflow status as being set in result object.
      */
     public static enum WorkflowStatus {
-        WAITING, RUNNING, SUSPENDED, KILLED, FAILED, SUCCEEDED, ERROR
+        WAITING, RUNNING, SUSPENDED, KILLED, FAILED, SUCCEEDED, ERROR, READY
     }
 
     @XmlElement

http://git-wip-us.apache.org/repos/asf/falcon/blob/8a739e1f/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java
index c4ced46..69f1e48 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java
@@ -218,6 +218,27 @@ public final class InMemoryStateStore extends AbstractStateStore {
     }
 
     @Override
+    public Map<InstanceState.STATE, Long> getExecutionInstanceSummary(Entity entity, String cluster,
+            DateTime start, DateTime end) throws StateStoreException {
+        Map<InstanceState.STATE, Long> summary = new HashMap<>();
+        for (InstanceState state : getAllExecutionInstances(entity, cluster)) {
+            ExecutionInstance instance = state.getInstance();
+            DateTime instanceTime = instance.getInstanceTime();
+            // Start date inclusive and end date exclusive.
+            // If start date and end date are equal no instances will be added.
+            if ((instanceTime.isEqual(start) || instanceTime.isAfter(start))
+                    && instanceTime.isBefore(end)) {
+                if (summary.containsKey(state.getCurrentState())) {
+                    summary.put(state.getCurrentState(), summary.get(state.getCurrentState()) + 1L);
+                } else {
+                    summary.put(state.getCurrentState(), 1L);
+                }
+            }
+        }
+        return summary;
+    }
+
+    @Override
     public InstanceState getLastExecutionInstance(Entity entity, String cluster) throws StateStoreException {
         EntityClusterID id = new EntityClusterID(entity, cluster);
         if (!entityStates.containsKey(id.getEntityID().getKey())) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/8a739e1f/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java
index 8ce8fa0..b7269f8 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java
@@ -17,6 +17,7 @@
  */
 package org.apache.falcon.state.store;
 
+import java.util.Map;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.exception.StateStoreException;
 import org.apache.falcon.state.EntityClusterID;
@@ -101,6 +102,18 @@ public interface InstanceStateStore {
      */
     Collection<InstanceState> getExecutionInstances(EntityClusterID entityClusterID,
                                                     Collection<InstanceState.STATE> states) throws StateStoreException;
+
+    /**
+     * @param entity
+     * @param cluster
+     * @param states
+     * @param start
+     * @param end
+     * @return - A map of state and the no. of instances in that state.
+     * @throws StateStoreException
+     */
+    Map<InstanceState.STATE, Long> getExecutionInstanceSummary(Entity entity, String cluster,
+                                                               DateTime start, DateTime end) throws StateStoreException;
     /**
      * @param entity
      * @param cluster

http://git-wip-us.apache.org/repos/asf/falcon/blob/8a739e1f/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
index 3def14a..194819e 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
@@ -17,6 +17,8 @@
  */
 package org.apache.falcon.state.store.jdbc;
 
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.commons.io.IOUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.EntityUtil;
@@ -301,4 +303,22 @@ public final class BeanMapperUtil {
             IOUtils.closeQuietly(out);
         }
     }
+
+    /**
+     * @param summary
+     * @return A map of state and count given the JQL result.
+     */
+    public static Map<InstanceState.STATE, Long> getInstanceStateSummary(Collection<Object[]> summary) {
+        Map<InstanceState.STATE, Long> stateSummary = new HashMap<>();
+        if (summary != null && !summary.isEmpty()) {
+            for (Object[] projection : summary) {
+                // Has to have 2 columns (state and count), else Array will be out of bounds.
+                if (projection.length == 2) {
+                    stateSummary.put(InstanceState.STATE.valueOf((String)projection[0]),
+                            Long.valueOf(((Number)projection[1]).longValue()));
+                }
+            }
+        }
+        return stateSummary;
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/8a739e1f/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
index 7f7b966..e8385b1 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
@@ -50,7 +50,8 @@ import java.sql.Timestamp;
         @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_FOR_STATES", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.currentState IN (:currentState)"),
         @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER_FOR_STATES_WITH_RANGE", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.currentState IN (:currentState) AND a.instanceTime >= :startTime AND a.instanceTime < :endTime"),
         @NamedQuery(name = "GET_LAST_INSTANCE_FOR_ENTITY_CLUSTER", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster order by a.instanceTime desc"),
-        @NamedQuery(name = "DELETE_INSTANCES_TABLE", query = "delete from InstanceBean a")
+        @NamedQuery(name = "DELETE_INSTANCES_TABLE", query = "delete from InstanceBean a"),
+        @NamedQuery(name = "GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE", query = "select a.currentState, COUNT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.instanceTime >= :startTime AND a.instanceTime < :endTime GROUP BY a.currentState")
 })
 //RESUME CHECKSTYLE CHECK  LineLengthCheck
 @Table(name = "INSTANCES")

http://git-wip-us.apache.org/repos/asf/falcon/blob/8a739e1f/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
index 2eafbce..1c07286 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
@@ -17,6 +17,7 @@
  */
 package org.apache.falcon.state.store.jdbc;
 
+import java.util.Map;
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.exception.StateStoreException;
@@ -357,6 +358,22 @@ public final class JDBCStateStore extends AbstractStateStore {
     }
 
     @Override
+    public Map<InstanceState.STATE, Long> getExecutionInstanceSummary(Entity entity, String cluster,
+            DateTime start, DateTime end) throws StateStoreException {
+        String entityKey = new EntityClusterID(entity, cluster).getEntityID().getKey();
+        EntityManager entityManager = getEntityManager();
+        Query q = entityManager.createNamedQuery("GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE");
+        q.setParameter("entityId", entityKey);
+        q.setParameter("cluster", cluster);
+        q.setParameter("startTime", new Timestamp(start.getMillis()));
+        q.setParameter("endTime", new Timestamp(end.getMillis()));
+        List result  = q.getResultList();
+        entityManager.close();
+
+        return BeanMapperUtil.getInstanceStateSummary(result);
+    }
+
+    @Override
     public Collection<InstanceState> getExecutionInstances(Entity entity, String cluster,
                                                            Collection<InstanceState.STATE> states, DateTime start,
                                                            DateTime end) throws StateStoreException {

http://git-wip-us.apache.org/repos/asf/falcon/blob/8a739e1f/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
index c9d6b86..7ce2420 100644
--- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
+++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
@@ -18,6 +18,7 @@
 
 package org.apache.falcon.workflow.engine;
 
+import java.util.HashMap;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.LifeCycle;
@@ -398,7 +399,27 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine {
     @Override
     public InstancesSummaryResult getSummary(Entity entity, Date start, Date end,
                                              List<LifeCycle> lifeCycles) throws FalconException {
-        throw new FalconException("Not yet Implemented");
+        Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
+        List<InstancesSummaryResult.InstanceSummary> instanceSummaries = new ArrayList<>();
+
+        // Iterate over entity clusters
+        for (String cluster : clusters) {
+            LOG.debug("Retrieving summary of instances for cluster : {}", cluster);
+            Map<InstanceState.STATE, Long> summaries = STATE_STORE.getExecutionInstanceSummary(entity, cluster,
+                    new DateTime(start), new DateTime(end));
+            Map<String, Long> summaryMap = new HashMap<>();
+            // Iterate over the map and convert STATE to String
+            for (Map.Entry<InstanceState.STATE, Long> summary : summaries.entrySet()) {
+                summaryMap.put(summary.getKey().name(), summary.getValue());
+            }
+            instanceSummaries.add(new InstancesSummaryResult.InstanceSummary(cluster, summaryMap));
+        }
+
+        InstancesSummaryResult instancesSummaryResult =
+                new InstancesSummaryResult(APIResult.Status.SUCCEEDED, JobAction.SUMMARY.name());
+        instancesSummaryResult.setInstancesSummary(instanceSummaries.
+                toArray(new InstancesSummaryResult.InstanceSummary[instanceSummaries.size()]));
+        return instancesSummaryResult;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/falcon/blob/8a739e1f/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java b/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
index 2a383cc..d597e27 100644
--- a/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
+++ b/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
@@ -17,6 +17,7 @@
  */
 package org.apache.falcon.state.service.store;
 
+import java.util.Map;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.cluster.util.EmbeddedCluster;
@@ -445,7 +446,41 @@ public class TestJDBCStateStore extends AbstractSchedulerTestBase {
         Assert.assertEquals(instances.size(), 0);
     }
 
+    @Test
+    public void testGetExecutionSummaryWithRange() throws Exception {
+        storeEntity(EntityType.CLUSTER, "testCluster");
+        storeEntity(EntityType.FEED, "clicksFeed");
+        storeEntity(EntityType.FEED, "clicksSummary");
+
+        long instance1Time = System.currentTimeMillis() - 180000;
+        long instance2Time = System.currentTimeMillis();
+        EntityState entityState = getEntityState(EntityType.PROCESS, "clicksProcess");
+        ExecutionInstance processExecutionInstance1 = BeanMapperUtil.getExecutionInstance(
+                entityState.getEntity().getEntityType(), entityState.getEntity(),
+                instance1Time, "cluster1", instance1Time);
+        InstanceState instanceState1 = new InstanceState(processExecutionInstance1);
+        instanceState1.setCurrentState(InstanceState.STATE.RUNNING);
 
+        ExecutionInstance processExecutionInstance2 = BeanMapperUtil.getExecutionInstance(
+                entityState.getEntity().getEntityType(), entityState.getEntity(),
+                instance2Time, "cluster1", instance2Time);
+        InstanceState instanceState2 = new InstanceState(processExecutionInstance2);
+        instanceState2.setCurrentState(InstanceState.STATE.SUCCEEDED);
+
+        stateStore.putExecutionInstance(instanceState1);
+        stateStore.putExecutionInstance(instanceState2);
+
+
+        Map<InstanceState.STATE, Long> summary = stateStore.getExecutionInstanceSummary(entityState.getEntity(),
+                "cluster1", new DateTime(instance1Time), new DateTime(instance1Time + 60000));
+        Assert.assertEquals(summary.size(), 1);
+        Assert.assertEquals(summary.get(InstanceState.STATE.RUNNING).longValue(), 1L);
+
+        summary = stateStore.getExecutionInstanceSummary(entityState.getEntity(),
+                "cluster1", new DateTime(instance2Time), new DateTime(instance2Time + 60000));
+        Assert.assertEquals(summary.size(), 1);
+        Assert.assertEquals(summary.get(InstanceState.STATE.SUCCEEDED).longValue(), 1L);
+    }
 
     private void initInstanceState(InstanceState instanceState) {
         instanceState.setCurrentState(InstanceState.STATE.READY);

http://git-wip-us.apache.org/repos/asf/falcon/blob/8a739e1f/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
index a82cf03..37221f3 100644
--- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
+++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
@@ -324,6 +324,12 @@ public class FalconUnitClient extends AbstractFalconClient {
                                                         String colo, List<LifeCycle> lifeCycles, String filterBy,
                                                         String orderBy, String sortOrder, String doAsUser) throws
             FalconCLIException {
+        if (StringUtils.isBlank(orderBy)) {
+            orderBy = DEFAULT_ORDERBY;
+        }
+        if (StringUtils.isBlank(sortOrder)) {
+            sortOrder = DEFAULT_SORTED_ORDER;
+        }
         return localInstanceManager.getSummary(type, entity, start, end, colo, lifeCycles, filterBy, orderBy,
                 sortOrder);
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/8a739e1f/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java
index b1c8ce0..b06725f 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java
@@ -144,4 +144,28 @@ public class InstanceSchedulerManagerJerseyIT extends AbstractSchedulerManagerJe
         Assert.assertEquals(result.getInstances()[0].getInstance(), "2012-04-22T00:00Z");
         Assert.assertEquals(result.getInstances()[2].getInstance(), START_INSTANCE);
     }
+
+    @Test
+    public void testInstanceSummary() throws Exception {
+        UnitTestContext context = new UnitTestContext();
+        Map<String, String> overlay = context.getUniqueOverlay();
+
+        setupProcessExecution(context, overlay, 3);
+
+        String processName = overlay.get(PROCESS_NAME);
+        String colo = overlay.get(COLO);
+
+        waitForStatus(EntityType.PROCESS.toString(), processName,
+                START_INSTANCE, InstancesResult.WorkflowStatus.RUNNING);
+
+        InstancesSummaryResult result = falconUnitClient.getSummaryOfInstances(EntityType.PROCESS.toString(),
+                processName, START_INSTANCE, "2012-04-23T00:00Z", colo, null, null, null, null, null);
+
+        Assert.assertEquals(result.getInstancesSummary().length, 1);
+        Assert.assertEquals(result.getInstancesSummary()[0].getCluster(), overlay.get(CLUSTER));
+        Assert.assertEquals(result.getInstancesSummary()[0].getSummaryMap().size(), 2);
+        // Parallelism is 2
+        Assert.assertEquals(result.getInstancesSummary()[0].getSummaryMap().get("RUNNING").longValue(), 2L);
+        Assert.assertEquals(result.getInstancesSummary()[0].getSummaryMap().get("READY").longValue(), 1L);
+    }
 }