You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/08/05 04:25:55 UTC

falcon git commit: FALCON-2039 Move falcon post processing to falcon server and remove post processing

Repository: falcon
Updated Branches:
  refs/heads/master 723f1f7f1 -> 7d9687bcb


FALCON-2039 Move falcon post processing to falcon server and remove post processing

Author: Praveen Adlakha <ad...@gmail.com>

Reviewers: @pallavi-rao, @vrangan

Closes #244 from PraveenAdlakha/2039 and squashes the following commits:

b71290a [Praveen Adlakha] process removed
26ddc02 [Praveen Adlakha] comments addressed
d4f4cf8 [Praveen Adlakha] fixed test cases
cdf9ae1 [Praveen Adlakha] documentation added
9aee018 [Praveen Adlakha] multithread added
b3115de [Praveen Adlakha] Agebased boolean check removed
22c25a3 [Praveen Adlakha] Startup properties
1bc278f [Praveen Adlakha] minor fixes done
71b8d1a [Praveen Adlakha] check added in service
dd58642 [Praveen Adlakha] FALCON-2039 Move falcon post processing to falcon server and remove post processing action from falcon workflow


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/7d9687bc
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/7d9687bc
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/7d9687bc

Branch: refs/heads/master
Commit: 7d9687bcbc8a978ad361498084f3e473fd69fc9b
Parents: 723f1f7
Author: Praveen Adlakha <ad...@gmail.com>
Authored: Fri Aug 5 09:55:49 2016 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Fri Aug 5 09:55:49 2016 +0530

----------------------------------------------------------------------
 common/src/main/resources/runtime.properties    |   2 +-
 common/src/main/resources/startup.properties    |   7 ++
 docs/src/site/twiki/Configuration.twiki         |  14 +++
 .../retention/AgeBasedWorkflowBuilder.java      |  32 ++++--
 .../engine/oozie/utils/OozieBuilderUtils.java   |   2 +
 oozie/pom.xml                                   |   1 -
 .../org/apache/falcon/logging/JobLogMover.java  |  14 +++
 .../oozie/DatabaseExportWorkflowBuilder.java    |  13 +--
 .../oozie/DatabaseImportWorkflowBuilder.java    |  13 +--
 .../OozieOrchestrationWorkflowBuilder.java      |  30 +++++
 .../feed/FSReplicationWorkflowBuilder.java      |  15 +--
 .../feed/FeedRetentionWorkflowBuilder.java      |  13 +--
 .../feed/HCatReplicationWorkflowBuilder.java    |  15 +--
 .../ProcessExecutionWorkflowBuilder.java        |  12 +-
 .../apache/falcon/service/LogMoverService.java  | 111 +++++++++++++++++++
 .../falcon/workflow/FalconPostProcessing.java   |  18 +--
 .../feed/OozieFeedWorkflowBuilderTest.java      |  48 ++++++++
 .../OozieProcessWorkflowBuilderTest.java        |  34 ++++++
 src/conf/startup.properties                     |   7 ++
 19 files changed, 295 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/common/src/main/resources/runtime.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/runtime.properties b/common/src/main/resources/runtime.properties
index 643559e..ba4c055 100644
--- a/common/src/main/resources/runtime.properties
+++ b/common/src/main/resources/runtime.properties
@@ -51,4 +51,4 @@
 
 *.falcon.service.ProxyUserService.proxyuser.#USER#.groups=*
 
-######### Proxyuser Configuration End #########
\ No newline at end of file
+######### Proxyuser Configuration End #########

http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index 4b692a2..9207b25 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -83,6 +83,7 @@
 
 ##### Workflow Job Execution Completion listeners #####
 *.workflow.execution.listeners=
+#org.apache.falcon.service.LogMoverService
 
 ######### Implementation classes #########
 
@@ -336,3 +337,9 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle
 # Backlog Metric Properties
 #*.falcon.backlog.metricservice.emit.interval.millisecs=60000
 #*.falcon.backlog.metricservice.recheck.interval.millisecs=600000
