You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ba...@apache.org on 2016/04/15 20:46:46 UTC

falcon git commit: FALCON-1789 Extension Job Management: REST API

Repository: falcon
Updated Branches:
  refs/heads/master 20bc33187 -> dc3e729a8


FALCON-1789 Extension Job Management: REST API

Tested REST APIs. Will add IT tests (FALCON-1905) and documentations (FALCON-1904) in separate pull requests.

Author: yzheng-hortonworks <yz...@hortonworks.com>

Reviewers: "Balu <ba...@apache.org>, Sowmya <so...@apache.org>"

Closes #98 from yzheng-hortonworks/FALCON-1789


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/dc3e729a
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/dc3e729a
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/dc3e729a

Branch: refs/heads/master
Commit: dc3e729a8b73a15b923ae6925af89b5a5644e289
Parents: 20bc331
Author: Ying Zheng <yz...@hortonworks.com>
Authored: Fri Apr 15 11:46:33 2016 -0700
Committer: bvellanki <bv...@hortonworks.com>
Committed: Fri Apr 15 11:46:33 2016 -0700

----------------------------------------------------------------------
 .../falcon/resource/ExtensionInstanceList.java  |  95 +++++
 .../falcon/resource/ExtensionJobList.java       |  97 +++++
 .../falcon/resource/AbstractEntityManager.java  |  98 +++--
 .../resource/extensions/ExtensionManager.java   | 396 +++++++++++++++++++
 prism/src/main/webapp/WEB-INF/web.xml           |   4 +-
 .../src/main/webapp/WEB-INF/distributed/web.xml |   8 +-
 webapp/src/main/webapp/WEB-INF/embedded/web.xml |   4 +-
 webapp/src/main/webapp/WEB-INF/web.xml          |   4 +-
 8 files changed, 665 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/dc3e729a/client/src/main/java/org/apache/falcon/resource/ExtensionInstanceList.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/ExtensionInstanceList.java b/client/src/main/java/org/apache/falcon/resource/ExtensionInstanceList.java
