You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/01/25 07:07:25 UTC

falcon git commit: FALCON-1758 APIs fail when oozie workflow entries are deleted (By Pavan Kolamuri)

Repository: falcon
Updated Branches:
  refs/heads/0.9 44f15153b -> f3cfe690c


FALCON-1758 APIs fail when oozie workflow entries are deleted (By Pavan Kolamuri)


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/f3cfe690
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/f3cfe690
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/f3cfe690

Branch: refs/heads/0.9
Commit: f3cfe690c80fdd7c221329c36f2e3c9827e8708a
Parents: 44f1515
Author: Pallavi Rao <pa...@inmobi.com>
Authored: Mon Jan 25 10:12:30 2016 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Mon Jan 25 10:12:30 2016 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../execution/FalconExecutionService.java       | 26 ++++++++++-------
 .../falcon/execution/ProcessExecutor.java       | 30 ++++++++++----------
 .../org/apache/falcon/state/EntityState.java    | 23 +++++++++++++--
 .../falcon/state/EntityStateChangeHandler.java  |  9 +++++-
 .../org/apache/falcon/state/StateService.java   | 13 ++++++---
 .../falcon/state/store/AbstractStateStore.java  |  4 +++
 7 files changed, 74 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/f3cfe690/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 790f7e5..d4aa15d 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -106,6 +106,8 @@ Release Version: 0.9
   OPTIMIZATIONS
 
   BUG FIXES
+    FALCON-1758 APIs fail when oozie workflow entries are deleted (Pavan Kolamuri via Pallavi Rao)
+
     FALCON-1754 JobCompletionService throws FalconException (Pallavi Rao)
 
     FALCON-1716 API fails with CommunicationsException when mysql interaction time is longer than 53,434,939 milliseconds (Pavan Kolamuri via Pallavi Rao)

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3cfe690/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
index da1d7cc..93c894d 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
@@ -185,6 +185,20 @@ public final class FalconExecutionService implements FalconService, EntityStateC
         }
     }
 
+    @Override
+    public void onKill(Entity entity) throws FalconException {
+        for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) {
+            EntityClusterID id = new EntityClusterID(entity, cluster);
+            if (!executors.containsKey(id)) {
+                LOG.info("Entity {} is already deleted on cluster {}.", id, cluster);
+                continue;
+            }
+            EntityExecutor executor = getEntityExecutor(entity, cluster);
+            executor.killAll();
+            executors.remove(executor.getId());
+        }
+    }
+
     /**
      * Schedules an entity.
      *
@@ -222,18 +236,10 @@ public final class FalconExecutionService implements FalconService, EntityStateC
      * @throws FalconException
      */
     public void delete(Entity entity) throws FalconException {
-        for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) {
-            EntityClusterID id = new EntityClusterID(entity, cluster);
-            if (!executors.containsKey(id)) {
-                LOG.info("Entity {} is already deleted on cluster {}.", id, cluster);
-                continue;
-            }
-            EntityExecutor executor = getEntityExecutor(entity, cluster);
-            executor.killAll();
-            executors.remove(executor.getId());
-        }
+        StateService.get().handleStateChange(entity, EntityState.EVENT.KILL, this);
     }
 
