You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/01/19 18:14:42 UTC
[4/5] falcon git commit: FALCON-1742 Implement instance summary api
for native scheduler (By Pallavi Rao)
FALCON-1742 Implement instance summary api for native scheduler (By Pallavi Rao)
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/2590096e
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/2590096e
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/2590096e
Branch: refs/heads/0.9
Commit: 2590096ec5bb57bef3f4a62422194e888d12dca2
Parents: f0893f7
Author: Pallavi Rao <pa...@inmobi.com>
Authored: Tue Jan 19 21:53:54 2016 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Tue Jan 19 21:53:54 2016 +0530
----------------------------------------------------------------------
CHANGES.txt | 2 ++
.../falcon/resource/EntitySummaryResult.java | 2 +-
.../falcon/state/store/InMemoryStateStore.java | 21 ++++++++++++
.../falcon/state/store/InstanceStateStore.java | 13 ++++++++
.../falcon/state/store/jdbc/BeanMapperUtil.java | 20 +++++++++++
.../falcon/state/store/jdbc/InstanceBean.java | 3 +-
.../falcon/state/store/jdbc/JDBCStateStore.java | 17 ++++++++++
.../workflow/engine/FalconWorkflowEngine.java | 23 ++++++++++++-
.../state/service/store/TestJDBCStateStore.java | 35 ++++++++++++++++++++
.../apache/falcon/unit/FalconUnitClient.java | 6 ++++
.../InstanceSchedulerManagerJerseyIT.java | 24 ++++++++++++++
11 files changed, 163 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/2590096e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 531c4a5..2747185 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -22,6 +22,8 @@ Proposed Release Version: 0.9
INCOMPATIBLE CHANGES
NEW FEATURES
+ FALCON-1742 Implement instance summary api for native scheduler (Pallavi Rao)
+
FALCON-1677 Support re-tries for timed-out instances (Narayan Periwal via Pallavi Rao)
FALCON-1643 Add CLI option to display captured replication metrics(Peeyush Bishnoi via Ajay Yadava)
http://git-wip-us.apache.org/repos/asf/falcon/blob/2590096e/client/src/main/java/org/apache/falcon/resource/EntitySummaryResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/EntitySummaryResult.java b/client/src/main/java/org/apache/falcon/resource/EntitySummaryResult.java
index 4a885ec..3ebfe26 100644
--- a/client/src/main/java/org/apache/falcon/resource/EntitySummaryResult.java
+++ b/client/src/main/java/org/apache/falcon/resource/EntitySummaryResult.java
@@ -35,7 +35,7 @@ public class EntitySummaryResult extends APIResult {
* Workflow status as being set in result object.
*/
public static enum WorkflowStatus {
- WAITING, RUNNING, SUSPENDED, KILLED, FAILED, SUCCEEDED, ERROR
+ WAITING, RUNNING, SUSPENDED, KILLED, FAILED, SUCCEEDED, ERROR, READY
}
@XmlElement
http://git-wip-us.apache.org/repos/asf/falcon/blob/2590096e/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java
index c4ced46..69f1e48 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java
@@ -218,6 +218,27 @@ public final class InMemoryStateStore extends AbstractStateStore {
}
@Override
+ public Map<InstanceState.STATE, Long> getExecutionInstanceSummary(Entity entity, String cluster,
+ DateTime start, DateTime end) throws StateStoreException {
+ Map<InstanceState.STATE, Long> summary = new HashMap<>();
+ for (InstanceState state : getAllExecutionInstances(entity, cluster)) {
+ ExecutionInstance instance = state.getInstance();
+ DateTime instanceTime = instance.getInstanceTime();
+ // Start date inclusive and end date exclusive.
+ // If start date and end date are equal no instances will be added.
+ if ((instanceTime.isEqual(start) || instanceTime.isAfter(start))
+ && instanceTime.isBefore(end)) {
+ if (summary.containsKey(state.getCurrentState())) {
+ summary.put(state.getCurrentState(), summary.get(state.getCurrentState()) + 1L);
+ } else {
+ summary.put(state.getCurrentState(), 1L);
+ }
+ }
+ }
+ return summary;
+ }
+
+ @Override
public InstanceState getLastExecutionInstance(Entity entity, String cluster) throws StateStoreException {
EntityClusterID id = new EntityClusterID(entity, cluster);
if (!entityStates.containsKey(id.getEntityID().getKey())) {
http://git-wip-us.apache.org/repos/asf/falcon/blob/2590096e/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java
index 8ce8fa0..b7269f8 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java
@@ -17,6 +17,7 @@
*/
package org.apache.falcon.state.store;
+import java.util.Map;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.exception.StateStoreException;
import org.apache.falcon.state.EntityClusterID;
@@ -101,6 +102,18 @@ public interface InstanceStateStore {
*/
Collection<InstanceState> getExecutionInstances(EntityClusterID entityClusterID,
Collection<InstanceState.STATE> states) throws StateStoreException;
+
+ /**
+ * @param entity
+ * @param cluster
+ * @param states
+ * @param start
+ * @param end
+ * @return - A map of state and the no. of instances in that state.
+ * @throws StateStoreException
+ */
+ Map<InstanceState.STATE, Long> getExecutionInstanceSummary(Entity entity, String cluster,
+ DateTime start, DateTime end) throws StateStoreException;
/**
* @param entity
* @param cluster
http://git-wip-us.apache.org/repos/asf/falcon/blob/2590096e/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
index 3def14a..194819e 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
@@ -17,6 +17,8 @@
*/
package org.apache.falcon.state.store.jdbc;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.commons.io.IOUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.EntityUtil;
@@ -301,4 +303,22 @@ public final class BeanMapperUtil {
IOUtils.closeQuietly(out);
}
}
+
+ /**
+ * @param summary
+ * @return A map of state and count given the JQL result.
+ */
+ public static Map<InstanceState.STATE, Long> getInstanceStateSummary(Collection<Object[]> summary) {
+ Map<InstanceState.STATE, Long> stateSummary = new HashMap<>();
+ if (summary != null && !summary.isEmpty()) {
+ for (Object[] projection : summary) {
+ // Has to have 2 columns (state and count), else Array will be out of bounds.
+ if (projection.length == 2) {
+ stateSummary.put(InstanceState.STATE.valueOf((String)projection[0]),
+ Long.valueOf(((Number)projection[1]).longValue()));
+ }
+ }
+ }
+ return stateSummary;
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/2590096e/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
index 7f7b966..e8385b1 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
@@ -50,7 +50,8 @@ import java.sql.Timestamp;
@NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_FOR_STATES", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.currentState IN (:currentState)"),
@NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER_FOR_STATES_WITH_RANGE", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.currentState IN (:currentState) AND a.instanceTime >= :startTime AND a.instanceTime < :endTime"),
@NamedQuery(name = "GET_LAST_INSTANCE_FOR_ENTITY_CLUSTER", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster order by a.instanceTime desc"),
- @NamedQuery(name = "DELETE_INSTANCES_TABLE", query = "delete from InstanceBean a")
+ @NamedQuery(name = "DELETE_INSTANCES_TABLE", query = "delete from InstanceBean a"),
+ @NamedQuery(name = "GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE", query = "select a.currentState, COUNT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.instanceTime >= :startTime AND a.instanceTime < :endTime GROUP BY a.currentState")
})
//RESUME CHECKSTYLE CHECK LineLengthCheck
@Table(name = "INSTANCES")
http://git-wip-us.apache.org/repos/asf/falcon/blob/2590096e/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
index 2eafbce..1c07286 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
@@ -17,6 +17,7 @@
*/
package org.apache.falcon.state.store.jdbc;
+import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.exception.StateStoreException;
@@ -357,6 +358,22 @@ public final class JDBCStateStore extends AbstractStateStore {
}
@Override
+ public Map<InstanceState.STATE, Long> getExecutionInstanceSummary(Entity entity, String cluster,
+ DateTime start, DateTime end) throws StateStoreException {
+ String entityKey = new EntityClusterID(entity, cluster).getEntityID().getKey();
+ EntityManager entityManager = getEntityManager();
+ Query q = entityManager.createNamedQuery("GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE");
+ q.setParameter("entityId", entityKey);
+ q.setParameter("cluster", cluster);
+ q.setParameter("startTime", new Timestamp(start.getMillis()));
+ q.setParameter("endTime", new Timestamp(end.getMillis()));
+ List result = q.getResultList();
+ entityManager.close();
+
+ return BeanMapperUtil.getInstanceStateSummary(result);
+ }
+
+ @Override
public Collection<InstanceState> getExecutionInstances(Entity entity, String cluster,
Collection<InstanceState.STATE> states, DateTime start,
DateTime end) throws StateStoreException {
http://git-wip-us.apache.org/repos/asf/falcon/blob/2590096e/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
index eb39ec0..34bcf01 100644
--- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
+++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
@@ -18,6 +18,7 @@
package org.apache.falcon.workflow.engine;
+import java.util.HashMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.LifeCycle;
@@ -398,7 +399,27 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine {
@Override
public InstancesSummaryResult getSummary(Entity entity, Date start, Date end,
List<LifeCycle> lifeCycles) throws FalconException {
- throw new FalconException("Not yet Implemented");
+ Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
+ List<InstancesSummaryResult.InstanceSummary> instanceSummaries = new ArrayList<>();
+
+ // Iterate over entity clusters
+ for (String cluster : clusters) {
+ LOG.debug("Retrieving summary of instances for cluster : {}", cluster);
+ Map<InstanceState.STATE, Long> summaries = STATE_STORE.getExecutionInstanceSummary(entity, cluster,
+ new DateTime(start), new DateTime(end));
+ Map<String, Long> summaryMap = new HashMap<>();
+ // Iterate over the map and convert STATE to String
+ for (Map.Entry<InstanceState.STATE, Long> summary : summaries.entrySet()) {
+ summaryMap.put(summary.getKey().name(), summary.getValue());
+ }
+ instanceSummaries.add(new InstancesSummaryResult.InstanceSummary(cluster, summaryMap));
+ }
+
+ InstancesSummaryResult instancesSummaryResult =
+ new InstancesSummaryResult(APIResult.Status.SUCCEEDED, JobAction.SUMMARY.name());
+ instancesSummaryResult.setInstancesSummary(instanceSummaries.
+ toArray(new InstancesSummaryResult.InstanceSummary[instanceSummaries.size()]));
+ return instancesSummaryResult;
}
@Override
http://git-wip-us.apache.org/repos/asf/falcon/blob/2590096e/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java b/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
index 2a383cc..d597e27 100644
--- a/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
+++ b/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
@@ -17,6 +17,7 @@
*/
package org.apache.falcon.state.service.store;
+import java.util.Map;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.cluster.util.EmbeddedCluster;
@@ -445,7 +446,41 @@ public class TestJDBCStateStore extends AbstractSchedulerTestBase {
Assert.assertEquals(instances.size(), 0);
}
+ @Test
+ public void testGetExecutionSummaryWithRange() throws Exception {
+ storeEntity(EntityType.CLUSTER, "testCluster");
+ storeEntity(EntityType.FEED, "clicksFeed");
+ storeEntity(EntityType.FEED, "clicksSummary");
+
+ long instance1Time = System.currentTimeMillis() - 180000;
+ long instance2Time = System.currentTimeMillis();
+ EntityState entityState = getEntityState(EntityType.PROCESS, "clicksProcess");
+ ExecutionInstance processExecutionInstance1 = BeanMapperUtil.getExecutionInstance(
+ entityState.getEntity().getEntityType(), entityState.getEntity(),
+ instance1Time, "cluster1", instance1Time);
+ InstanceState instanceState1 = new InstanceState(processExecutionInstance1);
+ instanceState1.setCurrentState(InstanceState.STATE.RUNNING);
+ ExecutionInstance processExecutionInstance2 = BeanMapperUtil.getExecutionInstance(
+ entityState.getEntity().getEntityType(), entityState.getEntity(),
+ instance2Time, "cluster1", instance2Time);
+ InstanceState instanceState2 = new InstanceState(processExecutionInstance2);
+ instanceState2.setCurrentState(InstanceState.STATE.SUCCEEDED);
+
+ stateStore.putExecutionInstance(instanceState1);
+ stateStore.putExecutionInstance(instanceState2);
+
+
+ Map<InstanceState.STATE, Long> summary = stateStore.getExecutionInstanceSummary(entityState.getEntity(),
+ "cluster1", new DateTime(instance1Time), new DateTime(instance1Time + 60000));
+ Assert.assertEquals(summary.size(), 1);
+ Assert.assertEquals(summary.get(InstanceState.STATE.RUNNING).longValue(), 1L);
+
+ summary = stateStore.getExecutionInstanceSummary(entityState.getEntity(),
+ "cluster1", new DateTime(instance2Time), new DateTime(instance2Time + 60000));
+ Assert.assertEquals(summary.size(), 1);
+ Assert.assertEquals(summary.get(InstanceState.STATE.SUCCEEDED).longValue(), 1L);
+ }
private void initInstanceState(InstanceState instanceState) {
instanceState.setCurrentState(InstanceState.STATE.READY);
http://git-wip-us.apache.org/repos/asf/falcon/blob/2590096e/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
index 13375ef..7371e3a 100644
--- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
+++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
@@ -324,6 +324,12 @@ public class FalconUnitClient extends AbstractFalconClient {
String colo, List<LifeCycle> lifeCycles, String filterBy,
String orderBy, String sortOrder, String doAsUser) throws
FalconCLIException {
+ if (StringUtils.isBlank(orderBy)) {
+ orderBy = DEFAULT_ORDERBY;
+ }
+ if (StringUtils.isBlank(sortOrder)) {
+ sortOrder = DEFAULT_SORTED_ORDER;
+ }
return localInstanceManager.getSummary(type, entity, start, end, colo, lifeCycles, filterBy, orderBy,
sortOrder);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/2590096e/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java
index 698580b..aac7e26 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/InstanceSchedulerManagerJerseyIT.java
@@ -144,4 +144,28 @@ public class InstanceSchedulerManagerJerseyIT extends AbstractSchedulerManagerJe
Assert.assertEquals(result.getInstances()[0].getInstance(), "2012-04-22T00:00Z");
Assert.assertEquals(result.getInstances()[2].getInstance(), START_INSTANCE);
}
+
+ @Test
+ public void testInstanceSummary() throws Exception {
+ UnitTestContext context = new UnitTestContext();
+ Map<String, String> overlay = context.getUniqueOverlay();
+
+ setupProcessExecution(context, overlay, 3);
+
+ String processName = overlay.get(PROCESS_NAME);
+ String colo = overlay.get(COLO);
+
+ waitForStatus(EntityType.PROCESS.toString(), processName,
+ START_INSTANCE, InstancesResult.WorkflowStatus.RUNNING);
+
+ InstancesSummaryResult result = falconUnitClient.getSummaryOfInstances(EntityType.PROCESS.toString(),
+ processName, START_INSTANCE, "2012-04-23T00:00Z", colo, null, null, null, null, null);
+
+ Assert.assertEquals(result.getInstancesSummary().length, 1);
+ Assert.assertEquals(result.getInstancesSummary()[0].getCluster(), overlay.get(CLUSTER));
+ Assert.assertEquals(result.getInstancesSummary()[0].getSummaryMap().size(), 2);
+ // Parallelism is 2
+ Assert.assertEquals(result.getInstancesSummary()[0].getSummaryMap().get("RUNNING").longValue(), 2L);
+ Assert.assertEquals(result.getInstancesSummary()[0].getSummaryMap().get("READY").longValue(), 1L);
+ }
}