new file mode 100644
index 0000000..65ca4d4
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/resource/ExtensionInstanceList.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource;
+
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlElementWrapper;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Instance list of an extension job used for marshalling / unmarshalling with REST calls.
+ */
+@XmlRootElement
+@edu.umd.cs.findbugs.annotations.SuppressWarnings({"EI_EXPOSE_REP", "EI_EXPOSE_REP2"})
+public class ExtensionInstanceList {
+    @XmlElement
+    private int numEntities;
+
+    @XmlElementWrapper(name = "entitiesSummary")
+    private List<EntitySummary> entitySummary;
+
+    public ExtensionInstanceList() {
+        numEntities = 0;
+        entitySummary = null;
+    }
+
+    public ExtensionInstanceList(int numEntities) {
+        this.numEntities = numEntities;
+        entitySummary = new ArrayList<>();
+    }
+
+    public ExtensionInstanceList(int numEntities, List<EntitySummary> entitySummary) {
+        this.numEntities = numEntities;
+        this.entitySummary = entitySummary;
+    }
+
+    public void addEntitySummary(EntitySummary summary) {
+        entitySummary.add(summary);
+    }
+
+    public String toString() {
+        StringBuilder buffer = new StringBuilder();
+        buffer.append(numEntities + "\n\n");
+        for (EntitySummary summary : entitySummary) {
+            buffer.append(summary.toString());
+        }
+        return buffer.toString();
+    }
+
+    /**
+     * Summary of an entity (including entity properties and instances.
+     */
+    public static class EntitySummary {
+        @XmlElement
+        private EntityList.EntityElement entityProfile;
+
+        @XmlElement
+        private InstancesResult.Instance[] instances;
+
+        public EntitySummary() {
+            entityProfile = null;
+            instances = null;
+        }
+
+        public EntitySummary(EntityList.EntityElement entityProfile, InstancesResult.Instance[] instances) {
+            this.entityProfile = entityProfile;
+            this.instances = instances;
+        }
+
+        public String toString() {
+            StringBuilder buffer = new StringBuilder();
+            buffer.append(entityProfile.toString() + "\n");
+            buffer.append(Arrays.toString(instances) + "\n");
+            return buffer.toString();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/dc3e729a/client/src/main/java/org/apache/falcon/resource/ExtensionJobList.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/ExtensionJobList.java b/client/src/main/java/org/apache/falcon/resource/ExtensionJobList.java
new file mode 100644
index 0000000..71f92b8
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/resource/ExtensionJobList.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource;
+
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlElementWrapper;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Extension job list used for marshalling / unmarshalling with REST calls.
+ */
+@XmlRootElement
+@edu.umd.cs.findbugs.annotations.SuppressWarnings({"EI_EXPOSE_REP", "EI_EXPOSE_REP2"})
+public class ExtensionJobList {
+
+    @XmlElement
+    private int numJobs;
+
+    @XmlElementWrapper(name = "jobs")
+    private List<JobElement> job;
+
+    public ExtensionJobList() {
+        numJobs = 0;
+        job = null;
+    }
+
+    public ExtensionJobList(int numJobs) {
+        this.numJobs = numJobs;
+        job = new ArrayList<JobElement>();
+    }
+
+    public ExtensionJobList(int numJobs, List<JobElement> elements) {
+        this.numJobs = numJobs;
+        this.job = elements;
+    }
+
+    public void addJob(JobElement element) {
+        job.add(element);
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder buffer = new StringBuilder();
+        buffer.append(numJobs + "\n\n");
+        for (JobElement element : job) {
+            buffer.append(element.toString());
+        }
+        return buffer.toString();
+    }
+
+    /**
+     * Element for a job.
+     */
+    public static class JobElement {
+        @XmlElement
+        private String jobName;
+
+        @XmlElement
+        private EntityList jobEntities;
+
+        public JobElement() {
+            jobName = null;
+            jobEntities = null;
+        }
+
+        public JobElement(String name, EntityList entities) {
+            jobName = name;
+            jobEntities = entities;
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder buffer = new StringBuilder();
+            buffer.append("Job: " + jobName + ", #. entities: ");
+            buffer.append(jobEntities.toString() + "\n");
+            return buffer.toString();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/dc3e729a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index 3ebe612..fde0cd7 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -296,11 +296,21 @@ public abstract class AbstractEntityManager {
     protected APIResult update(InputStream inputStream, String type, String entityName,
                                String colo, Boolean skipDryRun) {
         checkColo(colo);
+        try {
+            EntityType entityType = EntityType.getEnum(type);
+            Entity entity = deserializeEntity(inputStream, entityType);
+            return update(entity, type, entityName, skipDryRun);
+        } catch (IOException | FalconException e) {
+            LOG.error("Update failed", e);
+            throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
+        }
+    }
+
+    protected APIResult update(Entity newEntity, String type, String entityName, Boolean skipDryRun) {
         List<Entity> tokenList = new ArrayList<>();
         try {
             EntityType entityType = EntityType.getEnum(type);
             Entity oldEntity = EntityUtil.getEntity(type, entityName);
-            Entity newEntity = deserializeEntity(inputStream, entityType);
             // KLUDGE - Until ACL is mandated entity passed should be decorated for equals check to pass
             decorateEntityWithACL(newEntity);
             validate(newEntity);
@@ -400,7 +410,7 @@ public abstract class AbstractEntityManager {
         }
     }
 
-    private void canRemove(Entity entity) throws FalconException {
+    protected void canRemove(Entity entity) throws FalconException {
         Pair<String, EntityType>[] referencedBy = EntityIntegrityChecker.referencedBy(entity);
         if (referencedBy != null && referencedBy.length > 0) {
             StringBuilder messages = new StringBuilder();
@@ -415,9 +425,14 @@ public abstract class AbstractEntityManager {
 
     protected Entity submitInternal(InputStream inputStream, String type, String doAsUser)
         throws IOException, FalconException {
-
         EntityType entityType = EntityType.getEnum(type);
         Entity entity = deserializeEntity(inputStream, entityType);
+        submitInternal(entity, doAsUser);
+        return entity;
+    }
+
+    protected synchronized void submitInternal(Entity entity, String doAsUser) throws IOException, FalconException {
+        EntityType entityType = entity.getEntityType();
         List<Entity> tokenList = new ArrayList<>();
         // KLUDGE - Until ACL is mandated entity passed should be decorated for equals check to pass
         decorateEntityWithACL(entity);
@@ -431,7 +446,7 @@ public abstract class AbstractEntityManager {
         Entity existingEntity = configStore.get(entityType, entity.getName());
         if (existingEntity != null) {
             if (EntityUtil.equals(existingEntity, entity)) {
-                return existingEntity;
+                return;
             }
 
             throw new EntityAlreadyExistsException(
@@ -442,8 +457,7 @@ public abstract class AbstractEntityManager {
         SecurityUtil.tryProxy(entity, doAsUser); // proxy before validating since FS/Oozie needs to be proxied
         validate(entity);
         configStore.publish(entityType, entity);
-        LOG.info("Submit successful: ({}): {}", type, entity.getName());
-        return entity;
+        LOG.info("Submit successful: ({}): {}", entityType, entity.getName());
     }
 
     /**
@@ -514,7 +528,7 @@ public abstract class AbstractEntityManager {
     }
 
     @SuppressWarnings({"unchecked", "rawtypes"})
-    private void validate(Entity entity) throws FalconException {
+    protected void validate(Entity entity) throws FalconException {
         EntityParser entityParser = EntityParserFactory.getParser(entity.getEntityType());
         entityParser.validate(entity);
     }
@@ -615,14 +629,37 @@ public abstract class AbstractEntityManager {
                                     String filterType, String filterTags, String filterBy,
                                     String orderBy, String sortOrder, Integer offset,
                                     Integer resultsPerPage, final String doAsUser) {
-
         HashSet<String> fields = new HashSet<String>(Arrays.asList(fieldStr.toUpperCase().split(",")));
         Map<String, List<String>> filterByFieldsValues = getFilterByFieldsValues(filterBy);
-        for (String  key : filterByFieldsValues.keySet()) {
+        for (String key : filterByFieldsValues.keySet()) {
             if (!key.toUpperCase().equals("NAME") && !key.toUpperCase().equals("CLUSTER")) {
                 fields.add(key.toUpperCase());
             }
         }
+        try {
+            // get filtered entities
+            List<Entity> entities = getEntityList(
+                    nameSubsequence, tagKeywords, filterType, filterTags, filterBy, doAsUser);
+
+            // sort entities and pagination
+            List<Entity> entitiesReturn = sortEntitiesPagination(entities, orderBy, sortOrder, offset, resultsPerPage);
+
+            // add total number of results
+            EntityList entityList = entitiesReturn.size() == 0
+                    ? new EntityList(new Entity[]{}, 0)
+                    : new EntityList(buildEntityElements(new HashSet<String>(fields), entitiesReturn), entities.size());
+            return entityList;
+        } catch (Exception e) {
+            LOG.error("Failed to get entity list", e);
+            throw FalconWebException.newAPIException(e);
+        }
+    }
+
+    public List<Entity> getEntityList(String nameSubsequence, String tagKeywords,
+                                      String filterType, String filterTags, String filterBy, final String doAsUser)
+        throws FalconException, IOException {
+
+        Map<String, List<String>> filterByFieldsValues = getFilterByFieldsValues(filterBy);
         validateEntityFilterByClause(filterByFieldsValues);
         if (StringUtils.isNotEmpty(filterTags)) {
             filterByFieldsValues.put(EntityList.EntityFilterByFields.TAGS.name(), Arrays.asList(filterTags));
@@ -630,34 +667,21 @@ public abstract class AbstractEntityManager {
 
         // get filtered entities
         List<Entity> entities = new ArrayList<Entity>();
-        try {
-            if (StringUtils.isEmpty(filterType)) {
-                // return entities of all types if no entity type specified
-                for (EntityType entityType : EntityType.values()) {
-                    entities.addAll(getFilteredEntities(
-                            entityType, nameSubsequence, tagKeywords, filterByFieldsValues, "", "", "", doAsUser));
-                }
-            } else {
-                String[] types = filterType.split(",");
-                for (String type : types) {
-                    EntityType entityType = EntityType.getEnum(type);
-                    entities.addAll(getFilteredEntities(
-                            entityType, nameSubsequence, tagKeywords, filterByFieldsValues, "", "", "", doAsUser));
-                }
+        if (StringUtils.isEmpty(filterType)) {
+            // return entities of all types if no entity type specified
+            for (EntityType entityType : EntityType.values()) {
+                entities.addAll(getFilteredEntities(
+                        entityType, nameSubsequence, tagKeywords, filterByFieldsValues, "", "", "", doAsUser));
+            }
+        } else {
+            String[] types = filterType.split(",");
+            for (String type : types) {
+                EntityType entityType = EntityType.getEnum(type);
+                entities.addAll(getFilteredEntities(
+                        entityType, nameSubsequence, tagKeywords, filterByFieldsValues, "", "", "", doAsUser));
             }
-        } catch (Exception e) {
-            LOG.error("Failed to get entity list", e);
-            throw FalconWebException.newAPIException(e);
         }
-
-        // sort entities and pagination
-        List<Entity> entitiesReturn = sortEntitiesPagination(entities, orderBy, sortOrder, offset, resultsPerPage);
-
-        // add total number of results
-        EntityList entityList = entitiesReturn.size() == 0
-                ? new EntityList(new Entity[]{}, 0)
-                : new EntityList(buildEntityElements(new HashSet<String>(fields), entitiesReturn), entities.size());
-        return entityList;
+        return entities;
     }
 
     protected List<Entity> sortEntitiesPagination(List<Entity> entities, String orderBy, String sortOrder,
@@ -1032,7 +1056,7 @@ public abstract class AbstractEntityManager {
         return retLen;
     }
 
-    private EntityElement[] buildEntityElements(HashSet<String> fields, List<Entity> entities) {
+    protected EntityElement[] buildEntityElements(HashSet<String> fields, List<Entity> entities) {
         EntityElement[] elements = new EntityElement[entities.size()];
         int elementIndex = 0;
         for (Entity entity : entities) {
@@ -1041,7 +1065,7 @@ public abstract class AbstractEntityManager {
         return elements;
     }
 
-    private EntityElement getEntityElement(Entity entity, HashSet<String> fields) {
+    protected EntityElement getEntityElement(Entity entity, HashSet<String> fields) {
         EntityElement elem = new EntityElement();
         elem.type = entity.getEntityType().toString();
         elem.name = entity.getName();

http://git-wip-us.apache.org/repos/asf/falcon/blob/dc3e729a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java b/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java
new file mode 100644
index 0000000..07d4217
--- /dev/null
+++ b/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java
@@ -0,0 +1,396 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource.extensions;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.POST;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.FalconWebException;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.extensions.Extension;
+import org.apache.falcon.extensions.ExtensionProperties;
+import org.apache.falcon.resource.APIResult;
+import org.apache.falcon.resource.AbstractSchedulableEntityManager;
+import org.apache.falcon.resource.EntityList;
+import org.apache.falcon.resource.ExtensionJobList;
+import org.apache.falcon.resource.ExtensionInstanceList;
+import org.apache.falcon.resource.InstancesResult;
+import org.apache.falcon.util.DeploymentUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Jersey Resource for extension job operations.
+ */
+@Path("extension")
+public class ExtensionManager extends AbstractSchedulableEntityManager {
+    public static final Logger LOG = LoggerFactory.getLogger(ExtensionManager.class);
+
+    public static final String TAG_PREFIX_EXTENSION_NAME = "_falcon_extension_name=";
+    public static final String TAG_PREFIX_EXTENSION_JOB = "_falcon_extension_job=";
+    public static final String TAG_SEPARATOR = ",";
+    public static final String ASCENDING_SORT_ORDER = "asc";
+    public static final String DESCENDING_SORT_ORDER = "desc";
+
+    private Extension extension = new Extension();
+
+    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
+    @GET
+    @Path("list/{extension-name}")
+    @Produces({MediaType.TEXT_XML, MediaType.APPLICATION_JSON})
+    public ExtensionJobList getExtensionJobs(
+            @PathParam("extension-name") String extensionName,
+            @DefaultValue("") @QueryParam("fields") String fields,
+            @DefaultValue(ASCENDING_SORT_ORDER) @QueryParam("sortOrder") String sortOrder,
+            @DefaultValue("0") @QueryParam("offset") Integer offset,
+            @QueryParam("numResults") Integer resultsPerPage,
+            @DefaultValue("") @QueryParam("doAs") String doAsUser) {
+        resultsPerPage = resultsPerPage == null ? getDefaultResultsPerPage() : resultsPerPage;
+        try {
+            // get filtered entities
+            List<Entity> entities = getEntityList("", "", "", TAG_PREFIX_EXTENSION_NAME + extensionName, "", doAsUser);
+
+            // group entities by extension job name
+            Map<String, List<Entity>> groupedEntities = groupEntitiesByJob(entities);
+
+            // sort by extension job name
+            List<String> jobNames = new ArrayList<>(groupedEntities.keySet());
+            switch (sortOrder.toLowerCase()) {
+            case DESCENDING_SORT_ORDER :
+                Collections.sort(jobNames, Collections.reverseOrder(String.CASE_INSENSITIVE_ORDER));
+                break;
+            default:
+                Collections.sort(jobNames, String.CASE_INSENSITIVE_ORDER);
+            }
+
+            // pagination and format output
+            int pageCount = getRequiredNumberOfResults(jobNames.size(), offset, resultsPerPage);
+            HashSet<String> fieldSet = new HashSet<>(Arrays.asList(fields.toUpperCase().split(",")));
+            ExtensionJobList jobList = new ExtensionJobList(pageCount);
+            for (int i = offset; i < offset + pageCount; i++) {
+                String jobName = jobNames.get(i);
+                List<Entity> jobEntities = groupedEntities.get(jobName);
+                EntityList entityList = new EntityList(buildEntityElements(fieldSet, jobEntities), jobEntities.size());
+                jobList.addJob(new ExtensionJobList.JobElement(jobName, entityList));
+            }
+            return jobList;
+        } catch (FalconException | IOException e) {
+            LOG.error("Failed to get extension job list of " + extensionName + ": ", e);
+            throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
+        }
+    }
+
+    @GET
+    @Path("instances/{job-name}")
+    @Produces(MediaType.APPLICATION_JSON)
+    public ExtensionInstanceList getInstances(
+            @PathParam("job-name") final String jobName,
+            @QueryParam("start") final String nominalStart,
+            @QueryParam("end") final String nominalEnd,
+            @DefaultValue("") @QueryParam("instanceStatus") String instanceStatus,
+            @DefaultValue("") @QueryParam("fields") String fields,
+            @DefaultValue("") @QueryParam("orderBy") String orderBy,
+            @DefaultValue("") @QueryParam("sortOrder") String sortOrder,
+            @DefaultValue("0") @QueryParam("offset") final Integer offset,
+            @QueryParam("numResults") Integer resultsPerPage,
+            @DefaultValue("") @QueryParam("doAs") String doAsUser) {
+        resultsPerPage = resultsPerPage == null ? getDefaultResultsPerPage() : resultsPerPage;
+        try {
+            List<Entity> entities = getEntityList("", "", "", TAG_PREFIX_EXTENSION_JOB + jobName, "", doAsUser);
+            HashSet<String> fieldSet = new HashSet<>(Arrays.asList(fields.toUpperCase().split(",")));
+            ExtensionInstanceList instances = new ExtensionInstanceList(entities.size());
+            for (Entity entity : entities) {
+                InstancesResult entityInstances = super.getStatus(
+                        entity.getEntityType().name(), entity.getName(), nominalStart, nominalEnd,
+                        null, null, "STATUS:" + instanceStatus, orderBy, sortOrder, offset, resultsPerPage, null);
+                instances.addEntitySummary(new ExtensionInstanceList.EntitySummary(
+                        getEntityElement(entity, fieldSet), entityInstances.getInstances()));
+            }
+            return instances;
+        } catch (FalconException | IOException e) {
+            LOG.error("Error when listing instances of extension job: " + jobName + ": ", e);
+            throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
+        }
+    }
+
+    @POST
+    @Path("schedule/{job-name}")
+    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
+    @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
+    public APIResult schedule(@PathParam("job-name") String jobName,
+                              @DefaultValue("") @QueryParam("doAs") String doAsUser) {
+        try {
+            List<Entity> entities = getEntityList("", "", "", TAG_PREFIX_EXTENSION_JOB + jobName, "", doAsUser);
+            for (Entity entity : entities) {
+                scheduleInternal(entity.getEntityType().name(), entity.getName(), null, null);
+            }
+        } catch (FalconException | IOException e) {
+            LOG.error("Error when scheduling extension job: " + jobName + ": ", e);
+            throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
+        }
+        return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " scheduled successfully");
+    }
+
+    @POST
+    @Path("suspend/{job-name}")
+    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
+    @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
+    public APIResult suspend(@PathParam("job-name") String jobName,
+                             @DefaultValue("") @QueryParam("doAs") String doAsUser) {
+        try {
+            List<Entity> entities = getEntityList("", "", "", TAG_PREFIX_EXTENSION_JOB + jobName, "", doAsUser);
+            for (Entity entity : entities) {
+                if (entity.getEntityType().isSchedulable()) {
+                    if (getWorkflowEngine(entity).isActive(entity)) {
+                        getWorkflowEngine(entity).suspend(entity);
+                    }
+                }
+            }
+        } catch (FalconException | IOException e) {
+            LOG.error("Error when scheduling extension job: " + jobName + ": ", e);
+            throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
+        }
+        return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " suspended successfully");
+    }
+
+    @POST
+    @Path("resume/{job-name}")
+    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
+    @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
+    public APIResult resume(@PathParam("job-name") String jobName,
+                            @DefaultValue("") @QueryParam("doAs") String doAsUser) {
+        try {
+            List<Entity> entities = getEntityList("", "", "", TAG_PREFIX_EXTENSION_JOB + jobName, "", doAsUser);
+            for (Entity entity : entities) {
+                if (entity.getEntityType().isSchedulable()) {
+                    if (getWorkflowEngine(entity).isSuspended(entity)) {
+                        getWorkflowEngine(entity).resume(entity);
+                    }
+                }
+            }
+        } catch (FalconException | IOException e) {
+            LOG.error("Error when resuming extension job " + jobName + ": ", e);
+            throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
+        }
+        return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " resumed successfully");
+    }
+
+    @POST
+    @Path("delete/{job-name}")
+    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
+    @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
+    public APIResult delete(@PathParam("job-name") String jobName,
+                            @DefaultValue("") @QueryParam("doAs") String doAsUser) {
+        try {
+            List<Entity> entities = getEntityList("", "", "", TAG_PREFIX_EXTENSION_JOB + jobName, "", doAsUser);
+            for (Entity entity : entities) {
+                // TODO(yzheng): need to remember the entity dependency graph for clean ordered removal
+                canRemove(entity);
+                if (entity.getEntityType().isSchedulable() && !DeploymentUtil.isPrism()) {
+                    getWorkflowEngine(entity).delete(entity);
+                }
+                configStore.remove(entity.getEntityType(), entity.getName());
+            }
+        } catch (FalconException | IOException e) {
+            LOG.error("Error when deleting extension job: " + jobName + ": ", e);
+            throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
+        }
+        return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " deleted successfully");
+    }
+
+    @POST
+    @Path("submit/{extension-name}")
+    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
+    @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
+    public APIResult submit(
+            @PathParam("extension-name") String extensionName,
+            @Context HttpServletRequest request,
+            @DefaultValue("") @QueryParam("doAs") String doAsUser) {
+        try {
+            List<Entity> entities = generateEntities(extensionName, request);
+            for (Entity entity : entities) {
+                submitInternal(entity, doAsUser);
+            }
+        } catch (FalconException | IOException e) {
+            LOG.error("Error when submitting extension job: ", e);
+            throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
+        }
+        return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted successfully");
+    }
+
+    @POST
+    @Path("submitAndSchedule/{extension-name}")
+    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
+    @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
+    public APIResult submitAndSchedule(
+            @PathParam("extension-name") String extensionName,
+            @Context HttpServletRequest request,
+            @DefaultValue("") @QueryParam("doAs") String doAsUser) {
+        try {
+            List<Entity> entities = generateEntities(extensionName, request);
+            for (Entity entity : entities) {
+                submitInternal(entity, doAsUser);
+            }
+            for (Entity entity : entities) {
+                scheduleInternal(entity.getEntityType().name(), entity.getName(), null, null);
+            }
+        } catch (FalconException | IOException e) {
+            LOG.error("Error when submitting extension job: ", e);
+            throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
+        }
+        return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted and scheduled successfully");
+    }
+
+    @POST
+    @Path("update/{extension-name}")
+    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
+    @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
+    public APIResult update(
+            @PathParam("extension-name") String extensionName,
+            @Context HttpServletRequest request,
+            @DefaultValue("") @QueryParam("doAs") String doAsUser) {
+        try {
+            List<Entity> entities = generateEntities(extensionName, request);
+            for (Entity entity : entities) {
+                super.update(entity, entity.getEntityType().name(), entity.getName(), null);
+            }
+        } catch (FalconException | IOException e) {
+            LOG.error("Error when updating extension job: ", e);
+            throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
+        }
+        return new APIResult(APIResult.Status.SUCCEEDED, "Updated successfully");
+    }
+
+    @POST
+    @Path("validate/{extension-name}")
+    @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
+    @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
+    public APIResult validate(
+            @PathParam("extension-name") String extensionName,
+            @Context HttpServletRequest request,
+            @DefaultValue("") @QueryParam("doAs") String doAsUser) {
+        try {
+            List<Entity> entities = generateEntities(extensionName, request);
+            for (Entity entity : entities) {
+                super.validate(entity);
+            }
+        } catch (FalconException | IOException e) {
+            LOG.error("Error when validating extension job: ", e);
+            throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
+        }
+        return new APIResult(APIResult.Status.SUCCEEDED, "Validated successfully");
+    }
+
+    private List<Entity> generateEntities(String extensionName, HttpServletRequest request)
+        throws FalconException, IOException {
+        // get entities for extension job
+        Properties properties = new Properties();
+        properties.load(request.getInputStream());
+        List<Entity> entities = extension.getEntities(extensionName, properties);
+
+        // add tags on extension name and job
+        for (Entity entity : entities) {
+            String tags = entity.getTags();
+            if (StringUtils.isEmpty(tags)) {
+                setEntityTags(entity, TAG_PREFIX_EXTENSION_NAME + extensionName + TAG_SEPARATOR
+                        + TAG_PREFIX_EXTENSION_JOB + properties.getProperty(ExtensionProperties.JOB_NAME.getName()));
+            } else {
+                if (tags.indexOf(TAG_PREFIX_EXTENSION_NAME) != -1) {
+                    throw new FalconException("Generated extention entity " + entity.getName()
+                            + " should not contain tag prefix " + TAG_PREFIX_EXTENSION_NAME);
+                }
+                if (tags.indexOf(TAG_PREFIX_EXTENSION_JOB) != -1) {
+                    throw new FalconException("Generated extention entity " + entity.getName()
+                            + " should not contain tag prefix " + TAG_PREFIX_EXTENSION_JOB);
+                }
+                setEntityTags(entity, tags + TAG_SEPARATOR + TAG_PREFIX_EXTENSION_NAME + extensionName + TAG_SEPARATOR
+                        + TAG_PREFIX_EXTENSION_JOB + properties.getProperty(ExtensionProperties.JOB_NAME.getName()));
+            }
+        }
+
+        return entities;
+    }
+
+    private void setEntityTags(Entity entity, String tags) {
+        switch (entity.getEntityType()) {
+        case PROCESS:
+            ((Process) entity).setTags(tags);
+            break;
+        case FEED:
+            ((Feed) entity).setTags(tags);
+            break;
+        case CLUSTER:
+            ((Cluster) entity).setTags(tags);
+            break;
+        default:
+            LOG.error("Unknown entity type: {}", entity.getEntityType().name());
+        }
+    }
+
+    private Map<String, List<Entity>> groupEntitiesByJob(List<Entity> entities) {
+        Map<String, List<Entity>> groupedEntities = new HashMap<>();
+        for (Entity entity : entities) {
+            String jobName = getJobNameFromTag(entity.getTags());
+            if (!groupedEntities.containsKey(jobName)) {
+                groupedEntities.put(jobName, new ArrayList<Entity>());
+            }
+            groupedEntities.get(jobName).add(entity);
+        }
+        return groupedEntities;
+    }
+
+    private String getJobNameFromTag(String tags) {
+        int nameStart = tags.indexOf(TAG_PREFIX_EXTENSION_JOB);
+        if (nameStart == -1) {
+            return null;
+        }
+
+        nameStart = nameStart + TAG_PREFIX_EXTENSION_JOB.length();
+        int nameEnd = tags.indexOf(',', nameStart);
+        if (nameEnd == -1) {
+            nameEnd = tags.length();
+        }
+        return tags.substring(nameStart, nameEnd);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/dc3e729a/prism/src/main/webapp/WEB-INF/web.xml
----------------------------------------------------------------------
diff --git a/prism/src/main/webapp/WEB-INF/web.xml b/prism/src/main/webapp/WEB-INF/web.xml
index 7c1a7ad..dcc114c 100644
--- a/prism/src/main/webapp/WEB-INF/web.xml
+++ b/prism/src/main/webapp/WEB-INF/web.xml
@@ -79,7 +79,9 @@
         <init-param>
             <param-name>com.sun.jersey.config.property.packages</param-name>
             <param-value>
-                org.apache.falcon.resource.admin,org.apache.falcon.resource.provider,org.apache.falcon.resource.proxy,org.apache.falcon.resource.metadata
+                org.apache.falcon.resource.admin,org.apache.falcon.resource.provider,
+                org.apache.falcon.resource.proxy,org.apache.falcon.resource.metadata,
+                org.apache.falcon.resource.extensions
             </param-value>
         </init-param>
         <load-on-startup>1</load-on-startup>

http://git-wip-us.apache.org/repos/asf/falcon/blob/dc3e729a/webapp/src/main/webapp/WEB-INF/distributed/web.xml
----------------------------------------------------------------------
diff --git a/webapp/src/main/webapp/WEB-INF/distributed/web.xml b/webapp/src/main/webapp/WEB-INF/distributed/web.xml
index 4741897..e67191f 100644
--- a/webapp/src/main/webapp/WEB-INF/distributed/web.xml
+++ b/webapp/src/main/webapp/WEB-INF/distributed/web.xml
@@ -99,7 +99,13 @@
         <init-param>
             <param-name>com.sun.jersey.config.property.classnames</param-name>
             <param-value>
-                org.apache.falcon.resource.admin.AdminResource,org.apache.falcon.resource.provider.JAXBContextResolver,org.apache.falcon.resource.SchedulableEntityManager,org.apache.falcon.resource.InstanceManager,org.apache.falcon.resource.metadata.MetadataDiscoveryResource,org.apache.falcon.resource.metadata.LineageMetadataResource
+                org.apache.falcon.resource.admin.AdminResource,
+                org.apache.falcon.resource.provider.JAXBContextResolver,
+                org.apache.falcon.resource.SchedulableEntityManager,
+                org.apache.falcon.resource.InstanceManager,
+                org.apache.falcon.resource.metadata.MetadataDiscoveryResource,
+                org.apache.falcon.resource.metadata.LineageMetadataResource,
+                org.apache.falcon.resource.extensions.ExtensionManager
             </param-value>
         </init-param>
         <load-on-startup>1</load-on-startup>

http://git-wip-us.apache.org/repos/asf/falcon/blob/dc3e729a/webapp/src/main/webapp/WEB-INF/embedded/web.xml
----------------------------------------------------------------------
diff --git a/webapp/src/main/webapp/WEB-INF/embedded/web.xml b/webapp/src/main/webapp/WEB-INF/embedded/web.xml
index 5ecfe77..084f9ab 100644
--- a/webapp/src/main/webapp/WEB-INF/embedded/web.xml
+++ b/webapp/src/main/webapp/WEB-INF/embedded/web.xml
@@ -79,7 +79,9 @@
         <init-param>
             <param-name>com.sun.jersey.config.property.packages</param-name>
             <param-value>
-                org.apache.falcon.resource.admin,org.apache.falcon.resource.provider,org.apache.falcon.resource.proxy,org.apache.falcon.resource.metadata
+                org.apache.falcon.resource.admin,org.apache.falcon.resource.provider,
+                org.apache.falcon.resource.proxy,org.apache.falcon.resource.metadata,
+                org.apache.falcon.resource.extensions
             </param-value>
         </init-param>
         <load-on-startup>1</load-on-startup>

http://git-wip-us.apache.org/repos/asf/falcon/blob/dc3e729a/webapp/src/main/webapp/WEB-INF/web.xml
----------------------------------------------------------------------
diff --git a/webapp/src/main/webapp/WEB-INF/web.xml b/webapp/src/main/webapp/WEB-INF/web.xml
index acfa938..048d085 100644
--- a/webapp/src/main/webapp/WEB-INF/web.xml
+++ b/webapp/src/main/webapp/WEB-INF/web.xml
@@ -79,7 +79,9 @@
         <init-param>
             <param-name>com.sun.jersey.config.property.packages</param-name>
             <param-value>
-                org.apache.falcon.resource.admin,org.apache.falcon.resource.proxy,org.apache.falcon.resource.provider,org.apache.falcon.resource.metadata
+                org.apache.falcon.resource.admin,org.apache.falcon.resource.provider,
+                org.apache.falcon.resource.proxy,org.apache.falcon.resource.metadata,
+                org.apache.falcon.resource.extensions
             </param-value>
         </init-param>
         <load-on-startup>1</load-on-startup>