You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2015/10/14 07:40:05 UTC
falcon git commit: FALCON-1519 Suspend And Resume API's in Falcon
Unit(Narayan Periwal)
Repository: falcon
Updated Branches:
refs/heads/master 4a64dec0e -> e6d63f9b8
FALCON-1519 Suspend And Resume API's in Falcon Unit(Narayan Periwal)
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/e6d63f9b
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/e6d63f9b
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/e6d63f9b
Branch: refs/heads/master
Commit: e6d63f9b894a08dea46abc52c90c32d022a46708
Parents: 4a64dec
Author: Pallavi Rao <pa...@inmobi.com>
Authored: Wed Oct 14 11:09:54 2015 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Wed Oct 14 11:09:54 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../falcon/client/AbstractFalconClient.java | 36 ++++++++++++++
.../oozie/client/LocalProxyOozieClient.java | 20 +++++++-
.../apache/falcon/unit/FalconUnitClient.java | 18 +++++++
.../unit/LocalSchedulableEntityManager.java | 16 +++++++
.../apache/falcon/unit/FalconUnitTestBase.java | 11 +++--
.../org/apache/falcon/unit/TestFalconUnit.java | 30 ++++++++++++
unit/src/test/resources/process1.xml | 50 ++++++++++++++++++++
8 files changed, 178 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/e6d63f9b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2dc7e3c..e92c81e 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -21,6 +21,8 @@ Trunk (Unreleased)
FALCON-1027 Falcon proxy user support(Sowmya Ramesh)
IMPROVEMENTS
+ FALCON-1519 Suspend And Resume API's in Falcon Unit(Narayan Periwal via Pallavi Rao)
+
FALCON-1524 Improve Lifecycle Retention validation checks(Ajay Yadava)
FALCON-1516 Feed Retention support in Falcon Unit(Pavan Kolamuri via Pallavi Rao)
http://git-wip-us.apache.org/repos/asf/falcon/blob/e6d63f9b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
index 2358289..b889931 100644
--- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
@@ -81,4 +81,40 @@ public abstract class AbstractFalconClient {
Integer offset, Integer numResults,
String doAsUser) throws FalconCLIException;
//RESUME CHECKSTYLE CHECK ParameterNumberCheck
+
+ /**
+ * Suspend an entity.
+ * @param entityType Valid options are feed or process.
+ * @param entityName Name of the entity.
+ * @param colo Colo on which the query should be run.
+ * @param doAsUser proxy user
+ * @return Status of the entity.
+ * @throws FalconCLIException
+ */
+ public abstract APIResult suspend(EntityType entityType, String entityName, String colo, String doAsUser) throws
+ FalconCLIException;
+
+ /**
+ * Resume a supended entity.
+ * @param entityType Valid options are feed or process.
+ * @param entityName Name of the entity.
+ * @param colo Colo on which the query should be run.
+ * @param doAsUser proxy user
+ * @return Result of the resume command.
+ * @throws FalconCLIException
+ */
+ public abstract APIResult resume(EntityType entityType, String entityName, String colo, String doAsUser) throws
+ FalconCLIException;
+
+ /**
+ * Get status of the entity.
+ * @param entityType Valid options are feed or process.
+ * @param entityName Name of the entity.
+ * @param colo Colo on which the query should be run.
+ * @param doAsUser proxy user
+ * @return Status of the entity.
+ * @throws FalconCLIException
+ */
+ public abstract APIResult getStatus(EntityType entityType, String entityName, String colo, String doAsUser) throws
+ FalconCLIException;
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/e6d63f9b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
index 217cec9..756828f 100644
--- a/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
+++ b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
@@ -61,6 +61,22 @@ public class LocalProxyOozieClient extends OozieClient {
return localOozieClientCoord;
}
+ private OozieClient getClient(String jobId) {
+ if (jobId != null) {
+ if (jobId.toUpperCase().endsWith("B")) { //checking if it's a bundle job
+ return getLocalOozieClientBundle();
+ } else if (jobId.toUpperCase().endsWith("C")) { //checking if it's a coordinator job
+ return getLocalOozieClientCoord();
+ } else if (jobId.toUpperCase().endsWith("W")) { //checking if it's a workflow job
+ return getLocalOozieClient();
+ } else {
+ throw new IllegalArgumentException("Couldn't decide the type for the job-id " + jobId);
+ }
+ } else {
+ throw new IllegalArgumentException("Job-id cannot be null");
+ }
+ }
+
@Override
public BundleJob getBundleJobInfo(String jobId) throws OozieClientException {
return getLocalOozieClientBundle().getBundleJobInfo(jobId);
@@ -155,12 +171,12 @@ public class LocalProxyOozieClient extends OozieClient {
@Override
public void suspend(String jobId) throws OozieClientException {
- throw new IllegalStateException("Suspend not supported ");
+ getClient(jobId).suspend(jobId);
}
@Override
public void resume(String jobId) throws OozieClientException {
- throw new IllegalStateException("Resume not supported ");
+ getClient(jobId).resume(jobId);
}
@Override
http://git-wip-us.apache.org/repos/asf/falcon/blob/e6d63f9b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
index 3ce261e..783af19 100644
--- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
+++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
@@ -198,6 +198,24 @@ public class FalconUnitClient extends AbstractFalconClient {
return null;
}
+ @Override
+ public APIResult suspend(EntityType entityType, String entityName, String colo, String doAsUser) throws
+ FalconCLIException {
+ return localSchedulableEntityManager.suspend(entityType.name(), entityName, colo);
+ }
+
+ @Override
+ public APIResult resume(EntityType entityType, String entityName, String colo, String doAsUser) throws
+ FalconCLIException {
+ return localSchedulableEntityManager.resume(entityType.name(), entityName, colo);
+ }
+
+ @Override
+ public APIResult getStatus(EntityType entityType, String entityName, String colo, String doAsUser) throws
+ FalconCLIException {
+ return localSchedulableEntityManager.getStatus(entityType.name(), entityName, colo);
+ }
+
private boolean checkAndUpdateCluster(Entity entity, EntityType entityType, String cluster) {
if (entityType == EntityType.FEED) {
return checkAndUpdateFeedClusters(entity, cluster);
http://git-wip-us.apache.org/repos/asf/falcon/blob/e6d63f9b/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java
index d793cf2..8b1c435 100644
--- a/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java
+++ b/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java
@@ -17,6 +17,7 @@
*/
package org.apache.falcon.unit;
+import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.AbstractSchedulableEntityManager;
/**
@@ -24,4 +25,19 @@ import org.apache.falcon.resource.AbstractSchedulableEntityManager;
*/
public class LocalSchedulableEntityManager extends AbstractSchedulableEntityManager {
// Created for future purposes to add all entity API's here for falcon unit.
+
+ public LocalSchedulableEntityManager() {}
+
+ public APIResult suspend(String type, String entity, String colo) {
+ return super.suspend(null, type, entity, colo);
+ }
+
+ public APIResult resume(String type, String entity, String colo) {
+ return super.resume(null, type, entity, colo);
+ }
+
+ public APIResult getStatus(String type, String entity, String colo) {
+ return super.getStatus(type, entity, colo);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/e6d63f9b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
index 45b88f0..d12efbc 100644
--- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
+++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
@@ -43,7 +43,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
-import org.testng.annotations.AfterTest;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import java.io.File;
@@ -120,8 +120,13 @@ public class FalconUnitTestBase {
FalconUnit.cleanup();
}
- @AfterTest
- public void cleanUpActionXml() throws IOException {
+ @AfterMethod
+ public void cleanUpActionXml() throws IOException, FalconException {
+ for (EntityType type : EntityType.values()) {
+ for (String name : ConfigurationStore.get().getEntities(type)) {
+ ConfigurationStore.get().remove(type, name);
+ }
+ }
//Needed since oozie writes action xml to current directory.
FileUtils.deleteQuietly(new File("action.xml"));
FileUtils.deleteQuietly(new File(".action.xml.crc"));
http://git-wip-us.apache.org/repos/asf/falcon/blob/e6d63f9b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
index d2e574b..d504bd2 100644
--- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
+++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
@@ -87,4 +87,34 @@ public class TestFalconUnit extends FalconUnitTestBase {
Assert.assertEquals(InstancesResult.WorkflowStatus.SUCCEEDED, status);
Assert.assertFalse(fs.exists(new Path(inPath)));
}
+
+ @Test
+ public void testSuspendAndResume() throws Exception {
+ // submit with default props
+ submitCluster();
+ // submitting feeds
+ APIResult result = submit(EntityType.FEED, getAbsolutePath("/infeed.xml"));
+ assertStatus(result);
+ result = submit(EntityType.FEED, getAbsolutePath("/outfeed.xml"));
+ assertStatus(result);
+ // submitting and scheduling process
+ String scheduleTime = "2015-06-20T00:00Z";
+ createData("in", "local", scheduleTime, "input.txt");
+ result = submitProcess(getAbsolutePath("/process1.xml"), "/app/oozie-mr");
+ assertStatus(result);
+ result = scheduleProcess("process1", scheduleTime, 2, "local", getAbsolutePath("/workflow.xml"),
+ true, "");
+ assertStatus(result);
+ waitForStatus(EntityType.PROCESS, "process1", scheduleTime);
+ result = getClient().suspend(EntityType.PROCESS, "process1", "local", null);
+ assertStatus(result);
+ result = getClient().getStatus(EntityType.PROCESS, "process1", "local", null);
+ assertStatus(result);
+ Assert.assertEquals(result.getMessage(), "SUSPENDED");
+ result = getClient().resume(EntityType.PROCESS, "process1", "local", null);
+ assertStatus(result);
+ result = getClient().getStatus(EntityType.PROCESS, "process1", "local", null);
+ assertStatus(result);
+ Assert.assertEquals(result.getMessage(), "RUNNING");
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/e6d63f9b/unit/src/test/resources/process1.xml
----------------------------------------------------------------------
diff --git a/unit/src/test/resources/process1.xml b/unit/src/test/resources/process1.xml
new file mode 100644
index 0000000..37dbb9c
--- /dev/null
+++ b/unit/src/test/resources/process1.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<process name="process1" xmlns="uri:falcon:process:0.1">
+ <clusters>
+ <cluster name="local">
+ <validity start="2013-11-18T00:05Z" end="2013-11-18T01:05Z"/>
+ </cluster>
+ </clusters>
+
+ <parallel>5</parallel>
+ <order>FIFO</order>
+ <frequency>minutes(1)</frequency>
+ <timezone>UTC</timezone>
+
+ <inputs>
+ <!-- In the workflow, the input paths will be available in a variable 'inpaths' -->
+ <input name="inpaths" feed="in" start="now(0,0)" end="now(0,0)" />
+ </inputs>
+
+ <outputs>
+ <!-- In the workflow, the output path will be available in a variable 'outpath' -->
+ <output name="outpath" feed="out" instance="now(0,0)"/>
+ </outputs>
+
+ <properties>
+ <!-- In the workflow, these properties will be available with variable - key -->
+ <property name="queueName" value="default"/>
+ <!-- The schedule time available as a property in workflow -->
+ <property name="time" value="${instanceTime()}"/>
+ </properties>
+
+ <workflow engine="oozie" path="/app/oozie-mr"/>
+</process>