You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by so...@apache.org on 2015/09/10 00:25:34 UTC

falcon git commit: FALCON-1371 Status of scheduled Process entity is shown as submitted in corner case. Contributed by Balu Vellanki.

Repository: falcon
Updated Branches:
  refs/heads/master 20f2acec7 -> 7624b2c8b


FALCON-1371 Status of scheduled Process entity is shown as submitted in corner case. Contributed by Balu Vellanki.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/7624b2c8
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/7624b2c8
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/7624b2c8

Branch: refs/heads/master
Commit: 7624b2c8b55e241eef60391534c39c719e0702dd
Parents: 20f2ace
Author: Sowmya Ramesh <sr...@hortonworks.com>
Authored: Wed Sep 9 15:25:27 2015 -0700
Committer: Sowmya Ramesh <sr...@hortonworks.com>
Committed: Wed Sep 9 15:25:27 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +
 common/pom.xml                                  | 14 ++--
 .../workflow/engine/AbstractWorkflowEngine.java | 16 ++++-
 .../workflow/engine/OozieWorkflowEngine.java    | 76 ++++++++++++++++----
 .../falcon/resource/AbstractEntityManager.java  | 20 +++---
 5 files changed, 102 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/7624b2c8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bc1422a..45ead3f 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -20,6 +20,9 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+
