You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sh...@apache.org on 2014/02/21 14:28:51 UTC

git commit: FALCON-297 Validations on update with effective time. Contributed by Shwetha GS

Repository: incubator-falcon
Updated Branches:
  refs/heads/master 2570f1513 -> b42f740a8


FALCON-297 Validations on update with effective time. Contributed by Shwetha GS


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/b42f740a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/b42f740a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/b42f740a

Branch: refs/heads/master
Commit: b42f740a886f436fba95eb0a4758f8439d1d8a0b
Parents: 2570f15
Author: Shwetha GS <sh...@gmail.com>
Authored: Fri Feb 21 18:58:43 2014 +0530
Committer: Shwetha GS <sh...@gmail.com>
Committed: Fri Feb 21 18:58:43 2014 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../workflow/engine/OozieWorkflowEngine.java    | 66 +++++++++++---------
 .../falcon/resource/AbstractEntityManager.java  | 12 +++-
 3 files changed, 51 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b42f740a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 41882b1..de3acef 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -54,6 +54,8 @@ Trunk (Unreleased)
     FALCON-123 Improve build speeds in falcon. (Srikanth Sundarrajan via Shwetha GS)
 
   BUG FIXES
+    FALCON-297 Validations on update with effective time. (Shwetha GS)
+
     FALCON-278 Changes in feed availability info doesn't update process. (Shwetha GS)
 
     FALCON-239 Build failed on build-tools due to a missing SNAPSHOT. (Srikanth 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b42f740a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index d1ced99..9372ba4 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -1040,30 +1040,44 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         OozieWorkflowBuilder<Entity> builder =
                 (OozieWorkflowBuilder<Entity>) WorkflowBuilder.getBuilder(ENGINE, oldEntity);
 
-        // Change end time of coords and schedule new bundle
         Job.Status oldBundleStatus = oldBundle.getStatus();
+        //Suspend coords as bundle suspend doesn't suspend coords synchronously
         suspendCoords(cluster, oldBundle);
 
         Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER, cluster);
         Path stagingPath = EntityUtil.getLatestStagingPath(clusterEntity, oldEntity);
-        BundleJob newBundle = findBundleForStagingPath(cluster, oldEntity, stagingPath);
+        //find last scheduled bundle
+        BundleJob latestBundle = findBundleForStagingPath(cluster, oldEntity, stagingPath);
         Date effectiveTime;
-        if (oldBundle.getAppPath().endsWith(stagingPath.toUri().getPath()) || newBundle == null || !alreadyCreated) {
+        if (oldBundle.getAppPath().endsWith(stagingPath.toUri().getPath()) || latestBundle == null || !alreadyCreated) {
             // new entity is not scheduled yet, create new bundle
             LOG.info("New bundle hasn't been created yet. So will create one");
+
+            //pick effective time as now() + 3 min to handle any time diff between falcon and oozie
+            //oozie rejects changes with endtime < now
             effectiveTime = offsetTime(now(), 3);
             if (inEffectiveTime != null && inEffectiveTime.after(effectiveTime)) {
+                //If the user has specified effective time and is valid, pick user specified effective time
                 effectiveTime = inEffectiveTime;
             }
+
+            //pick start time for new bundle which is after effectiveTime
             effectiveTime = builder.getNextStartTime(newEntity, cluster, effectiveTime);
-            newBundle =
-                    getBundleInfo(cluster, scheduleForUpdate(newEntity, cluster, effectiveTime, oldBundle.getUser()));
-            LOG.info("New bundle " + newBundle.getId() + " scheduled successfully with start time "
+
+            //schedule new bundle
+            String newBundleId = scheduleForUpdate(newEntity, cluster, effectiveTime, oldBundle.getUser());
+            //newBundleId and latestBundle will be null if effectiveTime = process end time
+            if (newBundleId != null) {
+                latestBundle = getBundleInfo(cluster, newBundleId);
+                LOG.info("New bundle " + newBundleId + " scheduled successfully with start time "
                     + SchemaHelper.formatDateUTC(effectiveTime));
+            }
         } else {
-            LOG.info("New bundle has already been created. Bundle Id: " + newBundle.getId() + ", Start: "
-                    + SchemaHelper.formatDateUTC(newBundle.getStartTime()) + ", End: " + newBundle.getEndTime());
-            effectiveTime = getMinStartTime(newBundle);
+            LOG.info("New bundle has already been created. Bundle Id: " + latestBundle.getId() + ", Start: "
+                    + SchemaHelper.formatDateUTC(latestBundle.getStartTime()) + ", End: " + latestBundle.getEndTime());
+
+            //pick effectiveTime from already created bundle
+            effectiveTime = getMinStartTime(latestBundle);
             LOG.info("Will set old coord end time to " + SchemaHelper.formatDateUTC(effectiveTime));
         }
         if (effectiveTime != null) {
@@ -1072,11 +1086,15 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         }
 
         if (oldBundleStatus != Job.Status.SUSPENDED && oldBundleStatus != Job.Status.PREPSUSPENDED) {
+            //resume coords
             resumeCoords(cluster, oldBundle);
         }
 
-        //create _SUCCESS in staging path to mark update is complete(to handle roll-forward for updates)
-        commitStagingPath(cluster, newBundle.getAppPath());
+        //latestBundle will be null if effectiveTime = process end time
+        if (latestBundle != null) {
+            //create _SUCCESS in staging path to mark update is complete(to handle roll-forward for updates)
+            commitStagingPath(cluster, latestBundle.getAppPath());
+        }
         return effectiveTime;
     }
 
@@ -1105,8 +1123,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         return startTime;
     }
 
