You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2015/08/25 13:51:20 UTC

[1/3] falcon git commit: FALCON-1382 Add a test for feed retention to make sure that data directory is not deleted. Contributed by Paul Isaychuk.

Repository: falcon
Updated Branches:
  refs/heads/0.7 44ca0bdbc -> fbb4d3142


FALCON-1382 Add a test for feed retention to make sure that data directory is not deleted. Contributed by Paul Isaychuk.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/ba83ad21
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/ba83ad21
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/ba83ad21

Branch: refs/heads/0.7
Commit: ba83ad21fd361cd8f40c68184587307b4c8d9150
Parents: 44ca0bd
Author: Paul Isaychuk <pi...@apache.org>
Authored: Fri Aug 21 12:34:33 2015 +0300
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Tue Aug 25 17:20:30 2015 +0530

----------------------------------------------------------------------
 falcon-regression/CHANGES.txt                                 | 2 ++
 .../org/apache/falcon/regression/prism/RetentionTest.java     | 7 +++++--
 2 files changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/ba83ad21/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index 7ad0f3d..af5e660 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -5,6 +5,8 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+   FALCON-1382 Add a test for feed retention to make sure that data directory is not deleted (Paul Isaychuk)
+
    FALCON-1321 Add Entity Lineage Test (Pragya Mittal via Ajay Yadava)
 
    FALCON-1319 Contribute HiveDr, Mirror tests and some test fixes (Namit Maheshwari, Paul Isaychuk,

http://git-wip-us.apache.org/repos/asf/falcon/blob/ba83ad21/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
index 628dca1..8f45d1c 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
@@ -37,6 +37,7 @@ import org.apache.falcon.regression.core.util.TimeUtil;
 import org.apache.falcon.regression.core.util.Util;
 import org.apache.falcon.regression.testHelper.BaseTestClass;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.log4j.Logger;
 import org.apache.oozie.client.OozieClient;
@@ -141,7 +142,7 @@ public class RetentionTest extends BaseTestClass {
         }
         final DateTime today = new DateTime(DateTimeZone.UTC);
         final List<DateTime> times = TimeUtil.getDatesOnEitherSide(
-            freqType.addTime(today, -36), freqType.addTime(today, 36), skip, freqType);
+            freqType.addTime(today, -36), freqType.addTime(today, -1), skip, freqType);
         final List<String> dataDates = TimeUtil.convertDatesToString(times, freqType.getFormatter());
         LOGGER.info("dataDates = " + dataDates);
         dataDates.add(HadoopUtil.SOMETHING_RANDOM);
@@ -206,6 +207,9 @@ public class RetentionTest extends BaseTestClass {
 
         Assert.assertTrue(Arrays.deepEquals(finalData.toArray(new String[finalData.size()]),
             expectedOutput.toArray(new String[expectedOutput.size()])));
+
+        //check that root directory exists
+        Assert.assertTrue(clusterFS.exists(new Path(testHDFSDir)), "Base data directory should be present.");
     }
 
     /**
@@ -273,7 +277,6 @@ public class RetentionTest extends BaseTestClass {
         return finalData;
     }
 
-
     /**
      * Provides different sets of parameters for retention workflow.
      */


[2/3] falcon git commit: FALCON-1174 Ability to disable oozie dryrun while scheduling or updating the falcon entity. Contributed by Balu Vellanki.

Posted by aj...@apache.org.
FALCON-1174 Ability to disable oozie dryrun while scheduling or updating the falcon entity. Contributed by Balu Vellanki.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/17e2f71c
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/17e2f71c
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/17e2f71c

Branch: refs/heads/0.7
Commit: 17e2f71cd7b1a2681abdec758cc5327dc4c543ea
Parents: ba83ad2
Author: Ajay Yadava <aj...@gmail.com>
Authored: Tue Aug 25 12:41:35 2015 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Tue Aug 25 17:20:45 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../java/org/apache/falcon/cli/FalconCLI.java   | 33 ++++++++---
 .../falcon/client/AbstractFalconClient.java     |  3 +-
 .../org/apache/falcon/client/FalconClient.java  | 55 ++++++++++-------
 .../workflow/engine/AbstractWorkflowEngine.java |  9 +--
 .../workflow/engine/OozieWorkflowEngine.java    | 62 +++++++++++++-------
 .../falcon/resource/AbstractEntityManager.java  |  9 +--
 .../AbstractSchedulableEntityManager.java       | 19 +++---
 .../proxy/SchedulableEntityManagerProxy.java    | 28 +++++----
 .../falcon/resource/EntityManagerTest.java      |  6 +-
 .../apache/falcon/unit/FalconUnitClient.java    |  9 +--
 .../apache/falcon/unit/FalconUnitTestBase.java  |  8 +--
 .../org/apache/falcon/unit/TestFalconUnit.java  |  2 +-
 .../falcon/resource/ConfigSyncService.java      |  5 +-
 .../resource/SchedulableEntityManager.java      | 15 +++--
 .../java/org/apache/falcon/cli/FalconCLIIT.java | 27 +++++++++
 .../falcon/resource/EntityManagerJerseyIT.java  | 41 ++++++++-----
 .../org/apache/falcon/resource/TestContext.java | 31 ++++++++--
 18 files changed, 245 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f31f839..a1054fe 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -13,6 +13,8 @@ Trunk (Unreleased)
     FALCON-796 Enable users to triage data processing issues through falcon (Ajay Yadava)
     
   IMPROVEMENTS
