You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2016/02/05 16:37:05 UTC

falcon git commit: FALCON-1795 Kill api not killing waiting/ready instances

Repository: falcon
Updated Branches:
  refs/heads/master 943fc11d7 -> 49050c84c


FALCON-1795 Kill api not killing waiting/ready instances

Author: sandeep <sa...@gmail.com>

Reviewers: Ajay Yadava <aj...@apache.org>, Pavan Kumar, Peeyush Bishnoi

Closes #22 from sandeepSamudrala/master


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/49050c84
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/49050c84
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/49050c84

Branch: refs/heads/master
Commit: 49050c84c5f6b3ff364d4b7ec795b9861f4d9973
Parents: 943fc11
Author: sandeep <sa...@gmail.com>
Authored: Fri Feb 5 21:06:39 2016 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Fri Feb 5 21:06:39 2016 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../workflow/engine/OozieWorkflowEngine.java    | 20 ++++++++++++++++++-
 .../client/LocalOozieClientCoordProxy.java      | 21 ++++++++++++++++++++
 .../oozie/client/LocalProxyOozieClient.java     |  5 +++++
 .../apache/falcon/unit/FalconUnitTestBase.java  | 13 ++++++++++++
 .../org/apache/falcon/unit/TestFalconUnit.java  | 16 +++++++++++++++
 6 files changed, 76 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/49050c84/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3dc3aa3..01e16b5 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -30,6 +30,8 @@ Trunk
     FALCON-1770 Update README file (Ajay Yadava)
 
   BUG FIXES
+    FALCON-1795 Kill api not killing waiting/ready instances
+   
     FALCON-1804 Non-SLA feed throws NullPointerException.
     
     FALCON-1806 Update documentation for Import and Export. (Venkatesan Ramachandran via Balu Vellanki)

http://git-wip-us.apache.org/repos/asf/falcon/blob/49050c84/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 04f5e93..ebf23da 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -901,9 +901,18 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
 
         switch (action) {
         case KILL:
-            if (jobInfo == null || !WF_KILL_PRECOND.contains(jobInfo.getStatus())) {
+            if (jobInfo == null) {
+                StringBuilder scope = new StringBuilder();
+                scope.append(SchemaHelper.formatDateUTC(coordinatorAction.getNominalTime())).append("::")
+                        .append(SchemaHelper.formatDateUTC(coordinatorAction.getNominalTime()));
+                kill(cluster, coordinatorAction.getJobId(), "date", scope.toString());
+                status = Status.KILLED.name();
                 break;
             }
+            if (!WF_KILL_PRECOND.contains(jobInfo.getStatus())) {
+                break;
+            }
+
 
             kill(cluster, jobInfo.getId());
             status = Status.KILLED.name();
@@ -1628,6 +1637,15 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         }
     }
 
