You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/01/19 18:14:37 UTC
[2/4] falcon git commit: FALCON-1727 Suspend fails with
InvalidStateTransitionException if entity has 'KILLED' instances (by Pallavi
Rao)
FALCON-1727 Suspend fails with InvalidStateTransitionException if entity has 'KILLED' instances (by Pallavi Rao)
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/5fb3a7ab
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/5fb3a7ab
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/5fb3a7ab
Branch: refs/heads/master
Commit: 5fb3a7ab830255d9321ff4a5a8adb481c6c0d683
Parents: ccd536a
Author: Pallavi Rao <pa...@inmobi.com>
Authored: Tue Jan 19 19:27:21 2016 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Tue Jan 19 19:27:21 2016 +0530
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../falcon/execution/ProcessExecutor.java | 39 +++++++++++++-------
.../org/apache/falcon/state/EntityState.java | 3 +-
3 files changed, 29 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/5fb3a7ab/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f80e7a1..255706d 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -116,6 +116,8 @@ Proposed Release Version: 0.9
OPTIMIZATIONS
BUG FIXES
+ FALCON-1727 Suspend fails with InvalidStateTransitionException if entity has 'KILLED' instances (Pallavi Rao)
+
FALCON-1723 Rerun with skip fail actions won't work in few cases (Pavan Kolamuri via Pallavi Rao)
FALCON-1538 Prism status gives wrong info(Praveen Adlakha via Ajay Yadava)
http://git-wip-us.apache.org/repos/asf/falcon/blob/5fb3a7ab/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
index 40fe1b3..188cec2 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
@@ -56,7 +56,6 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Date;
import java.util.Properties;
-import java.util.TimeZone;
/**
* This class is responsible for managing execution instances of a process.
@@ -153,8 +152,7 @@ public class ProcessExecutor extends EntityExecutor {
suspend(instance);
} catch (FalconException e) {
// Proceed with next
- errMsg.append("Instance suspend failed for : " + instance.getId() + " due to " + e.getMessage());
- LOG.error("Instance suspend failed for : " + instance.getId(), e);
+ errMsg.append(handleError(instance, e, EntityState.EVENT.SUSPEND));
}
}
for (InstanceState instanceState : STATE_STORE.getExecutionInstances(process, cluster,
@@ -163,8 +161,7 @@ public class ProcessExecutor extends EntityExecutor {
try {
suspend(instance);
} catch (FalconException e) {
- errMsg.append("Instance suspend failed for : " + instance.getId() + " due to " + e.getMessage());
- LOG.error("Instance suspend failed for : " + instance.getId(), e);
+ errMsg.append(handleError(instance, e, EntityState.EVENT.SUSPEND));
}
}
// Some errors
@@ -173,6 +170,24 @@ public class ProcessExecutor extends EntityExecutor {
}
}
+ // Error handling for an operation.
+ private String handleError(ExecutionInstance instance, FalconException e, EntityState.EVENT action)
+ throws StateStoreException {
+ try {
+ // If the instance terminated while a kill/suspend operation was in progress, ignore the exception.
+ InstanceState.STATE currentState = STATE_STORE.getExecutionInstance(instance.getId()).getCurrentState();
+ if (InstanceState.getTerminalStates().contains(currentState)) {
+ return "";
+ }
+ } catch (StateStoreException sse) {
+ throw sse;
+ }
+
+ String errMsg = "Instance " + action.name() + " failed for: " + instance.getId() + " due to " + e.getMessage();
+ LOG.error(errMsg, e);
+ return errMsg;
+ }
+
// Returns last materialized instance's time.
private Date getLastInstanceTime() throws StateStoreException {
InstanceState instanceState = STATE_STORE.getLastExecutionInstance(process, cluster);
@@ -198,8 +213,8 @@ public class ProcessExecutor extends EntityExecutor {
try {
resume(instance);
} catch (FalconException e) {
- errMsg.append("Instance suspend failed for : " + instance.getId() + " due to " + e.getMessage());
- LOG.error("Instance suspend failed for : " + instance.getId(), e);
+ errMsg.append("Instance resume failed for : " + instance.getId() + " due to " + e.getMessage());
+ LOG.error("Instance resume failed for : " + instance.getId(), e);
}
}
registerForNotifications(getLastInstanceTime());
@@ -219,8 +234,7 @@ public class ProcessExecutor extends EntityExecutor {
kill(instance);
} catch (FalconException e) {
// Proceed with next
- errMsg.append("Instance kill failed for : " + instance.getId() + " due to " + e.getMessage());
- LOG.error("Instance kill failed for : " + instance.getId(), e);
+ errMsg.append(handleError(instance, e, EntityState.EVENT.KILL));
}
}
for (InstanceState instanceState : STATE_STORE.getExecutionInstances(process, cluster,
@@ -229,8 +243,7 @@ public class ProcessExecutor extends EntityExecutor {
try {
kill(instance);
} catch (FalconException e) {
- errMsg.append("Instance kill failed for : " + instance.getId() + " due to " + e.getMessage());
- LOG.error("Instance kill failed for : " + instance.getId(), e);
+ errMsg.append(handleError(instance, e, EntityState.EVENT.KILL));
}
}
// Some errors
@@ -248,12 +261,10 @@ public class ProcessExecutor extends EntityExecutor {
LOG.error("Suspend failed for instance id : " + instance.getId(), e);
throw new FalconException("Suspend failed for instance : " + instance.getId(), e);
}
-
}
@Override
public void resume(ExecutionInstance instance) throws FalconException {
-
try {
instance.resume();
if (((ProcessExecutionInstance) instance).isScheduled()) {
@@ -452,7 +463,7 @@ public class ProcessExecutor extends EntityExecutor {
requestBuilder.setFrequency(process.getFrequency())
.setStartTime(new DateTime(startTime))
.setEndTime(new DateTime(endTime))
- .setTimeZone(TimeZone.getTimeZone("UTC"));
+ .setTimeZone(EntityUtil.getTimeZone(process));
NotificationServicesRegistry.register(requestBuilder.build());
LOG.info("Registered for a time based notification for process {} with frequency: {}, "
+ "start time: {}, end time: {}", process.getName(), process.getFrequency(), startTime, endTime);
http://git-wip-us.apache.org/repos/asf/falcon/blob/5fb3a7ab/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/EntityState.java b/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
index f44f174..ae57fa1 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
@@ -81,7 +81,8 @@ public class EntityState implements StateMachine<EntityState.STATE, EntityState.
SUBMIT,
SCHEDULE,
SUSPEND,
- RESUME
+ RESUME,
+ KILL
}
/**