You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2015/10/14 07:40:05 UTC

falcon git commit: FALCON-1519 Suspend And Resume API's in Falcon Unit(Narayan Periwal)

Repository: falcon
Updated Branches:
  refs/heads/master 4a64dec0e -> e6d63f9b8


FALCON-1519 Suspend And Resume API's in Falcon Unit(Narayan Periwal)


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/e6d63f9b
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/e6d63f9b
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/e6d63f9b

Branch: refs/heads/master
Commit: e6d63f9b894a08dea46abc52c90c32d022a46708
Parents: 4a64dec
Author: Pallavi Rao <pa...@inmobi.com>
Authored: Wed Oct 14 11:09:54 2015 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Wed Oct 14 11:09:54 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../falcon/client/AbstractFalconClient.java     | 36 ++++++++++++++
 .../oozie/client/LocalProxyOozieClient.java     | 20 +++++++-
 .../apache/falcon/unit/FalconUnitClient.java    | 18 +++++++
 .../unit/LocalSchedulableEntityManager.java     | 16 +++++++
 .../apache/falcon/unit/FalconUnitTestBase.java  | 11 +++--
 .../org/apache/falcon/unit/TestFalconUnit.java  | 30 ++++++++++++
 unit/src/test/resources/process1.xml            | 50 ++++++++++++++++++++
 8 files changed, 178 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/e6d63f9b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2dc7e3c..e92c81e 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -21,6 +21,8 @@ Trunk (Unreleased)
     FALCON-1027 Falcon proxy user support(Sowmya Ramesh)
 
   IMPROVEMENTS
+    FALCON-1519 Suspend And Resume API's in Falcon Unit(Narayan Periwal via Pallavi Rao)
+
     FALCON-1524 Improve Lifecycle Retention validation checks(Ajay Yadava)
 
     FALCON-1516 Feed Retention support in Falcon Unit(Pavan Kolamuri via Pallavi Rao) 

http://git-wip-us.apache.org/repos/asf/falcon/blob/e6d63f9b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
index 2358289..b889931 100644
--- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
@@ -81,4 +81,40 @@ public abstract class AbstractFalconClient {
                                                          Integer offset, Integer numResults,
                                                          String doAsUser) throws FalconCLIException;
     //RESUME CHECKSTYLE CHECK ParameterNumberCheck
+
+    /**
+     * Suspend an entity.
+     * @param entityType Valid options are feed or process.
+     * @param entityName Name of the entity.
+     * @param colo Colo on which the query should be run.
+     * @param doAsUser proxy user
+     * @return Status of the entity.
+     * @throws FalconCLIException
+     */
+    public abstract APIResult suspend(EntityType entityType, String entityName, String colo, String doAsUser) throws
+            FalconCLIException;
+
+    /**
+     * Resume a supended entity.
+     * @param entityType Valid options are feed or process.
+     * @param entityName Name of the entity.
+     * @param colo Colo on which the query should be run.
+     * @param doAsUser proxy user
+     * @return Result of the resume command.
+     * @throws FalconCLIException
+     */
+    public abstract APIResult resume(EntityType entityType, String entityName, String colo, String doAsUser) throws
+            FalconCLIException;
+
+    /**
+     * Get status of the entity.
+     * @param entityType Valid options are feed or process.
+     * @param entityName Name of the entity.
+     * @param colo Colo on which the query should be run.
+     * @param doAsUser proxy user
+     * @return Status of the entity.
+     * @throws FalconCLIException
+     */
+    public abstract APIResult getStatus(EntityType entityType, String entityName, String colo, String doAsUser) throws
+            FalconCLIException;
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/e6d63f9b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
index 217cec9..756828f 100644
--- a/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
+++ b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
@@ -61,6 +61,22 @@ public class LocalProxyOozieClient extends OozieClient {
         return localOozieClientCoord;
     }
 