+    private void kill(String cluster, String jobId, String rangeType, String scope) throws FalconException {
+        try {
+            OozieClientFactory.get(cluster).kill(jobId, rangeType, scope);
+            LOG.info("Killed job {} for instances {} on cluster {}", jobId, scope, cluster);
+        } catch (OozieClientException e) {
+            throw new FalconException(e);
+        }
+    }
+
     private void kill(String cluster, String jobId) throws FalconException {
         try {
             OozieClientFactory.get(cluster).kill(jobId);

http://git-wip-us.apache.org/repos/asf/falcon/blob/49050c84/oozie/src/main/java/org/apache/oozie/client/LocalOozieClientCoordProxy.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/oozie/client/LocalOozieClientCoordProxy.java b/oozie/src/main/java/org/apache/oozie/client/LocalOozieClientCoordProxy.java
index ff4561b..d0bfbfc 100644
--- a/oozie/src/main/java/org/apache/oozie/client/LocalOozieClientCoordProxy.java
+++ b/oozie/src/main/java/org/apache/oozie/client/LocalOozieClientCoordProxy.java
@@ -23,6 +23,8 @@ import org.apache.oozie.CoordinatorEngine;
 import org.apache.oozie.CoordinatorEngineException;
 import org.apache.oozie.LocalOozieClientCoord;
 
+import java.util.List;
+
 /**
  * Client API to submit and manage Oozie Coord jobs against an Oozie
  * intance.
@@ -75,4 +77,23 @@ public class LocalOozieClientCoordProxy extends LocalOozieClientCoord {
             throw new OozieClientException(e.getErrorCode().toString(), e);
         }
     }
+
+    /**
+     * Kill coordinator actions.
+     *
+     * @param jobId coordinator Job Id
+     * @param rangeType type 'date' if -date is used, 'action-num' if -action is used
+     * @param scope kill scope for date or action nums
+     * @return list of coordinator actions that underwent kill
+     * @throws OozieClientException thrown if some actions could not be killed.
+     */
+    @Override
+    public List<CoordinatorAction> kill(String jobId, String rangeType, String scope) throws OozieClientException {
+        try {
+            coordEngine.killActions(jobId, rangeType, scope).getCoordActions();
+        } catch (CoordinatorEngineException e) {
+            throw new OozieClientException(e.getErrorCode().toString(), e);
+        }
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/49050c84/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
index 81f4c54..7bf5c37 100644
--- a/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
+++ b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
@@ -193,6 +193,11 @@ public class LocalProxyOozieClient extends OozieClient {
     }
 
     @Override
+    public List<CoordinatorAction> kill(String jobId, String rangeType, String scope) throws OozieClientException {
+        return getClient(jobId).kill(jobId, rangeType, scope);
+    }
+
+    @Override
     public void change(final String jobId, final String changeValue) throws OozieClientException {
         getClient(jobId).change(jobId, changeValue);
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/49050c84/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
index fb30a55..70e1de9 100644
--- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
+++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
@@ -285,6 +285,19 @@ public class FalconUnitTestBase {
         fs.copyFromLocalFile(new Path(getAbsolutePath(inputFile)), new Path(feedPath));
     }
 
+    public void deleteData(String feedName, String cluster) throws FalconException, ParseException, IOException {
+        Entity existingEntity = configStore.get(EntityType.FEED, feedName);
+        if (existingEntity == null) {
+            throw new FalconException("Feed Not Found  " + feedName);
+        }
+        Feed feed = (Feed) existingEntity;
+        Storage rawStorage = FeedHelper.createStorage(cluster, feed);
+        String feedPathTemplate = rawStorage.getUriTemplate(LocationType.DATA);
+
+        Path feedBasePath = FeedHelper.getFeedBasePath(feedPathTemplate);
+        fs.delete(feedBasePath, true);
+    }
+
     protected String getFeedPathForTS(String cluster, String feedName,
                                       String timeStamp) throws FalconException, ParseException {
         Entity existingEntity = configStore.get(EntityType.FEED, feedName);

http://git-wip-us.apache.org/repos/asf/falcon/blob/49050c84/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
index 7c76660..aaf2b37 100644
--- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
+++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
@@ -231,6 +231,22 @@ public class TestFalconUnit extends FalconUnitTestBase {
     }
 
     @Test
+    public void testKillWaitingInstances() throws Exception {
+        submitClusterAndFeeds();
+        InstancesResult.WorkflowStatus currentStatus;
+        deleteData(INPUT_FEED_NAME, CLUSTER_NAME);
+        submitProcess(getAbsolutePath(PROCESS), PROCESS_APP_PATH);
+        scheduleProcess(PROCESS_NAME, SCHEDULE_TIME, 1, CLUSTER_NAME, getAbsolutePath(WORKFLOW), true, "");
+
+        waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, InstancesResult.WorkflowStatus.WAITING);
+        getClient().killInstances(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, END_TIME, null,
+                CLUSTER_NAME, null, null, null);
+        waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, InstancesResult.WorkflowStatus.KILLED);
+        currentStatus = getClient().getInstanceStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME);
+        Assert.assertEquals(currentStatus, InstancesResult.WorkflowStatus.KILLED);
+    }
+
+    @Test
     public void testProcessInstanceManagementAPI1() throws Exception {
         submitClusterAndFeeds();
         // submitting and scheduling process