You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2015/09/18 10:18:46 UTC
falcon git commit: FALCON-1442 Contract of WorkflowEngine API broken.
Contributed by Balu Vellanki.
Repository: falcon
Updated Branches:
refs/heads/master b806b32fd -> 52ce7f833
FALCON-1442 Contract of WorkflowEngine API broken. Contributed by Balu Vellanki.
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/52ce7f83
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/52ce7f83
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/52ce7f83
Branch: refs/heads/master
Commit: 52ce7f8332e4f3dc9b0b644cb8cef3453ae7e1da
Parents: b806b32
Author: Ajay Yadava <aj...@gmail.com>
Authored: Fri Sep 18 13:48:02 2015 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Fri Sep 18 13:48:02 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 2 ++
common/pom.xml | 4 ---
.../workflow/engine/AbstractWorkflowEngine.java | 14 ++--------
.../workflow/engine/OozieWorkflowEngine.java | 29 ++++++--------------
.../falcon/resource/AbstractEntityManager.java | 16 ++++-------
5 files changed, 17 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/52ce7f83/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 25f02f0..db11e40 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -27,6 +27,8 @@ Trunk (Unreleased)
OPTIMIZATIONS
BUG FIXES
+ FALCON-1442 Contract of WorkflowEngine API broken(Balu Vellanki via Ajay Yadava)
+
FALCON-1460 Move getHiveCredentials method to ClusterHelper(Ajay Yadava via Sowmya Ramesh)
FALCON-1342 Do not allow duplicate properties in entities(Balu Vellanki via Sowmya Ramesh)
http://git-wip-us.apache.org/repos/asf/falcon/blob/52ce7f83/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index 0420b4c..37eb2d1 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -61,10 +61,6 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.oozie</groupId>
- <artifactId>oozie-client</artifactId>
- </dependency>
</dependencies>
</profile>
</profiles>
http://git-wip-us.apache.org/repos/asf/falcon/blob/52ce7f83/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
index 265106b..78af6b2 100644
--- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
+++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
@@ -24,12 +24,10 @@ import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.resource.InstancesResult;
import org.apache.falcon.resource.InstancesSummaryResult;
-import org.apache.oozie.client.BundleJob;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
import java.util.Set;
@@ -68,17 +66,9 @@ public abstract class AbstractWorkflowEngine {
public abstract boolean isActive(Entity entity) throws FalconException;
- public abstract boolean isActive(Map<String, BundleJob> bundles) throws FalconException;
+ public abstract boolean isSuspended(Entity entity) throws FalconException;
- public abstract boolean isSuspended(Map<String, BundleJob> bundles) throws FalconException;
-
- public abstract boolean isSucceeded(Map<String, BundleJob> bundles) throws FalconException;
-
- public abstract boolean isFailed(Map<String, BundleJob> bundles) throws FalconException;
-
- public abstract boolean isKilled(Map<String, BundleJob> bundles) throws FalconException;
-
- public abstract Map<String, BundleJob> findLatestBundle(Entity entity) throws FalconException;
+ public abstract boolean isCompleted(Entity entity) throws FalconException;
public abstract InstancesResult getRunningInstances(Entity entity,
List<LifeCycle> lifeCycles) throws FalconException;
http://git-wip-us.apache.org/repos/asf/falcon/blob/52ce7f83/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 96661ad..0441f7c 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -246,28 +246,16 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
@Override
- public boolean isActive(Map<String, BundleJob> bundles) throws FalconException {
- return isBundleInState(bundles, BundleStatus.ACTIVE);
+ public boolean isSuspended(Entity entity) throws FalconException {
+ return isBundleInState(findLatestBundle(entity), BundleStatus.SUSPENDED);
}
@Override
- public boolean isSuspended(Map<String, BundleJob> bundles) throws FalconException {
- return isBundleInState(bundles, BundleStatus.SUSPENDED);
- }
-
- @Override
- public boolean isFailed(Map<String, BundleJob> bundles) throws FalconException {
- return isBundleInState(bundles, BundleStatus.FAILED);
- }
-
- @Override
- public boolean isKilled(Map<String, BundleJob> bundles) throws FalconException {
- return isBundleInState(bundles, BundleStatus.KILLED);
- }
-
- @Override
- public boolean isSucceeded(Map<String, BundleJob> bundles) throws FalconException {
- return isBundleInState(bundles, BundleStatus.SUCCEEDED);
+ public boolean isCompleted(Entity entity) throws FalconException {
+ Map<String, BundleJob> bundles = findLatestBundle(entity);
+ return (isBundleInState(bundles, BundleStatus.SUCCEEDED)
+ || isBundleInState(bundles, BundleStatus.FAILED)
+ || isBundleInState(bundles, BundleStatus.KILLED));
}
private enum BundleStatus {
@@ -366,8 +354,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
//Return latest bundle(last created) for the entity for each cluster
- @Override
- public Map<String, BundleJob> findLatestBundle(Entity entity) throws FalconException {
+ private Map<String, BundleJob> findLatestBundle(Entity entity) throws FalconException {
Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
Map<String, BundleJob> jobMap = new HashMap<String, BundleJob>();
for (String cluster : clusters) {
http://git-wip-us.apache.org/repos/asf/falcon/blob/52ce7f83/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index b867055..bed0b6c 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -48,7 +48,6 @@ import org.apache.falcon.util.RuntimeProperties;
import org.apache.falcon.workflow.WorkflowEngineFactory;
import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
import org.apache.hadoop.io.IOUtils;
-import org.apache.oozie.client.BundleJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -515,7 +514,7 @@ public abstract class AbstractEntityManager {
}
private enum EntityStatus {
- SUBMITTED, SUSPENDED, RUNNING, SUCCEEDED, KILLED, FAILED
+ SUBMITTED, SUSPENDED, RUNNING, COMPLETED
}
/**
@@ -545,21 +544,16 @@ public abstract class AbstractEntityManager {
protected EntityStatus getStatus(Entity entity, EntityType type) throws FalconException {
EntityStatus status = EntityStatus.SUBMITTED;
- Map<String, BundleJob> latestBundles = workflowEngine.findLatestBundle(entity);
if (type.isSchedulable()) {
- if (workflowEngine.isActive(latestBundles)) {
- if (workflowEngine.isSuspended(latestBundles)) {
+ if (workflowEngine.isActive(entity)) {
+ if (workflowEngine.isSuspended(entity)) {
status = EntityStatus.SUSPENDED;
} else {
status = EntityStatus.RUNNING;
}
- } else if (workflowEngine.isSucceeded(latestBundles)) {
- status = EntityStatus.SUCCEEDED;
- } else if (workflowEngine.isKilled(latestBundles)) {
- status = EntityStatus.KILLED;
- } else if (workflowEngine.isFailed(latestBundles)) {
- status = EntityStatus.FAILED;
+ } else if (workflowEngine.isCompleted(entity)) {
+ status = EntityStatus.COMPLETED;
}
}
return status;