You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2017/01/02 11:25:04 UTC

falcon git commit: FALCON-2197 enable and disable functionalities for extensions

Repository: falcon
Updated Branches:
  refs/heads/master 9fdfe713f -> 8054727ce


FALCON-2197 enable and disable functionalities for extensions

Author: Pracheer Agarwal <pr...@inmobi.com>
Author: Pracheer Agarwal <pr...@gmail.com>

Reviewers: @sandeepSamudrala,@pallavi-rao

Closes #328 from PracheerAgarwal/enableDisable and squashes the following commits:

f141efc [Pracheer Agarwal] back merge
dd6f9e9 [Pracheer Agarwal] back merge
7f17079 [Pracheer Agarwal] back merge
26b182f [Pracheer Agarwal] EC_UNRELATED_TYPES error fixes
5e60c83 [Pracheer Agarwal] checkstyle errors
3100641 [Pracheer Agarwal] bug fixes
e9fabdf [Pracheer Agarwal] squashing the commits
f4a08c8 [Pracheer Agarwal] # This is a combination of 3 commits. # This is the 1st commit message: FALCON-2197, status flag added for extensions for enable/disable funcnality
0793f2e [Pracheer Agarwal] FALCON-2221 back merge
62271b3 [Pracheer Agarwal] FALCON-2221 bug fixes for extension job submit
ab830e4 [Pracheer Agarwal] FALCON-2197 adding disabled check for an extension while submitting jobs
c47613a [Pracheer Agarwal] FALCON-2197 adding disabled check for an extension while submitting jobs
2d9fc53 [Pracheer Agarwal] FALCON-2197, extension flag value changed
b39aea2 [Pracheer Agarwal] FALCON-2197, status flag added for extensions for enable/disable funcnality
f885602 [Pracheer Agarwal] FALCON-2197, setting the default status flag for extensions
8a0c245 [Pracheer Agarwal] FALCON-2197, enable and disable functionalities for extensions
46042fd [Pracheer Agarwal] Merge branch 'master' of https://github.com/PracheerAgarwal/falcon
daa3ffc [Pracheer Agarwal] FALCON-2225 extension owner added for trusted extensions
622cae4 [Pracheer Agarwal] FALCON-2225 extension owner added for trusted extensions


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/8054727c
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/8054727c
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/8054727c

Branch: refs/heads/master
Commit: 8054727ce160bf7e9707cee86b6ea2b77897a26e
Parents: 9fdfe71
Author: Pracheer Agarwal <pr...@inmobi.com>
Authored: Mon Jan 2 16:54:48 2017 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Mon Jan 2 16:54:48 2017 +0530

----------------------------------------------------------------------
 .../apache/falcon/cli/FalconExtensionCLI.java   | 10 ++-
 .../falcon/client/AbstractFalconClient.java     | 14 ++++
 .../org/apache/falcon/client/FalconClient.java  | 16 +++-
 .../falcon/extensions/ExtensionStatus.java      | 37 +++++++++
 .../falcon/persistence/ExtensionBean.java       | 18 ++++-
 .../persistence/PersistenceConstants.java       |  1 +
 .../extensions/jdbc/ExtensionMetaStore.java     | 15 ++++
 .../falcon/extensions/store/ExtensionStore.java | 27 +++++++
 .../extensions/store/ExtensionStoreTest.java    | 37 +++++++++
 .../resource/AbstractExtensionManager.java      | 20 ++++-
 .../resource/proxy/ExtensionManagerProxy.java   | 82 ++++++++++++++++----
 .../apache/falcon/unit/FalconUnitClient.java    | 10 +++
 .../falcon/unit/LocalExtensionManager.java      |  9 ++-
 13 files changed, 278 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/8054727c/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
