You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/01/25 07:12:05 UTC
falcon git commit: FALCON-1758 APIs fail when oozie workflow entries
are deleted (Pavan Kolamuri)
Repository: falcon
Updated Branches:
refs/heads/master 46ecceba8 -> bd147972b
FALCON-1758 APIs fail when oozie workflow entries are deleted (Pavan Kolamuri)
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/bd147972
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/bd147972
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/bd147972
Branch: refs/heads/master
Commit: bd147972b9336c108d0660d9ba4dd91460963102
Parents: 46ecceb
Author: Pallavi Rao <pa...@inmobi.com>
Authored: Mon Jan 25 11:41:49 2016 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Mon Jan 25 11:41:49 2016 +0530
----------------------------------------------------------------------
CHANGES.txt | 2 ++
.../execution/FalconExecutionService.java | 26 ++++++++++-------
.../falcon/execution/ProcessExecutor.java | 30 ++++++++++----------
.../org/apache/falcon/state/EntityState.java | 23 +++++++++++++--
.../falcon/state/EntityStateChangeHandler.java | 9 +++++-
.../org/apache/falcon/state/StateService.java | 13 ++++++---
.../falcon/state/store/AbstractStateStore.java | 4 +++
7 files changed, 74 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/bd147972/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a86633f..d87ce26 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -120,6 +120,8 @@ Proposed Release Version: 0.9
OPTIMIZATIONS
BUG FIXES
+ FALCON-1758 APIs fail when oozie workflow entries are deleted (Pavan Kolamuri via Pallavi Rao)
+
FALCON-1754 JobCompletionService throws FalconException (Pallavi Rao)
FALCON-1716 API fails with CommunicationsException when mysql interaction time is longer than 53,434,939 milliseconds (Pavan Kolamuri via Pallavi Rao)
http://git-wip-us.apache.org/repos/asf/falcon/blob/bd147972/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
index da1d7cc..93c894d 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
@@ -185,6 +185,20 @@ public final class FalconExecutionService implements FalconService, EntityStateC
}
}
+ @Override
+ public void onKill(Entity entity) throws FalconException {
+ for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) {
+ EntityClusterID id = new EntityClusterID(entity, cluster);
+ if (!executors.containsKey(id)) {
+ LOG.info("Entity {} is already deleted on cluster {}.", id, cluster);
+ continue;
+ }
+ EntityExecutor executor = getEntityExecutor(entity, cluster);
+ executor.killAll();
+ executors.remove(executor.getId());
+ }
+ }
+
/**
* Schedules an entity.
*
@@ -222,18 +236,10 @@ public final class FalconExecutionService implements FalconService, EntityStateC
* @throws FalconException
*/
public void delete(Entity entity) throws FalconException {
- for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) {
- EntityClusterID id = new EntityClusterID(entity, cluster);
- if (!executors.containsKey(id)) {
- LOG.info("Entity {} is already deleted on cluster {}.", id, cluster);
- continue;
- }
- EntityExecutor executor = getEntityExecutor(entity, cluster);
- executor.killAll();
- executors.remove(executor.getId());
- }
+ StateService.get().handleStateChange(entity, EntityState.EVENT.KILL, this);
}
+
/**
* Returns the instance of {@link EntityExecutor} for a given entity and cluster.
*
http://git-wip-us.apache.org/repos/asf/falcon/blob/bd147972/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
index 188cec2..745d2ea 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
@@ -20,6 +20,8 @@ package org.apache.falcon.execution;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
+
+import java.util.Collection;
import java.util.concurrent.ExecutionException;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.EntityUtil;
@@ -173,14 +175,10 @@ public class ProcessExecutor extends EntityExecutor {
// Error handling for an operation.
private String handleError(ExecutionInstance instance, FalconException e, EntityState.EVENT action)
throws StateStoreException {
- try {
- // If the instance terminated while a kill/suspend operation was in progress, ignore the exception.
- InstanceState.STATE currentState = STATE_STORE.getExecutionInstance(instance.getId()).getCurrentState();
- if (InstanceState.getTerminalStates().contains(currentState)) {
- return "";
- }
- } catch (StateStoreException sse) {
- throw sse;
+ // If the instance terminated while a kill/suspend operation was in progress, ignore the exception.
+ InstanceState.STATE currentState = STATE_STORE.getExecutionInstance(instance.getId()).getCurrentState();
+ if (InstanceState.getTerminalStates().contains(currentState)) {
+ return "";
}
String errMsg = "Instance " + action.name() + " failed for: " + instance.getId() + " due to " + e.getMessage();
@@ -226,23 +224,24 @@ public class ProcessExecutor extends EntityExecutor {
@Override
public void killAll() throws FalconException {
- NotificationServicesRegistry.unregister(executionService, getId());
StringBuffer errMsg = new StringBuffer();
- // Only active instances are in memory. Kill them first.
- for (ExecutionInstance instance : instances.asMap().values()) {
+ // Kill workflows in oozie.
+ for (InstanceState instanceState : STATE_STORE.getExecutionInstances(process, cluster,
+ InstanceState.getActiveStates())) {
+ ExecutionInstance instance = instanceState.getInstance();
try {
kill(instance);
} catch (FalconException e) {
- // Proceed with next
errMsg.append(handleError(instance, e, EntityState.EVENT.KILL));
}
}
- for (InstanceState instanceState : STATE_STORE.getExecutionInstances(process, cluster,
- InstanceState.getActiveStates())) {
- ExecutionInstance instance = instanceState.getInstance();
+ // Kill active instances in memory.
+ Collection<ProcessExecutionInstance> execInstances = instances.asMap().values();
+ for (ExecutionInstance instance : execInstances) {
try {
kill(instance);
} catch (FalconException e) {
+ // Proceed with next
errMsg.append(handleError(instance, e, EntityState.EVENT.KILL));
}
}
@@ -250,6 +249,7 @@ public class ProcessExecutor extends EntityExecutor {
if (errMsg.length() != 0) {
throw new FalconException("Some instances failed to kill : " + errMsg.toString());
}
+ NotificationServicesRegistry.unregister(executionService, getId());
}
@Override
http://git-wip-us.apache.org/repos/asf/falcon/blob/bd147972/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/EntityState.java b/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
index ae57fa1..38479a4 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
@@ -41,8 +41,10 @@ public class EntityState implements StateMachine<EntityState.STATE, EntityState.
return STATE.SCHEDULED;
case SUBMIT:
return this;
+ case KILL:
+ return STATE.KILLED;
default:
- throw new InvalidStateTransitionException("Submitted entities can only be scheduled.");
+ throw new InvalidStateTransitionException("Submitted entities can only be scheduled or killed.");
}
}
},
@@ -54,8 +56,10 @@ public class EntityState implements StateMachine<EntityState.STATE, EntityState.
return STATE.SUSPENDED;
case SCHEDULE:
return this;
+ case KILL:
+ return STATE.KILLED;
default:
- throw new InvalidStateTransitionException("Scheduled entities can only be suspended.");
+ throw new InvalidStateTransitionException("Scheduled entities can only be suspended or killed.");
}
}
},
@@ -67,8 +71,21 @@ public class EntityState implements StateMachine<EntityState.STATE, EntityState.
return STATE.SCHEDULED;
case SUSPEND:
return this;
+ case KILL:
+ return STATE.KILLED;
default:
- throw new InvalidStateTransitionException("Suspended entities can only be resumed.");
+ throw new InvalidStateTransitionException("Suspended entities can only be resumed or killed.");
+ }
+ }
+ },
+ KILLED {
+ @Override
+ public STATE nextTransition(EVENT event) throws InvalidStateTransitionException {
+ switch (event) {
+ case KILL:
+ return STATE.KILLED;
+ default:
+ throw new InvalidStateTransitionException("Partially killed entities can only be killed.");
}
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/bd147972/scheduler/src/main/java/org/apache/falcon/state/EntityStateChangeHandler.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/EntityStateChangeHandler.java b/scheduler/src/main/java/org/apache/falcon/state/EntityStateChangeHandler.java
index 44ec3fc..79c6abd 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/EntityStateChangeHandler.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/EntityStateChangeHandler.java
@@ -50,10 +50,17 @@ public interface EntityStateChangeHandler {
void onSuspend(Entity entity) throws FalconException;
/**
- * Invoked when the an intity is resumed.
+ * Invoked when the an entity is resumed.
*
* @param entity
* @throws FalconException
*/
void onResume(Entity entity) throws FalconException;
+
+ /**
+ * Invoked when and entity is killed/deleted.
+ * @param entity
+ * @throws FalconException
+ */
+ void onKill(Entity entity) throws FalconException;
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/bd147972/scheduler/src/main/java/org/apache/falcon/state/StateService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/StateService.java b/scheduler/src/main/java/org/apache/falcon/state/StateService.java
index 9266354..638bb6e 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/StateService.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/StateService.java
@@ -81,10 +81,12 @@ public final class StateService {
EntityState entityState = stateStore.getEntity(id);
EntityState.STATE newState = entityState.nextTransition(event);
callbackHandler(entity, event, handler);
- entityState.setCurrentState(newState);
- stateStore.updateEntity(entityState);
- LOG.debug("State of entity: {} changed to: {} as a result of event: {}.", id,
- entityState.getCurrentState(), event.name());
+ if (newState != entityState.getCurrentState()) {
+ entityState.setCurrentState(newState);
+ stateStore.updateEntity(entityState);
+ LOG.debug("State of entity: {} changed to: {} as a result of event: {}.", id,
+ entityState.getCurrentState(), event.name());
+ }
}
}
@@ -107,6 +109,9 @@ public final class StateService {
case RESUME:
handler.onResume(entity);
break;
+ case KILL:
+ handler.onKill(entity);
+ break;
default: // Do nothing, only propagate events that originate from user
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/bd147972/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java
index 84d12f8..82cb659 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java
@@ -38,6 +38,10 @@ public abstract class AbstractStateStore implements StateStore, ConfigurationCha
@Override
public void onAdd(Entity entity) throws FalconException {
if (entity.getEntityType() != EntityType.CLUSTER) {
+ EntityID entityID = new EntityID(entity);
+ if (entityExists(entityID)) {
+ deleteEntity(entityID);
+ }
putEntity(new EntityState(entity));
}
}