You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2017/01/06 07:59:18 UTC

falcon git commit: FALCON-2235 Suspend/Resume API support for extension job (user extension)

Repository: falcon
Updated Branches:
  refs/heads/master 3f6b69024 -> 860866531


FALCON-2235 Suspend/Resume API support for extension job (user extension)

Author: sandeep <sa...@gmail.com>

Reviewers: @pallavi-rao

Closes #336 from sandeepSamudrala/FALCON-2235 and squashes the following commits:

f1f1f03 [sandeep] FALCON-2235 Incorporated  review comments
554824d [sandeep] FALCON-2235 new bufferedRequest to let mark/reset apis validation work for the streams
80ffd94 [sandeep] FALCON-2235 Incorporated Review comments
73a57f8 [sandeep] FALCON-2235 Suspend/Resume API support for extension job (user extension)
32e9982 [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2235
73fbf75 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
cc28658 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
089b10d [sandeep] Merge branch 'master' of https://github.com/apache/falcon
456d4ee [sandeep] Merge branch 'master' of https://github.com/apache/falcon
0cf9af6 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
4a2e23e [sandeep] Merge branch 'master' of https://github.com/apache/falcon
b1546ed [sandeep] Merge branch 'master' of https://github.com/apache/falcon
0a433fb [sandeep] Merge branch 'master' of https://github.com/apache/falcon
194f36a [sandeep] Merge branch 'master' of https://github.com/apache/falcon
e0ad358 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
f96a084 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
9cf36e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
bbca081 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
48f6afa [sandeep] Merge branch 'master' of https://github.com/apache/falcon
250cc46 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d0393e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
a178805 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d6dc8bf [sandeep] Merge branch 'master' of https://github.com/apache/falcon
1bb8d3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon
c065566 [sandeep] reverting last line changes made
1a4dcd2 [sandeep] rebased and resolved the conflicts from master
271318b [sandeep] FALCON-2097. Adding UT to the new method for getting next instance time with Delay.
a94d4fe [sandeep] rebasing from master
9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/86086653
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/86086653
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/86086653

Branch: refs/heads/master
Commit: 86086653134d9c0ccd854237f40a6a15276c0a41
Parents: 3f6b690
Author: sandeep <sa...@gmail.com>
Authored: Fri Jan 6 13:29:02 2017 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Fri Jan 6 13:29:02 2017 +0530

----------------------------------------------------------------------
 .../apache/falcon/cli/FalconExtensionCLI.java   |   6 +-
 .../falcon/client/AbstractFalconClient.java     |  19 +++
 .../org/apache/falcon/client/FalconClient.java  |   6 +-
 .../resource/AbstractExtensionManager.java      |  39 ++----
 .../falcon/resource/proxy/EntityProxyUtil.java  |  49 +++++++
 .../resource/proxy/ExtensionManagerProxy.java   | 136 ++++++++++---------
 .../proxy/SchedulableEntityManagerProxy.java    |  41 +-----
 .../apache/falcon/unit/FalconUnitClient.java    |  22 ++-
 .../falcon/unit/LocalExtensionManager.java      |  42 ++++--
 .../org/apache/falcon/unit/TestFalconUnit.java  |  17 ++-
 .../falcon/resource/ExtensionManager.java       |  22 +++
 11 files changed, 251 insertions(+), 148 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/86086653/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
----------------------------------------------------------------------
diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
index 2a105dc..60578d0 100644
--- a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
+++ b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
@@ -139,10 +139,12 @@ public class FalconExtensionCLI extends FalconCLI{
             result = client.scheduleExtensionJob(jobName, colo, doAsUser).getMessage();
         } else if (optionsList.contains(FalconCLIConstants.SUSPEND_OPT)) {
             validateRequiredParameter(jobName, JOB_NAME_OPT);
-            result = client.suspendExtensionJob(jobName, doAsUser).getMessage();
+            colo = getColo(colo);
+            result = client.suspendExtensionJob(jobName, colo, doAsUser).getMessage();
         } else if (optionsList.contains(FalconCLIConstants.RESUME_OPT)) {
             validateRequiredParameter(jobName, JOB_NAME_OPT);
-            result = client.resumeExtensionJob(jobName, doAsUser).getMessage();
+            colo = getColo(colo);
+            result = client.resumeExtensionJob(jobName, colo, doAsUser).getMessage();
         } else if (optionsList.contains(FalconCLIConstants.DELETE_OPT)) {
             validateRequiredParameter(jobName, JOB_NAME_OPT);
             result = client.deleteExtensionJob(jobName, doAsUser).getMessage();

http://git-wip-us.apache.org/repos/asf/falcon/blob/86086653/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
index 7b8a606..49392c2 100644
--- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
@@ -255,6 +255,25 @@ public abstract class AbstractFalconClient {
      * @return APIResult status of the deletion query.
      */
     public abstract APIResult deleteExtensionJob(final String jobName, final String doAsUser);
+
+    /**
+     *
+     * @param jobName name of the extension that has to be suspended.
+     * @param coloExpr comma separated list of colos where the operation has to be performed.
+     * @param doAsUser proxy user
+     * @return result status of the suspend operation.
+     */
+    public abstract APIResult suspendExtensionJob(final String jobName, final String coloExpr, final String doAsUser);
+
+    /**
+     *
+     * @param jobName name of the extension that has to be resumed.
+     * @param coloExpr comma separated list of colos where the operation has to be performed.
+     * @param doAsUser proxy user.
+     * @return result status of the resume operation.
+     */
+    public abstract APIResult resumeExtensionJob(final String jobName, final String coloExpr, final String doAsUser);
+
     /**
      *  Prepares set of entities the extension has implemented to validate the extension job.
      * @param jobName job name of the extension job.

http://git-wip-us.apache.org/repos/asf/falcon/blob/86086653/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 2772085..cf457ea 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -1209,17 +1209,19 @@ public class FalconClient extends AbstractFalconClient {
         return getResponse(APIResult.class, clientResponse);
     }
 
-    public APIResult suspendExtensionJob(final String jobName, final String doAsUser)  {
+    public APIResult suspendExtensionJob(final String jobName, final String coloExpr, final String doAsUser)  {
         ClientResponse clientResponse = new ResourceBuilder()
                 .path(ExtensionOperations.SUSPEND.path, jobName)
+                .addQueryParam(COLO, coloExpr)
                 .addQueryParam(DO_AS_OPT, doAsUser)
                 .call(ExtensionOperations.SUSPEND);
         return getResponse(APIResult.class, clientResponse);
     }
 
-    public APIResult resumeExtensionJob(final String jobName, final String doAsUser)  {
+    public APIResult resumeExtensionJob(final String jobName, final String coloExpr, final String doAsUser)  {
         ClientResponse clientResponse = new ResourceBuilder()
                 .path(ExtensionOperations.RESUME.path, jobName)
+                .addQueryParam(COLO, coloExpr)
                 .addQueryParam(DO_AS_OPT, doAsUser)
                 .call(ExtensionOperations.RESUME);
         return getResponse(APIResult.class, clientResponse);

http://git-wip-us.apache.org/repos/asf/falcon/blob/86086653/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
index 63bf1b6..ff89682 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
@@ -20,11 +20,8 @@ package org.apache.falcon.resource;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.FalconWebException;
-import org.apache.falcon.entity.EntityNotRegisteredException;
-import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.parser.ValidationException;
 import org.apache.falcon.extensions.ExtensionStatus;
-import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
 import org.apache.falcon.extensions.store.ExtensionStore;
@@ -37,8 +34,6 @@ import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import javax.ws.rs.core.Response;
-import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.SortedMap;
 import java.util.TreeMap;
@@ -59,9 +54,9 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager {
     private static final String LAST_UPDATE_TIME  = "lastUpdatedTime";
 
     public static final String NAME = "name";
-    protected static final String EXTENSION_TYPE = "type";
-    protected static final String EXTENSION_DESC = "description";
-    protected static final String EXTENSION_LOCATION = "location";
+    private static final String EXTENSION_TYPE = "type";
+    private static final String EXTENSION_DESC = "description";
+    private static final String EXTENSION_LOCATION = "location";
 
     protected static void validateExtensionName(final String extensionName) {
         if (StringUtils.isBlank(extensionName)) {
@@ -114,28 +109,14 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager {
         }
     }
 
-    protected SortedMap<EntityType, List<Entity>> getJobEntities(ExtensionJobsBean extensionJobsBean)
-        throws FalconException, IOException {
-        TreeMap<EntityType, List<Entity>> entityMap = new TreeMap<>();
-        List<String> processes = extensionJobsBean.getProcesses();
-        List<String> feeds = extensionJobsBean.getFeeds();
-        entityMap.put(EntityType.PROCESS, getEntities(processes, EntityType.PROCESS));
-        entityMap.put(EntityType.FEED, getEntities(feeds, EntityType.FEED));
+    protected SortedMap<EntityType, List<String>> getJobEntities(ExtensionJobsBean extensionJobsBean)
+        throws FalconException {
+        TreeMap<EntityType, List<String>> entityMap = new TreeMap<>();
+        entityMap.put(EntityType.PROCESS, extensionJobsBean.getProcesses());
+        entityMap.put(EntityType.FEED, extensionJobsBean.getFeeds());
         return entityMap;
     }
 
-    private List<Entity> getEntities(List<String> entityNames, EntityType entityType) throws FalconException {
-        List<Entity> entities = new ArrayList<>();
-        for (String entityName : entityNames) {
-            try {
-                entities.add(EntityUtil.getEntity(entityType, entityName));
-            } catch (EntityNotRegisteredException e) {
-                LOG.error("Entity {}  not found during deletion nothing to do", entityName);
-            }
-        }
-        return entities;
-    }
-
     private JSONObject buildExtensionJobDetailResult(final String jobName) throws FalconException {
         ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
         ExtensionJobsBean jobsBean = metaStore.getExtensionJobDetails(jobName);
@@ -174,7 +155,7 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager {
         return tags.substring(nameStart, nameEnd);
     }
 
-    public String disableExtension(String extensionName, String currentUser) {
+    protected String disableExtension(String extensionName, String currentUser) {
         validateExtensionName(extensionName);
         try {
             return ExtensionStore.get().updateExtensionStatus(extensionName, currentUser, ExtensionStatus.DISABLED);
@@ -183,7 +164,7 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager {
         }
     }
 
-    public String enableExtension(String extensionName, String currentUser) {
+    protected String enableExtension(String extensionName, String currentUser) {
         validateExtensionName(extensionName);
         try {
             return ExtensionStore.get().updateExtensionStatus(extensionName, currentUser, ExtensionStatus.ENABLED);

http://git-wip-us.apache.org/repos/asf/falcon/blob/86086653/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxyUtil.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxyUtil.java b/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxyUtil.java
index ae0a61a..7d00442 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxyUtil.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxyUtil.java
@@ -120,6 +120,55 @@ class EntityProxyUtil {
         return results;
     }
 
+    APIResult proxySchedule(final String type, final String entity, final String coloExpr,
+                                    final Boolean skipDryRun, final String properties,
+                                    final HttpServletRequest bufferedRequest) {
+        return new EntityProxy(type, entity) {
+            @Override
+            protected Set<String> getColosToApply() {
+                return getColosFromExpression(coloExpr, type, entity);
+            }
+
+            @Override
+            protected APIResult doExecute(String colo) throws FalconException {
+                return getEntityManager(colo).invoke("schedule", bufferedRequest, type, entity,
+                        colo, skipDryRun, properties);
+            }
+        }.execute();
+    }
+
+    APIResult proxySuspend(final String type, final String entity, final String coloExpr,
+                                   final HttpServletRequest bufferedRequest) {
+        return new EntityProxy(type, entity) {
+            @Override
+            protected Set<String> getColosToApply() {
+                return getColosFromExpression(coloExpr, type, entity);
+            }
+
+            @Override
+            protected APIResult doExecute(String colo) throws FalconException {
+                return getEntityManager(colo).invoke("suspend", bufferedRequest, type, entity,
+                        colo);
+            }
+        }.execute();
+    }
+
+    APIResult proxyResume(final String type, final String entity, final String coloExpr,
+                          final HttpServletRequest bufferedRequest) {
+        return new EntityProxy(type, entity) {
+            @Override
+            protected Set<String> getColosToApply() {
+                return getColosFromExpression(coloExpr, type, entity);
+            }
+
+            @Override
+            protected APIResult doExecute(String colo) throws FalconException {
+                return getEntityManager(colo).invoke("resume", bufferedRequest, type, entity,
+                        colo);
+            }
+        }.execute();
+    }
+
     Map<String, APIResult> proxyUpdate(final String type, final String entityName, final Boolean skipDryRun,
                                        final HttpServletRequest bufferedRequest, Entity newEntity) {
         final Set<String> oldColos = getApplicableColos(type, entityName);

http://git-wip-us.apache.org/repos/asf/falcon/blob/86086653/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
index 6f75dc7..8733170 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
@@ -205,11 +205,11 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
                     Response.Status.NOT_FOUND);
         }
 
-        SortedMap<EntityType, List<Entity>> entityMap;
+        SortedMap<EntityType, List<String>> entityMap;
         try {
             entityMap = getJobEntities(extensionJobsBean);
             scheduleEntities(entityMap, request, coloExpr);
-        } catch (FalconException | IOException | JAXBException e) {
+        } catch (FalconException e) {
             LOG.error("Error while scheduling entities of the extension: " + jobName + ": ", e);
             throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
         }
@@ -221,27 +221,47 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
     @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
     @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
     public APIResult suspend(@PathParam("job-name") String jobName,
-                             @DefaultValue("") @QueryParam("doAs") String doAsUser) {
+                             @Context HttpServletRequest request,
+                             @DefaultValue("") @QueryParam("doAs") String doAsUser,
+                             @QueryParam("colo") final String coloExpr) {
         checkIfExtensionServiceIsEnabled();
+        ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
+        ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName);
+        if (extensionJobsBean == null) {
+            // return failure if the extension job doesn't exist
+            LOG.error("Extension Job not found:" + jobName);
+            throw FalconWebException.newAPIException("ExtensionJob not found:" + jobName,
+                    Response.Status.NOT_FOUND);
+        }
+
         try {
-            List<Entity> entities = getEntityList("", "", "", TAG_PREFIX_EXTENSION_JOB + jobName, "", doAsUser);
-            if (entities.isEmpty()) {
-                // return failure if the extension job doesn't exist
-                return new APIResult(APIResult.Status.FAILED, "Extension job " + jobName + " doesn't exist.");
+            SortedMap<EntityType, List<String>> entityNameMap = getJobEntities(extensionJobsBean);
+            suspendEntities(entityNameMap, coloExpr, request);
+        } catch (FalconException e) {
+            LOG.error("Error while suspending entities of the extension: " + jobName + ": ", e);
+            throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
+        }
+        return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " suspended successfully");
+    }
+
+    private void suspendEntities(SortedMap<EntityType, List<String>> entityNameMap, String coloExpr,
+                                 final HttpServletRequest request) throws FalconException {
+        HttpServletRequest bufferedRequest = new BufferedRequest(request);
+        for (Map.Entry<EntityType, List<String>> entityTypeEntry : entityNameMap.entrySet()) {
+            for (final String entityName : entityTypeEntry.getValue()) {
+                entityProxyUtil.proxySuspend(entityTypeEntry.getKey().name(), entityName, coloExpr, bufferedRequest);
             }
+        }
+    }
 
-            for (Entity entity : entities) {
-                if (entity.getEntityType().isSchedulable()) {
-                    if (getWorkflowEngine(entity).isActive(entity)) {
-                        getWorkflowEngine(entity).suspend(entity);
-                    }
-                }
+    private void resumeEntities(SortedMap<EntityType, List<String>> entityNameMap, String coloExpr,
+                                final HttpServletRequest request) throws FalconException {
+        HttpServletRequest bufferedRequest = new BufferedRequest(request);
+        for (Map.Entry<EntityType, List<String>> entityTypeEntry : entityNameMap.entrySet()) {
+            for (final String entityName : entityTypeEntry.getValue()) {
+                entityProxyUtil.proxyResume(entityTypeEntry.getKey().name(), entityName, coloExpr, bufferedRequest);
             }
-        } catch (FalconException | IOException e) {
-            LOG.error("Error when scheduling extension job: " + jobName + ": ", e);
-            throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
         }
-        return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " suspended successfully");
     }
 
     @POST
@@ -249,24 +269,23 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
     @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
     @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
     public APIResult resume(@PathParam("job-name") String jobName,
+                            @Context HttpServletRequest request,
+                            @QueryParam("colo") final String coloExpr,
                             @DefaultValue("") @QueryParam("doAs") String doAsUser) {
         checkIfExtensionServiceIsEnabled();
+        ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
+        ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName);
+        if (extensionJobsBean == null) {
+            // return failure if the extension job doesn't exist
+            LOG.error("Extension Job not found:" + jobName);
+            throw FalconWebException.newAPIException("ExtensionJob not found:" + jobName,
+                    Response.Status.NOT_FOUND);
+        }
         try {
-            List<Entity> entities = getEntityList("", "", "", TAG_PREFIX_EXTENSION_JOB + jobName, "", doAsUser);
-            if (entities.isEmpty()) {
-                // return failure if the extension job doesn't exist
-                return new APIResult(APIResult.Status.FAILED, "Extension job " + jobName + " doesn't exist.");
-            }
-
-            for (Entity entity : entities) {
-                if (entity.getEntityType().isSchedulable()) {
-                    if (getWorkflowEngine(entity).isSuspended(entity)) {
-                        getWorkflowEngine(entity).resume(entity);
-                    }
-                }
-            }
-        } catch (FalconException | IOException e) {
-            LOG.error("Error when resuming extension job " + jobName + ": ", e);
+            SortedMap<EntityType, List<String>> entityNameMap = getJobEntities(extensionJobsBean);
+            resumeEntities(entityNameMap, coloExpr, request);
+        } catch (FalconException e) {
+            LOG.error("Error while resuming entities of the extension: " + jobName + ": ", e);
             throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
         }
         return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " resumed successfully");
@@ -288,11 +307,11 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
                     "Extension job " + jobName + " doesn't exist. Nothing to delete.");
         }
 
-        SortedMap<EntityType, List<Entity>> entityMap;
+        SortedMap<EntityType, List<String>> entityMap;
         try {
             entityMap = getJobEntities(extensionJobsBean);
             deleteEntities(entityMap, request);
-        } catch (FalconException | IOException | JAXBException e) {
+        } catch (FalconException e) {
             LOG.error("Error when deleting extension job: " + jobName + ": ", e);
             throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
         }
@@ -388,10 +407,13 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
         checkIfExtensionIsEnabled(extensionName);
         checkIfExtensionJobExists(jobName, extensionName);
         SortedMap<EntityType, List<Entity>> entityMap;
+        SortedMap<EntityType, List<String>> entityNameMap;
+        ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
         try {
             entityMap = getEntityList(extensionName, jobName, feedForms, processForms, config);
             submitEntities(extensionName, jobName, entityMap, config, request);
-            scheduleEntities(entityMap, request, coloExpr);
+            entityNameMap = getJobEntities(metaStore.getExtensionJobDetails(jobName));
+            scheduleEntities(entityNameMap, request, coloExpr);
         } catch (FalconException | IOException | JAXBException e) {
             LOG.error("Error while submitting extension job: ", e);
             throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
@@ -399,27 +421,13 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
         return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted and scheduled successfully");
     }
 
-    private void scheduleEntities(Map<EntityType, List<Entity>> entityMap, HttpServletRequest request, String coloExpr)
-        throws FalconException, JAXBException, IOException {
-        for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
-            for (final Entity entity : entry.getValue()) {
-                final HttpServletRequest httpServletRequest = getEntityStream(entity, entity.getEntityType(), request);
-                final HttpServletRequest bufferedRequest = getBufferedRequest(httpServletRequest);
-                final Set<String> colos = getColosFromExpression(coloExpr, entity.getEntityType().name(), entity);
-
-                new EntityProxy(entity.getEntityType().toString(), entity.getName()) {
-                    @Override
-                    protected Set<String> getColosToApply() {
-                        return colos;
-                    }
-
-                    @Override
-                    protected APIResult doExecute(String colo) throws FalconException {
-                        return new EntityProxyUtil().getEntityManager(colo).invoke("schedule", bufferedRequest,
-                                entity.getEntityType().toString(),
-                                entity.getName(), colo, Boolean.FALSE, "");
-                    }
-                }.execute();
+    private void scheduleEntities(SortedMap<EntityType, List<String>> entityMap, HttpServletRequest request,
+                                  String coloExpr) throws FalconException {
+        HttpServletRequest bufferedRequest = new BufferedRequest(request);
+        for (Map.Entry<EntityType, List<String>> entityTypeEntry : entityMap.entrySet()) {
+            for (final String entityName : entityTypeEntry.getValue()) {
+                entityProxyUtil.proxySchedule(entityTypeEntry.getKey().name(), entityName, coloExpr,
+                        Boolean.FALSE, "", bufferedRequest);
             }
         }
     }
@@ -431,16 +439,14 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
         return new BufferedRequest(request);
     }
 
-    private void deleteEntities(SortedMap<EntityType, List<Entity>> entityMap, HttpServletRequest request)
-        throws IOException, JAXBException {
-        for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
-            for (final Entity entity : entry.getValue()) {
-                final HttpServletRequest bufferedRequest = getEntityStream(entity, entity.getEntityType(), request);
-                final String entityType = entity.getEntityType().toString();
-                final String entityName = entity.getName();
-                entityProxyUtil.proxyDelete(entityType, entityName, bufferedRequest);
+    private void deleteEntities(SortedMap<EntityType, List<String>> entityMap, HttpServletRequest request)
+        throws FalconException {
+        for (Map.Entry<EntityType, List<String>> entityTypeEntry : entityMap.entrySet()) {
+            for (final String entityName : entityTypeEntry.getValue()) {
+                HttpServletRequest bufferedRequest = new BufferedRequest(request);
+                entityProxyUtil.proxyDelete(entityTypeEntry.getKey().name(), entityName, bufferedRequest);
                 if (!embeddedMode) {
-                    super.delete(bufferedRequest, entityType, entityName, currentColo);
+                    super.delete(bufferedRequest, entityTypeEntry.getKey().name(), entityName, currentColo);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/86086653/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
index 8f41c48..5b5d690 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
@@ -469,18 +469,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
                               @QueryParam("properties") final String properties) {
 
         final HttpServletRequest bufferedRequest = getBufferedRequest(request);
-        return new EntityProxy(type, entity) {
-            @Override
-            protected Set<String> getColosToApply() {
-                return getColosFromExpression(coloExpr, type, entity);
-            }
-
-            @Override
-            protected APIResult doExecute(String colo) throws FalconException {
-                return entityProxyUtil.getEntityManager(colo).invoke("schedule", bufferedRequest, type, entity,
-                        colo, skipDryRun, properties);
-            }
-        }.execute();
+        return entityProxyUtil.proxySchedule(type, entity, coloExpr, skipDryRun, properties, bufferedRequest);
     }
 
     /**
@@ -531,22 +520,11 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
                              @Dimension("colo") @QueryParam("colo") final String coloExpr) {
 
         final HttpServletRequest bufferedRequest = new BufferedRequest(request);
-        return new EntityProxy(type, entity) {
-            @Override
-            protected Set<String> getColosToApply() {
-                return getColosFromExpression(coloExpr, type, entity);
-            }
-
-            @Override
-            protected APIResult doExecute(String colo) throws FalconException {
-                return entityProxyUtil.getEntityManager(colo).invoke("suspend", bufferedRequest, type, entity,
-                        colo);
-            }
-        }.execute();
+        return entityProxyUtil.proxySuspend(type, entity, coloExpr, bufferedRequest);
     }
 
     /**
-     * Resume a supended entity.
+     * Resume a suspended entity.
      * @param request Servlet Request
      * @param type Valid options are feed or process.
      * @param entity Name of the entity.
@@ -564,18 +542,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
             @Dimension("colo") @QueryParam("colo") final String coloExpr) {
 
         final HttpServletRequest bufferedRequest = new BufferedRequest(request);
-        return new EntityProxy(type, entity) {
-            @Override
-            protected Set<String> getColosToApply() {
-                return getColosFromExpression(coloExpr, type, entity);
-            }
-
-            @Override
-            protected APIResult doExecute(String colo) throws FalconException {
-                return entityProxyUtil.getEntityManager(colo).invoke("resume", bufferedRequest, type, entity,
-                        colo);
-            }
-        }.execute();
+        return entityProxyUtil.proxyResume(type, entity, coloExpr, bufferedRequest);
     }
 
     //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck

http://git-wip-us.apache.org/repos/asf/falcon/blob/86086653/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
index 6a65d2c..d76dbca 100644
--- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
+++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
@@ -77,14 +77,14 @@ public class FalconUnitClient extends AbstractFalconClient {
     private static final String DEFAULT_ORDER_BY = "status";
     private static final String DEFAULT_SORTED_ORDER = "asc";
 
-    protected ConfigurationStore configStore;
+    private ConfigurationStore configStore;
     private AbstractWorkflowEngine workflowEngine;
     private LocalSchedulableEntityManager localSchedulableEntityManager;
     private LocalInstanceManager localInstanceManager;
     private LocalExtensionManager localExtensionManager;
 
 
-    public FalconUnitClient() throws FalconException {
+    FalconUnitClient() throws FalconException {
         configStore = ConfigurationStore.get();
         workflowEngine = WorkflowEngineFactory.getWorkflowEngine();
         localSchedulableEntityManager = new LocalSchedulableEntityManager();
@@ -123,7 +123,6 @@ public class FalconUnitClient extends AbstractFalconClient {
      * @param entityName entity name
      * @param cluster    cluster on which it has to be scheduled
      * @return
-     * @throws FalconException
      */
     @Override
     public APIResult schedule(EntityType entityType, String entityName, String cluster,
@@ -377,6 +376,23 @@ public class FalconUnitClient extends AbstractFalconClient {
         }
     }
 
+    @Override
+    public APIResult suspendExtensionJob(String jobName, String coloExpr, String doAsUser) {
+        try {
+            return localExtensionManager.suspendExtensionJob(jobName, coloExpr, doAsUser);
+        } catch (FalconException e) {
+            throw new FalconCLIException("Failed in suspending the extension job:" + jobName);
+        }
+    }
+
+    @Override
+    public APIResult resumeExtensionJob(String jobName, String coloExpr, String doAsUser) {
+        try {
+            return localExtensionManager.resumeExtensionJob(jobName, coloExpr, doAsUser);
+        } catch (FalconException e) {
+            throw new FalconCLIException("Failed in resuming the extension job:" + jobName);
+        }
+    }
 
     @Override
     public APIResult getExtensionJobDetails(final String jobName) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/86086653/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
index 20ccfca..a32dbfa 100644
--- a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
+++ b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
@@ -39,7 +39,7 @@ import java.util.SortedMap;
 /**
  * A proxy implementation of the extension operations in local mode.
  */
-public class LocalExtensionManager extends AbstractExtensionManager {
+class LocalExtensionManager extends AbstractExtensionManager {
     LocalExtensionManager() {}
 
     APIResult submitExtensionJob(String extensionName, String jobName, InputStream configStream,
@@ -99,10 +99,10 @@ public class LocalExtensionManager extends AbstractExtensionManager {
         throws FalconException, IOException{
         ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
         ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName);
-        SortedMap<EntityType, List<Entity>> entityMap = getJobEntities(extensionJobsBean);
-        for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
-            for (Entity entity : entry.getValue()) {
-                scheduleInternal(entity.getEntityType().name(), entity.getName(), true, null);
+        SortedMap<EntityType, List<String>> entityMap = getJobEntities(extensionJobsBean);
+        for (Map.Entry<EntityType, List<String>> entry : entityMap.entrySet()) {
+            for (String entityName : entry.getValue()) {
+                scheduleInternal(entry.getKey().name(), entityName, true, null);
             }
         }
         return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " scheduled successfully");
@@ -111,10 +111,10 @@ public class LocalExtensionManager extends AbstractExtensionManager {
     APIResult deleteExtensionJob(String jobName) throws FalconException, IOException {
         ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
         ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName);
-        SortedMap<EntityType, List<Entity>> entityMap = getJobEntities(extensionJobsBean);
-        for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
-            for (Entity entity : entry.getValue()) {
-                delete(entity.getEntityType().name(), entity.getName(), null);
+        SortedMap<EntityType, List<String>> entityMap = getJobEntities(extensionJobsBean);
+        for (Map.Entry<EntityType, List<String>> entry : entityMap.entrySet()) {
+            for (String entityName : entry.getValue()) {
+                delete(entry.getKey().name(), entityName, null);
             }
         }
         ExtensionStore.getMetaStore().deleteExtensionJob(jobName);
@@ -148,6 +148,30 @@ public class LocalExtensionManager extends AbstractExtensionManager {
         return new APIResult(APIResult.Status.SUCCEEDED, "Updated successfully");
     }
 
+    APIResult suspendExtensionJob(String jobName, String coloExpr, String doAsUser) throws FalconException {
+        ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
+        ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName);
+        SortedMap<EntityType, List<String>> entityMap = getJobEntities(extensionJobsBean);
+        for (Map.Entry<EntityType, List<String>> entityTypeEntry : entityMap.entrySet()) {
+            for (String entityName : entityTypeEntry.getValue()) {
+                super.suspend(null, entityTypeEntry.getKey().name(), entityName, coloExpr);
+            }
+        }
+        return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " suspended successfully");
+    }
+
+    APIResult resumeExtensionJob(String jobName, String coloExpr, String doAsUser) throws FalconException {
+        ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
+        ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName);
+        SortedMap<EntityType, List<String>> entityMap = getJobEntities(extensionJobsBean);
+        for (Map.Entry<EntityType, List<String>> entityTypeEntry : entityMap.entrySet()) {
+            for (String entityName : entityTypeEntry.getValue()) {
+                super.resume(null, entityTypeEntry.getKey().name(), entityName, coloExpr);
+            }
+        }
+        return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " suspended successfully");
+    }
+
     APIResult registerExtensionMetadata(String extensionName, String packagePath, String description) {
         return super.registerExtensionMetadata(extensionName, packagePath, description, CurrentUser.getUser());
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/86086653/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
index 508a7bb..5717fc2 100644
--- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
+++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
@@ -438,6 +438,8 @@ public class TestFalconUnit extends FalconUnitTestBase {
         clearDB();
         submitCluster();
         createExtensionPackage();
+        createDir(PROCESS_APP_PATH);
+        fs.copyFromLocalFile(new Path(getAbsolutePath(WORKFLOW)), new Path(PROCESS_APP_PATH, "workflow.xml"));
         String packageBuildLib = new Path(EXTENSION_PATH, "libs/build/").toString();
         String result = registerExtension(TEST_EXTENSION, STORAGE_URL + EXTENSION_PATH, TEST_EXTENSION);
         Assert.assertEquals(result, "Extension :testExtension registered successfully.");
@@ -454,6 +456,14 @@ public class TestFalconUnit extends FalconUnitTestBase {
 
         apiResult = getClient().scheduleExtensionJob(TEST_JOB, null, null);
         assertStatus(apiResult);
+
+        apiResult = getClient().suspendExtensionJob(TEST_JOB, null, null);
+        assertStatus(apiResult);
+        apiResult = getClient().getStatus(EntityType.PROCESS, "sample", CLUSTER_NAME, null, false);
+        Assert.assertEquals(apiResult.getMessage(), "SUSPENDED");
+
+        apiResult = getClient().resumeExtensionJob(TEST_JOB, null, null);
+        assertStatus(apiResult);
         apiResult = getClient().getStatus(EntityType.PROCESS, "sample", CLUSTER_NAME, null, false);
         assertStatus(apiResult);
         Assert.assertEquals(apiResult.getMessage(), "RUNNING");
@@ -487,8 +497,13 @@ public class TestFalconUnit extends FalconUnitTestBase {
         }
     }
 
+    @Test
+    public void testExtensionJobSuspendAndResume() throws Exception {
+
+    }
+
 
-    void copyExtensionJar(String destDirPath) throws IOException {
+    private void copyExtensionJar(String destDirPath) throws IOException {
         File dir = new File(new Path(JARS_DIR).toUri().toURL().getPath());
         for (File file : dir.listFiles()) {
             if (file.toString().endsWith(".jar")) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/86086653/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java b/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java
index 3a6c9c0..ac05b0f 100644
--- a/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java
+++ b/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java
@@ -130,6 +130,28 @@ public class ExtensionManager extends AbstractExtensionManager {
                 + "on Prism.");
     }
 
+    @POST
+    @Path("suspend/{job-name}")
+    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
+    @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
+    public APIResult suspend(@PathParam("job-name") String jobName,
+                             @DefaultValue("") @QueryParam("doAs") String doAsUser) {
+        LOG.error("Suspend of an extension job is not supported on Server.Please run your operation on Prism ");
+        throw FalconWebException.newAPIException("Suspend of an extension job is not supported on Server."
+                + "Please run your operation on Prism.");
+    }
+
+    @POST
+    @Path("resume/{job-name}")
+    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
+    @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
+    public APIResult resume(@PathParam("job-name") String jobName,
+                            @DefaultValue("") @QueryParam("doAs") String doAsUser) {
+        LOG.error("Resume of an extension job is not supported on Server.Please run your operation on Prism ");
+        throw FalconWebException.newAPIException("Resume of an extension job is not supported on Server."
+                + "Please run your operation on Prism.");
+    }
+
     @GET
     @Path("definition/{extension-name}")
     @Produces({MediaType.APPLICATION_JSON})