+
+# Property to remove postProcessing
+*.falcon.postprocessing.enable=true
+
+### LogMoveService Thread count
+*.falcon.logMoveService.threadCount=50

http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/docs/src/site/twiki/Configuration.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/Configuration.twiki b/docs/src/site/twiki/Configuration.twiki
index ce32019..c686d48 100644
--- a/docs/src/site/twiki/Configuration.twiki
+++ b/docs/src/site/twiki/Configuration.twiki
@@ -317,6 +317,20 @@ su - $OOZIE_USER
 Where $OOZIE_USER is the Oozie user. For example, oozie.
 </verbatim>
 
+---+++Disabling Falcon Post Processing
+Falcon post processing performs two tasks:
+They send user notifications to Active mq.
+It moves oozie executor logs once the workflow finishes.
+
+If post processing is failing because of any reason user mind end up having a backlog in the pipeline thats why it has been made optional.
+
+To disable post processing set the following property to false in startup.properties :
+<verbatim>
+*.falcon.postprocessing.enable=false
+*.workflow.execution.listeners=org.apache.falcon.service.LogMoverService
+</verbatim>
+*NOTE : Please make sure Oozie JMS Notifications are enabled as logMoverService depends on the Oozie JMS Notification.*
+
 
 ---+++Enabling Falcon Native Scheudler
 You can either choose to schedule entities using Oozie's coordinator or using Falcon's native scheduler. To be able to