----------------------------------------------------------------------
diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
index aa436da..0343aa8 100644
--- a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
+++ b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
@@ -52,6 +52,8 @@ public class FalconExtensionCLI {
     public static final String UNREGISTER_OPT = "unregister";
     public static final String DETAIL_OPT = "detail";
     public static final String REGISTER_OPT = "register";
+    public static final String ENABLE_OPT = "enable";
+    public static final String DISABLE_OPT = "disable";
 
     // Input parameters
     public static final String EXTENSION_NAME_OPT = "extensionName";
@@ -153,10 +155,16 @@ public class FalconExtensionCLI {
                     commandLine.getOptionValue(FalconCLIConstants.OFFSET_OPT),
                     commandLine.getOptionValue(FalconCLIConstants.NUM_RESULTS_OPT));
             result = instances != null ? instances.toString() : "No instance (" + jobName + ") found.";
+        } else if (optionsList.contains(ENABLE_OPT)) {
+            validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
+            result = client.enableExtension(extensionName).getMessage();
+        } else if (optionsList.contains(DISABLE_OPT)) {
+            validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
+            result = client.disableExtension(extensionName).getMessage();
         } else {
             throw new FalconCLIException("Invalid/missing extension command. Supported commands include "
                     + "enumerate, definition, describe, list, instances, submit, submitAndSchedule, "
-                    + "schedule, suspend, resume, delete, update, validate. "
+                    + "schedule, suspend, resume, delete, update, validate, enable, disable. "
                     + "Please refer to Falcon CLI twiki for more details.");
         }
         OUT.get().println(result);