-    private BundleJob getBundleInfo(String cluster, String bundleId)
-        throws FalconException {
+    private BundleJob getBundleInfo(String cluster, String bundleId) throws FalconException {
         try {
             return OozieClientFactory.get(cluster).getBundleJobInfo(bundleId);
         } catch (OozieClientException e) {
@@ -1237,7 +1254,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         try {
             OozieClientFactory.get(cluster).suspend(jobId);
             assertStatus(cluster, jobId, Status.PREPSUSPENDED, Status.SUSPENDED, Status.SUCCEEDED,
-                    Status.FAILED, Status.KILLED);
+                Status.FAILED, Status.KILLED);
             LOG.info("Suspended job " + jobId + " on cluster " + cluster);
         } catch (OozieClientException e) {
             throw new FalconException(e);
@@ -1247,8 +1264,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     private void resume(String cluster, String jobId) throws FalconException {
         try {
             OozieClientFactory.get(cluster).resume(jobId);
-            assertStatus(cluster, jobId, Status.RUNNING, Status.SUCCEEDED,
-                    Status.FAILED, Status.KILLED);
+            assertStatus(cluster, jobId, Status.PREP, Status.RUNNING, Status.SUCCEEDED, Status.FAILED, Status.KILLED);
             LOG.info("Resumed job " + jobId + " on cluster " + cluster);
         } catch (OozieClientException e) {
             throw new FalconException(e);
@@ -1280,22 +1296,18 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     private void change(String cluster, String id, int concurrency,
                         Date endTime, String pauseTime) throws FalconException {
         StringBuilder changeValue = new StringBuilder();
-        changeValue.append(OozieClient.CHANGE_VALUE_CONCURRENCY).append("=")
-                .append(concurrency).append(";");
+        changeValue.append(OozieClient.CHANGE_VALUE_CONCURRENCY).append("=").append(concurrency).append(";");
         if (endTime != null) {
             String endTimeStr = SchemaHelper.formatDateUTC(endTime);
-            changeValue.append(OozieClient.CHANGE_VALUE_ENDTIME).append("=")
-                    .append(endTimeStr).append(";");
+            changeValue.append(OozieClient.CHANGE_VALUE_ENDTIME).append("=").append(endTimeStr).append(";");
         }
         if (pauseTime != null) {
-            changeValue.append(OozieClient.CHANGE_VALUE_PAUSETIME).append("=")
-                    .append(pauseTime);
+            changeValue.append(OozieClient.CHANGE_VALUE_PAUSETIME).append("=").append(pauseTime);
         }
 
         String changeValueStr = changeValue.toString();
         if (changeValue.toString().endsWith(";")) {
-            changeValueStr = changeValue.substring(0,
-                    changeValueStr.length() - 1);
+            changeValueStr = changeValue.substring(0, changeValueStr.length() - 1);
         }
 
         change(cluster, id, changeValueStr);
@@ -1305,8 +1317,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
             ProxyOozieClient client = OozieClientFactory.get(cluster);
             CoordinatorJob coord = client.getCoordJobInfo(id);
             for (int counter = 0; counter < 3; counter++) {
-                Date intendedPauseTime = (StringUtils.isEmpty(pauseTime) ? null
-                        : SchemaHelper.parseDateUTC(pauseTime));
+                Date intendedPauseTime = (StringUtils.isEmpty(pauseTime) ? null : SchemaHelper.parseDateUTC(pauseTime));
                 if (coord.getConcurrency() != concurrency
                         || (endTime != null && !coord.getEndTime().equals(endTime))
                         || (intendedPauseTime != null && !intendedPauseTime.equals(coord.getPauseTime()))) {
@@ -1357,8 +1368,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
             }
             instance.cluster = cluster;
             instances[0] = instance;
-            return new InstancesResult("Instance for workflow id:" + jobId,
-                    instances);
+            return new InstancesResult("Instance for workflow id:" + jobId, instances);
         } catch (Exception e) {
             throw new FalconException(e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b42f740a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index c353935..2eeaf8a 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -241,7 +241,8 @@ public abstract class AbstractEntityManager {
                 oldClusters.removeAll(newClusters); //deleted clusters
 
                 for (String cluster : newClusters) {
-                    Date effectiveEndTime = getWorkflowEngine().update(oldEntity, newEntity, cluster, effectiveTime);
+                    Date myEffectiveTime = validateEffectiveTime(newEntity, cluster, effectiveTime);
+                    Date effectiveEndTime = getWorkflowEngine().update(oldEntity, newEntity, cluster, myEffectiveTime);
                     if (effectiveEndTime != null) {
                         effectiveTimes.add("(" + cluster + ", " + SchemaHelper.formatDateUTC(effectiveEndTime) + ")");
                     }
@@ -263,6 +264,15 @@ public abstract class AbstractEntityManager {
         }
     }
 
+    private Date validateEffectiveTime(Entity entity, String cluster, Date effectiveTime) {
+        Date start = EntityUtil.getStartTime(entity, cluster);
+        Date end = EntityUtil.getEndTime(entity, cluster);
+        if (effectiveTime == null || effectiveTime.before(start) || effectiveTime.after(end)) {
+            return null;
+        }
+        return effectiveTime;
+    }
+
     private void validateUpdate(Entity oldEntity, Entity newEntity) throws FalconException {
         if (oldEntity.getEntityType() != newEntity.getEntityType() || !oldEntity.equals(newEntity)) {
             throw new FalconException(