+    FALCON-1174 Ability to disable oozie dryrun while scheduling or updating the falcon entity(Balu Vellanki via Ajay Yadava)
+
     FALCON-1374 Remove the cap on numResults(Pragya Mittal via Ajay Yadava)
 
     FALCON-1379 Doc describes retention incorrectly(Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
index 11f6bff..11dfe72 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
@@ -90,6 +90,7 @@ public class FalconCLI {
     public static final String PATH_OPT = "path";
     public static final String LIST_OPT = "list";
     public static final String TOUCH_OPT = "touch";
+    public static final String SKIPDRYRUN_OPT = "skipDryRun";
 
     public static final String FIELDS_OPT = "fields";
     public static final String FILTER_BY_OPT = "filterBy";
@@ -429,6 +430,11 @@ public class FalconCLI {
         Integer numResults = parseIntegerInput(commandLine.getOptionValue(NUM_RESULTS_OPT),
                 null, "numResults");
         Integer numInstances = parseIntegerInput(commandLine.getOptionValue(NUM_INSTANCES_OPT), 7, "numInstances");
+        Boolean skipDryRun = null;
+        if (optionsList.contains(SKIPDRYRUN_OPT)) {
+            skipDryRun = true;
+        }
+
         EntityType entityTypeEnum = null;
         if (optionsList.contains(LIST_OPT)) {
             if (entityType == null) {
@@ -460,20 +466,19 @@ public class FalconCLI {
             validateNotEmpty(filePath, "file");
             validateColo(optionsList);
             validateNotEmpty(entityName, ENTITY_NAME_OPT);
-            result = client.update(entityType, entityName, filePath).getMessage();
+            result = client.update(entityType, entityName, filePath, skipDryRun).getMessage();
         } else if (optionsList.contains(SUBMIT_AND_SCHEDULE_OPT)) {
             validateNotEmpty(filePath, "file");
             validateColo(optionsList);
-            result =
-                    client.submitAndSchedule(entityType, filePath).getMessage();
+            result = client.submitAndSchedule(entityType, filePath, skipDryRun).getMessage();
         } else if (optionsList.contains(VALIDATE_OPT)) {
             validateNotEmpty(filePath, "file");
             validateColo(optionsList);
-            result = client.validate(entityType, filePath).getMessage();
+            result = client.validate(entityType, filePath, skipDryRun).getMessage();
         } else if (optionsList.contains(SCHEDULE_OPT)) {
             validateNotEmpty(entityName, ENTITY_NAME_OPT);
             colo = getColo(colo);
-            result = client.schedule(entityTypeEnum, entityName, colo).getMessage();
+            result = client.schedule(entityTypeEnum, entityName, colo, skipDryRun).getMessage();
         } else if (optionsList.contains(SUSPEND_OPT)) {
             validateNotEmpty(entityName, ENTITY_NAME_OPT);
             colo = getColo(colo);
@@ -522,7 +527,7 @@ public class FalconCLI {
         } else if (optionsList.contains(TOUCH_OPT)) {
             validateNotEmpty(entityName, ENTITY_NAME_OPT);
             colo = getColo(colo);
-            result = client.touch(entityType, entityName, colo).getMessage();
+            result = client.touch(entityType, entityName, colo, skipDryRun).getMessage();
         } else if (optionsList.contains(HELP_CMD)) {
             OUT.get().println("Falcon Help");
         } else {
@@ -742,6 +747,7 @@ public class FalconCLI {
         Option numInstances = new Option(NUM_INSTANCES_OPT, true,
                 "Number of instances to return per entity summary request");
         Option path = new Option(PATH_OPT, true, "Path for a feed's instance");
+        Option skipDryRun = new Option(SKIPDRYRUN_OPT, false, "skip dry run in workflow engine");
 
         entityOptions.addOption(url);
         entityOptions.addOption(path);
@@ -763,6 +769,7 @@ public class FalconCLI {
         entityOptions.addOption(offset);
         entityOptions.addOption(numResults);
         entityOptions.addOption(numInstances);
+        entityOptions.addOption(skipDryRun);
 
         return entityOptions;
     }
@@ -926,6 +933,9 @@ public class FalconCLI {
         Option recipeOperation = new Option(RECIPE_OPERATION, true, "recipe operation");
         recipeOptions.addOption(recipeOperation);
 
+        Option skipDryRunOperation = new Option(SKIPDRYRUN_OPT, false, "skip dryrun operation");
+        recipeOptions.addOption(skipDryRunOperation);
+
         return recipeOptions;
     }
 
@@ -1015,6 +1025,11 @@ public class FalconCLI {
     }
 
     private void recipeCommand(CommandLine commandLine, FalconClient client) throws FalconCLIException {
+        Set<String> optionsList = new HashSet<String>();
+        for (Option option : commandLine.getOptions()) {
+            optionsList.add(option.getOpt());
+        }
+
         String recipeName = commandLine.getOptionValue(RECIPE_NAME);
         String recipeToolClass = commandLine.getOptionValue(RECIPE_TOOL_CLASS_NAME);
         String recipeOperation = commandLine.getOptionValue(RECIPE_OPERATION);
@@ -1022,8 +1037,12 @@ public class FalconCLI {
         validateNotEmpty(recipeName, RECIPE_NAME);
         validateNotEmpty(recipeOperation, RECIPE_OPERATION);
         validateRecipeOperations(recipeOperation);
+        Boolean skipDryRun = null;
+        if (optionsList.contains(SKIPDRYRUN_OPT)) {
+            skipDryRun = true;
+        }
 
-        String result = client.submitRecipe(recipeName, recipeToolClass, recipeOperation).toString();
+        String result = client.submitRecipe(recipeName, recipeToolClass, recipeOperation, skipDryRun).toString();
         OUT.get().println(result);
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
index bb6d8c9..282b41b 100644
--- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java
@@ -48,6 +48,7 @@ public abstract class AbstractFalconClient {
      * @return
      * @throws FalconCLIException
      */
-    public abstract APIResult schedule(EntityType entityType, String entityName, String colo) throws FalconCLIException;
+    public abstract APIResult schedule(EntityType entityType, String entityName,
+                                       String colo, Boolean skipDryRun) throws FalconCLIException;
 
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index d9bdf64..44436d2 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -285,41 +285,41 @@ public class FalconClient extends AbstractFalconClient {
         return str;
     }
 
-    public APIResult schedule(EntityType entityType, String entityName, String colo)
+    public APIResult schedule(EntityType entityType, String entityName, String colo, Boolean skipDryRun)
         throws FalconCLIException {
 
         return sendEntityRequest(Entities.SCHEDULE, entityType, entityName,
-                colo);
+                colo, skipDryRun);
 
     }
 
     public APIResult suspend(EntityType entityType, String entityName, String colo)
         throws FalconCLIException {
 
-        return sendEntityRequest(Entities.SUSPEND, entityType, entityName, colo);
+        return sendEntityRequest(Entities.SUSPEND, entityType, entityName, colo, null);
 
     }
 
     public APIResult resume(EntityType entityType, String entityName, String colo)
         throws FalconCLIException {
 
-        return sendEntityRequest(Entities.RESUME, entityType, entityName, colo);
+        return sendEntityRequest(Entities.RESUME, entityType, entityName, colo, null);
 
     }
 
     public APIResult delete(EntityType entityType, String entityName)
         throws FalconCLIException {
 
-        return sendEntityRequest(Entities.DELETE, entityType, entityName, null);
+        return sendEntityRequest(Entities.DELETE, entityType, entityName, null, null);
 
     }
 
-    public APIResult validate(String entityType, String filePath)
+    public APIResult validate(String entityType, String filePath, Boolean skipDryRun)
         throws FalconCLIException {
 
         InputStream entityStream = getServletInputStream(filePath);
         return sendEntityRequestWithObject(Entities.VALIDATE, entityType,
-                entityStream, null);
+                entityStream, null, skipDryRun);
     }
 
     public APIResult submit(String entityType, String filePath)
@@ -327,14 +327,17 @@ public class FalconClient extends AbstractFalconClient {
 
         InputStream entityStream = getServletInputStream(filePath);
         return sendEntityRequestWithObject(Entities.SUBMIT, entityType,
-                entityStream, null);
+                entityStream, null, null);
     }
 
-    public APIResult update(String entityType, String entityName, String filePath)
+    public APIResult update(String entityType, String entityName, String filePath, Boolean skipDryRun)
         throws FalconCLIException {
         InputStream entityStream = getServletInputStream(filePath);
         Entities operation = Entities.UPDATE;
         WebResource resource = service.path(operation.path).path(entityType).path(entityName);
+        if (null != skipDryRun) {
+            resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun));
+        }
         ClientResponse clientResponse = resource
                 .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
                 .accept(operation.mimeType).type(MediaType.TEXT_XML)
@@ -343,18 +346,18 @@ public class FalconClient extends AbstractFalconClient {
         return parseAPIResult(clientResponse);
     }
 
-    public APIResult submitAndSchedule(String entityType, String filePath)
+    public APIResult submitAndSchedule(String entityType, String filePath, Boolean skipDryRun)
         throws FalconCLIException {
 
         InputStream entityStream = getServletInputStream(filePath);
         return sendEntityRequestWithObject(Entities.SUBMITandSCHEDULE,
-                entityType, entityStream, null);
+                entityType, entityStream, null, skipDryRun);
     }
 
     public APIResult getStatus(EntityType entityType, String entityName, String colo)
         throws FalconCLIException {
 
-        return sendEntityRequest(Entities.STATUS, entityType, entityName, colo);
+        return sendEntityRequest(Entities.STATUS, entityType, entityName, colo, null);
     }
 
     public Entity getDefinition(String entityType, String entityName)
@@ -400,12 +403,16 @@ public class FalconClient extends AbstractFalconClient {
                 orderBy, sortOrder, offset, numResults, numInstances);
     }
 
-    public APIResult touch(String entityType, String entityName, String colo) throws FalconCLIException {
+    public APIResult touch(String entityType, String entityName,
+                           String colo, Boolean skipDryRun) throws FalconCLIException {
         Entities operation = Entities.TOUCH;
         WebResource resource = service.path(operation.path).path(entityType).path(entityName);
         if (colo != null) {
             resource = resource.queryParam("colo", colo);
         }
+        if (null != skipDryRun) {
+            resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun));
+        }
         ClientResponse clientResponse = resource
                 .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
                 .accept(operation.mimeType).type(MediaType.TEXT_XML)
@@ -604,13 +611,17 @@ public class FalconClient extends AbstractFalconClient {
     }
 
     private APIResult sendEntityRequest(Entities entities, EntityType entityType,
-                                     String entityName, String colo) throws FalconCLIException {
+                                     String entityName, String colo, Boolean skipDryRun) throws FalconCLIException {
 
         WebResource resource = service.path(entities.path)
                 .path(entityType.toString().toLowerCase()).path(entityName);
         if (colo != null) {
             resource = resource.queryParam("colo", colo);
         }
+        if (null != skipDryRun) {
+            resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun));
+        }
+
         ClientResponse clientResponse = resource
                 .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
                 .accept(entities.mimeType).type(MediaType.TEXT_XML)
@@ -731,13 +742,16 @@ public class FalconClient extends AbstractFalconClient {
         return parseEntityList(clientResponse);
     }
 
-    private APIResult sendEntityRequestWithObject(Entities entities, String entityType,
-                                               Object requestObject, String colo) throws FalconCLIException {
+    private APIResult sendEntityRequestWithObject(Entities entities, String entityType, Object requestObject,
+                                                  String colo, Boolean skipDryRun) throws FalconCLIException {
         WebResource resource = service.path(entities.path)
                 .path(entityType);
         if (colo != null) {
             resource = resource.queryParam("colo", colo);
         }
+        if (null != skipDryRun) {
+            resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun));
+        }
         ClientResponse clientResponse = resource
                 .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
                 .accept(entities.mimeType).type(MediaType.TEXT_XML)
@@ -962,9 +976,8 @@ public class FalconClient extends AbstractFalconClient {
         return sendMetadataLineageRequest(MetadataOperations.EDGES, id);
     }
 
-    public APIResult submitRecipe(String recipeName,
-                               String recipeToolClassName,
-                               final String recipeOperation) throws FalconCLIException {
+    public APIResult submitRecipe(String recipeName, String recipeToolClassName,
+                                  final String recipeOperation, Boolean skipDryRun) throws FalconCLIException {
         String recipePath = clientProperties.getProperty("falcon.recipe.path");
 
         if (StringUtils.isEmpty(recipePath)) {
@@ -1010,8 +1023,8 @@ public class FalconClient extends AbstractFalconClient {
             } else {
                 RecipeTool.main(args);
             }
-            validate(EntityType.PROCESS.toString(), processFile);
-            return submitAndSchedule(EntityType.PROCESS.toString(), processFile);
+            validate(EntityType.PROCESS.toString(), processFile, skipDryRun);
+            return submitAndSchedule(EntityType.PROCESS.toString(), processFile, skipDryRun);
         } catch (Exception e) {
             throw new FalconCLIException(e.getMessage(), e);
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
index 07fafb5..4d45cc7 100644
--- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
+++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
@@ -49,7 +49,7 @@ public abstract class AbstractWorkflowEngine {
 
     public abstract boolean isAlive(Cluster cluster) throws FalconException;
 
-    public abstract void schedule(Entity entity) throws FalconException;
+    public abstract void schedule(Entity entity, Boolean skipDryRun) throws FalconException;
 
     public abstract String suspend(Entity entity) throws FalconException;
 
@@ -61,7 +61,7 @@ public abstract class AbstractWorkflowEngine {
 
     public abstract void reRun(String cluster, String wfId, Properties props, boolean isForced) throws FalconException;
 
-    public abstract void dryRun(Entity entity, String clusterName) throws FalconException;
+    public abstract void dryRun(Entity entity, String clusterName, Boolean skipDryRun) throws FalconException;
 
     public abstract boolean isActive(Entity entity) throws FalconException;
 
@@ -88,9 +88,10 @@ public abstract class AbstractWorkflowEngine {
     public abstract InstancesSummaryResult getSummary(Entity entity, Date start, Date end,
                                                       List<LifeCycle> lifeCycles) throws FalconException;
 
-    public abstract String update(Entity oldEntity, Entity newEntity, String cluster) throws FalconException;
+    public abstract String update(Entity oldEntity, Entity newEntity,
+                                  String cluster, Boolean skipDryRun) throws FalconException;
 
-    public abstract String touch(Entity entity, String cluster) throws FalconException;
+    public abstract String touch(Entity entity, String cluster, Boolean skipDryRun) throws FalconException;
 
     public abstract String getWorkflowStatus(String cluster, String jobId) throws FalconException;
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 2f3dc6f..7e6cd6c 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -112,6 +112,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         Arrays.asList(Job.Status.SUSPENDED, Job.Status.PREPSUSPENDED);
     private static final String FALCON_INSTANCE_ACTION_CLUSTERS = "falcon.instance.action.clusters";
     private static final String FALCON_INSTANCE_SOURCE_CLUSTERS = "falcon.instance.source.clusters";
+    private static final String FALCON_SKIP_DRYRUN = "falcon.skip.dryrun";
 
     private static final int WORKFLOW_STATUS_RETRY_DELAY_MS = 100; // milliseconds
     private static final String WORKFLOW_STATUS_RETRY_COUNT = "workflow.status.retry.count";
@@ -142,7 +143,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     @Override
-    public void schedule(Entity entity) throws FalconException {
+    public void schedule(Entity entity, Boolean skipDryRun) throws FalconException {
         Map<String, BundleJob> bundleMap = findLatestBundle(entity);
         List<String> schedClusters = new ArrayList<String>();
         for (Map.Entry<String, BundleJob> entry : bundleMap.entrySet()) {
@@ -168,7 +169,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
                 }
 
                 //Do dryRun of coords before schedule as schedule is asynchronous
-                dryRunInternal(cluster, new Path(properties.getProperty(OozieEntityBuilder.ENTITY_PATH)));
+                dryRunInternal(cluster, new Path(properties.getProperty(OozieEntityBuilder.ENTITY_PATH)), skipDryRun);
                 scheduleEntity(clusterName, properties, entity);
             }
         }
@@ -196,18 +197,28 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     @Override
-    public void dryRun(Entity entity, String clusterName) throws FalconException {
+    public void dryRun(Entity entity, String clusterName, Boolean skipDryRun) throws FalconException {
         OozieEntityBuilder builder = OozieEntityBuilder.get(entity);
         Path buildPath = new Path("/tmp", "falcon" + entity.getName() + System.currentTimeMillis());
         Cluster cluster = STORE.get(EntityType.CLUSTER, clusterName);
         Properties props = builder.build(cluster, buildPath);
         if (props != null) {
-            dryRunInternal(cluster, new Path(props.getProperty(OozieEntityBuilder.ENTITY_PATH)));
+            dryRunInternal(cluster, new Path(props.getProperty(OozieEntityBuilder.ENTITY_PATH)), skipDryRun);
         }
     }
 
+    private void dryRunInternal(Cluster cluster, Path buildPath, Boolean skipDryRun) throws FalconException {
+        if (null != skipDryRun && skipDryRun) {
+            LOG.info("Skipping dryrun as directed by param in cli/RestApi.");
+            return;
+        } else {
+            String skipDryRunStr = RuntimeProperties.get().getProperty(FALCON_SKIP_DRYRUN, "false").toLowerCase();
+            if (Boolean.valueOf(skipDryRunStr)) {
+                LOG.info("Skipping dryrun as directed by Runtime properties.");
+                return;
+            }
+        }
 
-    private void dryRunInternal(Cluster cluster, Path buildPath) throws FalconException {
         BUNDLEAPP bundle = OozieBundleBuilder.unmarshal(cluster, buildPath);
         OozieClient client = OozieClientFactory.get(cluster.getName());
         for (COORDINATOR coord : bundle.getCoordinator()) {
@@ -322,7 +333,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         }
 
         return Collections.max(bundles, new Comparator<BundleJob>() {
-            @Override public int compare(BundleJob o1, BundleJob o2) {
+            @Override
+            public int compare(BundleJob o1, BundleJob o2) {
                 return o1.getCreatedTime().compareTo(o2.getCreatedTime());
             }
         });
@@ -683,7 +695,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         for (WorkflowAction action : wfActions) {
             if (action.getType().equalsIgnoreCase("sub-workflow") && StringUtils.isNotEmpty(action.getExternalId())) {
                 // if the action is sub-workflow, get job urls of all actions within the sub-workflow
-                List<WorkflowAction> subWorkFlowActions = getWorkflowInfo(cluster, action.getExternalId()).getActions();
+                List<WorkflowAction> subWorkFlowActions = getWorkflowInfo(cluster,
+                        action.getExternalId()).getActions();
                 for (WorkflowAction subWfAction : subWorkFlowActions) {
                     if (!subWfAction.getType().startsWith(":")) {
                         InstancesResult.InstanceAction instanceAction =
@@ -1039,7 +1052,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     @Override
-    public String update(Entity oldEntity, Entity newEntity, String cluster) throws FalconException {
+    public String update(Entity oldEntity, Entity newEntity,
+                         String cluster, Boolean skipDryRun) throws FalconException {
         BundleJob bundle = findLatestBundle(oldEntity, cluster);
 
         boolean entityUpdated = false;
@@ -1068,26 +1082,27 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
                 return getUpdateString(newEntity, new Date(), bundle, bundle);
             }
 
-            LOG.debug("Going to update! : {} for cluster {}, bundle: {}", newEntity.toShortString(), cluster, bundle
-                .getId());
+            LOG.debug("Going to update! : {} for cluster {}, bundle: {}",
+                    newEntity.toShortString(), cluster, bundle.getId());
             result.append(updateInternal(oldEntity, newEntity, clusterEntity, bundle,
-                    CurrentUser.getUser())).append("\n");
+                    CurrentUser.getUser(), skipDryRun)).append("\n");
             LOG.info("Entity update complete: {} for cluster {}, bundle: {}", newEntity.toShortString(), cluster,
                 bundle.getId());
         }
 
-        result.append(updateDependents(clusterEntity, oldEntity, newEntity));
+        result.append(updateDependents(clusterEntity, oldEntity, newEntity, skipDryRun));
         return result.toString();
     }
 
     @Override
-    public String touch(Entity entity, String cluster) throws FalconException {
+    public String touch(Entity entity, String cluster, Boolean skipDryRun) throws FalconException {
         BundleJob bundle = findLatestBundle(entity, cluster);
         Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER, cluster);
         StringBuilder result = new StringBuilder();
         if (bundle != MISSING) {
-            LOG.info("Updating entity {} for cluster: {}, bundle: {}", entity.toShortString(), cluster, bundle.getId());
-            String output = updateInternal(entity, entity, clusterEntity, bundle, CurrentUser.getUser());
+            LOG.info("Updating entity {} for cluster: {}, bundle: {}",
+                    entity.toShortString(), cluster, bundle.getId());
+            String output = updateInternal(entity, entity, clusterEntity, bundle, CurrentUser.getUser(), skipDryRun);
             result.append(output).append("\n");
             LOG.info("Entity update complete: {} for cluster {}, bundle: {}", entity.toShortString(), cluster,
                     bundle.getId());
@@ -1124,7 +1139,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         return  builder.toString();
     }
 
-    private String updateDependents(Cluster cluster, Entity oldEntity, Entity newEntity) throws FalconException {
+    private String updateDependents(Cluster cluster, Entity oldEntity,
+                                    Entity newEntity, Boolean skipDryRun) throws FalconException {
         //Update affected entities
         Set<Entity> affectedEntities = EntityGraph.get().getDependents(oldEntity);
         StringBuilder result = new StringBuilder();
@@ -1147,7 +1163,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
             LOG.info("Triggering update for {}, {}", cluster, affectedProcBundle.getId());
 
             result.append(updateInternal(affectedEntity, affectedEntity, cluster, affectedProcBundle,
-                affectedProcBundle.getUser())).append("\n");
+                affectedProcBundle.getUser(), skipDryRun)).append("\n");
             LOG.info("Entity update complete: {} for cluster {}, bundle: {}",
                 affectedEntity.toShortString(), cluster, affectedProcBundle.getId());
         }
@@ -1178,7 +1194,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         return null;
     }
 
-    private void updateCoords(String cluster, BundleJob bundle, int concurrency, Date endTime) throws FalconException {
+    private void updateCoords(String cluster, BundleJob bundle,
+                              int concurrency, Date endTime) throws FalconException {
         if (endTime.compareTo(now()) <= 0) {
             throw new FalconException("End time " + SchemaHelper.formatDateUTC(endTime) + " can't be in the past");
         }
@@ -1217,14 +1234,14 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     private String updateInternal(Entity oldEntity, Entity newEntity, Cluster cluster, BundleJob oldBundle,
-        String user) throws FalconException {
+        String user, Boolean skipDryRun) throws FalconException {
         String clusterName = cluster.getName();
 
         Date effectiveTime = getEffectiveTime(cluster, newEntity);
         LOG.info("Effective time " + effectiveTime);
 
         //Validate that new entity can be scheduled
-        dryRunForUpdate(cluster, newEntity, effectiveTime);
+        dryRunForUpdate(cluster, newEntity, effectiveTime, skipDryRun);
 
         boolean suspended = BUNDLE_SUSPENDED_STATUS.contains(oldBundle.getStatus());
 
@@ -1256,11 +1273,12 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         return EntityUtil.getNextStartTime(newEntity, cluster, effectiveTime);
     }
 
-    private void dryRunForUpdate(Cluster cluster, Entity entity, Date startTime) throws FalconException {
+    private void dryRunForUpdate(Cluster cluster, Entity entity, Date startTime,
+                                 Boolean skipDryRun) throws FalconException {
         Entity clone = entity.copy();
         EntityUtil.setStartDate(clone, cluster.getName(), startTime);
         try {
-            dryRun(clone, cluster.getName());
+            dryRun(clone, cluster.getName(), skipDryRun);
         } catch (FalconException e) {
             throw new FalconException("The new entity " + entity.toShortString() + " can't be scheduled", e);
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index f2f9826..78964dd 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -206,7 +206,7 @@ public abstract class AbstractEntityManager {
      * @param type entity type
      * @return APIResule -Succeeded or Failed
      */
-    public APIResult validate(HttpServletRequest request, String type) {
+    public APIResult validate(HttpServletRequest request, String type, Boolean skipDryRun) {
         try {
             EntityType entityType = EntityType.getEnum(type);
             Entity entity = deserializeEntity(request, entityType);
@@ -217,7 +217,7 @@ public abstract class AbstractEntityManager {
                 Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
                 for (String cluster : clusters) {
                     try {
-                        getWorkflowEngine().dryRun(entity, cluster);
+                        getWorkflowEngine().dryRun(entity, cluster, skipDryRun);
                     } catch (FalconException e) {
                         throw new FalconException("dryRun failed on cluster " + cluster, e);
                     }
@@ -267,7 +267,8 @@ public abstract class AbstractEntityManager {
         }
     }
 
-    public APIResult update(HttpServletRequest request, String type, String entityName, String colo) {
+    public APIResult update(HttpServletRequest request, String type, String entityName,
+                            String colo, Boolean skipDryRun) {
         checkColo(colo);
         List<Entity> tokenList = null;
         try {
@@ -292,7 +293,7 @@ public abstract class AbstractEntityManager {
                 oldClusters.removeAll(newClusters); //deleted clusters
 
                 for (String cluster : newClusters) {
-                    result.append(getWorkflowEngine().update(oldEntity, newEntity, cluster));
+                    result.append(getWorkflowEngine().update(oldEntity, newEntity, cluster, skipDryRun));
                 }
                 for (String cluster : oldClusters) {
                     getWorkflowEngine().delete(oldEntity, cluster);

http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
index e38749a..5b415a2 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
@@ -66,10 +66,11 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
     public APIResult schedule(
             @Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") String type,
             @Dimension("entityName") @PathParam("entity") String entity,
-            @Dimension("colo") @PathParam("colo") String colo) {
+            @Dimension("colo") @PathParam("colo") String colo,
+            @QueryParam("skipDryRun") Boolean skipDryRun) {
         checkColo(colo);
         try {
-            scheduleInternal(type, entity);
+            scheduleInternal(type, entity, skipDryRun);
             return new APIResult(APIResult.Status.SUCCEEDED, entity + "(" + type + ") scheduled successfully");
         } catch (Throwable e) {
             LOG.error("Unable to schedule workflow", e);
@@ -77,7 +78,7 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
         }
     }
 
-    private synchronized void scheduleInternal(String type, String entity)
+    private synchronized void scheduleInternal(String type, String entity, Boolean skipDryRun)
         throws FalconException, AuthorizationException {
 
         checkSchedulableEntity(type);
@@ -90,7 +91,7 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
                         + entityObj.toShortString());
             }
             LOG.info("Memory lock obtained for {} by {}", entityObj.toShortString(), Thread.currentThread().getName());
-            getWorkflowEngine().schedule(entityObj);
+            getWorkflowEngine().schedule(entityObj, skipDryRun);
         } catch (Exception e) {
             throw new FalconException("Entity schedule failed for " + type + ": " + entity, e);
         } finally {
@@ -110,12 +111,13 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
      */
     public APIResult submitAndSchedule(
             @Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") String type,
-            @Dimension("colo") @PathParam("colo") String colo) {
+            @Dimension("colo") @PathParam("colo") String colo,
+            @QueryParam("skipDryRun") Boolean skipDryRun) {
         checkColo(colo);
         try {
             checkSchedulableEntity(type);
             Entity entity = submitInternal(request, type);
-            scheduleInternal(type, entity.getName());
+            scheduleInternal(type, entity.getName(), skipDryRun);
             return new APIResult(APIResult.Status.SUCCEEDED,
                     entity.getName() + "(" + type + ") scheduled successfully");
         } catch (Throwable e) {
@@ -265,14 +267,15 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
      */
     public APIResult touch(@Dimension("entityType") @PathParam("type") String type,
                            @Dimension("entityName") @PathParam("entity") String entityName,
-                           @Dimension("colo") @QueryParam("colo") String colo) {
+                           @Dimension("colo") @QueryParam("colo") String colo,
+                           @QueryParam("skipDryRun") Boolean skipDryRun) {
         checkColo(colo);
         StringBuilder result = new StringBuilder();
         try {
             Entity entity = EntityUtil.getEntity(type, entityName);
             Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
             for (String cluster : clusters) {
-                result.append(getWorkflowEngine().touch(entity, cluster));
+                result.append(getWorkflowEngine().touch(entity, cluster, skipDryRun));
             }
         } catch (Throwable e) {
             LOG.error("Touch failed", e);

http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
index d22e8a3..ceabb06 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
@@ -174,7 +174,8 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
     @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
     @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
     @Override
-    public APIResult validate(@Context final HttpServletRequest request, @PathParam("type") final String type) {
+    public APIResult validate(@Context final HttpServletRequest request, @PathParam("type") final String type,
+                              @QueryParam("skipDryRun") final Boolean skipDryRun) {
         final HttpServletRequest bufferedRequest = getBufferedRequest(request);
         EntityType entityType = EntityType.getEnum(type);
         final Entity entity;
@@ -192,7 +193,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
 
             @Override
             protected APIResult doExecute(String colo) throws FalconException {
-                return getEntityManager(colo).invoke("validate", bufferedRequest, type);
+                return getEntityManager(colo).invoke("validate", bufferedRequest, type, skipDryRun);
             }
         }.execute();
     }
@@ -245,7 +246,8 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
     public APIResult update(
             @Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") final String type,
             @Dimension("entityName") @PathParam("entity") final String entityName,
-            @Dimension("colo") @QueryParam("colo") String ignore) {
+            @Dimension("colo") @QueryParam("colo") String ignore,
+            @QueryParam("skipDryRun") final Boolean skipDryRun) {
 
         final HttpServletRequest bufferedRequest = new BufferedRequest(request);
         final Set<String> oldColos = getApplicableColos(type, entityName);
@@ -281,7 +283,8 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
 
                 @Override
                 protected APIResult doExecute(String colo) throws FalconException {
-                    return getConfigSyncChannel(colo).invoke("update", bufferedRequest, type, entityName, colo);
+                    return getConfigSyncChannel(colo).invoke("update", bufferedRequest, type, entityName,
+                            colo, skipDryRun);
                 }
             }.execute());
         }
@@ -308,7 +311,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
 
         // update only if all are updated
         if (!embeddedMode && result) {
-            results.put(PRISM_TAG, super.update(bufferedRequest, type, entityName, currentColo));
+            results.put(PRISM_TAG, super.update(bufferedRequest, type, entityName, currentColo, skipDryRun));
         }
 
         return consolidateResult(results, APIResult.class);
@@ -322,7 +325,8 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
     public APIResult touch(
             @Dimension("entityType") @PathParam("type") final String type,
             @Dimension("entityName") @PathParam("entity") final String entityName,
-            @Dimension("colo") @QueryParam("colo") final String coloExpr) {
+            @Dimension("colo") @QueryParam("colo") final String coloExpr,
+            @QueryParam("skipDryRun") final Boolean skipDryRun) {
         final Set<String> colosFromExp = getColosFromExpression(coloExpr, type, entityName);
         return new EntityProxy(type, entityName) {
             @Override
@@ -332,7 +336,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
 
             @Override
             protected APIResult doExecute(String colo) throws FalconException {
-                return getEntityManager(colo).invoke("touch", type, entityName, colo);
+                return getEntityManager(colo).invoke("touch", type, entityName, colo, skipDryRun);
             }
         }.execute();
     }
@@ -384,7 +388,8 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
     public APIResult schedule(@Context final HttpServletRequest request,
                               @Dimension("entityType") @PathParam("type") final String type,
                               @Dimension("entityName") @PathParam("entity") final String entity,
-                              @Dimension("colo") @QueryParam("colo") final String coloExpr) {
+                              @Dimension("colo") @QueryParam("colo") final String coloExpr,
+                              @QueryParam("skipDryRun") final Boolean skipDryRun) {
 
         final HttpServletRequest bufferedRequest = getBufferedRequest(request);
         return new EntityProxy(type, entity) {
@@ -395,7 +400,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
 
             @Override
             protected APIResult doExecute(String colo) throws FalconException {
-                return getEntityManager(colo).invoke("schedule", bufferedRequest, type, entity, colo);
+                return getEntityManager(colo).invoke("schedule", bufferedRequest, type, entity, colo, skipDryRun);
             }
         }.execute();
     }
@@ -408,12 +413,13 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
     @Override
     public APIResult submitAndSchedule(
             @Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") String type,
-            @Dimension("colo") @QueryParam("colo") String coloExpr) {
+            @Dimension("colo") @QueryParam("colo") String coloExpr,
+            @QueryParam("skipDryRun") Boolean skipDryRun) {
         BufferedRequest bufferedRequest = new BufferedRequest(request);
         String entity = getEntity(bufferedRequest, type).getName();
         Map<String, APIResult> results = new HashMap<String, APIResult>();
         results.put("submit", submit(bufferedRequest, type, coloExpr));
-        results.put("schedule", schedule(bufferedRequest, type, entity, coloExpr));
+        results.put("schedule", schedule(bufferedRequest, type, entity, coloExpr, skipDryRun));
         return consolidateResult(results, APIResult.class);
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java b/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java
index be1fe1f..cce8737 100644
--- a/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java
+++ b/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java
@@ -95,8 +95,7 @@ public class EntityManagerTest extends AbstractEntityManager {
                 invalidProcessXML);
 
         try {
-            validate(mockHttpServletRequest,
-                    EntityType.PROCESS.name());
+            validate(mockHttpServletRequest, EntityType.PROCESS.name(), false);
             Assert.fail("Invalid entity type was accepted by the system");
         } catch (FalconWebException ignore) {
             // ignore
@@ -110,8 +109,7 @@ public class EntityManagerTest extends AbstractEntityManager {
                 invalidProcessXML);
 
         try {
-            validate(mockHttpServletRequest,
-                    "InvalidEntityType");
+            validate(mockHttpServletRequest, "InvalidEntityType", false);
             Assert.fail("Invalid entity type was accepted by the system");
         } catch (FalconWebException ignore) {
             // ignore

http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
index e898fc3..eb65cb3 100644
--- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
+++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
@@ -118,8 +118,9 @@ public class FalconUnitClient extends AbstractFalconClient {
      * @throws FalconException
      */
     @Override
-    public APIResult schedule(EntityType entityType, String entityName, String cluster) throws FalconCLIException {
-        return schedule(entityType, entityName, null, 0, cluster);
+    public APIResult schedule(EntityType entityType, String entityName,
+                              String cluster, Boolean skipDryRun) throws FalconCLIException {
+        return schedule(entityType, entityName, null, 0, cluster, skipDryRun);
     }
 
 
@@ -133,7 +134,7 @@ public class FalconUnitClient extends AbstractFalconClient {
      * @return boolean
      */
     public APIResult schedule(EntityType entityType, String entityName, String startTime, int numInstances,
-                              String cluster) throws FalconCLIException {
+                              String cluster, Boolean skipDryRun) throws FalconCLIException {
         try {
             FalconUnitHelper.checkSchedulableEntity(entityType.toString());
             Entity entity = EntityUtil.getEntity(entityType, entityName);
@@ -146,7 +147,7 @@ public class FalconUnitClient extends AbstractFalconClient {
             if (StringUtils.isNotEmpty(startTime) && entityType == EntityType.PROCESS) {
                 updateStartAndEndTime((Process) entity, startTime, numInstances, cluster);
             }
-            workflowEngine.schedule(entity);
+            workflowEngine.schedule(entity, skipDryRun);
             LOG.info(entityName + " is scheduled successfully");
             return new APIResult(APIResult.Status.SUCCEEDED, entity + "(" + "PROCESS" + ") scheduled successfully");
         } catch (FalconException e) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
index 9f00d94..997b301 100644
--- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
+++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
@@ -147,7 +147,7 @@ public class FalconUnitTestBase {
     }
 
     public APIResult scheduleProcess(String processName, String startTime, int numInstances,
-                                   String cluster, String localWfPath) throws FalconException,
+                                   String cluster, String localWfPath, Boolean skipDryRun) throws FalconException,
             IOException, FalconCLIException {
         Process processEntity = configStore.get(EntityType.PROCESS, processName);
         if (processEntity == null) {
@@ -155,16 +155,16 @@ public class FalconUnitTestBase {
         }
         String workflowPath = processEntity.getWorkflow().getPath();
         fs.copyFromLocalFile(new Path(localWfPath), new Path(workflowPath));
-        return falconUnitClient.schedule(EntityType.PROCESS, processName, startTime, numInstances, cluster);
+        return falconUnitClient.schedule(EntityType.PROCESS, processName, startTime, numInstances, cluster, skipDryRun);
     }
 
     public APIResult scheduleProcess(String processName, String startTime, int numInstances,
-                                   String cluster) throws FalconException, FalconCLIException {
+                                   String cluster, Boolean skipDryRun) throws FalconException, FalconCLIException {
         Process processEntity = configStore.get(EntityType.PROCESS, processName);
         if (processEntity == null) {
             throw new FalconException("Process not found " + processName);
         }
-        return falconUnitClient.schedule(EntityType.PROCESS, processName, startTime, numInstances, cluster);
+        return falconUnitClient.schedule(EntityType.PROCESS, processName, startTime, numInstances, cluster, skipDryRun);
     }
 
     private Map<String, String> updateColoAndCluster(String colo, String cluster, Map<String, String> props) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
index 57b7b1b..498f50e 100644
--- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
+++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
@@ -44,7 +44,7 @@ public class TestFalconUnit extends FalconUnitTestBase {
         createData("in", "local", scheduleTime, "input.txt");
         result = submitProcess(getAbsolutePath("/process.xml"), "/app/oozie-mr");
         assertStatus(result);
-        result = scheduleProcess("process", scheduleTime, 1, "local", getAbsolutePath("/workflow.xml"));
+        result = scheduleProcess("process", scheduleTime, 1, "local", getAbsolutePath("/workflow.xml"), true);
         assertStatus(result);
         waitForStatus(EntityType.PROCESS, "process", scheduleTime);
         InstancesResult.WorkflowStatus status = falconUnitClient.getInstanceStatus(EntityType.PROCESS,

http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java b/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java
index 3bd625c..bf538dc 100644
--- a/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java
+++ b/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java
@@ -70,7 +70,8 @@ public class ConfigSyncService extends AbstractEntityManager {
     public APIResult update(@Context HttpServletRequest request,
                             @Dimension("entityType") @PathParam("type") String type,
                             @Dimension("entityName") @PathParam("entity") String entityName,
-                            @Dimension("colo") @QueryParam("colo") String colo) {
-        return super.update(request, type, entityName, colo);
+                            @Dimension("colo") @QueryParam("colo") String colo,
+                            @QueryParam("skipDryRun") Boolean skipDryRun) {
+        return super.update(request, type, entityName, colo, skipDryRun);
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
index a2af0cd..1f8cc1b 100644
--- a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
+++ b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
@@ -126,8 +126,9 @@ public class SchedulableEntityManager extends AbstractSchedulableEntityManager {
     public APIResult schedule(@Context HttpServletRequest request,
                               @Dimension("entityType") @PathParam("type") String type,
                               @Dimension("entityName") @PathParam("entity") String entity,
-                              @Dimension("colo") @QueryParam("colo") String colo) {
-        return super.schedule(request, type, entity, colo);
+                              @Dimension("colo") @QueryParam("colo") String colo,
+                              @QueryParam("skipDryRun") Boolean skipDryRun) {
+        return super.schedule(request, type, entity, colo, skipDryRun);
     }
 
     @POST
@@ -160,8 +161,9 @@ public class SchedulableEntityManager extends AbstractSchedulableEntityManager {
     @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
     @Monitored(event = "validate")
     @Override
-    public APIResult validate(@Context HttpServletRequest request, @PathParam("type") String type) {
-        return super.validate(request, type);
+    public APIResult validate(@Context HttpServletRequest request, @PathParam("type") String type,
+                              @QueryParam("skipDryRun") Boolean skipDryRun) {
+        return super.validate(request, type, skipDryRun);
     }
 
     @POST
@@ -171,8 +173,9 @@ public class SchedulableEntityManager extends AbstractSchedulableEntityManager {
     @Override
     public APIResult touch(@Dimension("entityType") @PathParam("type") String type,
                            @Dimension("entityName") @PathParam("entity") String entityName,
-                           @Dimension("colo") @QueryParam("colo") String colo) {
-        return super.touch(type, entityName, colo);
+                           @Dimension("colo") @QueryParam("colo") String colo,
+                           @QueryParam("skipDryRun") Boolean skipDryRun) {
+        return super.touch(type, entityName, colo, skipDryRun);
     }
 
     @GET

http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
index e328d69..0062070 100644
--- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
@@ -184,6 +184,33 @@ public class FalconCLIIT {
 
     }
 
+    public void testSkipDryRunValidCommands() throws Exception {
+        TestContext context = new TestContext();
+        Map<String, String> overlay = context.getUniqueOverlay();
+        submitTestFiles(context, overlay);
+
+        Assert.assertEquals(
+                executeWithURL("entity -schedule -skipDryRun -type cluster -name " + overlay.get("cluster")), -1);
+
+        Assert.assertEquals(
+                executeWithURL("entity -schedule -type feed -name " + overlay.get("outputFeedName")), 0);
+
+        Assert.assertEquals(
+                executeWithURL("entity -schedule -type process -skipDryRun -name " + overlay.get("processName")), 0);
+
+        Assert.assertEquals(0,
+                executeWithURL("entity -touch -skipDryRun -name " + overlay.get("processName") + " -type process"));
+
+        String filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
+        Assert.assertEquals(
+                executeWithURL("entity -submitAndSchedule -skipDryRun -type feed -file " + filePath), 0);
+
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
+        Assert.assertEquals(
+                executeWithURL("entity -validate -skipDryRun -type process -file " + filePath), 0);
+
+    }
+
     public void testSuspendResumeStatusEntityValidCommands() throws Exception {
 
         TestContext context = new TestContext();

http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
index c602ffb..f0cee61 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
@@ -131,7 +131,8 @@ public class EntityManagerJerseyIT {
         context.assertSuccessful(response);
     }
 
-    private ClientResponse update(TestContext context, Entity entity, Date endTime) throws Exception {
+    private ClientResponse update(TestContext context, Entity entity,
+                                  Date endTime, Boolean skipDryRun) throws Exception {
         File tmpFile = TestContext.getTempFile();
         entity.getEntityType().getMarshaller().marshal(entity, tmpFile);
         WebResource resource = context.service.path("api/entities/update/"
@@ -139,19 +140,24 @@ public class EntityManagerJerseyIT {
         if (endTime != null) {
             resource = resource.queryParam("effective", SchemaHelper.formatDateUTC(endTime));
         }
+        if (null != skipDryRun) {
+            resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun));
+        }
         return resource.header("Cookie", context.getAuthenticationToken())
                 .accept(MediaType.TEXT_XML)
                 .post(ClientResponse.class, context.getServletInputStream(tmpFile.getAbsolutePath()));
     }
 
-    private ClientResponse touch(TestContext context, Entity entity) {
+    private ClientResponse touch(TestContext context, Entity entity, Boolean skipDryRun) {
         WebResource resource = context.service.path("api/entities/touch/"
                 + entity.getEntityType().name().toLowerCase() + "/" + entity.getName());
-        ClientResponse clientResponse = resource
+        if (null != skipDryRun) {
+            resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun));
+        }
+        return resource
                 .header("Cookie", context.getAuthenticationToken())
                 .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML)
                 .post(ClientResponse.class);
-        return clientResponse;
     }
 
     @Test
@@ -174,7 +180,7 @@ public class EntityManagerJerseyIT {
 
         //change output feed path and update feed as another user
         feed.getLocations().getLocations().get(0).setPath("/falcon/test/output2/${YEAR}/${MONTH}/${DAY}");
-        ClientResponse response = update(context, feed, null);
+        ClientResponse response = update(context, feed, null, false);
         context.assertSuccessful(response);
 
         bundles = OozieTestUtils.getBundles(context);
@@ -222,7 +228,7 @@ public class EntityManagerJerseyIT {
     }
 
     public void testDryRun() throws Exception {
-        //Schedule of invalid process should fail because of dryRun
+        //Schedule of invalid process should fail because of dryRun, and should pass when dryrun is skipped
         TestContext context = newContext();
         Map<String, String> overlay = context.getUniqueOverlay();
         String tmpFileName = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
@@ -237,7 +243,7 @@ public class EntityManagerJerseyIT {
         ClientResponse response = context.validate(tmpFile.getAbsolutePath(), overlay, EntityType.PROCESS);
         context.assertFailure(response);
 
-        context.scheduleProcess(tmpFile.getAbsolutePath(), overlay, false);
+        context.scheduleProcess(tmpFile.getAbsolutePath(), overlay, false, null);
 
         //Fix the process and then submitAndSchedule should succeed
         Iterator<Property> itr = process.getProperties().getProperties().iterator();
@@ -256,8 +262,13 @@ public class EntityManagerJerseyIT {
         //Update with invalid property should fail again
         process.getProperties().getProperties().add(prop);
         updateEndtime(process);
-        response = update(context, process, null);
+        response = update(context, process, null, null);
         context.assertFailure(response);
+
+        // update where dryrun is disabled should succeed.
+        response = update(context, process, null, true);
+        context.assertSuccessful(response);
+
     }
 
     @Test
@@ -274,7 +285,7 @@ public class EntityManagerJerseyIT {
         process.getProperties().getProperties().get(0).setName("newprop");
         Date endTime = getEndTime();
         process.getClusters().getClusters().get(0).getValidity().setEnd(endTime);
-        response = update(context, process, endTime);
+        response = update(context, process, endTime, null);
         context.assertSuccessful(response);
 
         //Since the process endtime = update effective time, it shouldn't create new bundle
@@ -315,7 +326,7 @@ public class EntityManagerJerseyIT {
 
         updateEndtime(process);
         Date endTime = getEndTime();
-        response = update(context, process, endTime);
+        response = update(context, process, endTime, null);
         context.assertSuccessful(response);
 
         //Assert that update creates new bundle and old coord is running
@@ -349,7 +360,7 @@ public class EntityManagerJerseyIT {
         Process process = (Process) getDefinition(context, EntityType.PROCESS, context.processName);
 
         updateEndtime(process);
-        ClientResponse response = update(context, process, null);
+        ClientResponse response = update(context, process, null, null);
         context.assertSuccessful(response);
 
         //Assert that update does not create new bundle
@@ -371,13 +382,13 @@ public class EntityManagerJerseyIT {
         //Update end time of process required for touch
         Process process = (Process) getDefinition(context, EntityType.PROCESS, context.processName);
         updateEndtime(process);
-        ClientResponse response = update(context, process, null);
+        ClientResponse response = update(context, process, null, null);
         context.assertSuccessful(response);
         bundles = OozieTestUtils.getBundles(context);
         Assert.assertEquals(bundles.size(), 1);
 
         //Calling force update
-        response = touch(context, process);
+        response = touch(context, process, true);
         context.assertSuccessful(response);
         OozieTestUtils.waitForBundleStart(context, Status.PREP, Status.RUNNING);
 
@@ -871,7 +882,7 @@ public class EntityManagerJerseyIT {
         Date endTime = getEndTime();
         ExecutorService service =  Executors.newSingleThreadExecutor();
         Future<ClientResponse> future = service.submit(new UpdateCommand(context, process, endTime));
-        response = update(context, process, endTime);
+        response = update(context, process, endTime, false);
         ClientResponse duplicateUpdateThreadResponse = future.get();
 
         // since there are duplicate threads for updates, there is no guarantee which request will succeed
@@ -918,7 +929,7 @@ public class EntityManagerJerseyIT {
 
         @Override
         public ClientResponse call() throws Exception {
-            return update(context, process, endTime);
+            return update(context, process, endTime, false);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/17e2f71c/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
index 7b227b3..4a25b88 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
@@ -230,10 +230,11 @@ public class TestContext {
     }
 
     public void scheduleProcess(String processTemplate, Map<String, String> overlay) throws Exception {
-        scheduleProcess(processTemplate, overlay, true);
+        scheduleProcess(processTemplate, overlay, true, null);
     }
 
-    public void scheduleProcess(String processTemplate, Map<String, String> overlay, boolean succeed) throws Exception {
+    public void scheduleProcess(String processTemplate, Map<String, String> overlay,
+                                boolean succeed, Boolean skipDryRun) throws Exception {
         ClientResponse response = submitToFalcon(CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
         assertSuccessful(response);
 
@@ -243,7 +244,7 @@ public class TestContext {
         response = submitToFalcon(FEED_TEMPLATE2, overlay, EntityType.FEED);
         assertSuccessful(response);
 
-        response = submitAndSchedule(processTemplate, overlay, EntityType.PROCESS);
+        response = submitAndSchedule(processTemplate, overlay, EntityType.PROCESS, skipDryRun);
         if (succeed) {
             assertSuccessful(response);
         } else {
@@ -278,10 +279,20 @@ public class TestContext {
 
     public ClientResponse submitAndSchedule(String template, Map<String, String> overlay, EntityType entityType)
         throws Exception {
+        return submitAndSchedule(template, overlay, entityType, null);
+    }
+
+    public ClientResponse submitAndSchedule(String template, Map<String, String> overlay,
+                                            EntityType entityType, Boolean skipDryRun)
+        throws Exception {
         String tmpFile = overlayParametersOverTemplate(template, overlay);
         ServletInputStream rawlogStream = getServletInputStream(tmpFile);
 
-        return this.service.path("api/entities/submitAndSchedule/" + entityType.name().toLowerCase())
+        WebResource resource = service.path("api/entities/submitAndSchedule/" + entityType.name().toLowerCase());
+        if (null != skipDryRun) {
+            resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun));
+        }
+        return resource
                 .header("Cookie", getAuthenticationToken())
                 .accept(MediaType.TEXT_XML)
                 .type(MediaType.TEXT_XML)
@@ -290,10 +301,20 @@ public class TestContext {
 
     public ClientResponse validate(String template, Map<String, String> overlay, EntityType entityType)
         throws Exception {
+        return validate(template, overlay, entityType, null);
+    }
+
+    public ClientResponse validate(String template, Map<String, String> overlay,
+                                   EntityType entityType, Boolean skipDryRun)
+        throws Exception {
         String tmpFile = overlayParametersOverTemplate(template, overlay);
         ServletInputStream rawlogStream = getServletInputStream(tmpFile);
 
-        return this.service.path("api/entities/validate/" + entityType.name().toLowerCase())
+        WebResource resource = service.path("api/entities/validate/" + entityType.name().toLowerCase());
+        if (null != skipDryRun) {
+            resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun));
+        }
+        return resource
             .header("Cookie", getAuthenticationToken())
             .accept(MediaType.TEXT_XML)
             .type(MediaType.TEXT_XML)


[3/3] falcon git commit: FALCON-1038 Log mover fails for map-reduce action. Contributed by Peeyush Bishnoi.

Posted by aj...@apache.org.
FALCON-1038 Log mover fails for map-reduce action. Contributed by Peeyush Bishnoi.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/fbb4d314
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/fbb4d314
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/fbb4d314

Branch: refs/heads/0.7
Commit: fbb4d314297e10b723e7acf53a50426e50333037
Parents: 17e2f71
Author: Ajay Yadava <aj...@gmail.com>
Authored: Tue Aug 25 14:00:37 2015 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Tue Aug 25 17:20:45 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../falcon/logging/DefaultTaskLogRetriever.java |  2 +-
 .../org/apache/falcon/logging/JobLogMover.java  | 47 ++++++++++++--------
 .../falcon/logging/TaskLogRetrieverYarn.java    | 11 ++++-
 4 files changed, 42 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/fbb4d314/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a1054fe..f7e9127 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -91,6 +91,8 @@ Trunk (Unreleased)
     (Suhas Vasu)
 
   BUG FIXES
+    FALCON-1038 Log mover fails for map-reduce action(Peeyush Bishnoi via Ajay Yadava)
+    
     FALCON-1412 Process waits indefinitely and finally timedout even though missing dependencies are met(Pallavi Rao via Ajay Yadava)
 
     FALCON-1409 Update API throws NullPointerException(Sandeep Samudrala via Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/fbb4d314/oozie/src/main/java/org/apache/falcon/logging/DefaultTaskLogRetriever.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/DefaultTaskLogRetriever.java b/oozie/src/main/java/org/apache/falcon/logging/DefaultTaskLogRetriever.java
index 962f891..82448d8 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/DefaultTaskLogRetriever.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/DefaultTaskLogRetriever.java
@@ -61,7 +61,7 @@ public class DefaultTaskLogRetriever extends Configured implements TaskLogURLRet
         }
     }
 
-    protected List<String> getFromHistory(String jodId) throws IOException {
+    protected List<String> getFromHistory(String jobId) throws IOException {
         return null;
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/fbb4d314/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
index ba669c8..478d68c 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
@@ -91,17 +91,26 @@ public class JobLogMover {
                     }
                 }
             } else {
-                // if process wf with oozie engine
-                String subflowId = jobInfo.getExternalId();
-                copyOozieLog(client, fs, path, subflowId);
-                WorkflowJob subflowInfo = client.getJobInfo(subflowId);
+                String flowId;
+                // if process wf with pig, hive
+                if (context.getUserWorkflowEngine().equals("pig")
+                        ||context.getUserWorkflowEngine().equals("hive")) {
+                    flowId = jobInfo.getId();
+                } else {
+                    // if process wf with oozie engine
+                    flowId = jobInfo.getExternalId();
+                }
+                copyOozieLog(client, fs, path, flowId);
+                WorkflowJob subflowInfo = client.getJobInfo(flowId);
                 List<WorkflowAction> actions = subflowInfo.getActions();
                 for (WorkflowAction action : actions) {
-                    if (action.getType().equals("pig")
-                            || action.getType().equals("java")) {
+                    if (isActionTypeSupported(action)) {
+                        LOG.info("Copying hadoop TT log for action: {} of type: {}",
+                                action.getName(), action.getType());
                         copyTTlogs(fs, path, action);
                     } else {
-                        LOG.info("Ignoring hadoop TT log for non-pig and non-java action: {}", action.getName());
+                        LOG.info("Ignoring hadoop TT log for non supported action: {} of type: {}",
+                                action.getName(), action.getType());
                     }
                 }
             }
@@ -114,8 +123,8 @@ public class JobLogMover {
     }
 
     private boolean notUserWorkflowEngineIsOozie(String userWorkflowEngine) {
-        // userWorkflowEngine will be null for replication and "pig" for pig
-        return userWorkflowEngine != null && EngineType.fromValue(userWorkflowEngine) != EngineType.OOZIE;
+        // userWorkflowEngine will be null for replication and "not null" for pig, hive, oozie
+        return userWorkflowEngine != null && EngineType.fromValue(userWorkflowEngine) == null;
     }
 
     private void copyOozieLog(OozieClient client, FileSystem fs, Path path,
@@ -134,7 +143,7 @@ public class JobLogMover {
             for (String ttLogURL : ttLogUrls) {
                 LOG.info("Fetching log for action: {} from url: {}", action.getExternalId(), ttLogURL);
                 InputStream in = getURLinputStream(new URL(ttLogURL));
-                OutputStream out = fs.create(new Path(path, action.getName() + "_"
+                OutputStream out = fs.create(new Path(path, action.getName() + "_" + action.getType() + "_"
                         + getMappedStatus(action.getStatus()) + "-" + index + ".log"));
                 IOUtils.copyBytes(in, out, 4096, true);
                 LOG.info("Copied log to {}", path);
@@ -143,6 +152,13 @@ public class JobLogMover {
         }
     }
 
+    private boolean isActionTypeSupported(WorkflowAction action) {
+        return action.getType().equals("pig")
+                || action.getType().equals("hive")
+                || action.getType().equals("java")
+                || action.getType().equals("map-reduce");
+    }
+
     private String getMappedStatus(WorkflowAction.Status status) {
         if (status == WorkflowAction.Status.FAILED
                 || status == WorkflowAction.Status.KILLED
@@ -161,14 +177,9 @@ public class JobLogMover {
 
     @SuppressWarnings("unchecked")
     private Class<? extends TaskLogURLRetriever> getLogRetrieverClassName(Configuration conf) {
-        try {
-            if (YARN.equals(conf.get(MAPREDUCE_FRAMEWORK))) {
-                return TaskLogRetrieverYarn.class;
-            }
-            return (Class<? extends TaskLogURLRetriever>)
-                    Class.forName("org.apache.falcon.logging.v1.TaskLogRetrieverV1");
-        } catch (ClassNotFoundException e) {
-            LOG.warn("V1 Retriever missing, falling back to Default retriever");
+        if (YARN.equals(conf.get(MAPREDUCE_FRAMEWORK))) {
+            return TaskLogRetrieverYarn.class;
+        } else {
             return DefaultTaskLogRetriever.class;
         }
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/fbb4d314/oozie/src/main/java/org/apache/falcon/logging/TaskLogRetrieverYarn.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/TaskLogRetrieverYarn.java b/oozie/src/main/java/org/apache/falcon/logging/TaskLogRetrieverYarn.java
index 61c5afb..146d53c 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/TaskLogRetrieverYarn.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/TaskLogRetrieverYarn.java
@@ -48,13 +48,22 @@ public class TaskLogRetrieverYarn extends DefaultTaskLogRetriever {
             LOG.warn("External id for workflow action is null");
             return null;
         }
+
+        if (conf.get(YARN_LOG_SERVER_URL) == null) {
+            LOG.warn("YARN log Server is null");
+            return null;
+        }
+
         try {
             Job job = cluster.getJob(jobID);
             if (job != null) {
                 TaskCompletionEvent[] events = job.getTaskCompletionEvents(0);
                 for (TaskCompletionEvent event : events) {
                     LogParams params = cluster.getLogParams(jobID, event.getTaskAttemptId());
-                    String url = SCHEME + conf.get(YARN_LOG_SERVER_URL) + "/"
+                    String url = (conf.get(YARN_LOG_SERVER_URL).startsWith(SCHEME)
+                            ? conf.get(YARN_LOG_SERVER_URL)
+                            : SCHEME + conf.get(YARN_LOG_SERVER_URL))
+                            + "/"
                             + event.getTaskTrackerHttp() + "/"
                             + params.getContainerId() + "/"
                             + params.getApplicationId() + "/"