You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sh...@apache.org on 2014/02/21 14:28:51 UTC
git commit: FALCON-297 Validations on update with effective time.
Contributed by Shwetha GS
Repository: incubator-falcon
Updated Branches:
refs/heads/master 2570f1513 -> b42f740a8
FALCON-297 Validations on update with effective time. Contributed by Shwetha GS
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/b42f740a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/b42f740a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/b42f740a
Branch: refs/heads/master
Commit: b42f740a886f436fba95eb0a4758f8439d1d8a0b
Parents: 2570f15
Author: Shwetha GS <sh...@gmail.com>
Authored: Fri Feb 21 18:58:43 2014 +0530
Committer: Shwetha GS <sh...@gmail.com>
Committed: Fri Feb 21 18:58:43 2014 +0530
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../workflow/engine/OozieWorkflowEngine.java | 66 +++++++++++---------
.../falcon/resource/AbstractEntityManager.java | 12 +++-
3 files changed, 51 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b42f740a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 41882b1..de3acef 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -54,6 +54,8 @@ Trunk (Unreleased)
FALCON-123 Improve build speeds in falcon. (Srikanth Sundarrajan via Shwetha GS)
BUG FIXES
+ FALCON-297 Validations on update with effective time. (Shwetha GS)
+
FALCON-278 Changes in feed availability info doesn't update process. (Shwetha GS)
FALCON-239 Build failed on build-tools due to a missing SNAPSHOT. (Srikanth
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b42f740a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index d1ced99..9372ba4 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -1040,30 +1040,44 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
OozieWorkflowBuilder<Entity> builder =
(OozieWorkflowBuilder<Entity>) WorkflowBuilder.getBuilder(ENGINE, oldEntity);
- // Change end time of coords and schedule new bundle
Job.Status oldBundleStatus = oldBundle.getStatus();
+ //Suspend coords as bundle suspend doesn't suspend coords synchronously
suspendCoords(cluster, oldBundle);
Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER, cluster);
Path stagingPath = EntityUtil.getLatestStagingPath(clusterEntity, oldEntity);
- BundleJob newBundle = findBundleForStagingPath(cluster, oldEntity, stagingPath);
+ //find last scheduled bundle
+ BundleJob latestBundle = findBundleForStagingPath(cluster, oldEntity, stagingPath);
Date effectiveTime;
- if (oldBundle.getAppPath().endsWith(stagingPath.toUri().getPath()) || newBundle == null || !alreadyCreated) {
+ if (oldBundle.getAppPath().endsWith(stagingPath.toUri().getPath()) || latestBundle == null || !alreadyCreated) {
// new entity is not scheduled yet, create new bundle
LOG.info("New bundle hasn't been created yet. So will create one");
+
+ //pick effective time as now() + 3 min to handle any time diff between falcon and oozie
+ //oozie rejects changes with endtime < now
effectiveTime = offsetTime(now(), 3);
if (inEffectiveTime != null && inEffectiveTime.after(effectiveTime)) {
+ //If the user has specified effective time and is valid, pick user specified effective time
effectiveTime = inEffectiveTime;
}
+
+ //pick start time for new bundle which is after effectiveTime
effectiveTime = builder.getNextStartTime(newEntity, cluster, effectiveTime);
- newBundle =
- getBundleInfo(cluster, scheduleForUpdate(newEntity, cluster, effectiveTime, oldBundle.getUser()));
- LOG.info("New bundle " + newBundle.getId() + " scheduled successfully with start time "
+
+ //schedule new bundle
+ String newBundleId = scheduleForUpdate(newEntity, cluster, effectiveTime, oldBundle.getUser());
+ //newBundleId and latestBundle will be null if effectiveTime = process end time
+ if (newBundleId != null) {
+ latestBundle = getBundleInfo(cluster, newBundleId);
+ LOG.info("New bundle " + newBundleId + " scheduled successfully with start time "
+ SchemaHelper.formatDateUTC(effectiveTime));
+ }
} else {
- LOG.info("New bundle has already been created. Bundle Id: " + newBundle.getId() + ", Start: "
- + SchemaHelper.formatDateUTC(newBundle.getStartTime()) + ", End: " + newBundle.getEndTime());
- effectiveTime = getMinStartTime(newBundle);
+ LOG.info("New bundle has already been created. Bundle Id: " + latestBundle.getId() + ", Start: "
+ + SchemaHelper.formatDateUTC(latestBundle.getStartTime()) + ", End: " + latestBundle.getEndTime());
+
+ //pick effectiveTime from already created bundle
+ effectiveTime = getMinStartTime(latestBundle);
LOG.info("Will set old coord end time to " + SchemaHelper.formatDateUTC(effectiveTime));
}
if (effectiveTime != null) {
@@ -1072,11 +1086,15 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
if (oldBundleStatus != Job.Status.SUSPENDED && oldBundleStatus != Job.Status.PREPSUSPENDED) {
+ //resume coords
resumeCoords(cluster, oldBundle);
}
- //create _SUCCESS in staging path to mark update is complete(to handle roll-forward for updates)
- commitStagingPath(cluster, newBundle.getAppPath());
+ //latestBundle will be null if effectiveTime = process end time
+ if (latestBundle != null) {
+ //create _SUCCESS in staging path to mark update is complete(to handle roll-forward for updates)
+ commitStagingPath(cluster, latestBundle.getAppPath());
+ }
return effectiveTime;
}
@@ -1105,8 +1123,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
return startTime;
}
- private BundleJob getBundleInfo(String cluster, String bundleId)
- throws FalconException {
+ private BundleJob getBundleInfo(String cluster, String bundleId) throws FalconException {
try {
return OozieClientFactory.get(cluster).getBundleJobInfo(bundleId);
} catch (OozieClientException e) {
@@ -1237,7 +1254,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
try {
OozieClientFactory.get(cluster).suspend(jobId);
assertStatus(cluster, jobId, Status.PREPSUSPENDED, Status.SUSPENDED, Status.SUCCEEDED,
- Status.FAILED, Status.KILLED);
+ Status.FAILED, Status.KILLED);
LOG.info("Suspended job " + jobId + " on cluster " + cluster);
} catch (OozieClientException e) {
throw new FalconException(e);
@@ -1247,8 +1264,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
private void resume(String cluster, String jobId) throws FalconException {
try {
OozieClientFactory.get(cluster).resume(jobId);
- assertStatus(cluster, jobId, Status.RUNNING, Status.SUCCEEDED,
- Status.FAILED, Status.KILLED);
+ assertStatus(cluster, jobId, Status.PREP, Status.RUNNING, Status.SUCCEEDED, Status.FAILED, Status.KILLED);
LOG.info("Resumed job " + jobId + " on cluster " + cluster);
} catch (OozieClientException e) {
throw new FalconException(e);
@@ -1280,22 +1296,18 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
private void change(String cluster, String id, int concurrency,
Date endTime, String pauseTime) throws FalconException {
StringBuilder changeValue = new StringBuilder();
- changeValue.append(OozieClient.CHANGE_VALUE_CONCURRENCY).append("=")
- .append(concurrency).append(";");
+ changeValue.append(OozieClient.CHANGE_VALUE_CONCURRENCY).append("=").append(concurrency).append(";");
if (endTime != null) {
String endTimeStr = SchemaHelper.formatDateUTC(endTime);
- changeValue.append(OozieClient.CHANGE_VALUE_ENDTIME).append("=")
- .append(endTimeStr).append(";");
+ changeValue.append(OozieClient.CHANGE_VALUE_ENDTIME).append("=").append(endTimeStr).append(";");
}
if (pauseTime != null) {
- changeValue.append(OozieClient.CHANGE_VALUE_PAUSETIME).append("=")
- .append(pauseTime);
+ changeValue.append(OozieClient.CHANGE_VALUE_PAUSETIME).append("=").append(pauseTime);
}
String changeValueStr = changeValue.toString();
if (changeValue.toString().endsWith(";")) {
- changeValueStr = changeValue.substring(0,
- changeValueStr.length() - 1);
+ changeValueStr = changeValue.substring(0, changeValueStr.length() - 1);
}
change(cluster, id, changeValueStr);
@@ -1305,8 +1317,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
ProxyOozieClient client = OozieClientFactory.get(cluster);
CoordinatorJob coord = client.getCoordJobInfo(id);
for (int counter = 0; counter < 3; counter++) {
- Date intendedPauseTime = (StringUtils.isEmpty(pauseTime) ? null
- : SchemaHelper.parseDateUTC(pauseTime));
+ Date intendedPauseTime = (StringUtils.isEmpty(pauseTime) ? null : SchemaHelper.parseDateUTC(pauseTime));
if (coord.getConcurrency() != concurrency
|| (endTime != null && !coord.getEndTime().equals(endTime))
|| (intendedPauseTime != null && !intendedPauseTime.equals(coord.getPauseTime()))) {
@@ -1357,8 +1368,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
instance.cluster = cluster;
instances[0] = instance;
- return new InstancesResult("Instance for workflow id:" + jobId,
- instances);
+ return new InstancesResult("Instance for workflow id:" + jobId, instances);
} catch (Exception e) {
throw new FalconException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b42f740a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index c353935..2eeaf8a 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -241,7 +241,8 @@ public abstract class AbstractEntityManager {
oldClusters.removeAll(newClusters); //deleted clusters
for (String cluster : newClusters) {
- Date effectiveEndTime = getWorkflowEngine().update(oldEntity, newEntity, cluster, effectiveTime);
+ Date myEffectiveTime = validateEffectiveTime(newEntity, cluster, effectiveTime);
+ Date effectiveEndTime = getWorkflowEngine().update(oldEntity, newEntity, cluster, myEffectiveTime);
if (effectiveEndTime != null) {
effectiveTimes.add("(" + cluster + ", " + SchemaHelper.formatDateUTC(effectiveEndTime) + ")");
}
@@ -263,6 +264,15 @@ public abstract class AbstractEntityManager {
}
}
+ private Date validateEffectiveTime(Entity entity, String cluster, Date effectiveTime) {
+ Date start = EntityUtil.getStartTime(entity, cluster);
+ Date end = EntityUtil.getEndTime(entity, cluster);
+ if (effectiveTime == null || effectiveTime.before(start) || effectiveTime.after(end)) {
+ return null;
+ }
+ return effectiveTime;
+ }
+
private void validateUpdate(Entity oldEntity, Entity newEntity) throws FalconException {
if (oldEntity.getEntityType() != newEntity.getEntityType() || !oldEntity.equals(newEntity)) {
throw new FalconException(