You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by so...@apache.org on 2015/09/10 00:25:34 UTC
falcon git commit: FALCON-1371 Status of scheduled Process entity is
shown as submitted in corner case. Contributed by Balu Vellanki.
Repository: falcon
Updated Branches:
refs/heads/master 20f2acec7 -> 7624b2c8b
FALCON-1371 Status of scheduled Process entity is shown as submitted in corner case. Contributed by Balu Vellanki.
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/7624b2c8
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/7624b2c8
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/7624b2c8
Branch: refs/heads/master
Commit: 7624b2c8b55e241eef60391534c39c719e0702dd
Parents: 20f2ace
Author: Sowmya Ramesh <sr...@hortonworks.com>
Authored: Wed Sep 9 15:25:27 2015 -0700
Committer: Sowmya Ramesh <sr...@hortonworks.com>
Committed: Wed Sep 9 15:25:27 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 3 +
common/pom.xml | 14 ++--
.../workflow/engine/AbstractWorkflowEngine.java | 16 ++++-
.../workflow/engine/OozieWorkflowEngine.java | 76 ++++++++++++++++----
.../falcon/resource/AbstractEntityManager.java | 20 +++---
5 files changed, 102 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/7624b2c8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bc1422a..45ead3f 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -20,6 +20,9 @@ Trunk (Unreleased)
OPTIMIZATIONS
BUG FIXES
+
+ FALCON-1371 Status of scheduled Process entity is shown as submitted in corner case(Balu Vellanki via Sowmya Ramesh)
+
FALCON-1402 Validate cmd throws NPE when source cluster and any one of target cluster doesn't have overlapping dates(Pavan Kumar Kolamuri via Ajay Yadava)
FALCON-1365 HCatReplication job fails with AccessControlException(Sowmya Ramesh via Ajay Yadava)
http://git-wip-us.apache.org/repos/asf/falcon/blob/7624b2c8/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index 3a64751..0420b4c 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -51,16 +51,20 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<classifier>tests</classifier>
- </dependency>
- <dependency>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<classifier>tests</classifier>
- </dependency>
- <dependency>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
- </dependency>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.oozie</groupId>
+ <artifactId>oozie-client</artifactId>
+ </dependency>
</dependencies>
</profile>
</profiles>
http://git-wip-us.apache.org/repos/asf/falcon/blob/7624b2c8/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
index 4d45cc7..ea86c2a 100644
--- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
+++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
@@ -24,12 +24,14 @@ import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.resource.InstancesResult;
import org.apache.falcon.resource.InstancesSummaryResult;
+import org.apache.oozie.client.BundleJob;
+import java.util.Date;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.Set;
-import java.util.Date;
/**
@@ -65,7 +67,17 @@ public abstract class AbstractWorkflowEngine {
public abstract boolean isActive(Entity entity) throws FalconException;
- public abstract boolean isSuspended(Entity entity) throws FalconException;
+ public abstract boolean isActive(Map<String, BundleJob> bundles) throws FalconException;
+
+ public abstract boolean isSuspended(Map<String, BundleJob> bundles) throws FalconException;
+
+ public abstract boolean isSucceeded(Map<String, BundleJob> bundles) throws FalconException;
+
+ public abstract boolean isFailed(Map<String, BundleJob> bundles) throws FalconException;
+
+ public abstract boolean isKilled(Map<String, BundleJob> bundles) throws FalconException;
+
+ public abstract Map<String, BundleJob> findLatestBundle(Entity entity) throws FalconException;
public abstract InstancesResult getRunningInstances(Entity entity,
List<LifeCycle> lifeCycles) throws FalconException;
http://git-wip-us.apache.org/repos/asf/falcon/blob/7624b2c8/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index f8b7764..5f79ca1 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -106,6 +106,10 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
Arrays.asList(Job.Status.PREPSUSPENDED, Job.Status.SUSPENDED, Status.SUSPENDEDWITHERROR);
private static final List<Job.Status> BUNDLE_RUNNING_STATUS = Arrays.asList(Job.Status.PREP, Job.Status.RUNNING,
Job.Status.RUNNINGWITHERROR);
+ private static final List<Job.Status> BUNDLE_SUCCEEDED_STATUS = Arrays.asList(Job.Status.SUCCEEDED);
+ private static final List<Job.Status> BUNDLE_FAILED_STATUS = Arrays.asList(Job.Status.FAILED,
+ Job.Status.DONEWITHERROR);
+ private static final List<Job.Status> BUNDLE_KILLED_STATUS = Arrays.asList(Job.Status.KILLED);
private static final List<Job.Status> BUNDLE_SUSPEND_PRECOND =
Arrays.asList(Job.Status.PREP, Job.Status.RUNNING, Job.Status.DONEWITHERROR);
@@ -239,26 +243,52 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
@Override
public boolean isActive(Entity entity) throws FalconException {
- return isBundleInState(entity, BundleStatus.ACTIVE);
+ return isBundleInState(findLatestBundle(entity), BundleStatus.ACTIVE);
}
@Override
- public boolean isSuspended(Entity entity) throws FalconException {
- return isBundleInState(entity, BundleStatus.SUSPENDED);
+ public boolean isActive(Map<String, BundleJob> bundles) throws FalconException {
+ return isBundleInState(bundles, BundleStatus.ACTIVE);
+ }
+
+ @Override
+ public boolean isSuspended(Map<String, BundleJob> bundles) throws FalconException {
+ return isBundleInState(bundles, BundleStatus.SUSPENDED);
+ }
+
+ @Override
+ public boolean isFailed(Map<String, BundleJob> bundles) throws FalconException {
+ return isBundleInState(bundles, BundleStatus.FAILED);
+ }
+
+ @Override
+ public boolean isKilled(Map<String, BundleJob> bundles) throws FalconException {
+ return isBundleInState(bundles, BundleStatus.KILLED);
+ }
+
+ @Override
+ public boolean isSucceeded(Map<String, BundleJob> bundles) throws FalconException {
+ return isBundleInState(bundles, BundleStatus.SUCCEEDED);
}
private enum BundleStatus {
- ACTIVE, RUNNING, SUSPENDED
+ ACTIVE, RUNNING, SUSPENDED, FAILED, KILLED, SUCCEEDED
}
- private boolean isBundleInState(Entity entity, BundleStatus status) throws FalconException {
+ private boolean isBundleInState(Map<String, BundleJob> bundles,
+ BundleStatus status) throws FalconException {
- Map<String, BundleJob> bundles = findLatestBundle(entity);
- for (BundleJob bundle : bundles.values()) {
- if (bundle == MISSING) {// There is no active bundle
- return false;
+ // After removing MISSING bundles for clusters, if bundles.size() == 0, entity is not scheduled. Return false.
+ for (Map.Entry<String, BundleJob> clusterBundle : bundles.entrySet()) {
+ if (clusterBundle.getValue() == MISSING) { // There is no active bundle for this cluster
+ bundles.remove(clusterBundle.getKey());
}
+ }
+ if (bundles.size() == 0) {
+ return false;
+ }
+ for (BundleJob bundle : bundles.values()) {
switch (status) {
case ACTIVE:
if (!BUNDLE_ACTIVE_STATUS.contains(bundle.getStatus())) {
@@ -277,8 +307,27 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
return false;
}
break;
+
+ case FAILED:
+ if (!BUNDLE_FAILED_STATUS.contains(bundle.getStatus())) {
+ return false;
+ }
+ break;
+
+ case KILLED:
+ if (!BUNDLE_KILLED_STATUS.contains(bundle.getStatus())) {
+ return false;
+ }
+ break;
+
+ case SUCCEEDED:
+ if (!BUNDLE_SUCCEEDED_STATUS.contains(bundle.getStatus())) {
+ return false;
+ }
+ break;
default:
}
+ LOG.debug("Bundle {} is in state {}", bundle.getAppName(), status.name());
}
return true;
}
@@ -296,7 +345,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
//Load bundle as coord info is not returned in getBundleJobsInfo()
BundleJob bundle = getBundleInfo(clusterName, job.getId());
filteredJobs.add(bundle);
- LOG.debug("Found bundle {} with app path {}", job.getId(), job.getAppPath());
+ LOG.debug("Found bundle {} with app path {} and status {}",
+ job.getId(), job.getAppPath(), job.getStatus());
}
}
}
@@ -317,11 +367,13 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
//Return latest bundle(last created) for the entity for each cluster
- private Map<String, BundleJob> findLatestBundle(Entity entity) throws FalconException {
+ @Override
+ public Map<String, BundleJob> findLatestBundle(Entity entity) throws FalconException {
Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
Map<String, BundleJob> jobMap = new HashMap<String, BundleJob>();
for (String cluster : clusters) {
- jobMap.put(cluster, findLatestBundle(entity, cluster));
+ BundleJob bundleJob = findLatestBundle(entity, cluster);
+ jobMap.put(cluster, bundleJob);
}
return jobMap;
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/7624b2c8/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index 06ab4d9..63c5d39 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -48,6 +48,7 @@ import org.apache.falcon.util.RuntimeProperties;
import org.apache.falcon.workflow.WorkflowEngineFactory;
import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
import org.apache.hadoop.io.IOUtils;
+import org.apache.oozie.client.BundleJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -512,7 +513,7 @@ public abstract class AbstractEntityManager {
}
private enum EntityStatus {
- SUBMITTED, SUSPENDED, RUNNING
+ SUBMITTED, SUSPENDED, RUNNING, SUCCEEDED, KILLED, FAILED
}
/**
@@ -541,20 +542,23 @@ public abstract class AbstractEntityManager {
}
protected EntityStatus getStatus(Entity entity, EntityType type) throws FalconException {
- EntityStatus status;
+ EntityStatus status = EntityStatus.SUBMITTED;
+ Map<String, BundleJob> latestBundles = workflowEngine.findLatestBundle(entity);
if (type.isSchedulable()) {
- if (workflowEngine.isActive(entity)) {
- if (workflowEngine.isSuspended(entity)) {
+ if (workflowEngine.isActive(latestBundles)) {
+ if (workflowEngine.isSuspended(latestBundles)) {
status = EntityStatus.SUSPENDED;
} else {
status = EntityStatus.RUNNING;
}
- } else {
- status = EntityStatus.SUBMITTED;
+ } else if (workflowEngine.isSucceeded(latestBundles)) {
+ status = EntityStatus.SUCCEEDED;
+ } else if (workflowEngine.isKilled(latestBundles)) {
+ status = EntityStatus.KILLED;
+ } else if (workflowEngine.isFailed(latestBundles)) {
+ status = EntityStatus.FAILED;
}
- } else {
- status = EntityStatus.SUBMITTED;
}
return status;
}