You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2015/08/25 13:51:20 UTC
[1/3] falcon git commit: FALCON-1382 Add a test for feed retention to
make sure that data directory is not deleted. Contributed by Paul Isaychuk.
Repository: falcon
Updated Branches:
refs/heads/0.7 44ca0bdbc -> fbb4d3142
FALCON-1382 Add a test for feed retention to make sure that data directory is not deleted. Contributed by Paul Isaychuk.
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/ba83ad21
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/ba83ad21
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/ba83ad21
Branch: refs/heads/0.7
Commit: ba83ad21fd361cd8f40c68184587307b4c8d9150
Parents: 44ca0bd
Author: Paul Isaychuk <pi...@apache.org>
Authored: Fri Aug 21 12:34:33 2015 +0300
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Tue Aug 25 17:20:30 2015 +0530
----------------------------------------------------------------------
falcon-regression/CHANGES.txt | 2 ++
.../org/apache/falcon/regression/prism/RetentionTest.java | 7 +++++--
2 files changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/ba83ad21/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index 7ad0f3d..af5e660 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -5,6 +5,8 @@ Trunk (Unreleased)
INCOMPATIBLE CHANGES
NEW FEATURES
+ FALCON-1382 Add a test for feed retention to make sure that data directory is not deleted (Paul Isaychuk)
+
FALCON-1321 Add Entity Lineage Test (Pragya Mittal via Ajay Yadava)
FALCON-1319 Contribute HiveDr, Mirror tests and some test fixes (Namit Maheshwari, Paul Isaychuk,
http://git-wip-us.apache.org/repos/asf/falcon/blob/ba83ad21/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
index 628dca1..8f45d1c 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
@@ -37,6 +37,7 @@ import org.apache.falcon.regression.core.util.TimeUtil;
import org.apache.falcon.regression.core.util.Util;
import org.apache.falcon.regression.testHelper.BaseTestClass;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.log4j.Logger;
import org.apache.oozie.client.OozieClient;
@@ -141,7 +142,7 @@ public class RetentionTest extends BaseTestClass {
}
final DateTime today = new DateTime(DateTimeZone.UTC);
final List<DateTime> times = TimeUtil.getDatesOnEitherSide(
- freqType.addTime(today, -36), freqType.addTime(today, 36), skip, freqType);
+ freqType.addTime(today, -36), freqType.addTime(today, -1), skip, freqType);
final List<String> dataDates = TimeUtil.convertDatesToString(times, freqType.getFormatter());
LOGGER.info("dataDates = " + dataDates);
dataDates.add(HadoopUtil.SOMETHING_RANDOM);
@@ -206,6 +207,9 @@ public class RetentionTest extends BaseTestClass {
Assert.assertTrue(Arrays.deepEquals(finalData.toArray(new String[finalData.size()]),
expectedOutput.toArray(new String[expectedOutput.size()])));
+
+ //check that root directory exists
+ Assert.assertTrue(clusterFS.exists(new Path(testHDFSDir)), "Base data directory should be present.");
}
/**
@@ -273,7 +277,6 @@ public class RetentionTest extends BaseTestClass {
return finalData;
}
-
/**
* Provides different sets of parameters for retention workflow.
*/
[2/3] falcon git commit: FALCON-1174 Ability to disable oozie dryrun
while scheduling or updating the falcon entity. Contributed by Balu Vellanki.
Posted by aj...@apache.org.
FALCON-1174 Ability to disable oozie dryrun while scheduling or updating the falcon entity. Contributed by Balu Vellanki.
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/17e2f71c
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/17e2f71c
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/17e2f71c
Branch: refs/heads/0.7
Commit: 17e2f71cd7b1a2681abdec758cc5327dc4c543ea
Parents: ba83ad2
Author: Ajay Yadava <aj...@gmail.com>
Authored: Tue Aug 25 12:41:35 2015 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Tue Aug 25 17:20:45 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../java/org/apache/falcon/cli/FalconCLI.java | 33 ++++++++---
.../falcon/client/AbstractFalconClient.java | 3 +-
.../org/apache/falcon/client/FalconClient.java | 55 ++++++++++-------
.../workflow/engine/AbstractWorkflowEngine.java | 9 +--
.../workflow/engine/OozieWorkflowEngine.java | 62 +++++++++++++-------
.../falcon/resource/AbstractEntityManager.java | 9 +--
.../AbstractSchedulableEntityManager.java | 19 +++---
.../proxy/SchedulableEntityManagerProxy.java | 28 +++++----
.../falcon/resource/EntityManagerTest.java | 6 +-
.../apache/falcon/unit/FalconUnitClient.java | 9 +--
.../apache/falcon/unit/FalconUnitTestBase.java | 8 +--
.../org/apache/falcon/unit/TestFalconUnit.java | 2 +-
.../falcon/resource/ConfigSyncService.java | 5 +-
.../resource/SchedulableEntityManager.java | 15 +++--
.../java/org/apache/falcon/cli/FalconCLIIT.java | 27 +++++++++
.../falcon/resource/EntityManagerJerseyIT.java | 41 ++++++++-----
.../org/apache/falcon/resource/TestContext.java | 31 ++++++++--
18 files changed, 245 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f31f839..a1054fe 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -13,6 +13,8 @@ Trunk (Unreleased)
FALCON-796 Enable users to triage data processing issues through falcon (Ajay Yadava)
IMPROVEMENTS
+ FALCON-1174 Ability to disable oozie dryrun while scheduling or updating the falcon entity(Balu Vellanki via Ajay Yadava)
+
FALCON-1374 Remove the cap on numResults(Pragya Mittal via Ajay Yadava)
FALCON-1379 Doc describes retention incorrectly(Ajay Yadava)
http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
index 11f6bff..11dfe72 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
@@ -90,6 +90,7 @@ public class FalconCLI {
public static final String PATH_OPT = "path";
public static final String LIST_OPT = "list";
public static final String TOUCH_OPT = "touch";
+ public static final String SKIPDRYRUN_OPT = "skipDryRun";
public static final String FIELDS_OPT = "fields";
public static final String FILTER_BY_OPT = "filterBy";
@@ -429,6 +430,11 @@ public class FalconCLI {
Integer numResults = parseIntegerInput(commandLine.getOptionValue(NUM_RESULTS_OPT),
null, "numResults");
Integer numInstances = parseIntegerInput(commandLine.getOptionValue(NUM_INSTANCES_OPT), 7, "numInstances");
+ Boolean skipDryRun = null;
+ if (optionsList.contains(SKIPDRYRUN_OPT)) {
+ skipDryRun = true;
+ }
+
EntityType entityTypeEnum = null;
if (optionsList.contains(LIST_OPT)) {
if (entityType == null) {
@@ -460,20 +466,19 @@ public class FalconCLI {
validateNotEmpty(filePath, "file");
validateColo(optionsList);
validateNotEmpty(entityName, ENTITY_NAME_OPT);
- result = client.update(entityType, entityName, filePath).getMessage();
+ result = client.update(entityType, entityName, filePath, skipDryRun).getMessage();
} else if (optionsList.contains(SUBMIT_AND_SCHEDULE_OPT)) {
validateNotEmpty(filePath, "file");
validateColo(optionsList);
- result =
- client.submitAndSchedule(entityType, filePath).getMessage();
+ result = client.submitAndSchedule(entityType, filePath, skipDryRun).getMessage();
} else if (optionsList.contains(VALIDATE_OPT)) {
validateNotEmpty(filePath, "file");
validateColo(optionsList);
- result = client.validate(entityType, filePath).getMessage();
+ result = client.validate(entityType, filePath, skipDryRun).getMessage();
} else if (optionsList.contains(SCHEDULE_OPT)) {
validateNotEmpty(entityName, ENTITY_NAME_OPT);
colo = getColo(colo);
- result = client.schedule(entityTypeEnum, entityName, colo).getMessage();
+ result = client.schedule(entityTypeEnum, entityName, colo, skipDryRun).getMessage();
} else if (optionsList.contains(SUSPEND_OPT)) {
validateNotEmpty(entityName, ENTITY_NAME_OPT);
colo = getColo(colo);
@@ -522,7 +527,7 @@ public class FalconCLI {
} else if (optionsList.contains(TOUCH_OPT)) {
validateNotEmpty(entityName, ENTITY_NAME_OPT);
colo = getColo(colo);
- result = client.touch(entityType, entityName, colo).getMessage();
+ result = client.touch(entityType, entityName, colo, skipDryRun).getMessage();
} else if (optionsList.contains(HELP_CMD)) {
OUT.get().println("Falcon Help");
} else {
@@ -742,6 +747,7 @@ public class FalconCLI {
Option numInstances = new Option(NUM_INSTANCES_OPT, true,
"Number of instances to return per entity summary request");
Option path = new Option(PATH_OPT, true, "Path for a feed's instance");
+ Option skipDryRun = new Option(SKIPDRYRUN_OPT, false, "skip dry run in workflow engine");
entityOptions.addOption(url);
entityOptions.addOption(path);
@@ -763,6 +769,7 @@ public class FalconCLI {
entityOptions.addOption(offset);
entityOptions.addOption(numResults);
entityOptions.addOption(numInstances);
+ entityOptions.addOption(skipDryRun);
return entityOptions;
}
@@ -926,6 +933,9 @@ public class FalconCLI {
Option recipeOperation = new Option(RECIPE_OPERATION, true, "recipe operation");
recipeOptions.addOption(recipeOperation);
+ Option skipDryRunOperation = new Option(SKIPDRYRUN_OPT, false, "skip dryrun operation");
+ recipeOptions.addOption(skipDryRunOperation);
+
return recipeOptions;
}
@@ -1015,6 +1025,11 @@ public class FalconCLI {
}
private void recipeCommand(CommandLine commandLine, FalconClient client) throws FalconCLIException {
+ Set<String> optionsList = new HashSet<String>();
+ for (Option option : commandLine.getOptions()) {
+ optionsList.add(option.getOpt());
+ }
+
String recipeName = commandLine.getOptionValue(RECIPE_NAME);
String recipeToolClass = commandLine.getOptionValue(RECIPE_TOOL_CLASS_NAME);
String recipeOperation = commandLine.getOptionValue(RECIPE_OPERATION);
@@ -1022,8 +1037,12 @@ public class FalconCLI {
validateNotEmpty(recipeName, RECIPE_NAME);
validateNotEmpty(recipeOperation, RECIPE_OPERATION);
validateRecipeOperations(recipeOperation);
+ Boolean skipDryRun = null;
+ if (optionsList.contains(SKIPDRYRUN_OPT)) {
+ skipDryRun = true;
+ }
- String result = client.submitRecipe(recipeName, recipeToolClass, recipeOperation).toString();
+ String result = client.submitRecipe(recipeName, recipeToolClass, recipeOperation, skipDryRun).toString();
OUT.get().println(result);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
index bb6d8c9..282b41b 100644
--- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
@@ -48,6 +48,7 @@ public abstract class AbstractFalconClient {
* @return
* @throws FalconCLIException
*/
- public abstract APIResult schedule(EntityType entityType, String entityName, String colo) throws FalconCLIException;
+ public abstract APIResult schedule(EntityType entityType, String entityName,
+ String colo, Boolean skipDryRun) throws FalconCLIException;
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index d9bdf64..44436d2 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -285,41 +285,41 @@ public class FalconClient extends AbstractFalconClient {
return str;
}
- public APIResult schedule(EntityType entityType, String entityName, String colo)
+ public APIResult schedule(EntityType entityType, String entityName, String colo, Boolean skipDryRun)
throws FalconCLIException {
return sendEntityRequest(Entities.SCHEDULE, entityType, entityName,
- colo);
+ colo, skipDryRun);
}
public APIResult suspend(EntityType entityType, String entityName, String colo)
throws FalconCLIException {
- return sendEntityRequest(Entities.SUSPEND, entityType, entityName, colo);
+ return sendEntityRequest(Entities.SUSPEND, entityType, entityName, colo, null);
}
public APIResult resume(EntityType entityType, String entityName, String colo)
throws FalconCLIException {
- return sendEntityRequest(Entities.RESUME, entityType, entityName, colo);
+ return sendEntityRequest(Entities.RESUME, entityType, entityName, colo, null);
}
public APIResult delete(EntityType entityType, String entityName)
throws FalconCLIException {
- return sendEntityRequest(Entities.DELETE, entityType, entityName, null);
+ return sendEntityRequest(Entities.DELETE, entityType, entityName, null, null);
}
- public APIResult validate(String entityType, String filePath)
+ public APIResult validate(String entityType, String filePath, Boolean skipDryRun)
throws FalconCLIException {
InputStream entityStream = getServletInputStream(filePath);
return sendEntityRequestWithObject(Entities.VALIDATE, entityType,
- entityStream, null);
+ entityStream, null, skipDryRun);
}
public APIResult submit(String entityType, String filePath)
@@ -327,14 +327,17 @@ public class FalconClient extends AbstractFalconClient {
InputStream entityStream = getServletInputStream(filePath);
return sendEntityRequestWithObject(Entities.SUBMIT, entityType,
- entityStream, null);
+ entityStream, null, null);
}
- public APIResult update(String entityType, String entityName, String filePath)
+ public APIResult update(String entityType, String entityName, String filePath, Boolean skipDryRun)
throws FalconCLIException {
InputStream entityStream = getServletInputStream(filePath);
Entities operation = Entities.UPDATE;
WebResource resource = service.path(operation.path).path(entityType).path(entityName);
+ if (null != skipDryRun) {
+ resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun));
+ }
ClientResponse clientResponse = resource
.header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
.accept(operation.mimeType).type(MediaType.TEXT_XML)
@@ -343,18 +346,18 @@ public class FalconClient extends AbstractFalconClient {
return parseAPIResult(clientResponse);
}
- public APIResult submitAndSchedule(String entityType, String filePath)
+ public APIResult submitAndSchedule(String entityType, String filePath, Boolean skipDryRun)
throws FalconCLIException {
InputStream entityStream = getServletInputStream(filePath);
return sendEntityRequestWithObject(Entities.SUBMITandSCHEDULE,
- entityType, entityStream, null);
+ entityType, entityStream, null, skipDryRun);
}
public APIResult getStatus(EntityType entityType, String entityName, String colo)
throws FalconCLIException {
- return sendEntityRequest(Entities.STATUS, entityType, entityName, colo);
+ return sendEntityRequest(Entities.STATUS, entityType, entityName, colo, null);
}
public Entity getDefinition(String entityType, String entityName)
@@ -400,12 +403,16 @@ public class FalconClient extends AbstractFalconClient {
orderBy, sortOrder, offset, numResults, numInstances);
}
- public APIResult touch(String entityType, String entityName, String colo) throws FalconCLIException {
+ public APIResult touch(String entityType, String entityName,
+ String colo, Boolean skipDryRun) throws FalconCLIException {
Entities operation = Entities.TOUCH;
WebResource resource = service.path(operation.path).path(entityType).path(entityName);
if (colo != null) {
resource = resource.queryParam("colo", colo);
}
+ if (null != skipDryRun) {
+ resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun));
+ }
ClientResponse clientResponse = resource
.header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
.accept(operation.mimeType).type(MediaType.TEXT_XML)
@@ -604,13 +611,17 @@ public class FalconClient extends AbstractFalconClient {
}
private APIResult sendEntityRequest(Entities entities, EntityType entityType,
- String entityName, String colo) throws FalconCLIException {
+ String entityName, String colo, Boolean skipDryRun) throws FalconCLIException {
WebResource resource = service.path(entities.path)
.path(entityType.toString().toLowerCase()).path(entityName);
if (colo != null) {
resource = resource.queryParam("colo", colo);
}
+ if (null != skipDryRun) {
+ resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun));
+ }
+
ClientResponse clientResponse = resource
.header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
.accept(entities.mimeType).type(MediaType.TEXT_XML)
@@ -731,13 +742,16 @@ public class FalconClient extends AbstractFalconClient {
return parseEntityList(clientResponse);
}
- private APIResult sendEntityRequestWithObject(Entities entities, String entityType,
- Object requestObject, String colo) throws FalconCLIException {
+ private APIResult sendEntityRequestWithObject(Entities entities, String entityType, Object requestObject,
+ String colo, Boolean skipDryRun) throws FalconCLIException {
WebResource resource = service.path(entities.path)
.path(entityType);
if (colo != null) {
resource = resource.queryParam("colo", colo);
}
+ if (null != skipDryRun) {
+ resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun));
+ }
ClientResponse clientResponse = resource
.header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
.accept(entities.mimeType).type(MediaType.TEXT_XML)
@@ -962,9 +976,8 @@ public class FalconClient extends AbstractFalconClient {
return sendMetadataLineageRequest(MetadataOperations.EDGES, id);
}
- public APIResult submitRecipe(String recipeName,
- String recipeToolClassName,
- final String recipeOperation) throws FalconCLIException {
+ public APIResult submitRecipe(String recipeName, String recipeToolClassName,
+ final String recipeOperation, Boolean skipDryRun) throws FalconCLIException {
String recipePath = clientProperties.getProperty("falcon.recipe.path");
if (StringUtils.isEmpty(recipePath)) {
@@ -1010,8 +1023,8 @@ public class FalconClient extends AbstractFalconClient {
} else {
RecipeTool.main(args);
}
- validate(EntityType.PROCESS.toString(), processFile);
- return submitAndSchedule(EntityType.PROCESS.toString(), processFile);
+ validate(EntityType.PROCESS.toString(), processFile, skipDryRun);
+ return submitAndSchedule(EntityType.PROCESS.toString(), processFile, skipDryRun);
} catch (Exception e) {
throw new FalconCLIException(e.getMessage(), e);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
index 07fafb5..4d45cc7 100644
--- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
+++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
@@ -49,7 +49,7 @@ public abstract class AbstractWorkflowEngine {
public abstract boolean isAlive(Cluster cluster) throws FalconException;
- public abstract void schedule(Entity entity) throws FalconException;
+ public abstract void schedule(Entity entity, Boolean skipDryRun) throws FalconException;
public abstract String suspend(Entity entity) throws FalconException;
@@ -61,7 +61,7 @@ public abstract class AbstractWorkflowEngine {
public abstract void reRun(String cluster, String wfId, Properties props, boolean isForced) throws FalconException;
- public abstract void dryRun(Entity entity, String clusterName) throws FalconException;
+ public abstract void dryRun(Entity entity, String clusterName, Boolean skipDryRun) throws FalconException;
public abstract boolean isActive(Entity entity) throws FalconException;
@@ -88,9 +88,10 @@ public abstract class AbstractWorkflowEngine {
public abstract InstancesSummaryResult getSummary(Entity entity, Date start, Date end,
List<LifeCycle> lifeCycles) throws FalconException;
- public abstract String update(Entity oldEntity, Entity newEntity, String cluster) throws FalconException;
+ public abstract String update(Entity oldEntity, Entity newEntity,
+ String cluster, Boolean skipDryRun) throws FalconException;
- public abstract String touch(Entity entity, String cluster) throws FalconException;
+ public abstract String touch(Entity entity, String cluster, Boolean skipDryRun) throws FalconException;
public abstract String getWorkflowStatus(String cluster, String jobId) throws FalconException;
http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 2f3dc6f..7e6cd6c 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -112,6 +112,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
Arrays.asList(Job.Status.SUSPENDED, Job.Status.PREPSUSPENDED);
private static final String FALCON_INSTANCE_ACTION_CLUSTERS = "falcon.instance.action.clusters";
private static final String FALCON_INSTANCE_SOURCE_CLUSTERS = "falcon.instance.source.clusters";
+ private static final String FALCON_SKIP_DRYRUN = "falcon.skip.dryrun";
private static final int WORKFLOW_STATUS_RETRY_DELAY_MS = 100; // milliseconds
private static final String WORKFLOW_STATUS_RETRY_COUNT = "workflow.status.retry.count";
@@ -142,7 +143,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
@Override
- public void schedule(Entity entity) throws FalconException {
+ public void schedule(Entity entity, Boolean skipDryRun) throws FalconException {
Map<String, BundleJob> bundleMap = findLatestBundle(entity);
List<String> schedClusters = new ArrayList<String>();
for (Map.Entry<String, BundleJob> entry : bundleMap.entrySet()) {
@@ -168,7 +169,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
//Do dryRun of coords before schedule as schedule is asynchronous
- dryRunInternal(cluster, new Path(properties.getProperty(OozieEntityBuilder.ENTITY_PATH)));
+ dryRunInternal(cluster, new Path(properties.getProperty(OozieEntityBuilder.ENTITY_PATH)), skipDryRun);
scheduleEntity(clusterName, properties, entity);
}
}
@@ -196,18 +197,28 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
@Override
- public void dryRun(Entity entity, String clusterName) throws FalconException {
+ public void dryRun(Entity entity, String clusterName, Boolean skipDryRun) throws FalconException {
OozieEntityBuilder builder = OozieEntityBuilder.get(entity);
Path buildPath = new Path("/tmp", "falcon" + entity.getName() + System.currentTimeMillis());
Cluster cluster = STORE.get(EntityType.CLUSTER, clusterName);
Properties props = builder.build(cluster, buildPath);
if (props != null) {
- dryRunInternal(cluster, new Path(props.getProperty(OozieEntityBuilder.ENTITY_PATH)));
+ dryRunInternal(cluster, new Path(props.getProperty(OozieEntityBuilder.ENTITY_PATH)), skipDryRun);
}
}
+ private void dryRunInternal(Cluster cluster, Path buildPath, Boolean skipDryRun) throws FalconException {
+ if (null != skipDryRun && skipDryRun) {
+ LOG.info("Skipping dryrun as directed by param in cli/RestApi.");
+ return;
+ } else {
+ String skipDryRunStr = RuntimeProperties.get().getProperty(FALCON_SKIP_DRYRUN, "false").toLowerCase();
+ if (Boolean.valueOf(skipDryRunStr)) {
+ LOG.info("Skipping dryrun as directed by Runtime properties.");
+ return;
+ }
+ }
- private void dryRunInternal(Cluster cluster, Path buildPath) throws FalconException {
BUNDLEAPP bundle = OozieBundleBuilder.unmarshal(cluster, buildPath);
OozieClient client = OozieClientFactory.get(cluster.getName());
for (COORDINATOR coord : bundle.getCoordinator()) {
@@ -322,7 +333,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
return Collections.max(bundles, new Comparator<BundleJob>() {
- @Override public int compare(BundleJob o1, BundleJob o2) {
+ @Override
+ public int compare(BundleJob o1, BundleJob o2) {
return o1.getCreatedTime().compareTo(o2.getCreatedTime());
}
});
@@ -683,7 +695,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
for (WorkflowAction action : wfActions) {
if (action.getType().equalsIgnoreCase("sub-workflow") && StringUtils.isNotEmpty(action.getExternalId())) {
// if the action is sub-workflow, get job urls of all actions within the sub-workflow
- List<WorkflowAction> subWorkFlowActions = getWorkflowInfo(cluster, action.getExternalId()).getActions();
+ List<WorkflowAction> subWorkFlowActions = getWorkflowInfo(cluster,
+ action.getExternalId()).getActions();
for (WorkflowAction subWfAction : subWorkFlowActions) {
if (!subWfAction.getType().startsWith(":")) {
InstancesResult.InstanceAction instanceAction =
@@ -1039,7 +1052,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
@Override
- public String update(Entity oldEntity, Entity newEntity, String cluster) throws FalconException {
+ public String update(Entity oldEntity, Entity newEntity,
+ String cluster, Boolean skipDryRun) throws FalconException {
BundleJob bundle = findLatestBundle(oldEntity, cluster);
boolean entityUpdated = false;
@@ -1068,26 +1082,27 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
return getUpdateString(newEntity, new Date(), bundle, bundle);
}
- LOG.debug("Going to update! : {} for cluster {}, bundle: {}", newEntity.toShortString(), cluster, bundle
- .getId());
+ LOG.debug("Going to update! : {} for cluster {}, bundle: {}",
+ newEntity.toShortString(), cluster, bundle.getId());
result.append(updateInternal(oldEntity, newEntity, clusterEntity, bundle,
- CurrentUser.getUser())).append("\n");
+ CurrentUser.getUser(), skipDryRun)).append("\n");
LOG.info("Entity update complete: {} for cluster {}, bundle: {}", newEntity.toShortString(), cluster,
bundle.getId());
}
- result.append(updateDependents(clusterEntity, oldEntity, newEntity));
+ result.append(updateDependents(clusterEntity, oldEntity, newEntity, skipDryRun));
return result.toString();
}
@Override
- public String touch(Entity entity, String cluster) throws FalconException {
+ public String touch(Entity entity, String cluster, Boolean skipDryRun) throws FalconException {
BundleJob bundle = findLatestBundle(entity, cluster);
Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER, cluster);
StringBuilder result = new StringBuilder();
if (bundle != MISSING) {
- LOG.info("Updating entity {} for cluster: {}, bundle: {}", entity.toShortString(), cluster, bundle.getId());
- String output = updateInternal(entity, entity, clusterEntity, bundle, CurrentUser.getUser());
+ LOG.info("Updating entity {} for cluster: {}, bundle: {}",
+ entity.toShortString(), cluster, bundle.getId());
+ String output = updateInternal(entity, entity, clusterEntity, bundle, CurrentUser.getUser(), skipDryRun);
result.append(output).append("\n");
LOG.info("Entity update complete: {} for cluster {}, bundle: {}", entity.toShortString(), cluster,
bundle.getId());
@@ -1124,7 +1139,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
return builder.toString();
}
- private String updateDependents(Cluster cluster, Entity oldEntity, Entity newEntity) throws FalconException {
+ private String updateDependents(Cluster cluster, Entity oldEntity,
+ Entity newEntity, Boolean skipDryRun) throws FalconException {
//Update affected entities
Set<Entity> affectedEntities = EntityGraph.get().getDependents(oldEntity);
StringBuilder result = new StringBuilder();
@@ -1147,7 +1163,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
LOG.info("Triggering update for {}, {}", cluster, affectedProcBundle.getId());
result.append(updateInternal(affectedEntity, affectedEntity, cluster, affectedProcBundle,
- affectedProcBundle.getUser())).append("\n");
+ affectedProcBundle.getUser(), skipDryRun)).append("\n");
LOG.info("Entity update complete: {} for cluster {}, bundle: {}",
affectedEntity.toShortString(), cluster, affectedProcBundle.getId());
}
@@ -1178,7 +1194,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
return null;
}
- private void updateCoords(String cluster, BundleJob bundle, int concurrency, Date endTime) throws FalconException {
+ private void updateCoords(String cluster, BundleJob bundle,
+ int concurrency, Date endTime) throws FalconException {
if (endTime.compareTo(now()) <= 0) {
throw new FalconException("End time " + SchemaHelper.formatDateUTC(endTime) + " can't be in the past");
}
@@ -1217,14 +1234,14 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
private String updateInternal(Entity oldEntity, Entity newEntity, Cluster cluster, BundleJob oldBundle,
- String user) throws FalconException {
+ String user, Boolean skipDryRun) throws FalconException {
String clusterName = cluster.getName();
Date effectiveTime = getEffectiveTime(cluster, newEntity);
LOG.info("Effective time " + effectiveTime);
//Validate that new entity can be scheduled
- dryRunForUpdate(cluster, newEntity, effectiveTime);
+ dryRunForUpdate(cluster, newEntity, effectiveTime, skipDryRun);
boolean suspended = BUNDLE_SUSPENDED_STATUS.contains(oldBundle.getStatus());
@@ -1256,11 +1273,12 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
return EntityUtil.getNextStartTime(newEntity, cluster, effectiveTime);
}
- private void dryRunForUpdate(Cluster cluster, Entity entity, Date startTime) throws FalconException {
+ private void dryRunForUpdate(Cluster cluster, Entity entity, Date startTime,
+ Boolean skipDryRun) throws FalconException {
Entity clone = entity.copy();
EntityUtil.setStartDate(clone, cluster.getName(), startTime);
try {
- dryRun(clone, cluster.getName());
+ dryRun(clone, cluster.getName(), skipDryRun);
} catch (FalconException e) {
throw new FalconException("The new entity " + entity.toShortString() + " can't be scheduled", e);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index f2f9826..78964dd 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -206,7 +206,7 @@ public abstract class AbstractEntityManager {
* @param type entity type
* @return APIResule -Succeeded or Failed
*/
- public APIResult validate(HttpServletRequest request, String type) {
+ public APIResult validate(HttpServletRequest request, String type, Boolean skipDryRun) {
try {
EntityType entityType = EntityType.getEnum(type);
Entity entity = deserializeEntity(request, entityType);
@@ -217,7 +217,7 @@ public abstract class AbstractEntityManager {
Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
for (String cluster : clusters) {
try {
- getWorkflowEngine().dryRun(entity, cluster);
+ getWorkflowEngine().dryRun(entity, cluster, skipDryRun);
} catch (FalconException e) {
throw new FalconException("dryRun failed on cluster " + cluster, e);
}
@@ -267,7 +267,8 @@ public abstract class AbstractEntityManager {
}
}
- public APIResult update(HttpServletRequest request, String type, String entityName, String colo) {
+ public APIResult update(HttpServletRequest request, String type, String entityName,
+ String colo, Boolean skipDryRun) {
checkColo(colo);
List<Entity> tokenList = null;
try {
@@ -292,7 +293,7 @@ public abstract class AbstractEntityManager {
oldClusters.removeAll(newClusters); //deleted clusters
for (String cluster : newClusters) {
- result.append(getWorkflowEngine().update(oldEntity, newEntity, cluster));
+ result.append(getWorkflowEngine().update(oldEntity, newEntity, cluster, skipDryRun));
}
for (String cluster : oldClusters) {
getWorkflowEngine().delete(oldEntity, cluster);
http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
index e38749a..5b415a2 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
@@ -66,10 +66,11 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
public APIResult schedule(
@Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") String type,
@Dimension("entityName") @PathParam("entity") String entity,
- @Dimension("colo") @PathParam("colo") String colo) {
+ @Dimension("colo") @PathParam("colo") String colo,
+ @QueryParam("skipDryRun") Boolean skipDryRun) {
checkColo(colo);
try {
- scheduleInternal(type, entity);
+ scheduleInternal(type, entity, skipDryRun);
return new APIResult(APIResult.Status.SUCCEEDED, entity + "(" + type + ") scheduled successfully");
} catch (Throwable e) {
LOG.error("Unable to schedule workflow", e);
@@ -77,7 +78,7 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
}
}
- private synchronized void scheduleInternal(String type, String entity)
+ private synchronized void scheduleInternal(String type, String entity, Boolean skipDryRun)
throws FalconException, AuthorizationException {
checkSchedulableEntity(type);
@@ -90,7 +91,7 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
+ entityObj.toShortString());
}
LOG.info("Memory lock obtained for {} by {}", entityObj.toShortString(), Thread.currentThread().getName());
- getWorkflowEngine().schedule(entityObj);
+ getWorkflowEngine().schedule(entityObj, skipDryRun);
} catch (Exception e) {
throw new FalconException("Entity schedule failed for " + type + ": " + entity, e);
} finally {
@@ -110,12 +111,13 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
*/
public APIResult submitAndSchedule(
@Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") String type,
- @Dimension("colo") @PathParam("colo") String colo) {
+ @Dimension("colo") @PathParam("colo") String colo,
+ @QueryParam("skipDryRun") Boolean skipDryRun) {
checkColo(colo);
try {
checkSchedulableEntity(type);
Entity entity = submitInternal(request, type);
- scheduleInternal(type, entity.getName());
+ scheduleInternal(type, entity.getName(), skipDryRun);
return new APIResult(APIResult.Status.SUCCEEDED,
entity.getName() + "(" + type + ") scheduled successfully");
} catch (Throwable e) {
@@ -265,14 +267,15 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
*/
public APIResult touch(@Dimension("entityType") @PathParam("type") String type,
@Dimension("entityName") @PathParam("entity") String entityName,
- @Dimension("colo") @QueryParam("colo") String colo) {
+ @Dimension("colo") @QueryParam("colo") String colo,
+ @QueryParam("skipDryRun") Boolean skipDryRun) {
checkColo(colo);
StringBuilder result = new StringBuilder();
try {
Entity entity = EntityUtil.getEntity(type, entityName);
Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
for (String cluster : clusters) {
- result.append(getWorkflowEngine().touch(entity, cluster));
+ result.append(getWorkflowEngine().touch(entity, cluster, skipDryRun));
}
} catch (Throwable e) {
LOG.error("Touch failed", e);
http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
index d22e8a3..ceabb06 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
@@ -174,7 +174,8 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
@Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
@Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
@Override
- public APIResult validate(@Context final HttpServletRequest request, @PathParam("type") final String type) {
+ public APIResult validate(@Context final HttpServletRequest request, @PathParam("type") final String type,
+ @QueryParam("skipDryRun") final Boolean skipDryRun) {
final HttpServletRequest bufferedRequest = getBufferedRequest(request);
EntityType entityType = EntityType.getEnum(type);
final Entity entity;
@@ -192,7 +193,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
@Override
protected APIResult doExecute(String colo) throws FalconException {
- return getEntityManager(colo).invoke("validate", bufferedRequest, type);
+ return getEntityManager(colo).invoke("validate", bufferedRequest, type, skipDryRun);
}
}.execute();
}
@@ -245,7 +246,8 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
public APIResult update(
@Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") final String type,
@Dimension("entityName") @PathParam("entity") final String entityName,
- @Dimension("colo") @QueryParam("colo") String ignore) {
+ @Dimension("colo") @QueryParam("colo") String ignore,
+ @QueryParam("skipDryRun") final Boolean skipDryRun) {
final HttpServletRequest bufferedRequest = new BufferedRequest(request);
final Set<String> oldColos = getApplicableColos(type, entityName);
@@ -281,7 +283,8 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
@Override
protected APIResult doExecute(String colo) throws FalconException {
- return getConfigSyncChannel(colo).invoke("update", bufferedRequest, type, entityName, colo);
+ return getConfigSyncChannel(colo).invoke("update", bufferedRequest, type, entityName,
+ colo, skipDryRun);
}
}.execute());
}
@@ -308,7 +311,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
// update only if all are updated
if (!embeddedMode && result) {
- results.put(PRISM_TAG, super.update(bufferedRequest, type, entityName, currentColo));
+ results.put(PRISM_TAG, super.update(bufferedRequest, type, entityName, currentColo, skipDryRun));
}
return consolidateResult(results, APIResult.class);
@@ -322,7 +325,8 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
public APIResult touch(
@Dimension("entityType") @PathParam("type") final String type,
@Dimension("entityName") @PathParam("entity") final String entityName,
- @Dimension("colo") @QueryParam("colo") final String coloExpr) {
+ @Dimension("colo") @QueryParam("colo") final String coloExpr,
+ @QueryParam("skipDryRun") final Boolean skipDryRun) {
final Set<String> colosFromExp = getColosFromExpression(coloExpr, type, entityName);
return new EntityProxy(type, entityName) {
@Override
@@ -332,7 +336,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
@Override
protected APIResult doExecute(String colo) throws FalconException {
- return getEntityManager(colo).invoke("touch", type, entityName, colo);
+ return getEntityManager(colo).invoke("touch", type, entityName, colo, skipDryRun);
}
}.execute();
}
@@ -384,7 +388,8 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
public APIResult schedule(@Context final HttpServletRequest request,
@Dimension("entityType") @PathParam("type") final String type,
@Dimension("entityName") @PathParam("entity") final String entity,
- @Dimension("colo") @QueryParam("colo") final String coloExpr) {
+ @Dimension("colo") @QueryParam("colo") final String coloExpr,
+ @QueryParam("skipDryRun") final Boolean skipDryRun) {
final HttpServletRequest bufferedRequest = getBufferedRequest(request);
return new EntityProxy(type, entity) {
@@ -395,7 +400,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
@Override
protected APIResult doExecute(String colo) throws FalconException {
- return getEntityManager(colo).invoke("schedule", bufferedRequest, type, entity, colo);
+ return getEntityManager(colo).invoke("schedule", bufferedRequest, type, entity, colo, skipDryRun);
}
}.execute();
}
@@ -408,12 +413,13 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
@Override
public APIResult submitAndSchedule(
@Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") String type,
- @Dimension("colo") @QueryParam("colo") String coloExpr) {
+ @Dimension("colo") @QueryParam("colo") String coloExpr,
+ @QueryParam("skipDryRun") Boolean skipDryRun) {
BufferedRequest bufferedRequest = new BufferedRequest(request);
String entity = getEntity(bufferedRequest, type).getName();
Map<String, APIResult> results = new HashMap<String, APIResult>();
results.put("submit", submit(bufferedRequest, type, coloExpr));
- results.put("schedule", schedule(bufferedRequest, type, entity, coloExpr));
+ results.put("schedule", schedule(bufferedRequest, type, entity, coloExpr, skipDryRun));
return consolidateResult(results, APIResult.class);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java b/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java
index be1fe1f..cce8737 100644
--- a/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java
+++ b/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java
@@ -95,8 +95,7 @@ public class EntityManagerTest extends AbstractEntityManager {
invalidProcessXML);
try {
- validate(mockHttpServletRequest,
- EntityType.PROCESS.name());
+ validate(mockHttpServletRequest, EntityType.PROCESS.name(), false);
Assert.fail("Invalid entity type was accepted by the system");
} catch (FalconWebException ignore) {
// ignore
@@ -110,8 +109,7 @@ public class EntityManagerTest extends AbstractEntityManager {
invalidProcessXML);
try {
- validate(mockHttpServletRequest,
- "InvalidEntityType");
+ validate(mockHttpServletRequest, "InvalidEntityType", false);
Assert.fail("Invalid entity type was accepted by the system");
} catch (FalconWebException ignore) {
// ignore
http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
index e898fc3..eb65cb3 100644
--- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
+++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
@@ -118,8 +118,9 @@ public class FalconUnitClient extends AbstractFalconClient {
* @throws FalconException
*/
@Override
- public APIResult schedule(EntityType entityType, String entityName, String cluster) throws FalconCLIException {
- return schedule(entityType, entityName, null, 0, cluster);
+ public APIResult schedule(EntityType entityType, String entityName,
+ String cluster, Boolean skipDryRun) throws FalconCLIException {
+ return schedule(entityType, entityName, null, 0, cluster, skipDryRun);
}
@@ -133,7 +134,7 @@ public class FalconUnitClient extends AbstractFalconClient {
* @return boolean
*/
public APIResult schedule(EntityType entityType, String entityName, String startTime, int numInstances,
- String cluster) throws FalconCLIException {
+ String cluster, Boolean skipDryRun) throws FalconCLIException {
try {
FalconUnitHelper.checkSchedulableEntity(entityType.toString());
Entity entity = EntityUtil.getEntity(entityType, entityName);
@@ -146,7 +147,7 @@ public class FalconUnitClient extends AbstractFalconClient {
if (StringUtils.isNotEmpty(startTime) && entityType == EntityType.PROCESS) {
updateStartAndEndTime((Process) entity, startTime, numInstances, cluster);
}
- workflowEngine.schedule(entity);
+ workflowEngine.schedule(entity, skipDryRun);
LOG.info(entityName + " is scheduled successfully");
return new APIResult(APIResult.Status.SUCCEEDED, entity + "(" + "PROCESS" + ") scheduled successfully");
} catch (FalconException e) {
http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
index 9f00d94..997b301 100644
--- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
+++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
@@ -147,7 +147,7 @@ public class FalconUnitTestBase {
}
public APIResult scheduleProcess(String processName, String startTime, int numInstances,
- String cluster, String localWfPath) throws FalconException,
+ String cluster, String localWfPath, Boolean skipDryRun) throws FalconException,
IOException, FalconCLIException {
Process processEntity = configStore.get(EntityType.PROCESS, processName);
if (processEntity == null) {
@@ -155,16 +155,16 @@ public class FalconUnitTestBase {
}
String workflowPath = processEntity.getWorkflow().getPath();
fs.copyFromLocalFile(new Path(localWfPath), new Path(workflowPath));
- return falconUnitClient.schedule(EntityType.PROCESS, processName, startTime, numInstances, cluster);
+ return falconUnitClient.schedule(EntityType.PROCESS, processName, startTime, numInstances, cluster, skipDryRun);
}
public APIResult scheduleProcess(String processName, String startTime, int numInstances,
- String cluster) throws FalconException, FalconCLIException {
+ String cluster, Boolean skipDryRun) throws FalconException, FalconCLIException {
Process processEntity = configStore.get(EntityType.PROCESS, processName);
if (processEntity == null) {
throw new FalconException("Process not found " + processName);
}
- return falconUnitClient.schedule(EntityType.PROCESS, processName, startTime, numInstances, cluster);
+ return falconUnitClient.schedule(EntityType.PROCESS, processName, startTime, numInstances, cluster, skipDryRun);
}
private Map<String, String> updateColoAndCluster(String colo, String cluster, Map<String, String> props) {
http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
index 57b7b1b..498f50e 100644
--- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
+++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
@@ -44,7 +44,7 @@ public class TestFalconUnit extends FalconUnitTestBase {
createData("in", "local", scheduleTime, "input.txt");
result = submitProcess(getAbsolutePath("/process.xml"), "/app/oozie-mr");
assertStatus(result);
- result = scheduleProcess("process", scheduleTime, 1, "local", getAbsolutePath("/workflow.xml"));
+ result = scheduleProcess("process", scheduleTime, 1, "local", getAbsolutePath("/workflow.xml"), true);
assertStatus(result);
waitForStatus(EntityType.PROCESS, "process", scheduleTime);
InstancesResult.WorkflowStatus status = falconUnitClient.getInstanceStatus(EntityType.PROCESS,
http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java b/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java
index 3bd625c..bf538dc 100644
--- a/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java
+++ b/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java
@@ -70,7 +70,8 @@ public class ConfigSyncService extends AbstractEntityManager {
public APIResult update(@Context HttpServletRequest request,
@Dimension("entityType") @PathParam("type") String type,
@Dimension("entityName") @PathParam("entity") String entityName,
- @Dimension("colo") @QueryParam("colo") String colo) {
- return super.update(request, type, entityName, colo);
+ @Dimension("colo") @QueryParam("colo") String colo,
+ @QueryParam("skipDryRun") Boolean skipDryRun) {
+ return super.update(request, type, entityName, colo, skipDryRun);
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
index a2af0cd..1f8cc1b 100644
--- a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
+++ b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
@@ -126,8 +126,9 @@ public class SchedulableEntityManager extends AbstractSchedulableEntityManager {
public APIResult schedule(@Context HttpServletRequest request,
@Dimension("entityType") @PathParam("type") String type,
@Dimension("entityName") @PathParam("entity") String entity,
- @Dimension("colo") @QueryParam("colo") String colo) {
- return super.schedule(request, type, entity, colo);
+ @Dimension("colo") @QueryParam("colo") String colo,
+ @QueryParam("skipDryRun") Boolean skipDryRun) {
+ return super.schedule(request, type, entity, colo, skipDryRun);
}
@POST
@@ -160,8 +161,9 @@ public class SchedulableEntityManager extends AbstractSchedulableEntityManager {
@Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
@Monitored(event = "validate")
@Override
- public APIResult validate(@Context HttpServletRequest request, @PathParam("type") String type) {
- return super.validate(request, type);
+ public APIResult validate(@Context HttpServletRequest request, @PathParam("type") String type,
+ @QueryParam("skipDryRun") Boolean skipDryRun) {
+ return super.validate(request, type, skipDryRun);
}
@POST
@@ -171,8 +173,9 @@ public class SchedulableEntityManager extends AbstractSchedulableEntityManager {
@Override
public APIResult touch(@Dimension("entityType") @PathParam("type") String type,
@Dimension("entityName") @PathParam("entity") String entityName,
- @Dimension("colo") @QueryParam("colo") String colo) {
- return super.touch(type, entityName, colo);
+ @Dimension("colo") @QueryParam("colo") String colo,
+ @QueryParam("skipDryRun") Boolean skipDryRun) {
+ return super.touch(type, entityName, colo, skipDryRun);
}
@GET
http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
index e328d69..0062070 100644
--- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
@@ -184,6 +184,33 @@ public class FalconCLIIT {
}
+ public void testSkipDryRunValidCommands() throws Exception {
+ TestContext context = new TestContext();
+ Map<String, String> overlay = context.getUniqueOverlay();
+ submitTestFiles(context, overlay);
+
+ Assert.assertEquals(
+ executeWithURL("entity -schedule -skipDryRun -type cluster -name " + overlay.get("cluster")), -1);
+
+ Assert.assertEquals(
+ executeWithURL("entity -schedule -type feed -name " + overlay.get("outputFeedName")), 0);
+
+ Assert.assertEquals(
+ executeWithURL("entity -schedule -type process -skipDryRun -name " + overlay.get("processName")), 0);
+
+ Assert.assertEquals(0,
+ executeWithURL("entity -touch -skipDryRun -name " + overlay.get("processName") + " -type process"));
+
+ String filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
+ Assert.assertEquals(
+ executeWithURL("entity -submitAndSchedule -skipDryRun -type feed -file " + filePath), 0);
+
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
+ Assert.assertEquals(
+ executeWithURL("entity -validate -skipDryRun -type process -file " + filePath), 0);
+
+ }
+
public void testSuspendResumeStatusEntityValidCommands() throws Exception {
TestContext context = new TestContext();
http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
index c602ffb..f0cee61 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
@@ -131,7 +131,8 @@ public class EntityManagerJerseyIT {
context.assertSuccessful(response);
}
- private ClientResponse update(TestContext context, Entity entity, Date endTime) throws Exception {
+ private ClientResponse update(TestContext context, Entity entity,
+ Date endTime, Boolean skipDryRun) throws Exception {
File tmpFile = TestContext.getTempFile();
entity.getEntityType().getMarshaller().marshal(entity, tmpFile);
WebResource resource = context.service.path("api/entities/update/"
@@ -139,19 +140,24 @@ public class EntityManagerJerseyIT {
if (endTime != null) {
resource = resource.queryParam("effective", SchemaHelper.formatDateUTC(endTime));
}
+ if (null != skipDryRun) {
+ resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun));
+ }
return resource.header("Cookie", context.getAuthenticationToken())
.accept(MediaType.TEXT_XML)
.post(ClientResponse.class, context.getServletInputStream(tmpFile.getAbsolutePath()));
}
- private ClientResponse touch(TestContext context, Entity entity) {
+ private ClientResponse touch(TestContext context, Entity entity, Boolean skipDryRun) {
WebResource resource = context.service.path("api/entities/touch/"
+ entity.getEntityType().name().toLowerCase() + "/" + entity.getName());
- ClientResponse clientResponse = resource
+ if (null != skipDryRun) {
+ resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun));
+ }
+ return resource
.header("Cookie", context.getAuthenticationToken())
.accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML)
.post(ClientResponse.class);
- return clientResponse;
}
@Test
@@ -174,7 +180,7 @@ public class EntityManagerJerseyIT {
//change output feed path and update feed as another user
feed.getLocations().getLocations().get(0).setPath("/falcon/test/output2/${YEAR}/${MONTH}/${DAY}");
- ClientResponse response = update(context, feed, null);
+ ClientResponse response = update(context, feed, null, false);
context.assertSuccessful(response);
bundles = OozieTestUtils.getBundles(context);
@@ -222,7 +228,7 @@ public class EntityManagerJerseyIT {
}
public void testDryRun() throws Exception {
- //Schedule of invalid process should fail because of dryRun
+ //Schedule of invalid process should fail because of dryRun, and should pass when dryrun is skipped
TestContext context = newContext();
Map<String, String> overlay = context.getUniqueOverlay();
String tmpFileName = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
@@ -237,7 +243,7 @@ public class EntityManagerJerseyIT {
ClientResponse response = context.validate(tmpFile.getAbsolutePath(), overlay, EntityType.PROCESS);
context.assertFailure(response);
- context.scheduleProcess(tmpFile.getAbsolutePath(), overlay, false);
+ context.scheduleProcess(tmpFile.getAbsolutePath(), overlay, false, null);
//Fix the process and then submitAndSchedule should succeed
Iterator<Property> itr = process.getProperties().getProperties().iterator();
@@ -256,8 +262,13 @@ public class EntityManagerJerseyIT {
//Update with invalid property should fail again
process.getProperties().getProperties().add(prop);
updateEndtime(process);
- response = update(context, process, null);
+ response = update(context, process, null, null);
context.assertFailure(response);
+
+ // update where dryrun is disabled should succeed.
+ response = update(context, process, null, true);
+ context.assertSuccessful(response);
+
}
@Test
@@ -274,7 +285,7 @@ public class EntityManagerJerseyIT {
process.getProperties().getProperties().get(0).setName("newprop");
Date endTime = getEndTime();
process.getClusters().getClusters().get(0).getValidity().setEnd(endTime);
- response = update(context, process, endTime);
+ response = update(context, process, endTime, null);
context.assertSuccessful(response);
//Since the process endtime = update effective time, it shouldn't create new bundle
@@ -315,7 +326,7 @@ public class EntityManagerJerseyIT {
updateEndtime(process);
Date endTime = getEndTime();
- response = update(context, process, endTime);
+ response = update(context, process, endTime, null);
context.assertSuccessful(response);
//Assert that update creates new bundle and old coord is running
@@ -349,7 +360,7 @@ public class EntityManagerJerseyIT {
Process process = (Process) getDefinition(context, EntityType.PROCESS, context.processName);
updateEndtime(process);
- ClientResponse response = update(context, process, null);
+ ClientResponse response = update(context, process, null, null);
context.assertSuccessful(response);
//Assert that update does not create new bundle
@@ -371,13 +382,13 @@ public class EntityManagerJerseyIT {
//Update end time of process required for touch
Process process = (Process) getDefinition(context, EntityType.PROCESS, context.processName);
updateEndtime(process);
- ClientResponse response = update(context, process, null);
+ ClientResponse response = update(context, process, null, null);
context.assertSuccessful(response);
bundles = OozieTestUtils.getBundles(context);
Assert.assertEquals(bundles.size(), 1);
//Calling force update
- response = touch(context, process);
+ response = touch(context, process, true);
context.assertSuccessful(response);
OozieTestUtils.waitForBundleStart(context, Status.PREP, Status.RUNNING);
@@ -871,7 +882,7 @@ public class EntityManagerJerseyIT {
Date endTime = getEndTime();
ExecutorService service = Executors.newSingleThreadExecutor();
Future<ClientResponse> future = service.submit(new UpdateCommand(context, process, endTime));
- response = update(context, process, endTime);
+ response = update(context, process, endTime, false);
ClientResponse duplicateUpdateThreadResponse = future.get();
// since there are duplicate threads for updates, there is no guarantee which request will succeed
@@ -918,7 +929,7 @@ public class EntityManagerJerseyIT {
@Override
public ClientResponse call() throws Exception {
- return update(context, process, endTime);
+ return update(context, process, endTime, false);
}
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
index 7b227b3..4a25b88 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
@@ -230,10 +230,11 @@ public class TestContext {
}
public void scheduleProcess(String processTemplate, Map<String, String> overlay) throws Exception {
- scheduleProcess(processTemplate, overlay, true);
+ scheduleProcess(processTemplate, overlay, true, null);
}
- public void scheduleProcess(String processTemplate, Map<String, String> overlay, boolean succeed) throws Exception {
+ public void scheduleProcess(String processTemplate, Map<String, String> overlay,
+ boolean succeed, Boolean skipDryRun) throws Exception {
ClientResponse response = submitToFalcon(CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
assertSuccessful(response);
@@ -243,7 +244,7 @@ public class TestContext {
response = submitToFalcon(FEED_TEMPLATE2, overlay, EntityType.FEED);
assertSuccessful(response);
- response = submitAndSchedule(processTemplate, overlay, EntityType.PROCESS);
+ response = submitAndSchedule(processTemplate, overlay, EntityType.PROCESS, skipDryRun);
if (succeed) {
assertSuccessful(response);
} else {
@@ -278,10 +279,20 @@ public class TestContext {
public ClientResponse submitAndSchedule(String template, Map<String, String> overlay, EntityType entityType)
throws Exception {
+ return submitAndSchedule(template, overlay, entityType, null);
+ }
+
+ public ClientResponse submitAndSchedule(String template, Map<String, String> overlay,
+ EntityType entityType, Boolean skipDryRun)
+ throws Exception {
String tmpFile = overlayParametersOverTemplate(template, overlay);
ServletInputStream rawlogStream = getServletInputStream(tmpFile);
- return this.service.path("api/entities/submitAndSchedule/" + entityType.name().toLowerCase())
+ WebResource resource = service.path("api/entities/submitAndSchedule/" + entityType.name().toLowerCase());
+ if (null != skipDryRun) {
+ resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun));
+ }
+ return resource
.header("Cookie", getAuthenticationToken())
.accept(MediaType.TEXT_XML)
.type(MediaType.TEXT_XML)
@@ -290,10 +301,20 @@ public class TestContext {
public ClientResponse validate(String template, Map<String, String> overlay, EntityType entityType)
throws Exception {
+ return validate(template, overlay, entityType, null);
+ }
+
+ public ClientResponse validate(String template, Map<String, String> overlay,
+ EntityType entityType, Boolean skipDryRun)
+ throws Exception {
String tmpFile = overlayParametersOverTemplate(template, overlay);
ServletInputStream rawlogStream = getServletInputStream(tmpFile);
- return this.service.path("api/entities/validate/" + entityType.name().toLowerCase())
+ WebResource resource = service.path("api/entities/validate/" + entityType.name().toLowerCase());
+ if (null != skipDryRun) {
+ resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun));
+ }
+ return resource
.header("Cookie", getAuthenticationToken())
.accept(MediaType.TEXT_XML)
.type(MediaType.TEXT_XML)
[3/3] falcon git commit: FALCON-1038 Log mover fails for map-reduce
action. Contributed by Peeyush Bishnoi.
Posted by aj...@apache.org.
FALCON-1038 Log mover fails for map-reduce action. Contributed by Peeyush Bishnoi.
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/fbb4d314
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/fbb4d314
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/fbb4d314
Branch: refs/heads/0.7
Commit: fbb4d314297e10b723e7acf53a50426e50333037
Parents: 17e2f71
Author: Ajay Yadava <aj...@gmail.com>
Authored: Tue Aug 25 14:00:37 2015 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Tue Aug 25 17:20:45 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../falcon/logging/DefaultTaskLogRetriever.java | 2 +-
.../org/apache/falcon/logging/JobLogMover.java | 47 ++++++++++++--------
.../falcon/logging/TaskLogRetrieverYarn.java | 11 ++++-
4 files changed, 42 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/fbb4d314/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a1054fe..f7e9127 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -91,6 +91,8 @@ Trunk (Unreleased)
(Suhas Vasu)
BUG FIXES
+ FALCON-1038 Log mover fails for map-reduce action(Peeyush Bishnoi via Ajay Yadava)
+
FALCON-1412 Process waits indefinitely and finally timedout even though missing dependencies are met(Pallavi Rao via Ajay Yadava)
FALCON-1409 Update API throws NullPointerException(Sandeep Samudrala via Ajay Yadava)
http://git-wip-us.apache.org/repos/asf/falcon/blob/fbb4d314/oozie/src/main/java/org/apache/falcon/logging/DefaultTaskLogRetriever.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/DefaultTaskLogRetriever.java b/oozie/src/main/java/org/apache/falcon/logging/DefaultTaskLogRetriever.java
index 962f891..82448d8 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/DefaultTaskLogRetriever.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/DefaultTaskLogRetriever.java
@@ -61,7 +61,7 @@ public class DefaultTaskLogRetriever extends Configured implements TaskLogURLRet
}
}
- protected List<String> getFromHistory(String jodId) throws IOException {
+ protected List<String> getFromHistory(String jobId) throws IOException {
return null;
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/fbb4d314/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
index ba669c8..478d68c 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
@@ -91,17 +91,26 @@ public class JobLogMover {
}
}
} else {
- // if process wf with oozie engine
- String subflowId = jobInfo.getExternalId();
- copyOozieLog(client, fs, path, subflowId);
- WorkflowJob subflowInfo = client.getJobInfo(subflowId);
+ String flowId;
+ // if process wf with pig, hive
+ if (context.getUserWorkflowEngine().equals("pig")
+ ||context.getUserWorkflowEngine().equals("hive")) {
+ flowId = jobInfo.getId();
+ } else {
+ // if process wf with oozie engine
+ flowId = jobInfo.getExternalId();
+ }
+ copyOozieLog(client, fs, path, flowId);
+ WorkflowJob subflowInfo = client.getJobInfo(flowId);
List<WorkflowAction> actions = subflowInfo.getActions();
for (WorkflowAction action : actions) {
- if (action.getType().equals("pig")
- || action.getType().equals("java")) {
+ if (isActionTypeSupported(action)) {
+ LOG.info("Copying hadoop TT log for action: {} of type: {}",
+ action.getName(), action.getType());
copyTTlogs(fs, path, action);
} else {
- LOG.info("Ignoring hadoop TT log for non-pig and non-java action: {}", action.getName());
+ LOG.info("Ignoring hadoop TT log for non supported action: {} of type: {}",
+ action.getName(), action.getType());
}
}
}
@@ -114,8 +123,8 @@ public class JobLogMover {
}
private boolean notUserWorkflowEngineIsOozie(String userWorkflowEngine) {
- // userWorkflowEngine will be null for replication and "pig" for pig
- return userWorkflowEngine != null && EngineType.fromValue(userWorkflowEngine) != EngineType.OOZIE;
+ // userWorkflowEngine will be null for replication and "not null" for pig, hive, oozie
+ return userWorkflowEngine != null && EngineType.fromValue(userWorkflowEngine) == null;
}
private void copyOozieLog(OozieClient client, FileSystem fs, Path path,
@@ -134,7 +143,7 @@ public class JobLogMover {
for (String ttLogURL : ttLogUrls) {
LOG.info("Fetching log for action: {} from url: {}", action.getExternalId(), ttLogURL);
InputStream in = getURLinputStream(new URL(ttLogURL));
- OutputStream out = fs.create(new Path(path, action.getName() + "_"
+ OutputStream out = fs.create(new Path(path, action.getName() + "_" + action.getType() + "_"
+ getMappedStatus(action.getStatus()) + "-" + index + ".log"));
IOUtils.copyBytes(in, out, 4096, true);
LOG.info("Copied log to {}", path);
@@ -143,6 +152,13 @@ public class JobLogMover {
}
}
+ private boolean isActionTypeSupported(WorkflowAction action) {
+ return action.getType().equals("pig")
+ || action.getType().equals("hive")
+ || action.getType().equals("java")
+ || action.getType().equals("map-reduce");
+ }
+
private String getMappedStatus(WorkflowAction.Status status) {
if (status == WorkflowAction.Status.FAILED
|| status == WorkflowAction.Status.KILLED
@@ -161,14 +177,9 @@ public class JobLogMover {
@SuppressWarnings("unchecked")
private Class<? extends TaskLogURLRetriever> getLogRetrieverClassName(Configuration conf) {
- try {
- if (YARN.equals(conf.get(MAPREDUCE_FRAMEWORK))) {
- return TaskLogRetrieverYarn.class;
- }
- return (Class<? extends TaskLogURLRetriever>)
- Class.forName("org.apache.falcon.logging.v1.TaskLogRetrieverV1");
- } catch (ClassNotFoundException e) {
- LOG.warn("V1 Retriever missing, falling back to Default retriever");
+ if (YARN.equals(conf.get(MAPREDUCE_FRAMEWORK))) {
+ return TaskLogRetrieverYarn.class;
+ } else {
return DefaultTaskLogRetriever.class;
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/fbb4d314/oozie/src/main/java/org/apache/falcon/logging/TaskLogRetrieverYarn.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/TaskLogRetrieverYarn.java b/oozie/src/main/java/org/apache/falcon/logging/TaskLogRetrieverYarn.java
index 61c5afb..146d53c 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/TaskLogRetrieverYarn.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/TaskLogRetrieverYarn.java
@@ -48,13 +48,22 @@ public class TaskLogRetrieverYarn extends DefaultTaskLogRetriever {
LOG.warn("External id for workflow action is null");
return null;
}
+
+ if (conf.get(YARN_LOG_SERVER_URL) == null) {
+ LOG.warn("YARN log Server is null");
+ return null;
+ }
+
try {
Job job = cluster.getJob(jobID);
if (job != null) {
TaskCompletionEvent[] events = job.getTaskCompletionEvents(0);
for (TaskCompletionEvent event : events) {
LogParams params = cluster.getLogParams(jobID, event.getTaskAttemptId());
- String url = SCHEME + conf.get(YARN_LOG_SERVER_URL) + "/"
+ String url = (conf.get(YARN_LOG_SERVER_URL).startsWith(SCHEME)
+ ? conf.get(YARN_LOG_SERVER_URL)
+ : SCHEME + conf.get(YARN_LOG_SERVER_URL))
+ + "/"
+ event.getTaskTrackerHttp() + "/"
+ params.getContainerId() + "/"
+ params.getApplicationId() + "/"