You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2015/09/07 12:42:14 UTC

falcon git commit: FALCON-298 Feed update with replication delay creates holes. Contributed by Sandeep Samudrala.

Repository: falcon
Updated Branches:
  refs/heads/master bd0028458 -> 717b472eb


FALCON-298 Feed update with replication delay creates holes. Contributed by Sandeep Samudrala.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/717b472e
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/717b472e
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/717b472e

Branch: refs/heads/master
Commit: 717b472eb15d45abfa94590940e5b131456fc6b1
Parents: bd00284
Author: Ajay Yadava <aj...@gmail.com>
Authored: Mon Sep 7 14:10:34 2015 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Mon Sep 7 14:10:34 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../workflow/engine/OozieWorkflowEngine.java    | 29 ++++++++++++++++++--
 2 files changed, 28 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/717b472e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 88d0f64..4b47d5b 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -12,6 +12,8 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+    FALCON-298 Feed update with replication delay creates holes(Sandeep Samudrala via Ajay Yadava)
+
     FALCON-1410 Entity submit fails when multiple threads try submitting same definition(Sandeep Samudrala via Ajay Yadava)
 
     FALCON-1429 Fix Falcon monitoring, alert, audit and monitoring plugins by fixing aspectj handling(Venkat Ranganathan via Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/717b472e/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 7e6cd6c..f8b7764 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -31,6 +31,7 @@ import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.Frequency.TimeUnit;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.oozie.OozieBundleBuilder;
 import org.apache.falcon.oozie.OozieEntityBuilder;
@@ -1078,7 +1079,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
                 // only concurrency and endtime are changed. So, change coords
                 LOG.info("Change operation is adequate! : {}, bundle: {}", cluster, bundle.getId());
                 updateCoords(cluster, bundle, EntityUtil.getParallel(newEntity),
-                    EntityUtil.getEndTime(newEntity, cluster));
+                    EntityUtil.getEndTime(newEntity, cluster), newEntity);
                 return getUpdateString(newEntity, new Date(), bundle, bundle);
             }
 
@@ -1195,7 +1196,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     private void updateCoords(String cluster, BundleJob bundle,
-                              int concurrency, Date endTime) throws FalconException {
+                              int concurrency, Date endTime, Entity entity) throws FalconException {
         if (endTime.compareTo(now()) <= 0) {
             throw new FalconException("End time " + SchemaHelper.formatDateUTC(endTime) + " can't be in the past");
         }
@@ -1206,8 +1207,19 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
 
         // change coords
         for (CoordinatorJob coord : bundle.getCoordinators()) {
+
+            Frequency delay = null;
+            //get Delay to calculate coordinator end time in case of feed replication with delay.
+            if (entity.getEntityType().equals(EntityType.FEED)) {
+                delay = getDelay((Feed) entity, coord);
+            }
+
+            //calculate next start time based on delay.
+            endTime = (delay == null) ? endTime
+                    : EntityUtil.getNextStartTime(coord.getStartTime(), delay, EntityUtil.getTimeZone(entity), endTime);
             LOG.debug("Updating endtime of coord {} to {} on cluster {}",
                     coord.getId(), SchemaHelper.formatDateUTC(endTime), cluster);
+
             Date lastActionTime = getCoordLastActionTime(coord);
             if (lastActionTime == null) { // nothing is materialized
                 LOG.info("Nothing is materialized for this coord: {}", coord.getId());
@@ -1233,6 +1245,17 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         }
     }
 
+    private Frequency getDelay(Feed entity, CoordinatorJob coord) {
+        Feed feed = entity;
+        for (org.apache.falcon.entity.v0.feed.Cluster entityCluster : feed.getClusters().getClusters()){
+            if (coord.getAppName().contains(entityCluster.getName()) && coord.getAppName().contains("REPLICATION")
+                    && entityCluster.getDelay() != null){
+                return entityCluster.getDelay();
+            }
+        }
+        return null;
+    }
+
     private String updateInternal(Entity oldEntity, Entity newEntity, Cluster cluster, BundleJob oldBundle,
         String user, Boolean skipDryRun) throws FalconException {
         String clusterName = cluster.getName();
@@ -1246,7 +1269,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         boolean suspended = BUNDLE_SUSPENDED_STATUS.contains(oldBundle.getStatus());
 
         //Set end times for old coords
-        updateCoords(clusterName, oldBundle, EntityUtil.getParallel(oldEntity), effectiveTime);
+        updateCoords(clusterName, oldBundle, EntityUtil.getParallel(oldEntity), effectiveTime, newEntity);
 
         //schedule new entity
         String newJobId = scheduleForUpdate(newEntity, cluster, effectiveTime, user);