+
     /**
      * Returns the instance of {@link EntityExecutor} for a given entity and cluster.
      *

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3cfe690/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
index 188cec2..745d2ea 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
@@ -20,6 +20,8 @@ package org.apache.falcon.execution;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+
+import java.util.Collection;
 import java.util.concurrent.ExecutionException;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.EntityUtil;
@@ -173,14 +175,10 @@ public class ProcessExecutor extends EntityExecutor {
     // Error handling for an operation.
     private String handleError(ExecutionInstance instance, FalconException e, EntityState.EVENT action)
         throws StateStoreException {
-        try {
-            // If the instance terminated while a kill/suspend operation was in progress, ignore the exception.
-            InstanceState.STATE currentState = STATE_STORE.getExecutionInstance(instance.getId()).getCurrentState();
-            if (InstanceState.getTerminalStates().contains(currentState)) {
-                return "";
-            }
-        } catch (StateStoreException sse) {
-            throw sse;
+        // If the instance terminated while a kill/suspend operation was in progress, ignore the exception.
+        InstanceState.STATE currentState = STATE_STORE.getExecutionInstance(instance.getId()).getCurrentState();
+        if (InstanceState.getTerminalStates().contains(currentState)) {
+            return "";
         }
 
         String errMsg = "Instance " + action.name() + " failed for: " + instance.getId() + " due to " + e.getMessage();
@@ -226,23 +224,24 @@ public class ProcessExecutor extends EntityExecutor {
 
     @Override
     public void killAll() throws FalconException {
-        NotificationServicesRegistry.unregister(executionService, getId());
         StringBuffer errMsg = new StringBuffer();
-        // Only active instances are in memory. Kill them first.
-        for (ExecutionInstance instance : instances.asMap().values()) {
+        // Kill workflows in oozie.
+        for (InstanceState instanceState : STATE_STORE.getExecutionInstances(process, cluster,
+                InstanceState.getActiveStates())) {
+            ExecutionInstance instance = instanceState.getInstance();
             try {
                 kill(instance);
             } catch (FalconException e) {
-                // Proceed with next
                 errMsg.append(handleError(instance, e, EntityState.EVENT.KILL));
             }
         }
-        for (InstanceState instanceState : STATE_STORE.getExecutionInstances(process, cluster,
-                InstanceState.getActiveStates())) {
-            ExecutionInstance instance = instanceState.getInstance();
+        // Kill active instances in memory.
+        Collection<ProcessExecutionInstance> execInstances = instances.asMap().values();
+        for (ExecutionInstance instance : execInstances) {
             try {
                 kill(instance);
             } catch (FalconException e) {
+                // Proceed with next
                 errMsg.append(handleError(instance, e, EntityState.EVENT.KILL));
             }
         }
@@ -250,6 +249,7 @@ public class ProcessExecutor extends EntityExecutor {
         if (errMsg.length() != 0) {
             throw new FalconException("Some instances failed to kill : " + errMsg.toString());
         }
+        NotificationServicesRegistry.unregister(executionService, getId());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3cfe690/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/EntityState.java b/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
index ae57fa1..38479a4 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
@@ -41,8 +41,10 @@ public class EntityState implements StateMachine<EntityState.STATE, EntityState.
                     return STATE.SCHEDULED;
                 case SUBMIT:
                     return this;
+                case KILL:
+                    return STATE.KILLED;
                 default:
-                    throw new InvalidStateTransitionException("Submitted entities can only be scheduled.");
+                    throw new InvalidStateTransitionException("Submitted entities can only be scheduled or killed.");
                 }
             }
         },
@@ -54,8 +56,10 @@ public class EntityState implements StateMachine<EntityState.STATE, EntityState.
                     return STATE.SUSPENDED;
                 case SCHEDULE:
                     return this;
+                case KILL:
+                    return STATE.KILLED;
                 default:
-                    throw new InvalidStateTransitionException("Scheduled entities can only be suspended.");
+                    throw new InvalidStateTransitionException("Scheduled entities can only be suspended or killed.");
                 }
             }
         },
@@ -67,8 +71,21 @@ public class EntityState implements StateMachine<EntityState.STATE, EntityState.
                     return STATE.SCHEDULED;
                 case SUSPEND:
                     return this;
+                case KILL:
+                    return STATE.KILLED;
                 default:
-                    throw new InvalidStateTransitionException("Suspended entities can only be resumed.");
+                    throw new InvalidStateTransitionException("Suspended entities can only be resumed or killed.");
+                }
+            }
+        },
+        KILLED {
+            @Override
+            public STATE nextTransition(EVENT event) throws InvalidStateTransitionException {
+                switch (event) {
+                case KILL:
+                    return STATE.KILLED;
+                default:
+                    throw new InvalidStateTransitionException("Partially killed entities can only be killed.");
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3cfe690/scheduler/src/main/java/org/apache/falcon/state/EntityStateChangeHandler.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/EntityStateChangeHandler.java b/scheduler/src/main/java/org/apache/falcon/state/EntityStateChangeHandler.java
index 44ec3fc..79c6abd 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/EntityStateChangeHandler.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/EntityStateChangeHandler.java
@@ -50,10 +50,17 @@ public interface EntityStateChangeHandler {
     void onSuspend(Entity entity) throws FalconException;
 
     /**
-     * Invoked when the an intity is resumed.
+     * Invoked when the an entity is resumed.
      *
      * @param entity
      * @throws FalconException
      */
     void onResume(Entity entity) throws FalconException;
+
+    /**
+     * Invoked when and entity is killed/deleted.
+     * @param entity
+     * @throws FalconException
+     */
+    void onKill(Entity entity) throws FalconException;
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3cfe690/scheduler/src/main/java/org/apache/falcon/state/StateService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/StateService.java b/scheduler/src/main/java/org/apache/falcon/state/StateService.java
index 9266354..638bb6e 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/StateService.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/StateService.java
@@ -81,10 +81,12 @@ public final class StateService {
             EntityState entityState = stateStore.getEntity(id);
             EntityState.STATE newState = entityState.nextTransition(event);
             callbackHandler(entity, event, handler);
-            entityState.setCurrentState(newState);
-            stateStore.updateEntity(entityState);
-            LOG.debug("State of entity: {} changed to: {} as a result of event: {}.", id,
-                    entityState.getCurrentState(), event.name());
+            if (newState != entityState.getCurrentState()) {
+                entityState.setCurrentState(newState);
+                stateStore.updateEntity(entityState);
+                LOG.debug("State of entity: {} changed to: {} as a result of event: {}.", id,
+                        entityState.getCurrentState(), event.name());
+            }
         }
     }
 
@@ -107,6 +109,9 @@ public final class StateService {
         case RESUME:
             handler.onResume(entity);
             break;
+        case KILL:
+            handler.onKill(entity);
+            break;
         default: // Do nothing, only propagate events that originate from user
         }
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/f3cfe690/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java
index 84d12f8..82cb659 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java
@@ -38,6 +38,10 @@ public abstract class AbstractStateStore implements StateStore, ConfigurationCha
     @Override
     public void onAdd(Entity entity) throws FalconException {
         if (entity.getEntityType() != EntityType.CLUSTER) {
+            EntityID entityID = new EntityID(entity);
+            if (entityExists(entityID)) {
+                deleteEntity(entityID);
+            }
             putEntity(new EntityState(entity));
         }
     }