You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2016/02/05 16:37:05 UTC
falcon git commit: FALCON-1795 Kill api not killing waiting/ready
instances
Repository: falcon
Updated Branches:
refs/heads/master 943fc11d7 -> 49050c84c
FALCON-1795 Kill api not killing waiting/ready instances
Author: sandeep <sa...@gmail.com>
Reviewers: Ajay Yadava <aj...@apache.org>, Pavan Kumar, Peeyush Bishnoi
Closes #22 from sandeepSamudrala/master
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/49050c84
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/49050c84
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/49050c84
Branch: refs/heads/master
Commit: 49050c84c5f6b3ff364d4b7ec795b9861f4d9973
Parents: 943fc11
Author: sandeep <sa...@gmail.com>
Authored: Fri Feb 5 21:06:39 2016 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Fri Feb 5 21:06:39 2016 +0530
----------------------------------------------------------------------
CHANGES.txt | 2 ++
.../workflow/engine/OozieWorkflowEngine.java | 20 ++++++++++++++++++-
.../client/LocalOozieClientCoordProxy.java | 21 ++++++++++++++++++++
.../oozie/client/LocalProxyOozieClient.java | 5 +++++
.../apache/falcon/unit/FalconUnitTestBase.java | 13 ++++++++++++
.../org/apache/falcon/unit/TestFalconUnit.java | 16 +++++++++++++++
6 files changed, 76 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/49050c84/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3dc3aa3..01e16b5 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -30,6 +30,8 @@ Trunk
FALCON-1770 Update README file (Ajay Yadava)
BUG FIXES
+ FALCON-1795 Kill api not killing waiting/ready instances
+
FALCON-1804 Non-SLA feed throws NullPointerException.
FALCON-1806 Update documentation for Import and Export. (Venkatesan Ramachandran via Balu Vellanki)
http://git-wip-us.apache.org/repos/asf/falcon/blob/49050c84/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 04f5e93..ebf23da 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -901,9 +901,18 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
switch (action) {
case KILL:
- if (jobInfo == null || !WF_KILL_PRECOND.contains(jobInfo.getStatus())) {
+ if (jobInfo == null) {
+ StringBuilder scope = new StringBuilder();
+ scope.append(SchemaHelper.formatDateUTC(coordinatorAction.getNominalTime())).append("::")
+ .append(SchemaHelper.formatDateUTC(coordinatorAction.getNominalTime()));
+ kill(cluster, coordinatorAction.getJobId(), "date", scope.toString());
+ status = Status.KILLED.name();
break;
}
+ if (!WF_KILL_PRECOND.contains(jobInfo.getStatus())) {
+ break;
+ }
+
kill(cluster, jobInfo.getId());
status = Status.KILLED.name();
@@ -1628,6 +1637,15 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
}
+ private void kill(String cluster, String jobId, String rangeType, String scope) throws FalconException {
+ try {
+ OozieClientFactory.get(cluster).kill(jobId, rangeType, scope);
+ LOG.info("Killed job {} for instances {} on cluster {}", jobId, scope, cluster);
+ } catch (OozieClientException e) {
+ throw new FalconException(e);
+ }
+ }
+
private void kill(String cluster, String jobId) throws FalconException {
try {
OozieClientFactory.get(cluster).kill(jobId);
http://git-wip-us.apache.org/repos/asf/falcon/blob/49050c84/oozie/src/main/java/org/apache/oozie/client/LocalOozieClientCoordProxy.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/oozie/client/LocalOozieClientCoordProxy.java b/oozie/src/main/java/org/apache/oozie/client/LocalOozieClientCoordProxy.java
index ff4561b..d0bfbfc 100644
--- a/oozie/src/main/java/org/apache/oozie/client/LocalOozieClientCoordProxy.java
+++ b/oozie/src/main/java/org/apache/oozie/client/LocalOozieClientCoordProxy.java
@@ -23,6 +23,8 @@ import org.apache.oozie.CoordinatorEngine;
import org.apache.oozie.CoordinatorEngineException;
import org.apache.oozie.LocalOozieClientCoord;
+import java.util.List;
+
/**
* Client API to submit and manage Oozie Coord jobs against an Oozie
* intance.
@@ -75,4 +77,23 @@ public class LocalOozieClientCoordProxy extends LocalOozieClientCoord {
throw new OozieClientException(e.getErrorCode().toString(), e);
}
}
+
+ /**
+ * Kill coordinator actions.
+ *
+ * @param jobId coordinator Job Id
+ * @param rangeType type 'date' if -date is used, 'action-num' if -action is used
+ * @param scope kill scope for date or action nums
+ * @return list of coordinator actions that underwent kill
+ * @throws OozieClientException thrown if some actions could not be killed.
+ */
+ @Override
+ public List<CoordinatorAction> kill(String jobId, String rangeType, String scope) throws OozieClientException {
+ try {
+ coordEngine.killActions(jobId, rangeType, scope).getCoordActions();
+ } catch (CoordinatorEngineException e) {
+ throw new OozieClientException(e.getErrorCode().toString(), e);
+ }
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/49050c84/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
index 81f4c54..7bf5c37 100644
--- a/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
+++ b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
@@ -193,6 +193,11 @@ public class LocalProxyOozieClient extends OozieClient {
}
@Override
+ public List<CoordinatorAction> kill(String jobId, String rangeType, String scope) throws OozieClientException {
+ return getClient(jobId).kill(jobId, rangeType, scope);
+ }
+
+ @Override
public void change(final String jobId, final String changeValue) throws OozieClientException {
getClient(jobId).change(jobId, changeValue);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/49050c84/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
index fb30a55..70e1de9 100644
--- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
+++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
@@ -285,6 +285,19 @@ public class FalconUnitTestBase {
fs.copyFromLocalFile(new Path(getAbsolutePath(inputFile)), new Path(feedPath));
}
+ public void deleteData(String feedName, String cluster) throws FalconException, ParseException, IOException {
+ Entity existingEntity = configStore.get(EntityType.FEED, feedName);
+ if (existingEntity == null) {
+ throw new FalconException("Feed Not Found " + feedName);
+ }
+ Feed feed = (Feed) existingEntity;
+ Storage rawStorage = FeedHelper.createStorage(cluster, feed);
+ String feedPathTemplate = rawStorage.getUriTemplate(LocationType.DATA);
+
+ Path feedBasePath = FeedHelper.getFeedBasePath(feedPathTemplate);
+ fs.delete(feedBasePath, true);
+ }
+
protected String getFeedPathForTS(String cluster, String feedName,
String timeStamp) throws FalconException, ParseException {
Entity existingEntity = configStore.get(EntityType.FEED, feedName);
http://git-wip-us.apache.org/repos/asf/falcon/blob/49050c84/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
index 7c76660..aaf2b37 100644
--- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
+++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
@@ -231,6 +231,22 @@ public class TestFalconUnit extends FalconUnitTestBase {
}
@Test
+ public void testKillWaitingInstances() throws Exception {
+ submitClusterAndFeeds();
+ InstancesResult.WorkflowStatus currentStatus;
+ deleteData(INPUT_FEED_NAME, CLUSTER_NAME);
+ submitProcess(getAbsolutePath(PROCESS), PROCESS_APP_PATH);
+ scheduleProcess(PROCESS_NAME, SCHEDULE_TIME, 1, CLUSTER_NAME, getAbsolutePath(WORKFLOW), true, "");
+
+ waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, InstancesResult.WorkflowStatus.WAITING);
+ getClient().killInstances(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, END_TIME, null,
+ CLUSTER_NAME, null, null, null);
+ waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME, InstancesResult.WorkflowStatus.KILLED);
+ currentStatus = getClient().getInstanceStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME);
+ Assert.assertEquals(currentStatus, InstancesResult.WorkflowStatus.KILLED);
+ }
+
+ @Test
public void testProcessInstanceManagementAPI1() throws Exception {
submitClusterAndFeeds();
// submitting and scheduling process