You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2017/01/06 07:59:18 UTC
falcon git commit: FALCON-2235 Suspend/Resume API support for
extension job (user extension)
Repository: falcon
Updated Branches:
refs/heads/master 3f6b69024 -> 860866531
FALCON-2235 Suspend/Resume API support for extension job (user extension)
Author: sandeep <sa...@gmail.com>
Reviewers: @pallavi-rao
Closes #336 from sandeepSamudrala/FALCON-2235 and squashes the following commits:
f1f1f03 [sandeep] FALCON-2235 Incorporated review comments
554824d [sandeep] FALCON-2235 new bufferedRequest to let mark/reset apis validation work for the streams
80ffd94 [sandeep] FALCON-2235 Incorporated Review comments
73a57f8 [sandeep] FALCON-2235 Suspend/Resume API support for extension job (user extension)
32e9982 [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2235
73fbf75 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
cc28658 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
089b10d [sandeep] Merge branch 'master' of https://github.com/apache/falcon
456d4ee [sandeep] Merge branch 'master' of https://github.com/apache/falcon
0cf9af6 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
4a2e23e [sandeep] Merge branch 'master' of https://github.com/apache/falcon
b1546ed [sandeep] Merge branch 'master' of https://github.com/apache/falcon
0a433fb [sandeep] Merge branch 'master' of https://github.com/apache/falcon
194f36a [sandeep] Merge branch 'master' of https://github.com/apache/falcon
e0ad358 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
f96a084 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
9cf36e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
bbca081 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
48f6afa [sandeep] Merge branch 'master' of https://github.com/apache/falcon
250cc46 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d0393e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
a178805 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d6dc8bf [sandeep] Merge branch 'master' of https://github.com/apache/falcon
1bb8d3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon
c065566 [sandeep] reverting last line changes made
1a4dcd2 [sandeep] rebased and resolved the conflicts from master
271318b [sandeep] FALCON-2097. Adding UT to the new method for getting next instance time with Delay.
a94d4fe [sandeep] rebasing from master
9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/86086653
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/86086653
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/86086653
Branch: refs/heads/master
Commit: 86086653134d9c0ccd854237f40a6a15276c0a41
Parents: 3f6b690
Author: sandeep <sa...@gmail.com>
Authored: Fri Jan 6 13:29:02 2017 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Fri Jan 6 13:29:02 2017 +0530
----------------------------------------------------------------------
.../apache/falcon/cli/FalconExtensionCLI.java | 6 +-
.../falcon/client/AbstractFalconClient.java | 19 +++
.../org/apache/falcon/client/FalconClient.java | 6 +-
.../resource/AbstractExtensionManager.java | 39 ++----
.../falcon/resource/proxy/EntityProxyUtil.java | 49 +++++++
.../resource/proxy/ExtensionManagerProxy.java | 136 ++++++++++---------
.../proxy/SchedulableEntityManagerProxy.java | 41 +-----
.../apache/falcon/unit/FalconUnitClient.java | 22 ++-
.../falcon/unit/LocalExtensionManager.java | 42 ++++--
.../org/apache/falcon/unit/TestFalconUnit.java | 17 ++-
.../falcon/resource/ExtensionManager.java | 22 +++
11 files changed, 251 insertions(+), 148 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/86086653/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
----------------------------------------------------------------------
diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
index 2a105dc..60578d0 100644
--- a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
+++ b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
@@ -139,10 +139,12 @@ public class FalconExtensionCLI extends FalconCLI{
result = client.scheduleExtensionJob(jobName, colo, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.SUSPEND_OPT)) {
validateRequiredParameter(jobName, JOB_NAME_OPT);
- result = client.suspendExtensionJob(jobName, doAsUser).getMessage();
+ colo = getColo(colo);
+ result = client.suspendExtensionJob(jobName, colo, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.RESUME_OPT)) {
validateRequiredParameter(jobName, JOB_NAME_OPT);
- result = client.resumeExtensionJob(jobName, doAsUser).getMessage();
+ colo = getColo(colo);
+ result = client.resumeExtensionJob(jobName, colo, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.DELETE_OPT)) {
validateRequiredParameter(jobName, JOB_NAME_OPT);
result = client.deleteExtensionJob(jobName, doAsUser).getMessage();
http://git-wip-us.apache.org/repos/asf/falcon/blob/86086653/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
index 7b8a606..49392c2 100644
--- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
@@ -255,6 +255,25 @@ public abstract class AbstractFalconClient {
* @return APIResult status of the deletion query.
*/
public abstract APIResult deleteExtensionJob(final String jobName, final String doAsUser);
+
+ /**
+ *
+ * @param jobName name of the extension that has to be suspended.
+ * @param coloExpr comma separated list of colos where the operation has to be performed.
+ * @param doAsUser proxy user
+ * @return result status of the suspend operation.
+ */
+ public abstract APIResult suspendExtensionJob(final String jobName, final String coloExpr, final String doAsUser);
+
+ /**
+ *
+ * @param jobName name of the extension that has to be resumed.
+ * @param coloExpr comma separated list of colos where the operation has to be performed.
+ * @param doAsUser proxy user.
+ * @return result status of the resume operation.
+ */
+ public abstract APIResult resumeExtensionJob(final String jobName, final String coloExpr, final String doAsUser);
+
/**
* Prepares set of entities the extension has implemented to validate the extension job.
* @param jobName job name of the extension job.
http://git-wip-us.apache.org/repos/asf/falcon/blob/86086653/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 2772085..cf457ea 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -1209,17 +1209,19 @@ public class FalconClient extends AbstractFalconClient {
return getResponse(APIResult.class, clientResponse);
}
- public APIResult suspendExtensionJob(final String jobName, final String doAsUser) {
+ public APIResult suspendExtensionJob(final String jobName, final String coloExpr, final String doAsUser) {
ClientResponse clientResponse = new ResourceBuilder()
.path(ExtensionOperations.SUSPEND.path, jobName)
+ .addQueryParam(COLO, coloExpr)
.addQueryParam(DO_AS_OPT, doAsUser)
.call(ExtensionOperations.SUSPEND);
return getResponse(APIResult.class, clientResponse);
}
- public APIResult resumeExtensionJob(final String jobName, final String doAsUser) {
+ public APIResult resumeExtensionJob(final String jobName, final String coloExpr, final String doAsUser) {
ClientResponse clientResponse = new ResourceBuilder()
.path(ExtensionOperations.RESUME.path, jobName)
+ .addQueryParam(COLO, coloExpr)
.addQueryParam(DO_AS_OPT, doAsUser)
.call(ExtensionOperations.RESUME);
return getResponse(APIResult.class, clientResponse);
http://git-wip-us.apache.org/repos/asf/falcon/blob/86086653/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
index 63bf1b6..ff89682 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
@@ -20,11 +20,8 @@ package org.apache.falcon.resource;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.FalconWebException;
-import org.apache.falcon.entity.EntityNotRegisteredException;
-import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.parser.ValidationException;
import org.apache.falcon.extensions.ExtensionStatus;
-import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
import org.apache.falcon.extensions.store.ExtensionStore;
@@ -37,8 +34,6 @@ import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.core.Response;
-import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -59,9 +54,9 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager {
private static final String LAST_UPDATE_TIME = "lastUpdatedTime";
public static final String NAME = "name";
- protected static final String EXTENSION_TYPE = "type";
- protected static final String EXTENSION_DESC = "description";
- protected static final String EXTENSION_LOCATION = "location";
+ private static final String EXTENSION_TYPE = "type";
+ private static final String EXTENSION_DESC = "description";
+ private static final String EXTENSION_LOCATION = "location";
protected static void validateExtensionName(final String extensionName) {
if (StringUtils.isBlank(extensionName)) {
@@ -114,28 +109,14 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager {
}
}
- protected SortedMap<EntityType, List<Entity>> getJobEntities(ExtensionJobsBean extensionJobsBean)
- throws FalconException, IOException {
- TreeMap<EntityType, List<Entity>> entityMap = new TreeMap<>();
- List<String> processes = extensionJobsBean.getProcesses();
- List<String> feeds = extensionJobsBean.getFeeds();
- entityMap.put(EntityType.PROCESS, getEntities(processes, EntityType.PROCESS));
- entityMap.put(EntityType.FEED, getEntities(feeds, EntityType.FEED));
+ protected SortedMap<EntityType, List<String>> getJobEntities(ExtensionJobsBean extensionJobsBean)
+ throws FalconException {
+ TreeMap<EntityType, List<String>> entityMap = new TreeMap<>();
+ entityMap.put(EntityType.PROCESS, extensionJobsBean.getProcesses());
+ entityMap.put(EntityType.FEED, extensionJobsBean.getFeeds());
return entityMap;
}
- private List<Entity> getEntities(List<String> entityNames, EntityType entityType) throws FalconException {
- List<Entity> entities = new ArrayList<>();
- for (String entityName : entityNames) {
- try {
- entities.add(EntityUtil.getEntity(entityType, entityName));
- } catch (EntityNotRegisteredException e) {
- LOG.error("Entity {} not found during deletion nothing to do", entityName);
- }
- }
- return entities;
- }
-
private JSONObject buildExtensionJobDetailResult(final String jobName) throws FalconException {
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
ExtensionJobsBean jobsBean = metaStore.getExtensionJobDetails(jobName);
@@ -174,7 +155,7 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager {
return tags.substring(nameStart, nameEnd);
}
- public String disableExtension(String extensionName, String currentUser) {
+ protected String disableExtension(String extensionName, String currentUser) {
validateExtensionName(extensionName);
try {
return ExtensionStore.get().updateExtensionStatus(extensionName, currentUser, ExtensionStatus.DISABLED);
@@ -183,7 +164,7 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager {
}
}
- public String enableExtension(String extensionName, String currentUser) {
+ protected String enableExtension(String extensionName, String currentUser) {
validateExtensionName(extensionName);
try {
return ExtensionStore.get().updateExtensionStatus(extensionName, currentUser, ExtensionStatus.ENABLED);
http://git-wip-us.apache.org/repos/asf/falcon/blob/86086653/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxyUtil.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxyUtil.java b/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxyUtil.java
index ae0a61a..7d00442 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxyUtil.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/EntityProxyUtil.java
@@ -120,6 +120,55 @@ class EntityProxyUtil {
return results;
}
+ APIResult proxySchedule(final String type, final String entity, final String coloExpr,
+ final Boolean skipDryRun, final String properties,
+ final HttpServletRequest bufferedRequest) {
+ return new EntityProxy(type, entity) {
+ @Override
+ protected Set<String> getColosToApply() {
+ return getColosFromExpression(coloExpr, type, entity);
+ }
+
+ @Override
+ protected APIResult doExecute(String colo) throws FalconException {
+ return getEntityManager(colo).invoke("schedule", bufferedRequest, type, entity,
+ colo, skipDryRun, properties);
+ }
+ }.execute();
+ }
+
+ APIResult proxySuspend(final String type, final String entity, final String coloExpr,
+ final HttpServletRequest bufferedRequest) {
+ return new EntityProxy(type, entity) {
+ @Override
+ protected Set<String> getColosToApply() {
+ return getColosFromExpression(coloExpr, type, entity);
+ }
+
+ @Override
+ protected APIResult doExecute(String colo) throws FalconException {
+ return getEntityManager(colo).invoke("suspend", bufferedRequest, type, entity,
+ colo);
+ }
+ }.execute();
+ }
+
+ APIResult proxyResume(final String type, final String entity, final String coloExpr,
+ final HttpServletRequest bufferedRequest) {
+ return new EntityProxy(type, entity) {
+ @Override
+ protected Set<String> getColosToApply() {
+ return getColosFromExpression(coloExpr, type, entity);
+ }
+
+ @Override
+ protected APIResult doExecute(String colo) throws FalconException {
+ return getEntityManager(colo).invoke("resume", bufferedRequest, type, entity,
+ colo);
+ }
+ }.execute();
+ }
+
Map<String, APIResult> proxyUpdate(final String type, final String entityName, final Boolean skipDryRun,
final HttpServletRequest bufferedRequest, Entity newEntity) {
final Set<String> oldColos = getApplicableColos(type, entityName);
http://git-wip-us.apache.org/repos/asf/falcon/blob/86086653/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
index 6f75dc7..8733170 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
@@ -205,11 +205,11 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
Response.Status.NOT_FOUND);
}
- SortedMap<EntityType, List<Entity>> entityMap;
+ SortedMap<EntityType, List<String>> entityMap;
try {
entityMap = getJobEntities(extensionJobsBean);
scheduleEntities(entityMap, request, coloExpr);
- } catch (FalconException | IOException | JAXBException e) {
+ } catch (FalconException e) {
LOG.error("Error while scheduling entities of the extension: " + jobName + ": ", e);
throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
}
@@ -221,27 +221,47 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
@Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
@Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
public APIResult suspend(@PathParam("job-name") String jobName,
- @DefaultValue("") @QueryParam("doAs") String doAsUser) {
+ @Context HttpServletRequest request,
+ @DefaultValue("") @QueryParam("doAs") String doAsUser,
+ @QueryParam("colo") final String coloExpr) {
checkIfExtensionServiceIsEnabled();
+ ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
+ ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName);
+ if (extensionJobsBean == null) {
+ // return failure if the extension job doesn't exist
+ LOG.error("Extension Job not found:" + jobName);
+ throw FalconWebException.newAPIException("ExtensionJob not found:" + jobName,
+ Response.Status.NOT_FOUND);
+ }
+
try {
- List<Entity> entities = getEntityList("", "", "", TAG_PREFIX_EXTENSION_JOB + jobName, "", doAsUser);
- if (entities.isEmpty()) {
- // return failure if the extension job doesn't exist
- return new APIResult(APIResult.Status.FAILED, "Extension job " + jobName + " doesn't exist.");
+ SortedMap<EntityType, List<String>> entityNameMap = getJobEntities(extensionJobsBean);
+ suspendEntities(entityNameMap, coloExpr, request);
+ } catch (FalconException e) {
+ LOG.error("Error while suspending entities of the extension: " + jobName + ": ", e);
+ throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " suspended successfully");
+ }
+
+ private void suspendEntities(SortedMap<EntityType, List<String>> entityNameMap, String coloExpr,
+ final HttpServletRequest request) throws FalconException {
+ HttpServletRequest bufferedRequest = new BufferedRequest(request);
+ for (Map.Entry<EntityType, List<String>> entityTypeEntry : entityNameMap.entrySet()) {
+ for (final String entityName : entityTypeEntry.getValue()) {
+ entityProxyUtil.proxySuspend(entityTypeEntry.getKey().name(), entityName, coloExpr, bufferedRequest);
}
+ }
+ }
- for (Entity entity : entities) {
- if (entity.getEntityType().isSchedulable()) {
- if (getWorkflowEngine(entity).isActive(entity)) {
- getWorkflowEngine(entity).suspend(entity);
- }
- }
+ private void resumeEntities(SortedMap<EntityType, List<String>> entityNameMap, String coloExpr,
+ final HttpServletRequest request) throws FalconException {
+ HttpServletRequest bufferedRequest = new BufferedRequest(request);
+ for (Map.Entry<EntityType, List<String>> entityTypeEntry : entityNameMap.entrySet()) {
+ for (final String entityName : entityTypeEntry.getValue()) {
+ entityProxyUtil.proxyResume(entityTypeEntry.getKey().name(), entityName, coloExpr, bufferedRequest);
}
- } catch (FalconException | IOException e) {
- LOG.error("Error when scheduling extension job: " + jobName + ": ", e);
- throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
}
- return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " suspended successfully");
}
@POST
@@ -249,24 +269,23 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
@Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
@Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
public APIResult resume(@PathParam("job-name") String jobName,
+ @Context HttpServletRequest request,
+ @QueryParam("colo") final String coloExpr,
@DefaultValue("") @QueryParam("doAs") String doAsUser) {
checkIfExtensionServiceIsEnabled();
+ ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
+ ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName);
+ if (extensionJobsBean == null) {
+ // return failure if the extension job doesn't exist
+ LOG.error("Extension Job not found:" + jobName);
+ throw FalconWebException.newAPIException("ExtensionJob not found:" + jobName,
+ Response.Status.NOT_FOUND);
+ }
try {
- List<Entity> entities = getEntityList("", "", "", TAG_PREFIX_EXTENSION_JOB + jobName, "", doAsUser);
- if (entities.isEmpty()) {
- // return failure if the extension job doesn't exist
- return new APIResult(APIResult.Status.FAILED, "Extension job " + jobName + " doesn't exist.");
- }
-
- for (Entity entity : entities) {
- if (entity.getEntityType().isSchedulable()) {
- if (getWorkflowEngine(entity).isSuspended(entity)) {
- getWorkflowEngine(entity).resume(entity);
- }
- }
- }
- } catch (FalconException | IOException e) {
- LOG.error("Error when resuming extension job " + jobName + ": ", e);
+ SortedMap<EntityType, List<String>> entityNameMap = getJobEntities(extensionJobsBean);
+ resumeEntities(entityNameMap, coloExpr, request);
+ } catch (FalconException e) {
+ LOG.error("Error while resuming entities of the extension: " + jobName + ": ", e);
throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
}
return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " resumed successfully");
@@ -288,11 +307,11 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
"Extension job " + jobName + " doesn't exist. Nothing to delete.");
}
- SortedMap<EntityType, List<Entity>> entityMap;
+ SortedMap<EntityType, List<String>> entityMap;
try {
entityMap = getJobEntities(extensionJobsBean);
deleteEntities(entityMap, request);
- } catch (FalconException | IOException | JAXBException e) {
+ } catch (FalconException e) {
LOG.error("Error when deleting extension job: " + jobName + ": ", e);
throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
}
@@ -388,10 +407,13 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
checkIfExtensionIsEnabled(extensionName);
checkIfExtensionJobExists(jobName, extensionName);
SortedMap<EntityType, List<Entity>> entityMap;
+ SortedMap<EntityType, List<String>> entityNameMap;
+ ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
try {
entityMap = getEntityList(extensionName, jobName, feedForms, processForms, config);
submitEntities(extensionName, jobName, entityMap, config, request);
- scheduleEntities(entityMap, request, coloExpr);
+ entityNameMap = getJobEntities(metaStore.getExtensionJobDetails(jobName));
+ scheduleEntities(entityNameMap, request, coloExpr);
} catch (FalconException | IOException | JAXBException e) {
LOG.error("Error while submitting extension job: ", e);
throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
@@ -399,27 +421,13 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted and scheduled successfully");
}
- private void scheduleEntities(Map<EntityType, List<Entity>> entityMap, HttpServletRequest request, String coloExpr)
- throws FalconException, JAXBException, IOException {
- for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
- for (final Entity entity : entry.getValue()) {
- final HttpServletRequest httpServletRequest = getEntityStream(entity, entity.getEntityType(), request);
- final HttpServletRequest bufferedRequest = getBufferedRequest(httpServletRequest);
- final Set<String> colos = getColosFromExpression(coloExpr, entity.getEntityType().name(), entity);
-
- new EntityProxy(entity.getEntityType().toString(), entity.getName()) {
- @Override
- protected Set<String> getColosToApply() {
- return colos;
- }
-
- @Override
- protected APIResult doExecute(String colo) throws FalconException {
- return new EntityProxyUtil().getEntityManager(colo).invoke("schedule", bufferedRequest,
- entity.getEntityType().toString(),
- entity.getName(), colo, Boolean.FALSE, "");
- }
- }.execute();
+ private void scheduleEntities(SortedMap<EntityType, List<String>> entityMap, HttpServletRequest request,
+ String coloExpr) throws FalconException {
+ HttpServletRequest bufferedRequest = new BufferedRequest(request);
+ for (Map.Entry<EntityType, List<String>> entityTypeEntry : entityMap.entrySet()) {
+ for (final String entityName : entityTypeEntry.getValue()) {
+ entityProxyUtil.proxySchedule(entityTypeEntry.getKey().name(), entityName, coloExpr,
+ Boolean.FALSE, "", bufferedRequest);
}
}
}
@@ -431,16 +439,14 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
return new BufferedRequest(request);
}
- private void deleteEntities(SortedMap<EntityType, List<Entity>> entityMap, HttpServletRequest request)
- throws IOException, JAXBException {
- for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
- for (final Entity entity : entry.getValue()) {
- final HttpServletRequest bufferedRequest = getEntityStream(entity, entity.getEntityType(), request);
- final String entityType = entity.getEntityType().toString();
- final String entityName = entity.getName();
- entityProxyUtil.proxyDelete(entityType, entityName, bufferedRequest);
+ private void deleteEntities(SortedMap<EntityType, List<String>> entityMap, HttpServletRequest request)
+ throws FalconException {
+ for (Map.Entry<EntityType, List<String>> entityTypeEntry : entityMap.entrySet()) {
+ for (final String entityName : entityTypeEntry.getValue()) {
+ HttpServletRequest bufferedRequest = new BufferedRequest(request);
+ entityProxyUtil.proxyDelete(entityTypeEntry.getKey().name(), entityName, bufferedRequest);
if (!embeddedMode) {
- super.delete(bufferedRequest, entityType, entityName, currentColo);
+ super.delete(bufferedRequest, entityTypeEntry.getKey().name(), entityName, currentColo);
}
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/86086653/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
index 8f41c48..5b5d690 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
@@ -469,18 +469,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
@QueryParam("properties") final String properties) {
final HttpServletRequest bufferedRequest = getBufferedRequest(request);
- return new EntityProxy(type, entity) {
- @Override
- protected Set<String> getColosToApply() {
- return getColosFromExpression(coloExpr, type, entity);
- }
-
- @Override
- protected APIResult doExecute(String colo) throws FalconException {
- return entityProxyUtil.getEntityManager(colo).invoke("schedule", bufferedRequest, type, entity,
- colo, skipDryRun, properties);
- }
- }.execute();
+ return entityProxyUtil.proxySchedule(type, entity, coloExpr, skipDryRun, properties, bufferedRequest);
}
/**
@@ -531,22 +520,11 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
@Dimension("colo") @QueryParam("colo") final String coloExpr) {
final HttpServletRequest bufferedRequest = new BufferedRequest(request);
- return new EntityProxy(type, entity) {
- @Override
- protected Set<String> getColosToApply() {
- return getColosFromExpression(coloExpr, type, entity);
- }
-
- @Override
- protected APIResult doExecute(String colo) throws FalconException {
- return entityProxyUtil.getEntityManager(colo).invoke("suspend", bufferedRequest, type, entity,
- colo);
- }
- }.execute();
+ return entityProxyUtil.proxySuspend(type, entity, coloExpr, bufferedRequest);
}
/**
- * Resume a supended entity.
+ * Resume a suspended entity.
* @param request Servlet Request
* @param type Valid options are feed or process.
* @param entity Name of the entity.
@@ -564,18 +542,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
@Dimension("colo") @QueryParam("colo") final String coloExpr) {
final HttpServletRequest bufferedRequest = new BufferedRequest(request);
- return new EntityProxy(type, entity) {
- @Override
- protected Set<String> getColosToApply() {
- return getColosFromExpression(coloExpr, type, entity);
- }
-
- @Override
- protected APIResult doExecute(String colo) throws FalconException {
- return entityProxyUtil.getEntityManager(colo).invoke("resume", bufferedRequest, type, entity,
- colo);
- }
- }.execute();
+ return entityProxyUtil.proxyResume(type, entity, coloExpr, bufferedRequest);
}
//SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
http://git-wip-us.apache.org/repos/asf/falcon/blob/86086653/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
index 6a65d2c..d76dbca 100644
--- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
+++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
@@ -77,14 +77,14 @@ public class FalconUnitClient extends AbstractFalconClient {
private static final String DEFAULT_ORDER_BY = "status";
private static final String DEFAULT_SORTED_ORDER = "asc";
- protected ConfigurationStore configStore;
+ private ConfigurationStore configStore;
private AbstractWorkflowEngine workflowEngine;
private LocalSchedulableEntityManager localSchedulableEntityManager;
private LocalInstanceManager localInstanceManager;
private LocalExtensionManager localExtensionManager;
- public FalconUnitClient() throws FalconException {
+ FalconUnitClient() throws FalconException {
configStore = ConfigurationStore.get();
workflowEngine = WorkflowEngineFactory.getWorkflowEngine();
localSchedulableEntityManager = new LocalSchedulableEntityManager();
@@ -123,7 +123,6 @@ public class FalconUnitClient extends AbstractFalconClient {
* @param entityName entity name
* @param cluster cluster on which it has to be scheduled
* @return
- * @throws FalconException
*/
@Override
public APIResult schedule(EntityType entityType, String entityName, String cluster,
@@ -377,6 +376,23 @@ public class FalconUnitClient extends AbstractFalconClient {
}
}
+ @Override
+ public APIResult suspendExtensionJob(String jobName, String coloExpr, String doAsUser) {
+ try {
+ return localExtensionManager.suspendExtensionJob(jobName, coloExpr, doAsUser);
+ } catch (FalconException e) {
+ throw new FalconCLIException("Failed in suspending the extension job:" + jobName);
+ }
+ }
+
+ @Override
+ public APIResult resumeExtensionJob(String jobName, String coloExpr, String doAsUser) {
+ try {
+ return localExtensionManager.resumeExtensionJob(jobName, coloExpr, doAsUser);
+ } catch (FalconException e) {
+ throw new FalconCLIException("Failed in resuming the extension job:" + jobName);
+ }
+ }
@Override
public APIResult getExtensionJobDetails(final String jobName) {
http://git-wip-us.apache.org/repos/asf/falcon/blob/86086653/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
index 20ccfca..a32dbfa 100644
--- a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
+++ b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
@@ -39,7 +39,7 @@ import java.util.SortedMap;
/**
* A proxy implementation of the extension operations in local mode.
*/
-public class LocalExtensionManager extends AbstractExtensionManager {
+class LocalExtensionManager extends AbstractExtensionManager {
LocalExtensionManager() {}
APIResult submitExtensionJob(String extensionName, String jobName, InputStream configStream,
@@ -99,10 +99,10 @@ public class LocalExtensionManager extends AbstractExtensionManager {
throws FalconException, IOException{
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName);
- SortedMap<EntityType, List<Entity>> entityMap = getJobEntities(extensionJobsBean);
- for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
- for (Entity entity : entry.getValue()) {
- scheduleInternal(entity.getEntityType().name(), entity.getName(), true, null);
+ SortedMap<EntityType, List<String>> entityMap = getJobEntities(extensionJobsBean);
+ for (Map.Entry<EntityType, List<String>> entry : entityMap.entrySet()) {
+ for (String entityName : entry.getValue()) {
+ scheduleInternal(entry.getKey().name(), entityName, true, null);
}
}
return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " scheduled successfully");
@@ -111,10 +111,10 @@ public class LocalExtensionManager extends AbstractExtensionManager {
APIResult deleteExtensionJob(String jobName) throws FalconException, IOException {
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName);
- SortedMap<EntityType, List<Entity>> entityMap = getJobEntities(extensionJobsBean);
- for (Map.Entry<EntityType, List<Entity>> entry : entityMap.entrySet()) {
- for (Entity entity : entry.getValue()) {
- delete(entity.getEntityType().name(), entity.getName(), null);
+ SortedMap<EntityType, List<String>> entityMap = getJobEntities(extensionJobsBean);
+ for (Map.Entry<EntityType, List<String>> entry : entityMap.entrySet()) {
+ for (String entityName : entry.getValue()) {
+ delete(entry.getKey().name(), entityName, null);
}
}
ExtensionStore.getMetaStore().deleteExtensionJob(jobName);
@@ -148,6 +148,30 @@ public class LocalExtensionManager extends AbstractExtensionManager {
return new APIResult(APIResult.Status.SUCCEEDED, "Updated successfully");
}
+ APIResult suspendExtensionJob(String jobName, String coloExpr, String doAsUser) throws FalconException {
+ ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
+ ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName);
+ SortedMap<EntityType, List<String>> entityMap = getJobEntities(extensionJobsBean);
+ for (Map.Entry<EntityType, List<String>> entityTypeEntry : entityMap.entrySet()) {
+ for (String entityName : entityTypeEntry.getValue()) {
+ super.suspend(null, entityTypeEntry.getKey().name(), entityName, coloExpr);
+ }
+ }
+ return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " suspended successfully");
+ }
+
+ APIResult resumeExtensionJob(String jobName, String coloExpr, String doAsUser) throws FalconException {
+ ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
+ ExtensionJobsBean extensionJobsBean = metaStore.getExtensionJobDetails(jobName);
+ SortedMap<EntityType, List<String>> entityMap = getJobEntities(extensionJobsBean);
+ for (Map.Entry<EntityType, List<String>> entityTypeEntry : entityMap.entrySet()) {
+ for (String entityName : entityTypeEntry.getValue()) {
+ super.resume(null, entityTypeEntry.getKey().name(), entityName, coloExpr);
+ }
+ }
+ return new APIResult(APIResult.Status.SUCCEEDED, "Extension job " + jobName + " suspended successfully");
+ }
+
APIResult registerExtensionMetadata(String extensionName, String packagePath, String description) {
return super.registerExtensionMetadata(extensionName, packagePath, description, CurrentUser.getUser());
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/86086653/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
index 508a7bb..5717fc2 100644
--- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
+++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
@@ -438,6 +438,8 @@ public class TestFalconUnit extends FalconUnitTestBase {
clearDB();
submitCluster();
createExtensionPackage();
+ createDir(PROCESS_APP_PATH);
+ fs.copyFromLocalFile(new Path(getAbsolutePath(WORKFLOW)), new Path(PROCESS_APP_PATH, "workflow.xml"));
String packageBuildLib = new Path(EXTENSION_PATH, "libs/build/").toString();
String result = registerExtension(TEST_EXTENSION, STORAGE_URL + EXTENSION_PATH, TEST_EXTENSION);
Assert.assertEquals(result, "Extension :testExtension registered successfully.");
@@ -454,6 +456,14 @@ public class TestFalconUnit extends FalconUnitTestBase {
apiResult = getClient().scheduleExtensionJob(TEST_JOB, null, null);
assertStatus(apiResult);
+
+ apiResult = getClient().suspendExtensionJob(TEST_JOB, null, null);
+ assertStatus(apiResult);
+ apiResult = getClient().getStatus(EntityType.PROCESS, "sample", CLUSTER_NAME, null, false);
+ Assert.assertEquals(apiResult.getMessage(), "SUSPENDED");
+
+ apiResult = getClient().resumeExtensionJob(TEST_JOB, null, null);
+ assertStatus(apiResult);
apiResult = getClient().getStatus(EntityType.PROCESS, "sample", CLUSTER_NAME, null, false);
assertStatus(apiResult);
Assert.assertEquals(apiResult.getMessage(), "RUNNING");
@@ -487,8 +497,13 @@ public class TestFalconUnit extends FalconUnitTestBase {
}
}
+ @Test
+ public void testExtensionJobSuspendAndResume() throws Exception {
+
+ }
+
- void copyExtensionJar(String destDirPath) throws IOException {
+ private void copyExtensionJar(String destDirPath) throws IOException {
File dir = new File(new Path(JARS_DIR).toUri().toURL().getPath());
for (File file : dir.listFiles()) {
if (file.toString().endsWith(".jar")) {
http://git-wip-us.apache.org/repos/asf/falcon/blob/86086653/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java b/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java
index 3a6c9c0..ac05b0f 100644
--- a/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java
+++ b/webapp/src/main/java/org/apache/falcon/resource/ExtensionManager.java
@@ -130,6 +130,28 @@ public class ExtensionManager extends AbstractExtensionManager {
+ "on Prism.");
}
+ @POST
+ @Path("suspend/{job-name}")
+ @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
+ @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
+ public APIResult suspend(@PathParam("job-name") String jobName,
+ @DefaultValue("") @QueryParam("doAs") String doAsUser) {
+ LOG.error("Suspend of an extension job is not supported on Server.Please run your operation on Prism ");
+ throw FalconWebException.newAPIException("Suspend of an extension job is not supported on Server."
+ + "Please run your operation on Prism.");
+ }
+
+ @POST
+ @Path("resume/{job-name}")
+ @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
+ @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
+ public APIResult resume(@PathParam("job-name") String jobName,
+ @DefaultValue("") @QueryParam("doAs") String doAsUser) {
+ LOG.error("Resume of an extension job is not supported on Server.Please run your operation on Prism ");
+ throw FalconWebException.newAPIException("Resume of an extension job is not supported on Server."
+ + "Please run your operation on Prism.");
+ }
+
@GET
@Path("definition/{extension-name}")
@Produces({MediaType.APPLICATION_JSON})