+    FALCON-1371 Status of scheduled Process entity is shown as submitted in corner case(Balu Vellanki via Sowmya Ramesh)
+
     FALCON-1402 Validate cmd throws NPE when source cluster and any one of target cluster doesn't have overlapping dates(Pavan Kumar Kolamuri via Ajay Yadava)
 
     FALCON-1365 HCatReplication job fails with AccessControlException(Sowmya Ramesh via Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/7624b2c8/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index 3a64751..0420b4c 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -51,16 +51,20 @@
                        <groupId>org.apache.hadoop</groupId>
                        <artifactId>hadoop-hdfs</artifactId>
                        <classifier>tests</classifier>
-                 </dependency>
-                 <dependency>
+                </dependency>
+                <dependency>
                        <groupId>org.apache.hadoop</groupId>
                        <artifactId>hadoop-common</artifactId>
                        <classifier>tests</classifier>
-                 </dependency>
-                 <dependency>
+                </dependency>
+                <dependency>
                        <groupId>org.apache.hadoop</groupId>
                        <artifactId>hadoop-common</artifactId>
-                 </dependency>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.oozie</groupId>
+                    <artifactId>oozie-client</artifactId>
+                </dependency>
             </dependencies>
         </profile>
     </profiles>

http://git-wip-us.apache.org/repos/asf/falcon/blob/7624b2c8/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
index 4d45cc7..ea86c2a 100644
--- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
+++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
@@ -24,12 +24,14 @@ import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.resource.InstancesResult;
 import org.apache.falcon.resource.InstancesSummaryResult;
+import org.apache.oozie.client.BundleJob;
 
+import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
-import java.util.Date;
 
 
 /**
@@ -65,7 +67,17 @@ public abstract class AbstractWorkflowEngine {
 
     public abstract boolean isActive(Entity entity) throws FalconException;
 
-    public abstract boolean isSuspended(Entity entity) throws FalconException;
+    public abstract boolean isActive(Map<String, BundleJob> bundles) throws FalconException;
+
+    public abstract boolean isSuspended(Map<String, BundleJob> bundles) throws FalconException;
+
+    public abstract boolean isSucceeded(Map<String, BundleJob> bundles) throws FalconException;
+
+    public abstract boolean isFailed(Map<String, BundleJob> bundles) throws FalconException;
+
+    public abstract boolean isKilled(Map<String, BundleJob> bundles) throws FalconException;
+
+    public abstract Map<String, BundleJob> findLatestBundle(Entity entity) throws FalconException;
 
     public abstract InstancesResult getRunningInstances(Entity entity,
                                                         List<LifeCycle> lifeCycles) throws FalconException;

http://git-wip-us.apache.org/repos/asf/falcon/blob/7624b2c8/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index f8b7764..5f79ca1 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -106,6 +106,10 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         Arrays.asList(Job.Status.PREPSUSPENDED, Job.Status.SUSPENDED, Status.SUSPENDEDWITHERROR);
     private static final List<Job.Status> BUNDLE_RUNNING_STATUS = Arrays.asList(Job.Status.PREP, Job.Status.RUNNING,
             Job.Status.RUNNINGWITHERROR);
+    private static final List<Job.Status> BUNDLE_SUCCEEDED_STATUS = Arrays.asList(Job.Status.SUCCEEDED);
+    private static final List<Job.Status> BUNDLE_FAILED_STATUS = Arrays.asList(Job.Status.FAILED,
+            Job.Status.DONEWITHERROR);
+    private static final List<Job.Status> BUNDLE_KILLED_STATUS = Arrays.asList(Job.Status.KILLED);
 
     private static final List<Job.Status> BUNDLE_SUSPEND_PRECOND =
         Arrays.asList(Job.Status.PREP, Job.Status.RUNNING, Job.Status.DONEWITHERROR);
@@ -239,26 +243,52 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
 
     @Override
     public boolean isActive(Entity entity) throws FalconException {
-        return isBundleInState(entity, BundleStatus.ACTIVE);
+        return isBundleInState(findLatestBundle(entity), BundleStatus.ACTIVE);
     }
 
     @Override
-    public boolean isSuspended(Entity entity) throws FalconException {
-        return isBundleInState(entity, BundleStatus.SUSPENDED);
+    public boolean isActive(Map<String, BundleJob> bundles) throws FalconException {
+        return isBundleInState(bundles, BundleStatus.ACTIVE);
+    }
+
+    @Override
+    public boolean isSuspended(Map<String, BundleJob> bundles) throws FalconException {
+        return isBundleInState(bundles, BundleStatus.SUSPENDED);
+    }
+
+    @Override
+    public boolean isFailed(Map<String, BundleJob> bundles) throws FalconException {
+        return isBundleInState(bundles, BundleStatus.FAILED);
+    }
+
+    @Override
+    public boolean isKilled(Map<String, BundleJob> bundles) throws FalconException {
+        return isBundleInState(bundles, BundleStatus.KILLED);
+    }
+
+    @Override
+    public boolean isSucceeded(Map<String, BundleJob> bundles) throws FalconException {
+        return isBundleInState(bundles, BundleStatus.SUCCEEDED);
     }
 
     private enum BundleStatus {
-        ACTIVE, RUNNING, SUSPENDED
+        ACTIVE, RUNNING, SUSPENDED, FAILED, KILLED, SUCCEEDED
     }
 
-    private boolean isBundleInState(Entity entity, BundleStatus status) throws FalconException {
+    private boolean isBundleInState(Map<String, BundleJob> bundles,
+                                    BundleStatus status) throws FalconException {
 
-        Map<String, BundleJob> bundles = findLatestBundle(entity);
-        for (BundleJob bundle : bundles.values()) {
-            if (bundle == MISSING) {// There is no active bundle
-                return false;
+        // After removing MISSING bundles for clusters, if bundles.size() == 0, entity is not scheduled. Return false.
+        for (Map.Entry<String, BundleJob> clusterBundle : bundles.entrySet()) {
+            if (clusterBundle.getValue() == MISSING) { // There is no active bundle for this cluster
+                bundles.remove(clusterBundle.getKey());
             }
+        }
+        if (bundles.size() == 0) {
+            return false;
+        }
 
+        for (BundleJob bundle : bundles.values()) {
             switch (status) {
             case ACTIVE:
                 if (!BUNDLE_ACTIVE_STATUS.contains(bundle.getStatus())) {
@@ -277,8 +307,27 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
                     return false;
                 }
                 break;
+
+            case FAILED:
+                if (!BUNDLE_FAILED_STATUS.contains(bundle.getStatus())) {
+                    return false;
+                }
+                break;
+
+            case KILLED:
+                if (!BUNDLE_KILLED_STATUS.contains(bundle.getStatus())) {
+                    return false;
+                }
+                break;
+
+            case SUCCEEDED:
+                if (!BUNDLE_SUCCEEDED_STATUS.contains(bundle.getStatus())) {
+                    return false;
+                }
+                break;
             default:
             }
+            LOG.debug("Bundle {} is in state {}", bundle.getAppName(), status.name());
         }
         return true;
     }
@@ -296,7 +345,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
                         //Load bundle as coord info is not returned in getBundleJobsInfo()
                         BundleJob bundle = getBundleInfo(clusterName, job.getId());
                         filteredJobs.add(bundle);
-                        LOG.debug("Found bundle {} with app path {}", job.getId(), job.getAppPath());
+                        LOG.debug("Found bundle {} with app path {} and status {}",
+                                job.getId(), job.getAppPath(), job.getStatus());
                     }
                 }
             }
@@ -317,11 +367,13 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     //Return latest bundle(last created) for the entity for each cluster
-    private Map<String, BundleJob> findLatestBundle(Entity entity) throws FalconException {
+    @Override
+    public Map<String, BundleJob> findLatestBundle(Entity entity) throws FalconException {
         Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
         Map<String, BundleJob> jobMap = new HashMap<String, BundleJob>();
         for (String cluster : clusters) {
-            jobMap.put(cluster, findLatestBundle(entity, cluster));
+            BundleJob bundleJob = findLatestBundle(entity, cluster);
+            jobMap.put(cluster, bundleJob);
         }
         return jobMap;
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/7624b2c8/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index 06ab4d9..63c5d39 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -48,6 +48,7 @@ import org.apache.falcon.util.RuntimeProperties;
 import org.apache.falcon.workflow.WorkflowEngineFactory;
 import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.oozie.client.BundleJob;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -512,7 +513,7 @@ public abstract class AbstractEntityManager {
     }
 
     private enum EntityStatus {
-        SUBMITTED, SUSPENDED, RUNNING
+        SUBMITTED, SUSPENDED, RUNNING, SUCCEEDED, KILLED, FAILED
     }
 
     /**
@@ -541,20 +542,23 @@ public abstract class AbstractEntityManager {
     }
 
     protected EntityStatus getStatus(Entity entity, EntityType type) throws FalconException {
-        EntityStatus status;
+        EntityStatus status = EntityStatus.SUBMITTED;
+        Map<String, BundleJob> latestBundles = workflowEngine.findLatestBundle(entity);
 
         if (type.isSchedulable()) {
-            if (workflowEngine.isActive(entity)) {
-                if (workflowEngine.isSuspended(entity)) {
+            if (workflowEngine.isActive(latestBundles)) {
+                if (workflowEngine.isSuspended(latestBundles)) {
                     status = EntityStatus.SUSPENDED;
                 } else {
                     status = EntityStatus.RUNNING;
                 }
-            } else {
-                status = EntityStatus.SUBMITTED;
+            } else if (workflowEngine.isSucceeded(latestBundles)) {
+                status = EntityStatus.SUCCEEDED;
+            } else if (workflowEngine.isKilled(latestBundles)) {
+                status = EntityStatus.KILLED;
+            } else if (workflowEngine.isFailed(latestBundles)) {
+                status = EntityStatus.FAILED;
             }
-        } else {
-            status = EntityStatus.SUBMITTED;
         }
         return status;
     }