http://git-wip-us.apache.org/repos/asf/falcon/blob/8054727c/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
index e4ce993..3181b64 100644
--- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
@@ -197,6 +197,20 @@ public abstract class AbstractFalconClient {
     public abstract APIResult unregisterExtension(String extensionName);
 
     /**
+     *
+     * @param extensionName extensionName that needs to be enabled
+     * @return Result of the enableExtension operation
+     */
+    public abstract APIResult enableExtension(String extensionName);
+
+    /**
+     *
+     * @param extensionName extensionName that needs to be disabled
+     * @return Result of the disableExtension operation
+     */
+    public abstract APIResult disableExtension(String extensionName);
+
+    /**
      * Prepares set of entities the extension has implemented and stage them to a local directory and submit them too.
      * @param extensionName extension which is available in the store.
      * @param jobName name to be used in all the extension entities' tagging that are built as part of

http://git-wip-us.apache.org/repos/asf/falcon/blob/8054727c/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 9adb142..8401c9c 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -356,7 +356,9 @@ public class FalconClient extends AbstractFalconClient {
         UNREGISTER("api/extension/unregister/", HttpMethod.POST, MediaType.TEXT_XML),
         DETAIL("api/extension/detail/", HttpMethod.GET, MediaType.APPLICATION_JSON),
         JOB_DETAILS("api/extension/extensionJobDetails/", HttpMethod.GET, MediaType.APPLICATION_JSON),
-        REGISTER("api/extension/register/", HttpMethod.POST, MediaType.TEXT_XML);
+        REGISTER("api/extension/register/", HttpMethod.POST, MediaType.TEXT_XML),
+        ENABLE("api/extension/enable", HttpMethod.POST, MediaType.TEXT_XML),
+        DISABLE("api/extension/disable", HttpMethod.POST, MediaType.TEXT_XML);
 
         private String path;
         private String method;
@@ -1049,6 +1051,18 @@ public class FalconClient extends AbstractFalconClient {
         return getResponse(APIResult.class, clientResponse);
     }
 
+    public APIResult enableExtension(final String extensionName) {
+        ClientResponse clientResponse = new ResourceBuilder()
+                .path(ExtensionOperations.ENABLE.path, extensionName).call(ExtensionOperations.ENABLE);
+        return getResponse(APIResult.class, clientResponse);
+    }
+
+    public APIResult disableExtension(final String extensionName) {
+        ClientResponse clientResponse = new ResourceBuilder()
+                .path(ExtensionOperations.DISABLE.path, extensionName).call(ExtensionOperations.DISABLE);
+        return getResponse(APIResult.class, clientResponse);
+    }
+
     public APIResult getExtensionDefinition(final String extensionName)  {
         ClientResponse clientResponse = new ResourceBuilder()
                 .path(ExtensionOperations.DEFINITION.path, extensionName)

http://git-wip-us.apache.org/repos/asf/falcon/blob/8054727c/common-types/src/main/java/org/apache/falcon/extensions/ExtensionStatus.java
----------------------------------------------------------------------
diff --git a/common-types/src/main/java/org/apache/falcon/extensions/ExtensionStatus.java b/common-types/src/main/java/org/apache/falcon/extensions/ExtensionStatus.java
new file mode 100644
index 0000000..8720096
--- /dev/null
+++ b/common-types/src/main/java/org/apache/falcon/extensions/ExtensionStatus.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.extensions;
+
+/**
+ * Enum to store ExtensionStatus.
+ */
+public enum ExtensionStatus {
+    ENABLED("enabled state"),
+    DISABLED("disabled state");
+
+    private final String text;
+
+    ExtensionStatus(final String text) {
+        this.text = text;
+    }
+
+    @Override
+    public String toString(){
+        return text;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8054727c/common/src/main/java/org/apache/falcon/persistence/ExtensionBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/ExtensionBean.java b/common/src/main/java/org/apache/falcon/persistence/ExtensionBean.java
index 79cfe16..122ae5e 100644
--- a/common/src/main/java/org/apache/falcon/persistence/ExtensionBean.java
+++ b/common/src/main/java/org/apache/falcon/persistence/ExtensionBean.java
@@ -18,6 +18,7 @@
 
 package org.apache.falcon.persistence;
 
+import org.apache.falcon.extensions.ExtensionStatus;
 import org.apache.falcon.extensions.ExtensionType;
 
 import javax.persistence.Basic;
@@ -44,7 +45,8 @@ import java.util.Date;
         @NamedQuery(name = PersistenceConstants.GET_ALL_EXTENSIONS, query = "select OBJECT(a) from ExtensionBean a "),
         @NamedQuery(name = PersistenceConstants.DELETE_EXTENSIONS_OF_TYPE, query = "delete from ExtensionBean a where a.extensionType = :extensionType "),
         @NamedQuery(name = PersistenceConstants.DELETE_EXTENSION, query = "delete from ExtensionBean a where a.extensionName = :extensionName "),
-        @NamedQuery(name = PersistenceConstants.GET_EXTENSION, query = "select OBJECT(a) from ExtensionBean a where a.extensionName = :extensionName")
+        @NamedQuery(name = PersistenceConstants.GET_EXTENSION, query = "select OBJECT(a) from ExtensionBean a where a.extensionName = :extensionName"),
+        @NamedQuery(name = PersistenceConstants.CHANGE_EXTENSION_STATUS, query = "update ExtensionBean a set a.status = :extensionStatus where a.extensionName = :extensionName")
 })
 //RESUME CHECKSTYLE CHECK  LineLengthCheck
 public class ExtensionBean {
@@ -79,6 +81,12 @@ public class ExtensionBean {
     @Column(name = "extension_owner")
     private String extensionOwner;
 
+    @Basic
+    @NotNull
+    @Column(name = "status")
+    @Enumerated(EnumType.STRING)
+    private ExtensionStatus status;
+
     public ExtensionType getExtensionType() {
         return extensionType;
     }
@@ -127,4 +135,12 @@ public class ExtensionBean {
         this.extensionOwner = extensionOwner;
     }
 
+    public ExtensionStatus getStatus() {
+        return status;
+    }
+
+    public void setStatus(ExtensionStatus status) {
+        this.status = status;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/8054727c/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
index 26a5cd4..e80f7b7 100644
--- a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
+++ b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
@@ -75,6 +75,7 @@ public final class PersistenceConstants {
     public static final String DELETE_EXTENSIONS_OF_TYPE = "DELETE_EXTENSIONS_OF_TYPE";
     public static final String DELETE_EXTENSION = "DELETE_EXTENSION";
     public static final String GET_EXTENSION = "GET_EXTENSION";
+    public static final String CHANGE_EXTENSION_STATUS = "CHANGE_EXTENSION_STATUS";
 
     public static final String GET_ALL_EXTENSION_JOBS = "GET_ALL_EXTENSION_JOBS";
     public static final String DELETE_EXTENSION_JOB = "DELETE_EXTENSION_JOB";

http://git-wip-us.apache.org/repos/asf/falcon/blob/8054727c/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
----------------------------------------------------------------------
diff --git a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
index 03f98f6..e53069a 100644
--- a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
+++ b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
@@ -17,6 +17,7 @@
  */
 package org.apache.falcon.extensions.jdbc;
 
+import org.apache.falcon.extensions.ExtensionStatus;
 import org.apache.falcon.extensions.ExtensionType;
 import org.apache.falcon.persistence.ExtensionBean;
 import org.apache.falcon.persistence.ExtensionJobsBean;
@@ -36,6 +37,7 @@ public class ExtensionMetaStore {
     private static final String EXTENSION_NAME = "extensionName";
     private static final String JOB_NAME = "jobName";
     private static final String EXTENSION_TYPE = "extensionType";
+    private static final String EXTENSION_STATUS = "extensionStatus";
 
     private EntityManager getEntityManager() {
         return FalconJPAService.get().getEntityManager();
@@ -50,6 +52,7 @@ public class ExtensionMetaStore {
         extensionBean.setCreationTime(new Date(System.currentTimeMillis()));
         extensionBean.setDescription(description);
         extensionBean.setExtensionOwner(extensionOwner);
+        extensionBean.setStatus(ExtensionStatus.ENABLED);
         EntityManager entityManager = getEntityManager();
         try {
             beginTransaction(entityManager);
@@ -229,4 +232,16 @@ public class ExtensionMetaStore {
             entityManager.close();
         }
     }
+
+    public void updateExtensionStatus(String extensionName, ExtensionStatus status) {
+        EntityManager entityManager = getEntityManager();
+        beginTransaction(entityManager);
+        Query q = entityManager.createNamedQuery(PersistenceConstants.CHANGE_EXTENSION_STATUS);
+        q.setParameter(EXTENSION_NAME, extensionName).setParameter(EXTENSION_STATUS, status);
+        try {
+            q.executeUpdate();
+        } finally {
+            commitAndCloseTransaction(entityManager);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/8054727c/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java
----------------------------------------------------------------------
diff --git a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java
index 13ff2d1..32b0cfd 100644
--- a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java
+++ b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java
@@ -23,6 +23,7 @@ import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.parser.ValidationException;
 import org.apache.falcon.entity.store.StoreAccessException;
 import org.apache.falcon.extensions.AbstractExtension;
+import org.apache.falcon.extensions.ExtensionStatus;
 import org.apache.falcon.extensions.ExtensionType;
 import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
 import org.apache.falcon.hadoop.HadoopClientFactory;
@@ -378,4 +379,30 @@ public final class ExtensionStore {
         return (storePath != null);
     }
 
+    public String updateExtensionStatus(final String extensionName, String currentUser, ExtensionStatus status) throws
+            FalconException {
+        validateStatusChange(extensionName, currentUser);
+        if (metaStore.getDetail(extensionName).getStatus().equals(status)) {
+            throw new ValidationException(extensionName + " is already in " + status.toString() + " state.");
+        } else {
+            metaStore.updateExtensionStatus(extensionName, status);
+            return "Status of extension: " + extensionName + "changed to " + status.toString() + " state.";
+        }
+    }
+
+    private void validateStatusChange(final String extensionName, String currentUser) throws FalconException {
+
+        ExtensionType extensionType = AbstractExtension.isExtensionTrusted(extensionName) ? ExtensionType.TRUSTED
+                : ExtensionType.CUSTOM;
+        if (extensionType.equals(ExtensionType.TRUSTED)) {
+            throw new ValidationException(extensionName + " is trusted. Status can't be changed for trusted "
+                    + "extensions.");
+        } else if (!metaStore.checkIfExtensionExists(extensionName)) {
+            throw new FalconException("Extension:" + extensionName + " is not registered with Falcon.");
+        } else if (!metaStore.getDetail(extensionName).getExtensionOwner().equals(currentUser)) {
+            throw new FalconException("User: " + currentUser + " is not allowed to change status of extension: "
+                    + extensionName);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/8054727c/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java
----------------------------------------------------------------------
diff --git a/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java b/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java
index 1b33e1b..b2fac5f 100644
--- a/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java
+++ b/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java
@@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.parser.ValidationException;
 import org.apache.falcon.entity.store.StoreAccessException;
+import org.apache.falcon.extensions.ExtensionStatus;
 import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
 import org.apache.falcon.extensions.mirroring.hdfs.HdfsMirroringExtension;
 import org.apache.falcon.hadoop.JailedFileSystem;
@@ -157,6 +158,42 @@ public class ExtensionStoreTest extends AbstractTestExtensionStore {
         store.deleteExtension("ACLFailure", "falconUser");
     }
 
+    @Test(expectedExceptions = FalconException.class)
+    public void testStatusChangeExtensionACLFailure() throws IOException, URISyntaxException, FalconException {
+        String extensionPath = EXTENSION_PATH + "testStatusChangeACLFailure";
+        createLibs(extensionPath);
+        createReadmeAndJar(extensionPath);
+        createMETA(extensionPath);
+        store = ExtensionStore.get();
+        store.registerExtension("testStatusChangeACLFailure", STORAGE_URL + extensionPath, "test desc", "falconUser");
+        store.updateExtensionStatus("testStatusChangeACLFailure", "oozieUser", ExtensionStatus.DISABLED);
+    }
+
+    @Test(expectedExceptions = ValidationException.class)
+    public void testStatusChangeExtensionValidationFailure() throws IOException, URISyntaxException, FalconException {
+        String extensionPath = EXTENSION_PATH + "testStatusChangeValidationFailure";
+        createLibs(extensionPath);
+        createReadmeAndJar(extensionPath);
+        createMETA(extensionPath);
+        store = ExtensionStore.get();
+        store.registerExtension("testStatusChangeValidationFailure", STORAGE_URL + extensionPath, "test desc",
+                "falconUser");
+        store.updateExtensionStatus("testStatusChangeValidationFailure", "falconUser", ExtensionStatus.ENABLED);
+    }
+
+    @Test()
+    public void testStatusChangeExtension() throws IOException, URISyntaxException, FalconException {
+        String extensionPath = EXTENSION_PATH + "testStatusChange";
+        createLibs(extensionPath);
+        createReadmeAndJar(extensionPath);
+        createMETA(extensionPath);
+        store = ExtensionStore.get();
+        store.registerExtension("testStatusChange", STORAGE_URL + extensionPath, "test desc", "falconUser");
+        store.updateExtensionStatus("testStatusChange", "falconUser", ExtensionStatus.DISABLED);
+        ExtensionMetaStore metaStore = new ExtensionMetaStore();
+        Assert.assertEquals(metaStore.getDetail("testStatusChange").getStatus(), ExtensionStatus.DISABLED);
+    }
+
     private void createMETA(String extensionPath) throws IOException {
         Path path = new Path(extensionPath + "/META");
         if (fs.exists(path)) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/8054727c/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
index f1ed6f5..63bf1b6 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
@@ -23,6 +23,7 @@ import org.apache.falcon.FalconWebException;
 import org.apache.falcon.entity.EntityNotRegisteredException;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.parser.ValidationException;
+import org.apache.falcon.extensions.ExtensionStatus;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
@@ -35,7 +36,6 @@ import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import javax.ws.rs.core.Response;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -174,6 +174,24 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager {
         return tags.substring(nameStart, nameEnd);
     }
 
+    public String disableExtension(String extensionName, String currentUser) {
+        validateExtensionName(extensionName);
+        try {
+            return ExtensionStore.get().updateExtensionStatus(extensionName, currentUser, ExtensionStatus.DISABLED);
+        } catch (Throwable e) {
+            throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
+        }
+    }
+
+    public String enableExtension(String extensionName, String currentUser) {
+        validateExtensionName(extensionName);
+        try {
+            return ExtensionStore.get().updateExtensionStatus(extensionName, currentUser, ExtensionStatus.ENABLED);
+        } catch (Throwable e) {
+            throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
+        }
+    }
+
     private JSONObject buildExtensionDetailResult(final String extensionName) throws FalconException {
         ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/8054727c/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
index 551dbbf..2b5cbe7 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
@@ -30,9 +30,10 @@ import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.extensions.Extension;
-import org.apache.falcon.extensions.ExtensionProperties;
+import org.apache.falcon.extensions.ExtensionStatus;
 import org.apache.falcon.extensions.ExtensionService;
 import org.apache.falcon.extensions.ExtensionType;
+import org.apache.falcon.extensions.ExtensionProperties;
 import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
 import org.apache.falcon.extensions.store.ExtensionStore;
 import org.apache.falcon.persistence.ExtensionBean;
@@ -192,6 +193,7 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
     public APIResult schedule(@PathParam("job-name") String jobName,
                               @DefaultValue("") @QueryParam("doAs") String doAsUser) {
         checkIfExtensionServiceIsEnabled();
+        checkIfExtensionIsEnabled(ExtensionStore.getMetaStore().getExtensionJobDetails(jobName).getExtensionName());
         try {
             List<Entity> entities = getEntityList("", "", "", TAG_PREFIX_EXTENSION_JOB + jobName, "", doAsUser);
             if (entities.isEmpty()) {
@@ -307,8 +309,9 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
             @FormDataParam("feeds") List<FormDataBodyPart> feedForms,
             @FormDataParam("config") InputStream config) {
         checkIfExtensionServiceIsEnabled();
+        checkIfExtensionIsEnabled(extensionName);
+        checkIfExtensionJobExists(jobName, extensionName);
         SortedMap<EntityType, List<Entity>> entityMap;
-
         try {
             entityMap = getEntityList(extensionName, jobName, feedForms, processForms, config);
             submitEntities(extensionName, jobName, entityMap, config, request);
@@ -376,6 +379,8 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
             @FormDataParam("feeds") List<FormDataBodyPart> feedForms,
             @FormDataParam("config") InputStream config) {
         checkIfExtensionServiceIsEnabled();
+        checkIfExtensionIsEnabled(extensionName);
+        checkIfExtensionJobExists(jobName, extensionName);
         SortedMap<EntityType, List<Entity>> entityMap;
         try {
             entityMap = getEntityList(extensionName, jobName, feedForms, processForms, config);
@@ -445,14 +450,13 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
         List<String> feedNames = new ArrayList<>();
         List<String> processNames = new ArrayList<>();
 
+        ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
+        byte[] configBytes = null;
+        if (configStream != null) {
+            configBytes = IOUtils.toByteArray(configStream);
+        }
         for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
             for (final Entity entity : entry.getValue()) {
-                final HttpServletRequest bufferedRequest = getEntityStream(entity, entity.getEntityType(), request);
-                final Set<String> colos = getApplicableColos(entity.getEntityType().toString(), entity);
-                entityProxyUtil.proxySubmit(entity.getEntityType().toString(), bufferedRequest, entity, colos);
-                if (!embeddedMode) {
-                    super.submit(bufferedRequest, entity.getEntityType().toString(), currentColo);
-                }
                 if (entity.getEntityType().equals(EntityType.FEED)) {
                     feedNames.add(entity.getName());
                 } else {
@@ -460,13 +464,18 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
                 }
             }
         }
+        metaStore.storeExtensionJob(jobName, extensionName, feedNames, processNames, configBytes);
 
-        ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
-        byte[] configBytes = null;
-        if (configStream != null) {
-            configBytes = IOUtils.toByteArray(configStream);
+        for(Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()){
+            for(final Entity entity : entry.getValue()){
+                final HttpServletRequest bufferedRequest = getEntityStream(entity, entity.getEntityType(), request);
+                final Set<String> colos = getApplicableColos(entity.getEntityType().toString(), entity);
+                entityProxyUtil.proxySubmit(entity.getEntityType().toString(), bufferedRequest, entity, colos);
+                if (!embeddedMode) {
+                    super.submit(bufferedRequest, entity.getEntityType().toString(), currentColo);
+                }
+            }
         }
-        metaStore.storeExtensionJob(jobName, extensionName, feedNames, processNames, configBytes);
     }
 
     private void updateEntities(String extensionName, String jobName,
@@ -700,6 +709,36 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
         }
     }
 
+    @POST
+    @Path("disable/{extension-name}")
+    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
+    @Produces(MediaType.TEXT_PLAIN)
+    public APIResult disableExtension(
+            @PathParam("extension-name") String extensionName) {
+        checkIfExtensionServiceIsEnabled();
+        try {
+            return new APIResult(APIResult.Status.SUCCEEDED, super.disableExtension(extensionName,
+                    CurrentUser.getUser()));
+        } catch (Throwable e) {
+            throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
+        }
+    }
+
+    @POST
+    @Path("enable/{extension-name}")
+    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
+    @Produces(MediaType.TEXT_PLAIN)
+    public APIResult enableExtension(
+            @PathParam("extension-name") String extensionName) {
+        checkIfExtensionServiceIsEnabled();
+        try {
+            return new APIResult(APIResult.Status.SUCCEEDED, super.enableExtension(extensionName,
+                    CurrentUser.getUser()));
+        } catch (Throwable e) {
+            throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
+        }
+    }
+
     private List<Entity> generateEntities(String extensionName, InputStream configStream)
         throws FalconException, IOException {
         // get entities for extension job
@@ -731,4 +770,21 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
                     ExtensionService.SERVICE_NAME + " is not enabled.", Response.Status.NOT_FOUND);
         }
     }
+
+    private static void checkIfExtensionIsEnabled(String extensionName) {
+        ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
+        if (metaStore.getDetail(extensionName).getStatus().equals(ExtensionStatus.ENABLED)) {
+            throw FalconWebException.newAPIException("Extension: " + extensionName + " is in disabled state.",
+                    Response.Status.INTERNAL_SERVER_ERROR);
+        }
+    }
+
+    private static void checkIfExtensionJobExists(String jobName, String extensionName) {
+        ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
+        ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName);
+        if (extensionJobsBean != null && !extensionJobsBean.getExtensionName().equals(extensionName)) {
+            throw FalconWebException.newAPIException("Extension job with name: " + extensionName + " already exists.",
+                    Response.Status.INTERNAL_SERVER_ERROR);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/8054727c/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
index 9ed2a0d..2a40611 100644
--- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
+++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
@@ -282,6 +282,16 @@ public class FalconUnitClient extends AbstractFalconClient {
     }
 
     @Override
+    public APIResult enableExtension(String extensionName) {
+        return localExtensionManager.enableExtension(extensionName);
+    }
+
+    @Override
+    public APIResult disableExtension(String extensionName) {
+        return localExtensionManager.disableExtension(extensionName);
+    }
+
+    @Override
     public APIResult submitExtensionJob(String extensionName, String jobName, String configPath, String doAsUser) {
 
         InputStream configStream = getServletInputStream(configPath);

http://git-wip-us.apache.org/repos/asf/falcon/blob/8054727c/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
index 0412ef2..1e9b15a 100644
--- a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
+++ b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
@@ -140,6 +140,14 @@ public class LocalExtensionManager extends AbstractExtensionManager {
         return super.getExtensionJobDetail(jobName);
     }
 
+    public APIResult disableExtension(String extensionName) {
+        return new APIResult(APIResult.Status.SUCCEEDED, super.disableExtension(extensionName, CurrentUser.getUser()));
+    }
+
+    public APIResult enableExtension(String extensionName) {
+        return new APIResult(APIResult.Status.SUCCEEDED, super.disableExtension(extensionName, CurrentUser.getUser()));
+    }
+
     public APIResult getExtensionDetails(String extensionName){
         return super.getExtensionDetail(extensionName);
     }
@@ -147,5 +155,4 @@ public class LocalExtensionManager extends AbstractExtensionManager {
     public APIResult getExtensions(){
         return super.getExtensions();
     }
-
 }