http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java
index 34b186e..dd0c6d2 100644
--- a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java
+++ b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java
@@ -59,19 +59,27 @@ public final class AgeBasedWorkflowBuilder {
 
         //Add eviction action
         ACTION eviction = OozieBuilderUtils.unmarshalAction(EVICTION_ACTION_TEMPLATE);
-        OozieBuilderUtils.addTransition(eviction, OozieBuilderUtils.SUCCESS_POSTPROCESS_ACTION_NAME,
-                OozieBuilderUtils.FAIL_POSTPROCESS_ACTION_NAME);
-        workflow.getDecisionOrForkOrJoin().add(eviction);
-
-        //Add post-processing actions
-        ACTION success = OozieBuilderUtils.getSuccessPostProcessAction();
-        OozieBuilderUtils.addTransition(success, OozieBuilderUtils.OK_ACTION_NAME, OozieBuilderUtils.FAIL_ACTION_NAME);
-        workflow.getDecisionOrForkOrJoin().add(success);
-
-        ACTION fail = OozieBuilderUtils.getFailPostProcessAction();
-        OozieBuilderUtils.addTransition(fail, OozieBuilderUtils.FAIL_ACTION_NAME, OozieBuilderUtils.FAIL_ACTION_NAME);
-        workflow.getDecisionOrForkOrJoin().add(fail);
 
+        if (!Boolean.parseBoolean(OozieBuilderUtils.ENABLE_POSTPROCESSING)){
+            OozieBuilderUtils.addTransition(eviction, OozieBuilderUtils.OK_ACTION_NAME,
+                    OozieBuilderUtils.FAIL_ACTION_NAME);
+            workflow.getDecisionOrForkOrJoin().add(eviction);
+        } else {
+            OozieBuilderUtils.addTransition(eviction, OozieBuilderUtils.SUCCESS_POSTPROCESS_ACTION_NAME,
+                    OozieBuilderUtils.FAIL_POSTPROCESS_ACTION_NAME);
+            workflow.getDecisionOrForkOrJoin().add(eviction);
+
+            //Add post-processing actions
+            ACTION success = OozieBuilderUtils.getSuccessPostProcessAction();
+            OozieBuilderUtils.addTransition(success, OozieBuilderUtils.OK_ACTION_NAME,
+                    OozieBuilderUtils.FAIL_ACTION_NAME);
+            workflow.getDecisionOrForkOrJoin().add(success);
+
+            ACTION fail = OozieBuilderUtils.getFailPostProcessAction();
+            OozieBuilderUtils.addTransition(fail, OozieBuilderUtils.FAIL_ACTION_NAME,
+                    OozieBuilderUtils.FAIL_ACTION_NAME);
+            workflow.getDecisionOrForkOrJoin().add(fail);
+        }
         OozieBuilderUtils.decorateWorkflow(workflow, wfName, EVICTION_ACTION_NAME);
         OozieBuilderUtils.addLibExtensionsToWorkflow(cluster, workflow, Tag.RETENTION, EntityType.FEED);
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java
----------------------------------------------------------------------
diff --git a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java
index 8f1b53b..7d51c9a 100644
--- a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java
+++ b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java
@@ -102,6 +102,8 @@ public final class OozieBuilderUtils {
     public static final String ENTITY_PATH = "ENTITY_PATH";
     public static final String ENTITY_NAME = "ENTITY_NAME";
     public static final String IGNORE = "IGNORE";
+    public static final String ENABLE_POSTPROCESSING = StartupProperties.get().
+            getProperty("falcon.postprocessing.enable");
 
 
     static {

http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/oozie/pom.xml
----------------------------------------------------------------------
diff --git a/oozie/pom.xml b/oozie/pom.xml
index 7bfb086..2adbba3 100644
--- a/oozie/pom.xml
+++ b/oozie/pom.xml
@@ -105,7 +105,6 @@
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
         </dependency>
-
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
index 64596c6..6ec2a20 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.OozieClientException;
@@ -68,6 +69,19 @@ public class JobLogMover {
         return conf == null ? new Configuration(): conf;
     }
 
+    public void moveLog(WorkflowExecutionContext context){
+        if (UserGroupInformation.isSecurityEnabled()) {
+            LOG.info("Unable to move logs as security is enabled.");
+            return;
+        }
+        try {
+            run(context);
+        } catch (Exception ignored) {
+            // Mask exception, a failed log mover will not fail the user workflow
+            LOG.error("Exception in job log mover:", ignored);
+        }
+    }
+
     public int run(WorkflowExecutionContext context) {
         try {
             OozieClient client = new OozieClient(context.getWorkflowEngineUrl());

http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/oozie/src/main/java/org/apache/falcon/oozie/DatabaseExportWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/DatabaseExportWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/DatabaseExportWorkflowBuilder.java
index 6468415..b86afaf 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/DatabaseExportWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/DatabaseExportWorkflowBuilder.java
@@ -67,18 +67,7 @@ public class DatabaseExportWorkflowBuilder extends ExportWorkflowBuilder {
         ImportExportCommon.addHCatalogProperties(props, entity, cluster, workflow, this, buildPath, sqoopExport);
         OozieUtils.marshalSqoopAction(action, actionJaxbElement);
 
-        addTransition(action, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
-        workflow.getDecisionOrForkOrJoin().add(action);
-
-        //Add post-processing actions
-        ACTION success = getSuccessPostProcessAction();
-        addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME);
-        workflow.getDecisionOrForkOrJoin().add(success);
-
-        ACTION fail = getFailPostProcessAction();
-        addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME);
-        workflow.getDecisionOrForkOrJoin().add(fail);
-
+        addPostProcessing(workflow, action);
         decorateWorkflow(workflow, workflow.getName(), EXPORT_ACTION_NAME);
         addLibExtensionsToWorkflow(cluster, workflow, Tag.EXPORT);
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java
index 44562f2..5c95162 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java
@@ -69,18 +69,7 @@ public class DatabaseImportWorkflowBuilder extends ImportWorkflowBuilder {
         ImportExportCommon.addHCatalogProperties(props, entity, cluster, workflow, this, buildPath, sqoopImport);
         OozieUtils.marshalSqoopAction(action, actionJaxbElement);
 
-        addTransition(action, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
-        workflow.getDecisionOrForkOrJoin().add(action);
-
-        //Add post-processing actions
-        ACTION success = getSuccessPostProcessAction();
-        addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME);
-        workflow.getDecisionOrForkOrJoin().add(success);
-
-        ACTION fail = getFailPostProcessAction();
-        addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME);
-        workflow.getDecisionOrForkOrJoin().add(fail);
-
+        addPostProcessing(workflow, action);
         decorateWorkflow(workflow, workflow.getName(), IMPORT_ACTION_NAME);
         addLibExtensionsToWorkflow(cluster, workflow, Tag.IMPORT);
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
index 5ad3d03..9683e62 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
@@ -80,6 +80,13 @@ import java.util.Set;
 public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extends OozieEntityBuilder<T> {
     public static final String HIVE_CREDENTIAL_NAME = "falconHiveAuth";
 
+    public String getEnablePostProcessing() {
+        return enablePostprocessing;
+    }
+
+    private String enablePostprocessing = StartupProperties.get().
+            getProperty("falcon.postprocessing.enable");
+
     protected static final String USER_ACTION_NAME = "user-action";
     protected static final String PREPROCESS_ACTION_NAME = "pre-processing";
     protected static final String SUCCESS_POSTPROCESS_ACTION_NAME = "succeeded-post-processing";
@@ -130,6 +137,10 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
         return get(entity, cluster, lifecycle, Scheduler.OOZIE);
     }
 
+    public Boolean isPostProcessingEnabled(){
+        return Boolean.parseBoolean(getEnablePostProcessing());
+    }
+
     public static OozieOrchestrationWorkflowBuilder get(Entity entity, Cluster cluster, Tag lifecycle,
                                                         Scheduler scheduler)
         throws FalconException {
@@ -219,6 +230,25 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
         wf.getDecisionOrForkOrJoin().add(kill);
     }
 
+    protected void addPostProcessing(WORKFLOWAPP workflow, ACTION action) throws FalconException{
+        if (!isPostProcessingEnabled()){
+            addTransition(action, OK_ACTION_NAME, FAIL_ACTION_NAME);
+            workflow.getDecisionOrForkOrJoin().add(action);
+        }else{
+            addTransition(action, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+            workflow.getDecisionOrForkOrJoin().add(action);
+
+            //Add post-processing actions
+            ACTION success = getSuccessPostProcessAction();
+            addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME);
+            workflow.getDecisionOrForkOrJoin().add(success);
+
+            ACTION fail = getFailPostProcessAction();
+            addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME);
+            workflow.getDecisionOrForkOrJoin().add(fail);
+        }
+    }
+
     protected ACTION getSuccessPostProcessAction() throws FalconException {
         ACTION action = unmarshalAction(POSTPROCESS_TEMPLATE);
         decorateWithOozieRetries(action);

http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
index cfcc698..598cf6f 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
@@ -59,20 +59,7 @@ public class FSReplicationWorkflowBuilder extends FeedReplicationWorkflowBuilder
         addAdditionalReplicationProperties(replication);
         enableCounters(replication);
         enableTDE(replication);
-        addTransition(replication, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
-        workflow.getDecisionOrForkOrJoin().add(replication);
-
-        //Add post-processing actions
-        ACTION success = getSuccessPostProcessAction();
-        addHDFSServersConfig(success, src, target);
-        addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME);
-        workflow.getDecisionOrForkOrJoin().add(success);
-
-        ACTION fail = getFailPostProcessAction();
-        addHDFSServersConfig(fail, src, target);
-        addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME);
-        workflow.getDecisionOrForkOrJoin().add(fail);
-
+        addPostProcessing(workflow, replication);
         decorateWorkflow(workflow, wfName, start);
         return workflow;
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
index b9e3848..fd51ed0 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
@@ -51,21 +51,10 @@ public class FeedRetentionWorkflowBuilder extends OozieOrchestrationWorkflowBuil
     @Override public Properties build(Cluster cluster, Path buildPath) throws FalconException {
         WORKFLOWAPP workflow = new WORKFLOWAPP();
         String wfName = EntityUtil.getWorkflowName(Tag.RETENTION, entity).toString();
-
         //Add eviction action
         ACTION eviction = unmarshalAction(EVICTION_ACTION_TEMPLATE);
-        addTransition(eviction, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
-        workflow.getDecisionOrForkOrJoin().add(eviction);
-
-        //Add post-processing actions
-        ACTION success = getSuccessPostProcessAction();
-        addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME);
-        workflow.getDecisionOrForkOrJoin().add(success);
-
-        ACTION fail = getFailPostProcessAction();
-        addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME);
-        workflow.getDecisionOrForkOrJoin().add(fail);
 
+        addPostProcessing(workflow, eviction);
         decorateWorkflow(workflow, wfName, EVICTION_ACTION_NAME);
         addLibExtensionsToWorkflow(cluster, workflow, Tag.RETENTION);
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
index 3da97d3..f4eecb7 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
@@ -127,20 +127,7 @@ public class HCatReplicationWorkflowBuilder extends FeedReplicationWorkflowBuild
 
         //Add cleanup action
         ACTION cleanup = unmarshalAction(CLEANUP_ACTION_TEMPLATE);
-        addTransition(cleanup, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
-        workflow.getDecisionOrForkOrJoin().add(cleanup);
-
-        //Add post-processing actions
-        ACTION success = getSuccessPostProcessAction();
-        addHDFSServersConfig(success, src, target);
-        addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME);
-        workflow.getDecisionOrForkOrJoin().add(success);
-
-        ACTION fail = getFailPostProcessAction();
-        addHDFSServersConfig(fail, src, target);
-        addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME);
-        workflow.getDecisionOrForkOrJoin().add(fail);
-
+        addPostProcessing(workflow, cleanup);
         decorateWorkflow(workflow, wfName, start);
         setupHiveCredentials(src, target, workflow);
         return workflow;

http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
index 5d2c43e..c31b4ee 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
@@ -83,20 +83,10 @@ public abstract class ProcessExecutionWorkflowBuilder extends OozieOrchestration
             wfApp.getDecisionOrForkOrJoin().add(preProcessAction);
             startAction = PREPROCESS_ACTION_NAME;
         }
-
         //Add user action
         ACTION userAction = getUserAction(cluster, buildPath);
-        addTransition(userAction, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
-        wfApp.getDecisionOrForkOrJoin().add(userAction);
-
-        //Add post-processing
-        ACTION success = getSuccessPostProcessAction();
-        addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME);
-        wfApp.getDecisionOrForkOrJoin().add(success);
 
-        ACTION fail = getFailPostProcessAction();
-        addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME);
-        wfApp.getDecisionOrForkOrJoin().add(fail);
+        addPostProcessing(wfApp, userAction);
 
         decorateWorkflow(wfApp, wfName, startAction);
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java b/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java
new file mode 100644
index 0000000..1f3d0a0
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.service;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.logging.JobLogMover;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.falcon.workflow.WorkflowExecutionListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+
+
+/**
+ * Moves Falcon logs.
+ */
+public class LogMoverService implements WorkflowExecutionListener  {
+
+    private static final Logger LOG = LoggerFactory.getLogger(LogMoverService.class);
+
+    public static final String ENABLE_POSTPROCESSING = StartupProperties.get().
+            getProperty("falcon.postprocessing.enable");
+
+    private BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(50);
+    private ExecutorService executorService = new ThreadPoolExecutor(20, getThreadCount(), 120,
+            TimeUnit.SECONDS, blockingQueue);
+    public int getThreadCount() {
+        try{
+            return Integer.parseInt(StartupProperties.get().getProperty("falcon.logMoveService.threadCount"));
+        } catch (NumberFormatException  e){
+            LOG.error("Exception in LogMoverService", e);
+            return 50;
+        }
+    }
+
+    @Override
+    public void onSuccess(WorkflowExecutionContext context) throws FalconException{
+        onEnd(context);
+    }
+
+    @Override
+    public void onFailure(WorkflowExecutionContext context) throws FalconException{
+        onEnd(context);
+    }
+
+    @Override
+    public void onStart(WorkflowExecutionContext context) throws FalconException{
+       //Do Nothing
+    }
+
+    @Override
+    public void onSuspend(WorkflowExecutionContext context) throws FalconException{
+        //DO Nothing
+    }
+
+    @Override
+    public void onWait(WorkflowExecutionContext context) throws FalconException{
+        //DO Nothing
+    }
+
+    private void onEnd(WorkflowExecutionContext context){
+        if (Boolean.parseBoolean(ENABLE_POSTPROCESSING)) {
+            return;
+        }
+        while(0<blockingQueue.remainingCapacity()){
+            try {
+                LOG.info("Sleeing, no capacity in threadpool....");
+                TimeUnit.MILLISECONDS.sleep(500);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+        executorService.execute(new LogMover(context));
+    }
+
+
+    private static class LogMover implements Runnable {
+        private WorkflowExecutionContext context;
+        public LogMover(@Nonnull WorkflowExecutionContext context){
+            this.context = context;
+        }
+        @Override
+        public void run(){
+            new JobLogMover().moveLog(context);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
index 4961896..9594c04 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
@@ -23,7 +23,6 @@ import org.apache.falcon.messaging.JMSMessageProducer;
 import org.apache.falcon.workflow.util.OozieActionConfigurationHelper;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.slf4j.Logger;
@@ -60,7 +59,7 @@ public class FalconPostProcessing extends Configured implements Tool {
 
         // JobLogMover doesn't throw exception, a failed log mover will not fail the user workflow
         LOG.info("Moving logs {}", context);
-        invokeLogProducer(context);
+        new JobLogMover().moveLog(context);
 
         return 0;
     }
@@ -71,19 +70,4 @@ public class FalconPostProcessing extends Configured implements Tool {
                 .build();
         jmsMessageProducer.sendMessage(WorkflowExecutionContext.USER_MESSAGE_ARGS);
     }
-
-    private void invokeLogProducer(WorkflowExecutionContext context) {
-        // todo: need to move this out to Falcon in-process
-        if (UserGroupInformation.isSecurityEnabled()) {
-            LOG.info("Unable to move logs as security is enabled.");
-            return;
-        }
-
-        try {
-            new JobLogMover().run(context);
-        } catch (Exception ignored) {
-            // Mask exception, a failed log mover will not fail the user workflow
-            LOG.error("Exception in job log mover:", ignored);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
index 2040748..fde5532 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
@@ -77,6 +77,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Iterator;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
 
 /**
  * Tests for Oozie workflow definition for feed replication & retention.
@@ -261,6 +265,45 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
     }
 
     @Test
+    public void testPostProcessing() throws Exception{
+        StartupProperties.get().setProperty("falcon.postprocessing.enable", "false");
+        OozieEntityBuilder builder = OozieEntityBuilder.get(feed);
+        Path bundlePath = new Path("/projects/falcon/");
+        builder.build(trgCluster, bundlePath);
+        BUNDLEAPP bundle = getBundle(trgMiniDFS.getFileSystem(), bundlePath);
+        List<COORDINATOR> coords = bundle.getCoordinator();
+        COORDINATORAPP coord = getCoordinator(trgMiniDFS, coords.get(0).getAppPath());
+
+        WORKFLOWAPP workflow = getWorkflowapp(trgMiniDFS.getFileSystem(), coord);
+
+        Boolean foundUserAction = false;
+        Boolean foundPostProcessing = false;
+        Iterator<COORDINATOR> coordIterator = coords.iterator();
+
+        while(coordIterator.hasNext()){
+            COORDINATORAPP coord1 = getCoordinator(trgMiniDFS, coordIterator.next().getAppPath());
+            WORKFLOWAPP workflow1 = getWorkflowapp(trgMiniDFS.getFileSystem(), coord1);
+            Iterator<Object> workflowIterator = workflow1.getDecisionOrForkOrJoin().iterator();
+            while (workflowIterator.hasNext()){
+                Object object = workflowIterator.next();
+                if (ACTION.class.isAssignableFrom(object.getClass())){
+                    ACTION action = (ACTION) object;
+                    if (action.getName().equals("eviction") || action.getName().equals("replication")){
+                        foundUserAction = true;
+                    }
+                    if (action.getName().contains("post")){
+                        foundPostProcessing = true;
+                    }
+                }
+            }
+        }
+
+        assertTrue(foundUserAction);
+        assertFalse(foundPostProcessing);
+        StartupProperties.get().setProperty("falcon.postprocessing.enable", "true");
+    }
+
+    @Test
     public void testReplicationCoordsForFSStorage() throws Exception {
         OozieEntityBuilder builder = OozieEntityBuilder.get(feed);
         Path bundlePath = new Path("/projects/falcon/");
@@ -697,6 +740,10 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         Configuration conf = fs.getConf();
         conf.set("fs.permissions.umask-mode", umask);
 
+        OozieEntityBuilder feedBuilder = OozieEntityBuilder.get(feed);
+        Path bundlePath = new Path("/projects/falcon/");
+        feedBuilder.build(trgCluster, bundlePath);
+
         // ClusterHelper constructs new fs Conf. Add it to cluster properties so that it gets added to FS conf
         setUmaskInFsConf(srcCluster, umask);
 
@@ -759,6 +806,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
 
     @Test (dataProvider = "secureOptions")
     public void testRetentionCoordsForTable(String secureOption) throws Exception {
+        StartupProperties.get().setProperty("falcon.postprocessing.enable", "true");
         StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, secureOption);
 
         final String umask = "000";

http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
index a692d0c..05b513e 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
@@ -78,6 +78,7 @@ import java.util.Map;
 import java.util.Properties;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 
 /**
@@ -115,6 +116,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         storeEntity(EntityType.PROCESS, "clicksummary", PROCESS_XML);
         storeEntity(EntityType.PROCESS, "pig-process", PIG_PROCESS_XML);
 
+
         ConfigurationStore store = ConfigurationStore.get();
         cluster = store.get(EntityType.CLUSTER, "corp");
         org.apache.falcon.entity.v0.cluster.Property property =
@@ -785,6 +787,38 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         assertAction(parentWorkflow, "user-action", false);
     }
 
+    @Test
+    public void testPostProcessingProcess() throws Exception {
+        StartupProperties.get().setProperty("falcon.postprocessing.enable", "false");
+        Process process = ConfigurationStore.get().get(EntityType.PROCESS, "pig-process");
+
+        OozieEntityBuilder builder = OozieEntityBuilder.get(process);
+        Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
+        builder.build(cluster, bundlePath);
+        BUNDLEAPP bundle = getBundle(fs, bundlePath);
+        String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
+        COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
+
+        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+        WORKFLOWAPP workflowapp = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
+
+        Boolean foudUserAction = false;
+        Boolean foundpostProcessing =false;
+
+        for(Object action : workflowapp.getDecisionOrForkOrJoin()){
+            if (action instanceof ACTION && ((ACTION)action).getName().equals("user-action")){
+                foudUserAction = true;
+            }
+            if (action instanceof ACTION && ((ACTION)action).getName().contains("post")){
+                foundpostProcessing = true;
+            }
+
+        }
+        assertTrue(foudUserAction);
+        assertFalse(foundpostProcessing);
+        StartupProperties.get().setProperty("falcon.postprocessing.enable", "true");
+    }
+
     @AfterMethod
     public void cleanup() throws Exception {
         cleanupStore();

http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index ef07e57..a47327a 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -123,6 +123,7 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
 
 ##### Workflow Job Execution Completion listeners #####
 *.workflow.execution.listeners=org.apache.falcon.handler.SLAMonitoringHandler
+#org.apache.falcon.service.LogMoverService
 
 ######### Implementation classes #########
 
@@ -353,3 +354,9 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
 # Backlog Metric Properties
 #*.falcon.backlog.metricservice.emit.interval.millisecs=60000
 #*.falcon.backlog.metricservice.recheck.interval.millisecs=600000
+
+# Property to remove postProcessing
+*.falcon.postprocessing.enable=true
+
+### LogMoveService Thread count
+*.falcon.logMoveService.threadCount=50