+    private OozieClient getClient(String jobId) {
+        if (jobId != null) {
+            if (jobId.toUpperCase().endsWith("B")) { //checking if it's a bundle job
+                return getLocalOozieClientBundle();
+            } else if (jobId.toUpperCase().endsWith("C")) { //checking if it's a coordinator job
+                return getLocalOozieClientCoord();
+            } else if (jobId.toUpperCase().endsWith("W")) { //checking if it's a workflow job
+                return getLocalOozieClient();
+            } else {
+                throw new IllegalArgumentException("Couldn't decide the type for the job-id " + jobId);
+            }
+        } else {
+            throw new IllegalArgumentException("Job-id cannot be null");
+        }
+    }
+
     @Override
     public BundleJob getBundleJobInfo(String jobId) throws OozieClientException {
         return getLocalOozieClientBundle().getBundleJobInfo(jobId);
@@ -155,12 +171,12 @@ public class LocalProxyOozieClient extends OozieClient {
 
     @Override
     public void suspend(String jobId) throws OozieClientException {
-        throw new IllegalStateException("Suspend not supported ");
+        getClient(jobId).suspend(jobId);
     }
 
     @Override
     public void resume(String jobId) throws OozieClientException {
-        throw new IllegalStateException("Resume not supported ");
+        getClient(jobId).resume(jobId);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/falcon/blob/e6d63f9b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
index 3ce261e..783af19 100644
--- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
+++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
@@ -198,6 +198,24 @@ public class FalconUnitClient extends AbstractFalconClient {
         return null;
     }
 
+    @Override
+    public APIResult suspend(EntityType entityType, String entityName, String colo, String doAsUser) throws
+            FalconCLIException {
+        return localSchedulableEntityManager.suspend(entityType.name(), entityName, colo);
+    }
+
+    @Override
+    public APIResult resume(EntityType entityType, String entityName, String colo, String doAsUser) throws
+            FalconCLIException {
+        return localSchedulableEntityManager.resume(entityType.name(), entityName, colo);
+    }
+
+    @Override
+    public APIResult getStatus(EntityType entityType, String entityName, String colo, String doAsUser) throws
+            FalconCLIException {
+        return localSchedulableEntityManager.getStatus(entityType.name(), entityName, colo);
+    }
+
     private boolean checkAndUpdateCluster(Entity entity, EntityType entityType, String cluster) {
         if (entityType == EntityType.FEED) {
             return checkAndUpdateFeedClusters(entity, cluster);

http://git-wip-us.apache.org/repos/asf/falcon/blob/e6d63f9b/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java
index d793cf2..8b1c435 100644
--- a/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java
+++ b/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java
@@ -17,6 +17,7 @@
  */
 package org.apache.falcon.unit;
 
+import org.apache.falcon.resource.APIResult;
 import org.apache.falcon.resource.AbstractSchedulableEntityManager;
 
 /**
@@ -24,4 +25,19 @@ import org.apache.falcon.resource.AbstractSchedulableEntityManager;
  */
 public class LocalSchedulableEntityManager extends AbstractSchedulableEntityManager {
     // Created for future purposes to add all entity API's here for falcon unit.
+
+    public LocalSchedulableEntityManager() {}
+
+    public APIResult suspend(String type, String entity, String colo) {
+        return super.suspend(null, type, entity, colo);
+    }
+
+    public APIResult resume(String type, String entity, String colo) {
+        return super.resume(null, type, entity, colo);
+    }
+
+    public APIResult getStatus(String type, String entity, String colo) {
+        return super.getStatus(type, entity, colo);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/e6d63f9b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
index 45b88f0..d12efbc 100644
--- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
+++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
@@ -43,7 +43,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
-import org.testng.annotations.AfterTest;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 
 import java.io.File;
@@ -120,8 +120,13 @@ public class FalconUnitTestBase {
         FalconUnit.cleanup();
     }
 
-    @AfterTest
-    public void cleanUpActionXml() throws IOException {
+    @AfterMethod
+    public void cleanUpActionXml() throws IOException, FalconException {
+        for (EntityType type : EntityType.values()) {
+            for (String name : ConfigurationStore.get().getEntities(type)) {
+                ConfigurationStore.get().remove(type, name);
+            }
+        }
         //Needed since oozie writes action xml to current directory.
         FileUtils.deleteQuietly(new File("action.xml"));
         FileUtils.deleteQuietly(new File(".action.xml.crc"));

http://git-wip-us.apache.org/repos/asf/falcon/blob/e6d63f9b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
index d2e574b..d504bd2 100644
--- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
+++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
@@ -87,4 +87,34 @@ public class TestFalconUnit extends FalconUnitTestBase {
         Assert.assertEquals(InstancesResult.WorkflowStatus.SUCCEEDED, status);
         Assert.assertFalse(fs.exists(new Path(inPath)));
     }
+
+    @Test
+    public void testSuspendAndResume() throws Exception {
+        // submit with default props
+        submitCluster();
+        // submitting feeds
+        APIResult result = submit(EntityType.FEED, getAbsolutePath("/infeed.xml"));
+        assertStatus(result);
+        result = submit(EntityType.FEED, getAbsolutePath("/outfeed.xml"));
+        assertStatus(result);
+        // submitting and scheduling process
+        String scheduleTime = "2015-06-20T00:00Z";
+        createData("in", "local", scheduleTime, "input.txt");
+        result = submitProcess(getAbsolutePath("/process1.xml"), "/app/oozie-mr");
+        assertStatus(result);
+        result = scheduleProcess("process1", scheduleTime, 2, "local", getAbsolutePath("/workflow.xml"),
+                true, "");
+        assertStatus(result);
+        waitForStatus(EntityType.PROCESS, "process1", scheduleTime);
+        result = getClient().suspend(EntityType.PROCESS, "process1", "local", null);
+        assertStatus(result);
+        result = getClient().getStatus(EntityType.PROCESS, "process1", "local", null);
+        assertStatus(result);
+        Assert.assertEquals(result.getMessage(), "SUSPENDED");
+        result = getClient().resume(EntityType.PROCESS, "process1", "local", null);
+        assertStatus(result);
+        result = getClient().getStatus(EntityType.PROCESS, "process1", "local", null);
+        assertStatus(result);
+        Assert.assertEquals(result.getMessage(), "RUNNING");
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/e6d63f9b/unit/src/test/resources/process1.xml
----------------------------------------------------------------------
diff --git a/unit/src/test/resources/process1.xml b/unit/src/test/resources/process1.xml
new file mode 100644
index 0000000..37dbb9c
--- /dev/null
+++ b/unit/src/test/resources/process1.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<process name="process1" xmlns="uri:falcon:process:0.1">
+    <clusters>
+        <cluster name="local">
+            <validity start="2013-11-18T00:05Z" end="2013-11-18T01:05Z"/>
+        </cluster>
+    </clusters>
+
+    <parallel>5</parallel>
+    <order>FIFO</order>
+    <frequency>minutes(1)</frequency>
+    <timezone>UTC</timezone>
+
+    <inputs>
+        <!-- In the workflow, the input paths will be available in a variable 'inpaths' -->
+        <input name="inpaths" feed="in" start="now(0,0)" end="now(0,0)" />
+    </inputs>
+
+    <outputs>
+        <!-- In the workflow, the output path will be available in a variable 'outpath' -->
+        <output name="outpath" feed="out" instance="now(0,0)"/>
+    </outputs>
+
+    <properties>
+        <!-- In the workflow, these properties will be available with variable - key -->
+        <property name="queueName" value="default"/>
+        <!-- The schedule time available as a property in workflow -->
+        <property name="time" value="${instanceTime()}"/>
+    </properties>
+
+    <workflow engine="oozie" path="/app/oozie-mr"/>
+</process>