You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2015/09/07 12:42:14 UTC
falcon git commit: FALCON-298 Feed update with replication delay
creates holes. Contributed by Sandeep Samudrala.
Repository: falcon
Updated Branches:
refs/heads/master bd0028458 -> 717b472eb
FALCON-298 Feed update with replication delay creates holes. Contributed by Sandeep Samudrala.
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/717b472e
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/717b472e
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/717b472e
Branch: refs/heads/master
Commit: 717b472eb15d45abfa94590940e5b131456fc6b1
Parents: bd00284
Author: Ajay Yadava <aj...@gmail.com>
Authored: Mon Sep 7 14:10:34 2015 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Mon Sep 7 14:10:34 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 2 ++
.../workflow/engine/OozieWorkflowEngine.java | 29 ++++++++++++++++++--
2 files changed, 28 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/717b472e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 88d0f64..4b47d5b 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -12,6 +12,8 @@ Trunk (Unreleased)
OPTIMIZATIONS
BUG FIXES
+ FALCON-298 Feed update with replication delay creates holes(Sandeep Samudrala via Ajay Yadava)
+
FALCON-1410 Entity submit fails when multiple threads try submitting same definition(Sandeep Samudrala via Ajay Yadava)
FALCON-1429 Fix Falcon monitoring, alert, audit and monitoring plugins by fixing aspectj handling(Venkat Ranganathan via Ajay Yadava)
http://git-wip-us.apache.org/repos/asf/falcon/blob/717b472e/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 7e6cd6c..f8b7764 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -31,6 +31,7 @@ import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.Frequency.TimeUnit;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.oozie.OozieBundleBuilder;
import org.apache.falcon.oozie.OozieEntityBuilder;
@@ -1078,7 +1079,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
// only concurrency and endtime are changed. So, change coords
LOG.info("Change operation is adequate! : {}, bundle: {}", cluster, bundle.getId());
updateCoords(cluster, bundle, EntityUtil.getParallel(newEntity),
- EntityUtil.getEndTime(newEntity, cluster));
+ EntityUtil.getEndTime(newEntity, cluster), newEntity);
return getUpdateString(newEntity, new Date(), bundle, bundle);
}
@@ -1195,7 +1196,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
private void updateCoords(String cluster, BundleJob bundle,
- int concurrency, Date endTime) throws FalconException {
+ int concurrency, Date endTime, Entity entity) throws FalconException {
if (endTime.compareTo(now()) <= 0) {
throw new FalconException("End time " + SchemaHelper.formatDateUTC(endTime) + " can't be in the past");
}
@@ -1206,8 +1207,19 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
// change coords
for (CoordinatorJob coord : bundle.getCoordinators()) {
+
+ Frequency delay = null;
+ //get Delay to calculate coordinator end time in case of feed replication with delay.
+ if (entity.getEntityType().equals(EntityType.FEED)) {
+ delay = getDelay((Feed) entity, coord);
+ }
+
+ //calculate next start time based on delay.
+ endTime = (delay == null) ? endTime
+ : EntityUtil.getNextStartTime(coord.getStartTime(), delay, EntityUtil.getTimeZone(entity), endTime);
LOG.debug("Updating endtime of coord {} to {} on cluster {}",
coord.getId(), SchemaHelper.formatDateUTC(endTime), cluster);
+
Date lastActionTime = getCoordLastActionTime(coord);
if (lastActionTime == null) { // nothing is materialized
LOG.info("Nothing is materialized for this coord: {}", coord.getId());
@@ -1233,6 +1245,17 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
}
+ private Frequency getDelay(Feed entity, CoordinatorJob coord) {
+ Feed feed = entity;
+ for (org.apache.falcon.entity.v0.feed.Cluster entityCluster : feed.getClusters().getClusters()){
+ if (coord.getAppName().contains(entityCluster.getName()) && coord.getAppName().contains("REPLICATION")
+ && entityCluster.getDelay() != null){
+ return entityCluster.getDelay();
+ }
+ }
+ return null;
+ }
+
private String updateInternal(Entity oldEntity, Entity newEntity, Cluster cluster, BundleJob oldBundle,
String user, Boolean skipDryRun) throws FalconException {
String clusterName = cluster.getName();
@@ -1246,7 +1269,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
boolean suspended = BUNDLE_SUSPENDED_STATUS.contains(oldBundle.getStatus());
//Set end times for old coords
- updateCoords(clusterName, oldBundle, EntityUtil.getParallel(oldEntity), effectiveTime);
+ updateCoords(clusterName, oldBundle, EntityUtil.getParallel(oldEntity), effectiveTime, newEntity);
//schedule new entity
String newJobId = scheduleForUpdate(newEntity, cluster, effectiveTime, user);