You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2015/10/26 12:14:57 UTC
falcon git commit: FALCON-1520 Delete, update,
Validate entity operations support in Falcon Unit (by Pavan Kolamuri)
Repository: falcon
Updated Branches:
refs/heads/master c30fce751 -> 7854e3d90
FALCON-1520 Delete, update, Validate entity operations support in Falcon Unit (by Pavan Kolamuri)
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/7854e3d9
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/7854e3d9
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/7854e3d9
Branch: refs/heads/master
Commit: 7854e3d90e7a5258b85f45afd00694e3f5157142
Parents: c30fce7
Author: Pallavi Rao <pa...@inmobi.com>
Authored: Mon Oct 26 16:44:30 2015 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Mon Oct 26 16:44:30 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../falcon/client/AbstractFalconClient.java | 61 +++++-
.../falcon/client/FalconCLIException.java | 4 +
.../falcon/entity/store/ConfigurationStore.java | 2 +
.../oozie/client/LocalProxyOozieClient.java | 4 +-
.../falcon/resource/AbstractEntityManager.java | 47 +++--
.../AbstractSchedulableEntityManager.java | 4 +-
.../proxy/SchedulableEntityManagerProxy.java | 4 +-
.../apache/falcon/unit/FalconUnitClient.java | 89 +++++----
.../unit/LocalSchedulableEntityManager.java | 31 +++-
.../apache/falcon/unit/FalconUnitTestBase.java | 11 +-
.../org/apache/falcon/unit/TestFalconUnit.java | 185 +++++++++++++++----
unit/src/test/resources/process1.xml | 50 -----
13 files changed, 341 insertions(+), 152 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/7854e3d9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 95dd69d..c00c265 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,7 @@ Trunk (Unreleased)
FALCON-1213 Base framework of the native scheduler(Pallavi Rao)
IMPROVEMENTS
+ FALCON-1520 Delete, update, Validate entity operations support in Falcon Unit (Pavan Kolamuri via Pallavi Rao)
OPTIMIZATIONS
http://git-wip-us.apache.org/repos/asf/falcon/blob/7854e3d9/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
index b889931..91d5324 100644
--- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
@@ -18,6 +18,7 @@
package org.apache.falcon.client;
import org.apache.falcon.LifeCycle;
+import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.InstancesResult;
@@ -31,12 +32,14 @@ import java.util.List;
*/
public abstract class AbstractFalconClient {
+ //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
+
/**
* Submit a new entity. Entities can be of type feed, process or data end
* points. Entity definitions are validated structurally against schema and
* subsequently for other rules before they are admitted into the system.
- * @param entityType
- * @param filePath
+ * @param entityType Entity type. Valid options are cluster, feed or process.
+ * @param filePath Path for the entity definition
* @return
* @throws FalconCLIException
*/
@@ -45,17 +48,63 @@ public abstract class AbstractFalconClient {
/**
* Schedules an submitted process entity immediately.
- * @param entityType
- * @param entityName
- * @param colo
+ * @param entityType Entity type. Valid options are cluster, feed or process.
+ * @param entityName Name of the entity.
+ * @param colo Cluster name.
* @return
* @throws FalconCLIException
*/
public abstract APIResult schedule(EntityType entityType, String entityName, String colo, Boolean skipDryRun,
String doAsuser, String properties) throws FalconCLIException;
+ /**
+ * Delete the specified entity.
+ * @param entityType Entity type. Valid options are cluster, feed or process.
+ * @param entityName Name of the entity.
+ * @param doAsUser Proxy User.
+ * @return
+ * @throws FalconCLIException
+ */
+ public abstract APIResult delete(EntityType entityType, String entityName,
+ String doAsUser) throws FalconCLIException;
+
+ /**
+ * Validates the submitted entity.
+ * @param entityType Entity type. Valid options are cluster, feed or process.
+ * @param filePath Path for the entity definition to validate.
+ * @param skipDryRun Dry run.
+ * @param doAsUser Proxy User.
+ * @return
+ * @throws FalconCLIException
+ */
+ public abstract APIResult validate(String entityType, String filePath, Boolean skipDryRun,
+ String doAsUser) throws FalconCLIException;
+
+ /**
+ * Updates the submitted entity.
+ * @param entityType Entity type. Valid options are cluster, feed or process.
+ * @param entityName Name of the entity.
+ * @param filePath Path for the entity definition to update.
+ * @param skipDryRun Dry run.
+ * @param doAsUser Proxy User.
+ * @return
+ * @throws FalconCLIException
+ */
+ public abstract APIResult update(String entityType, String entityName, String filePath,
+ Boolean skipDryRun, String doAsUser) throws FalconCLIException;
+
+ /**
+ * Get definition of the entity.
+ * @param entityType Entity type. Valid options are cluster, feed or process.
+ * @param entityName Name of the entity.
+ * @param doAsUser Proxy user.
+ * @return
+ * @throws FalconCLIException
+ */
+ public abstract Entity getDefinition(String entityType, String entityName,
+ String doAsUser) throws FalconCLIException;
+
- //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
/**
*
http://git-wip-us.apache.org/repos/asf/falcon/blob/7854e3d9/client/src/main/java/org/apache/falcon/client/FalconCLIException.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconCLIException.java b/client/src/main/java/org/apache/falcon/client/FalconCLIException.java
index ec74c27..51ef952 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconCLIException.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconCLIException.java
@@ -36,6 +36,10 @@ public class FalconCLIException extends Exception {
super(msg);
}
+ public FalconCLIException(Throwable e) {
+ super(e);
+ }
+
public FalconCLIException(String msg, Throwable throwable) {
super(msg, throwable);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/7854e3d9/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
index e27187b..4dd1c68 100644
--- a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
+++ b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
@@ -60,6 +60,8 @@ public final class ConfigurationStore implements FalconService {
private static final EntityType[] ENTITY_LOAD_ORDER = new EntityType[] {
EntityType.CLUSTER, EntityType.FEED, EntityType.PROCESS, };
+ public static final EntityType[] ENTITY_DELETE_ORDER = new EntityType[] { EntityType.PROCESS, EntityType.FEED,
+ EntityType.CLUSTER, };
private static final Logger LOG = LoggerFactory.getLogger(ConfigurationStore.class);
private static final Logger AUDIT = LoggerFactory.getLogger("AUDIT");
http://git-wip-us.apache.org/repos/asf/falcon/blob/7854e3d9/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
index 756828f..c2100d1 100644
--- a/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
+++ b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java
@@ -166,7 +166,7 @@ public class LocalProxyOozieClient extends OozieClient {
@Override
public void reRun(String jobId, Properties conf) throws OozieClientException {
- throw new IllegalStateException("Rerun not supported ");
+ getClient(jobId).reRun(jobId, conf);
}
@Override
@@ -181,7 +181,7 @@ public class LocalProxyOozieClient extends OozieClient {
@Override
public void kill(String jobId) throws OozieClientException {
- throw new IllegalStateException("Kill not supported");
+ getClient(jobId).kill(jobId);
}
@Override
http://git-wip-us.apache.org/repos/asf/falcon/blob/7854e3d9/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index 3323dd1..16ef83a 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -75,7 +75,7 @@ import java.util.Set;
public abstract class AbstractEntityManager {
private static final Logger LOG = LoggerFactory.getLogger(AbstractEntityManager.class);
private static MemoryLocks memoryLocks = MemoryLocks.getInstance();
- private static final String DO_AS_PARAM = "doAs";
+ protected static final String DO_AS_PARAM = "doAs";
protected static final int XML_DEBUG_LEN = 10 * 1024;
private AbstractWorkflowEngine workflowEngine;
@@ -195,7 +195,8 @@ public abstract class AbstractEntityManager {
checkColo(colo);
try {
- Entity entity = submitInternal(request, type);
+ String doAsUser = request.getParameter(DO_AS_PARAM);
+ Entity entity = submitInternal(request.getInputStream(), type, doAsUser);
return new APIResult(APIResult.Status.SUCCEEDED, "Submit successful (" + type + ") " + entity.getName());
} catch (Throwable e) {
LOG.error("Unable to persist entity object", e);
@@ -205,15 +206,24 @@ public abstract class AbstractEntityManager {
/**
* Post an entity XML with entity type. Validates the XML which can be
- * Process, Feed or Dataendpoint
+ * Process, Feed or Data endpoint
*
* @param type entity type
- * @return APIResule -Succeeded or Failed
+ * @return APIResult -Succeeded or Failed
*/
public APIResult validate(HttpServletRequest request, String type, Boolean skipDryRun) {
try {
+ return validate(request.getInputStream(), type, skipDryRun);
+ } catch (IOException e) {
+ LOG.error("Unable to get InputStream from Request", request, e);
+ throw FalconWebException.newException(e, Response.Status.BAD_REQUEST);
+ }
+ }
+
+ protected APIResult validate(InputStream inputStream, String type, Boolean skipDryRun) {
+ try {
EntityType entityType = EntityType.getEnum(type);
- Entity entity = deserializeEntity(request, entityType);
+ Entity entity = deserializeEntity(inputStream, entityType);
validate(entity);
//Validate that the entity can be scheduled in the cluster
@@ -244,6 +254,11 @@ public abstract class AbstractEntityManager {
* @return APIResult
*/
public APIResult delete(HttpServletRequest request, String type, String entity, String colo) {
+ return delete(type, entity, colo);
+
+ }
+
+ protected APIResult delete(String type, String entity, String colo) {
checkColo(colo);
List<Entity> tokenList = new ArrayList<>();
try {
@@ -277,12 +292,23 @@ public abstract class AbstractEntityManager {
public APIResult update(HttpServletRequest request, String type, String entityName,
String colo, Boolean skipDryRun) {
+ try {
+ return update(request.getInputStream(), type, entityName, colo, skipDryRun);
+ } catch (IOException e) {
+ LOG.error("Unable to get InputStream from Request", request, e);
+ throw FalconWebException.newException(e, Response.Status.BAD_REQUEST);
+ }
+
+ }
+
+ protected APIResult update(InputStream inputStream, String type, String entityName,
+ String colo, Boolean skipDryRun) {
checkColo(colo);
List<Entity> tokenList = new ArrayList<>();
try {
EntityType entityType = EntityType.getEnum(type);
Entity oldEntity = EntityUtil.getEntity(type, entityName);
- Entity newEntity = deserializeEntity(request, entityType);
+ Entity newEntity = deserializeEntity(inputStream, entityType);
// KLUDGE - Until ACL is mandated entity passed should be decorated for equals check to pass
decorateEntityWithACL(newEntity);
validate(newEntity);
@@ -309,7 +335,6 @@ public abstract class AbstractEntityManager {
}
configStore.update(entityType, newEntity);
-
return new APIResult(APIResult.Status.SUCCEEDED, result.toString());
} catch (Throwable e) {
LOG.error("Update failed", e);
@@ -399,11 +424,11 @@ public abstract class AbstractEntityManager {
}
}
- protected Entity submitInternal(HttpServletRequest request, String type)
+ protected Entity submitInternal(InputStream inputStream, String type, String doAsUser)
throws IOException, FalconException {
EntityType entityType = EntityType.getEnum(type);
- Entity entity = deserializeEntity(request, entityType);
+ Entity entity = deserializeEntity(inputStream, entityType);
List<Entity> tokenList = new ArrayList<>();
// KLUDGE - Until ACL is mandated entity passed should be decorated for equals check to pass
decorateEntityWithACL(entity);
@@ -425,7 +450,6 @@ public abstract class AbstractEntityManager {
+ "Can't be submitted again. Try removing before submitting.");
}
- String doAsUser = request.getParameter(DO_AS_PARAM);
SecurityUtil.tryProxy(entity, doAsUser); // proxy before validating since FS/Oozie needs to be proxied
validate(entity);
configStore.publish(entityType, entity);
@@ -477,11 +501,10 @@ public abstract class AbstractEntityManager {
}
}
- protected Entity deserializeEntity(HttpServletRequest request, EntityType entityType)
+ protected Entity deserializeEntity(InputStream xmlStream, EntityType entityType)
throws IOException, FalconException {
EntityParser<?> entityParser = EntityParserFactory.getParser(entityType);
- InputStream xmlStream = request.getInputStream();
if (xmlStream.markSupported()) {
xmlStream.mark(XML_DEBUG_LEN); // mark up to debug len
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/7854e3d9/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
index 0db55df..d317aa1 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
@@ -82,7 +82,7 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
}
}
- private synchronized void scheduleInternal(String type, String entity, Boolean skipDryRun,
+ protected synchronized void scheduleInternal(String type, String entity, Boolean skipDryRun,
Map<String, String> properties) throws FalconException, AuthorizationException {
checkSchedulableEntity(type);
@@ -187,7 +187,7 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
checkColo(colo);
try {
checkSchedulableEntity(type);
- Entity entity = submitInternal(request, type);
+ Entity entity = submitInternal(request.getInputStream(), type, request.getParameter(DO_AS_PARAM));
scheduleInternal(type, entity.getName(), skipDryRun, EntityUtil.getPropertyMap(properties));
return new APIResult(APIResult.Status.SUCCEEDED,
entity.getName() + "(" + type + ") scheduled successfully");
http://git-wip-us.apache.org/repos/asf/falcon/blob/7854e3d9/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
index 9d13d74..d3ba189 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
@@ -199,7 +199,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
private Entity getEntity(HttpServletRequest request, String type) {
try {
request.getInputStream().reset();
- Entity entity = deserializeEntity(request, EntityType.getEnum(type));
+ Entity entity = deserializeEntity(request.getInputStream(), EntityType.getEnum(type));
request.getInputStream().reset();
return entity;
} catch (Exception e) {
@@ -225,7 +225,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
EntityType entityType = EntityType.getEnum(type);
final Entity entity;
try {
- entity = deserializeEntity(bufferedRequest, entityType);
+ entity = deserializeEntity(bufferedRequest.getInputStream(), entityType);
bufferedRequest.getInputStream().reset();
} catch (Exception e) {
throw FalconWebException.newException("Unable to parse the request", Response.Status.BAD_REQUEST);
http://git-wip-us.apache.org/repos/asf/falcon/blob/7854e3d9/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
index 783af19..b5afae3 100644
--- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
+++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
@@ -23,8 +23,6 @@ import org.apache.falcon.LifeCycle;
import org.apache.falcon.client.AbstractFalconClient;
import org.apache.falcon.client.FalconCLIException;
import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.parser.EntityParser;
-import org.apache.falcon.entity.parser.EntityParserFactory;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
@@ -42,7 +40,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.io.InputStream;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@@ -55,6 +52,9 @@ public class FalconUnitClient extends AbstractFalconClient {
private static final Logger LOG = LoggerFactory.getLogger(FalconUnitClient.class);
+ private static final String DEFAULT_ORDERBY = "status";
+ private static final String DEFAULT_SORTED_ORDER = "asc";
+
protected ConfigurationStore configStore;
private AbstractWorkflowEngine workflowEngine;
private LocalSchedulableEntityManager localSchedulableEntityManager;
@@ -84,29 +84,9 @@ public class FalconUnitClient extends AbstractFalconClient {
*/
@Override
public APIResult submit(String type, String filePath, String doAsUser) throws IOException, FalconCLIException {
- try {
- EntityType entityType = EntityType.getEnum(type);
- InputStream entityStream = FalconUnitHelper.getFileInputStream(filePath);
- EntityParser entityParser = EntityParserFactory.getParser(entityType);
- Entity entity = entityParser.parse(entityStream);
-
- Entity existingEntity = configStore.get(entityType, entity.getName());
- if (existingEntity != null) {
- if (EntityUtil.equals(existingEntity, entity)) {
- LOG.warn(entity.toShortString() + " already registered with same definition " + entity.getName());
- return new APIResult(APIResult.Status.SUCCEEDED, "{} already registered with same definition"
- + entity.getName());
- }
- LOG.warn(entity.toShortString() + " already registered with different definition "
- + "Can't be submitted again. Try removing before submitting.");
- return new APIResult(APIResult.Status.FAILED, "{} already registered with different definition "
- + "Can't be submitted again. Try removing before submitting." + entity.getName());
- }
- entityParser.validate(entity);
- configStore.publish(entityType, entity);
- LOG.info("Submit successful: ({}): {}", entityType.name(), entity.getName());
- return new APIResult(APIResult.Status.SUCCEEDED, "Submit successful (" + type + ") " + entity.getName());
+ try {
+ return localSchedulableEntityManager.submit(type, filePath, doAsUser);
} catch (FalconException e) {
throw new FalconCLIException("FAILED", e);
}
@@ -128,12 +108,56 @@ public class FalconUnitClient extends AbstractFalconClient {
return schedule(entityType, entityName, null, 0, cluster, skipDryRun, properties);
}
+ @Override
+ public APIResult delete(EntityType entityType, String entityName, String doAsUser) {
+ return localSchedulableEntityManager.delete(entityType, entityName, doAsUser);
+ }
+
+ @Override
+ public APIResult validate(String entityType, String filePath, Boolean skipDryRun,
+ String doAsUser) throws FalconCLIException {
+ try {
+ return localSchedulableEntityManager.validate(entityType, filePath, skipDryRun, doAsUser);
+ } catch (FalconException e) {
+ throw new FalconCLIException(e);
+ }
+ }
+
+ @Override
+ public APIResult update(String entityType, String entityName, String filePath,
+ Boolean skipDryRun, String doAsUser) throws FalconCLIException {
+ try {
+ return localSchedulableEntityManager.update(entityType, entityName, filePath,
+ skipDryRun, "local", doAsUser);
+ } catch (FalconException e) {
+ throw new FalconCLIException(e);
+ }
+ }
+
+ @Override
+ public Entity getDefinition(String entityType, String entityName, String doAsUser) throws FalconCLIException {
+ String entity = localSchedulableEntityManager.getEntityDefinition(entityType, entityName);
+ return Entity.fromString(EntityType.getEnum(entityType), entity);
+ }
+
//SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
@Override
public InstancesResult getStatusOfInstances(String type, String entity, String start, String end,
String colo, List<LifeCycle> lifeCycles, String filterBy,
String orderBy, String sortOrder, Integer offset,
Integer numResults, String doAsUser) throws FalconCLIException {
+ if (orderBy == null) {
+ orderBy = DEFAULT_ORDERBY;
+ }
+ if (sortOrder == null) {
+ sortOrder = DEFAULT_SORTED_ORDER;
+ }
+ if (offset == null) {
+ offset = 0;
+ }
+ if (numResults == null) {
+ numResults = 1;
+ }
return localInstanceManager.getStatusOfInstances(type, entity, start, end, colo, lifeCycles, filterBy, orderBy,
sortOrder, offset, numResults);
@@ -164,7 +188,7 @@ public class FalconUnitClient extends AbstractFalconClient {
if (StringUtils.isNotEmpty(startTime) && entityType == EntityType.PROCESS) {
updateStartAndEndTime((Process) entity, startTime, numInstances, cluster);
}
- workflowEngine.schedule(entity, skipDryRun, EntityUtil.getPropertyMap(properties));
+ workflowEngine.schedule(entity, skipDryRun, EntityUtil.getPropertyMap(properties));
LOG.info(entityName + " is scheduled successfully");
return new APIResult(APIResult.Status.SUCCEEDED, entity + "(" + "PROCESS" + ") scheduled successfully");
} catch (FalconException e) {
@@ -180,16 +204,13 @@ public class FalconUnitClient extends AbstractFalconClient {
* @param nominalTime nominal time of process
* @return InstancesResult.WorkflowStatus
*/
- public InstancesResult.WorkflowStatus getInstanceStatus(EntityType entityType, String entityName,
+ public InstancesResult.WorkflowStatus getInstanceStatus(String entityType, String entityName,
String nominalTime) throws Exception {
- if (entityType == EntityType.CLUSTER) {
- throw new IllegalArgumentException("Instance management functions don't apply to Cluster entities");
- }
- Entity entityObject = EntityUtil.getEntity(entityType, entityName);
Date startTime = SchemaHelper.parseDateUTC(nominalTime);
- Date endTime = DateUtil.getNextMinute(startTime);
- List<LifeCycle> lifeCycles = FalconUnitHelper.checkAndUpdateLifeCycle(null, entityType.name());
- InstancesResult instancesResult = workflowEngine.getStatus(entityObject, startTime, endTime, lifeCycles);
+ Date endTimeDate = DateUtil.getNextMinute(startTime);
+ String endTime = DateUtil.getDateFormatFromTime(endTimeDate.getTime());
+ InstancesResult instancesResult = getStatusOfInstances(entityType, entityName, nominalTime, endTime, null,
+ null, null, null, null, null, null, null);
if (instancesResult.getInstances() != null && instancesResult.getInstances().length > 0
&& instancesResult.getInstances()[0] != null) {
LOG.info("Instance status is " + instancesResult.getInstances()[0].getStatus());
http://git-wip-us.apache.org/repos/asf/falcon/blob/7854e3d9/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java
index 8b1c435..42adc9a 100644
--- a/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java
+++ b/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java
@@ -17,14 +17,19 @@
*/
package org.apache.falcon.unit;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.AbstractSchedulableEntityManager;
+import java.io.IOException;
+import java.io.InputStream;
+
/**
* A proxy implementation of the schedulable entity operations in local mode.
*/
public class LocalSchedulableEntityManager extends AbstractSchedulableEntityManager {
- // Created for future purposes to add all entity API's here for falcon unit.
public LocalSchedulableEntityManager() {}
@@ -40,4 +45,28 @@ public class LocalSchedulableEntityManager extends AbstractSchedulableEntityMana
return super.getStatus(type, entity, colo);
}
+ public APIResult delete(EntityType entityType, String entityName, String doAsUser) {
+ if (entityType == null) {
+ throw new IllegalStateException("Entity-Type cannot be null");
+ }
+ return super.delete(entityType.name(), entityName, doAsUser);
+ }
+
+ public APIResult validate(String entityType, String filePath, Boolean skipDryRun,
+ String doAsUser) throws FalconException {
+ InputStream inputStream = FalconUnitHelper.getFileInputStream(filePath);
+ return super.validate(inputStream, entityType, skipDryRun);
+ }
+
+ public APIResult update(String entityType, String entityName, String filePath,
+ Boolean skipDryRun, String doAsUser, String colo) throws FalconException {
+ InputStream inputStream = FalconUnitHelper.getFileInputStream(filePath);
+ return super.update(inputStream, entityType, entityName, colo, skipDryRun);
+ }
+
+ public APIResult submit(String entityType, String filePath, String doAsUser) throws FalconException, IOException {
+ InputStream inputStream = FalconUnitHelper.getFileInputStream(filePath);
+ Entity entity = super.submitInternal(inputStream, entityType, doAsUser);
+ return new APIResult(APIResult.Status.SUCCEEDED, "Submit successful (" + entityType + ") " + entity.getName());
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/7854e3d9/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
index d12efbc..ac478f4 100644
--- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
+++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
@@ -77,7 +77,6 @@ public class FalconUnitTestBase {
* @throws Exception thrown if the predicate evaluation could not evaluate.
*/
public interface Predicate {
-
boolean evaluate() throws Exception;
}
@@ -122,9 +121,9 @@ public class FalconUnitTestBase {
@AfterMethod
public void cleanUpActionXml() throws IOException, FalconException {
- for (EntityType type : EntityType.values()) {
+ for (EntityType type : ConfigurationStore.ENTITY_DELETE_ORDER) {
for (String name : ConfigurationStore.get().getEntities(type)) {
- ConfigurationStore.get().remove(type, name);
+ getClient().delete(type, name, null);
}
}
//Needed since oozie writes action xml to current directory.
@@ -275,7 +274,7 @@ public class FalconUnitTestBase {
String inputFile) throws FalconException, ParseException, IOException {
String feedPath = getFeedPathForTS(cluster, feedName, time);
fs.mkdirs(new Path(feedPath));
- fs.copyFromLocalFile(new Path(getAbsolutePath("/" + inputFile)), new Path(feedPath));
+ fs.copyFromLocalFile(new Path(getAbsolutePath(inputFile)), new Path(feedPath));
}
protected String getFeedPathForTS(String cluster, String feedName,
@@ -295,7 +294,7 @@ public class FalconUnitTestBase {
public String getAbsolutePath(String fileName) {
- return this.getClass().getResource(fileName).getPath();
+ return this.getClass().getResource("/" + fileName).getPath();
}
public void createDir(String path) throws IOException {
@@ -333,7 +332,7 @@ public class FalconUnitTestBase {
}
}
- protected long waitForStatus(final EntityType entityType, final String entityName, final String instanceTime) {
+ protected long waitForStatus(final String entityType, final String entityName, final String instanceTime) {
return waitFor(WAIT_TIME, new Predicate() {
public boolean evaluate() throws Exception {
InstancesResult.WorkflowStatus status = falconUnitClient.getInstanceStatus(entityType,
http://git-wip-us.apache.org/repos/asf/falcon/blob/7854e3d9/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
index d504bd2..8cdbd88 100644
--- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
+++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
@@ -18,8 +18,11 @@
package org.apache.falcon.unit;
import org.apache.falcon.FalconException;
+import org.apache.falcon.FalconWebException;
import org.apache.falcon.client.FalconCLIException;
import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.entity.v0.process.Property;
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.InstancesResult;
import org.apache.hadoop.fs.FileStatus;
@@ -27,36 +30,47 @@ import org.apache.hadoop.fs.Path;
import org.testng.Assert;
import org.testng.annotations.Test;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
import java.io.IOException;
import java.text.ParseException;
+import static org.apache.falcon.entity.EntityUtil.getEntity;
+
+
/**
* Test cases of falcon jobs using Local Oozie and LocalJobRunner.
*/
public class TestFalconUnit extends FalconUnitTestBase {
+ private static final String INPUT_FEED = "infeed.xml";
+ private static final String OUTPUT_FEED = "outfeed.xml";
+ private static final String PROCESS = "process.xml";
+ private static final String PROCESS_APP_PATH = "/app/oozie-mr";
+ private static final String CLUSTER_NAME = "local";
+ private static final String INPUT_FEED_NAME = "in";
+ private static final String PROCESS_NAME = "process";
+ private static final String OUTPUT_FEED_NAME = "out";
+ private static final String INPUT_FILE_NAME = "input.txt";
+ private static final String SCHEDULE_TIME = "2015-06-20T00:00Z";
+ private static final String WORKFLOW = "workflow.xml";
+
@Test
public void testProcessInstanceExecution() throws Exception {
- // submit with default props
- submitCluster();
- // submitting feeds
- APIResult result = submit(EntityType.FEED, getAbsolutePath("/infeed.xml"));
- assertStatus(result);
- result = submit(EntityType.FEED, getAbsolutePath("/outfeed.xml"));
- assertStatus(result);
+ submitClusterAndFeeds();
// submitting and scheduling process
- String scheduleTime = "2015-06-20T00:00Z";
- createData("in", "local", scheduleTime, "input.txt");
- result = submitProcess(getAbsolutePath("/process.xml"), "/app/oozie-mr");
+ createData(INPUT_FEED_NAME, CLUSTER_NAME, SCHEDULE_TIME, INPUT_FILE_NAME);
+ APIResult result = submitProcess(getAbsolutePath(PROCESS), PROCESS_APP_PATH);
assertStatus(result);
- result = scheduleProcess("process", scheduleTime, 1, "local", getAbsolutePath("/workflow.xml"),
+ result = scheduleProcess(PROCESS_NAME, SCHEDULE_TIME, 1, CLUSTER_NAME, getAbsolutePath(WORKFLOW),
true, "");
assertStatus(result);
- waitForStatus(EntityType.PROCESS, "process", scheduleTime);
- InstancesResult.WorkflowStatus status = falconUnitClient.getInstanceStatus(EntityType.PROCESS,
- "process", scheduleTime);
+ waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME);
+ InstancesResult.WorkflowStatus status = getClient().getInstanceStatus(EntityType.PROCESS.name(),
+ PROCESS_NAME, SCHEDULE_TIME);
Assert.assertEquals(status, InstancesResult.WorkflowStatus.SUCCEEDED);
- String outPath = getFeedPathForTS("local", "out", scheduleTime);
+ String outPath = getFeedPathForTS(CLUSTER_NAME, OUTPUT_FEED_NAME, SCHEDULE_TIME);
Assert.assertTrue(getFileSystem().exists(new Path(outPath)));
FileStatus[] files = getFileSystem().listStatus(new Path(outPath));
Assert.assertTrue(files.length > 0);
@@ -69,52 +83,149 @@ public class TestFalconUnit extends FalconUnitTestBase {
// submit with default props
submitCluster();
// submitting feeds
- APIResult result = submit(EntityType.FEED, getAbsolutePath("/infeed.xml"));
+ APIResult result = submit(EntityType.FEED, getAbsolutePath(INPUT_FEED));
assertStatus(result);
- String scheduleTime = "2015-06-20T00:00Z";
- createData("in", "local", scheduleTime, "input.txt");
- String inPath = getFeedPathForTS("local", "in", scheduleTime);
+ createData(INPUT_FEED_NAME, CLUSTER_NAME, SCHEDULE_TIME, INPUT_FILE_NAME);
+ String inPath = getFeedPathForTS(CLUSTER_NAME, INPUT_FEED_NAME, SCHEDULE_TIME);
Assert.assertTrue(fs.exists(new Path(inPath)));
- result = schedule(EntityType.FEED, "in", "local");
+ result = schedule(EntityType.FEED, INPUT_FEED_NAME, CLUSTER_NAME);
Assert.assertEquals(APIResult.Status.SUCCEEDED, result.getStatus());
waitFor(WAIT_TIME, new Predicate() {
public boolean evaluate() throws Exception {
- InstancesResult.WorkflowStatus status = getRetentionStatus("in", "local");
+ InstancesResult.WorkflowStatus status = getRetentionStatus(INPUT_FEED_NAME, CLUSTER_NAME);
return InstancesResult.WorkflowStatus.SUCCEEDED.equals(status);
}
});
- InstancesResult.WorkflowStatus status = getRetentionStatus("in", "local");
+ InstancesResult.WorkflowStatus status = getRetentionStatus(INPUT_FEED_NAME, CLUSTER_NAME);
Assert.assertEquals(InstancesResult.WorkflowStatus.SUCCEEDED, status);
Assert.assertFalse(fs.exists(new Path(inPath)));
}
@Test
public void testSuspendAndResume() throws Exception {
- // submit with default props
- submitCluster();
- // submitting feeds
- APIResult result = submit(EntityType.FEED, getAbsolutePath("/infeed.xml"));
- assertStatus(result);
- result = submit(EntityType.FEED, getAbsolutePath("/outfeed.xml"));
- assertStatus(result);
+ submitClusterAndFeeds();
// submitting and scheduling process
String scheduleTime = "2015-06-20T00:00Z";
- createData("in", "local", scheduleTime, "input.txt");
- result = submitProcess(getAbsolutePath("/process1.xml"), "/app/oozie-mr");
+ //String processName = "process1";
+ createData(INPUT_FEED_NAME, CLUSTER_NAME, scheduleTime, INPUT_FILE_NAME);
+ APIResult result = submitProcess(getAbsolutePath(PROCESS), PROCESS_APP_PATH);
assertStatus(result);
- result = scheduleProcess("process1", scheduleTime, 2, "local", getAbsolutePath("/workflow.xml"),
+ result = scheduleProcess(PROCESS_NAME, scheduleTime, 2, CLUSTER_NAME, getAbsolutePath(WORKFLOW),
true, "");
assertStatus(result);
- waitForStatus(EntityType.PROCESS, "process1", scheduleTime);
- result = getClient().suspend(EntityType.PROCESS, "process1", "local", null);
+ waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, scheduleTime);
+ result = getClient().suspend(EntityType.PROCESS, PROCESS_NAME, CLUSTER_NAME, null);
assertStatus(result);
- result = getClient().getStatus(EntityType.PROCESS, "process1", "local", null);
+ result = getClient().getStatus(EntityType.PROCESS, PROCESS_NAME, CLUSTER_NAME, null);
assertStatus(result);
Assert.assertEquals(result.getMessage(), "SUSPENDED");
- result = getClient().resume(EntityType.PROCESS, "process1", "local", null);
+ result = getClient().resume(EntityType.PROCESS, PROCESS_NAME, CLUSTER_NAME, null);
assertStatus(result);
- result = getClient().getStatus(EntityType.PROCESS, "process1", "local", null);
+ result = getClient().getStatus(EntityType.PROCESS, PROCESS_NAME, CLUSTER_NAME, null);
assertStatus(result);
Assert.assertEquals(result.getMessage(), "RUNNING");
}
+
+ @Test
+ public void testDelete() throws IOException, FalconCLIException, FalconException,
+ ParseException, InterruptedException {
+ // submit cluster and feeds
+ submitClusterAndFeeds();
+ APIResult result = submitProcess(getAbsolutePath(PROCESS), PROCESS_APP_PATH);
+ assertStatus(result);
+ createData(INPUT_FEED_NAME, CLUSTER_NAME, SCHEDULE_TIME, INPUT_FILE_NAME);
+ result = submitProcess(getAbsolutePath(PROCESS), PROCESS_APP_PATH);
+ assertStatus(result);
+ result = scheduleProcess(PROCESS_NAME, SCHEDULE_TIME, 10, CLUSTER_NAME, getAbsolutePath(WORKFLOW),
+ true, "");
+ assertStatus(result);
+ waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME);
+ result = getClient().delete(EntityType.PROCESS, PROCESS_NAME, null);
+ assertStatus(result);
+ try {
+ getEntity(EntityType.PROCESS, PROCESS_NAME);
+ Assert.fail("Exception should be thrown");
+ } catch (FalconException e) {
+ // nothing to do
+ }
+
+ result = getClient().delete(EntityType.FEED, INPUT_FEED_NAME, null);
+ assertStatus(result);
+ try {
+ getEntity(EntityType.FEED, INPUT_FEED_NAME);
+ Assert.fail("Exception should be thrown");
+ } catch (FalconException e) {
+ // nothing to do
+ }
+ }
+
+ @Test
+ public void testValidate() throws IOException, FalconCLIException, FalconException {
+ submitClusterAndFeeds();
+ APIResult result = getClient().validate(EntityType.PROCESS.name(),
+ getAbsolutePath(PROCESS), true, null);
+ assertStatus(result);
+ try {
+ getClient().validate(EntityType.PROCESS.name(),
+ getAbsolutePath(INPUT_FEED), true, null);
+ Assert.fail("Exception should be thrown");
+ } catch (FalconWebException e) {
+ // nothing to do
+ }
+ }
+
+ @Test
+ public void testUpdate() throws IOException, FalconCLIException, FalconException,
+ ParseException, InterruptedException {
+ submitClusterAndFeeds();
+ APIResult result = submitProcess(getAbsolutePath(PROCESS), PROCESS_APP_PATH);
+ assertStatus(result);
+ result = getClient().update(EntityType.PROCESS.name(), PROCESS_NAME,
+ getAbsolutePath(PROCESS), true, null);
+ assertStatus(result);
+ createData(INPUT_FEED_NAME, CLUSTER_NAME, SCHEDULE_TIME, INPUT_FILE_NAME);
+ result = submitProcess(getAbsolutePath(PROCESS), PROCESS_APP_PATH);
+ assertStatus(result);
+ result = scheduleProcess(PROCESS_NAME, SCHEDULE_TIME, 10, CLUSTER_NAME, getAbsolutePath(WORKFLOW),
+ true, "");
+ assertStatus(result);
+ waitForStatus(EntityType.PROCESS.name(), PROCESS_NAME, SCHEDULE_TIME);
+
+ Process process = getEntity(EntityType.PROCESS, PROCESS_NAME);
+ setDummyProperty(process);
+ String processXml = process.toString();
+
+ File file = new File("target/newprocess.xml");
+ file.createNewFile();
+ FileWriter fw = new FileWriter(file.getAbsoluteFile());
+ BufferedWriter bw = new BufferedWriter(fw);
+ bw.write(processXml);
+ bw.close();
+
+ result = falconUnitClient.update(EntityType.PROCESS.name(), PROCESS_NAME, file.getAbsolutePath(), true, null);
+ assertStatus(result);
+
+ process = getEntity(EntityType.PROCESS,
+ PROCESS_NAME);
+ Assert.assertEquals(process.toString(), processXml);
+ file.delete();
+ }
+
+ private void submitClusterAndFeeds() throws IOException, FalconCLIException {
+ // submit with default props
+ submitCluster();
+ // submitting feeds
+ APIResult result = submit(EntityType.FEED, getAbsolutePath(INPUT_FEED));
+ assertStatus(result);
+ result = submit(EntityType.FEED, getAbsolutePath(OUTPUT_FEED));
+ assertStatus(result);
+ }
+
+ public void setDummyProperty(Process process) {
+ Property property = new Property();
+ property.setName("dummy");
+ property.setValue("dummy");
+ process.getProperties().getProperties().add(property);
+
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/7854e3d9/unit/src/test/resources/process1.xml
----------------------------------------------------------------------
diff --git a/unit/src/test/resources/process1.xml b/unit/src/test/resources/process1.xml
deleted file mode 100644
index 37dbb9c..0000000
--- a/unit/src/test/resources/process1.xml
+++ /dev/null
@@ -1,50 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- -->
-
-<process name="process1" xmlns="uri:falcon:process:0.1">
- <clusters>
- <cluster name="local">
- <validity start="2013-11-18T00:05Z" end="2013-11-18T01:05Z"/>
- </cluster>
- </clusters>
-
- <parallel>5</parallel>
- <order>FIFO</order>
- <frequency>minutes(1)</frequency>
- <timezone>UTC</timezone>
-
- <inputs>
- <!-- In the workflow, the input paths will be available in a variable 'inpaths' -->
- <input name="inpaths" feed="in" start="now(0,0)" end="now(0,0)" />
- </inputs>
-
- <outputs>
- <!-- In the workflow, the output path will be available in a variable 'outpath' -->
- <output name="outpath" feed="out" instance="now(0,0)"/>
- </outputs>
-
- <properties>
- <!-- In the workflow, these properties will be available with variable - key -->
- <property name="queueName" value="default"/>
- <!-- The schedule time available as a property in workflow -->
- <property name="time" value="${instanceTime()}"/>
- </properties>
-
- <workflow engine="oozie" path="/app/oozie-mr"/>
-</process>