You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/08/05 04:25:55 UTC
falcon git commit: FALCON-2039 Move falcon post processing to falcon
server and remove post processing
Repository: falcon
Updated Branches:
refs/heads/master 723f1f7f1 -> 7d9687bcb
FALCON-2039 Move falcon post processing to falcon server and remove post processing
Author: Praveen Adlakha <ad...@gmail.com>
Reviewers: @pallavi-rao, @vrangan
Closes #244 from PraveenAdlakha/2039 and squashes the following commits:
b71290a [Praveen Adlakha] process removed
26ddc02 [Praveen Adlakha] comments addressed
d4f4cf8 [Praveen Adlakha] fixed test cases
cdf9ae1 [Praveen Adlakha] documentation added
9aee018 [Praveen Adlakha] multithread added
b3115de [Praveen Adlakha] Agebased boolean check removed
22c25a3 [Praveen Adlakha] Startup properties
1bc278f [Praveen Adlakha] minor fixes done
71b8d1a [Praveen Adlakha] check added in service
dd58642 [Praveen Adlakha] FALCON-2039 Move falcon post processing to falcon server and remove post processing action from falcon workflow
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/7d9687bc
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/7d9687bc
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/7d9687bc
Branch: refs/heads/master
Commit: 7d9687bcbc8a978ad361498084f3e473fd69fc9b
Parents: 723f1f7
Author: Praveen Adlakha <ad...@gmail.com>
Authored: Fri Aug 5 09:55:49 2016 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Fri Aug 5 09:55:49 2016 +0530
----------------------------------------------------------------------
common/src/main/resources/runtime.properties | 2 +-
common/src/main/resources/startup.properties | 7 ++
docs/src/site/twiki/Configuration.twiki | 14 +++
.../retention/AgeBasedWorkflowBuilder.java | 32 ++++--
.../engine/oozie/utils/OozieBuilderUtils.java | 2 +
oozie/pom.xml | 1 -
.../org/apache/falcon/logging/JobLogMover.java | 14 +++
.../oozie/DatabaseExportWorkflowBuilder.java | 13 +--
.../oozie/DatabaseImportWorkflowBuilder.java | 13 +--
.../OozieOrchestrationWorkflowBuilder.java | 30 +++++
.../feed/FSReplicationWorkflowBuilder.java | 15 +--
.../feed/FeedRetentionWorkflowBuilder.java | 13 +--
.../feed/HCatReplicationWorkflowBuilder.java | 15 +--
.../ProcessExecutionWorkflowBuilder.java | 12 +-
.../apache/falcon/service/LogMoverService.java | 111 +++++++++++++++++++
.../falcon/workflow/FalconPostProcessing.java | 18 +--
.../feed/OozieFeedWorkflowBuilderTest.java | 48 ++++++++
.../OozieProcessWorkflowBuilderTest.java | 34 ++++++
src/conf/startup.properties | 7 ++
19 files changed, 295 insertions(+), 106 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/common/src/main/resources/runtime.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/runtime.properties b/common/src/main/resources/runtime.properties
index 643559e..ba4c055 100644
--- a/common/src/main/resources/runtime.properties
+++ b/common/src/main/resources/runtime.properties
@@ -51,4 +51,4 @@
*.falcon.service.ProxyUserService.proxyuser.#USER#.groups=*
-######### Proxyuser Configuration End #########
\ No newline at end of file
+######### Proxyuser Configuration End #########
http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index 4b692a2..9207b25 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -83,6 +83,7 @@
##### Workflow Job Execution Completion listeners #####
*.workflow.execution.listeners=
+#org.apache.falcon.service.LogMoverService
######### Implementation classes #########
@@ -336,3 +337,9 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle
# Backlog Metric Properties
#*.falcon.backlog.metricservice.emit.interval.millisecs=60000
#*.falcon.backlog.metricservice.recheck.interval.millisecs=600000
+
+# Property to remove postProcessing
+*.falcon.postprocessing.enable=true
+
+### LogMoveService Thread count
+*.falcon.logMoveService.threadCount=50
http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/docs/src/site/twiki/Configuration.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/Configuration.twiki b/docs/src/site/twiki/Configuration.twiki
index ce32019..c686d48 100644
--- a/docs/src/site/twiki/Configuration.twiki
+++ b/docs/src/site/twiki/Configuration.twiki
@@ -317,6 +317,20 @@ su - $OOZIE_USER
Where $OOZIE_USER is the Oozie user. For example, oozie.
</verbatim>
+---+++Disabling Falcon Post Processing
+Falcon post processing performs two tasks:
+They send user notifications to Active mq.
+It moves oozie executor logs once the workflow finishes.
+
+If post processing is failing because of any reason user mind end up having a backlog in the pipeline thats why it has been made optional.
+
+To disable post processing set the following property to false in startup.properties :
+<verbatim>
+*.falcon.postprocessing.enable=false
+*.workflow.execution.listeners=org.apache.falcon.service.LogMoverService
+</verbatim>
+*NOTE : Please make sure Oozie JMS Notifications are enabled as logMoverService depends on the Oozie JMS Notification.*
+
---+++Enabling Falcon Native Scheudler
You can either choose to schedule entities using Oozie's coordinator or using Falcon's native scheduler. To be able to
http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java
index 34b186e..dd0c6d2 100644
--- a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java
+++ b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java
@@ -59,19 +59,27 @@ public final class AgeBasedWorkflowBuilder {
//Add eviction action
ACTION eviction = OozieBuilderUtils.unmarshalAction(EVICTION_ACTION_TEMPLATE);
- OozieBuilderUtils.addTransition(eviction, OozieBuilderUtils.SUCCESS_POSTPROCESS_ACTION_NAME,
- OozieBuilderUtils.FAIL_POSTPROCESS_ACTION_NAME);
- workflow.getDecisionOrForkOrJoin().add(eviction);
-
- //Add post-processing actions
- ACTION success = OozieBuilderUtils.getSuccessPostProcessAction();
- OozieBuilderUtils.addTransition(success, OozieBuilderUtils.OK_ACTION_NAME, OozieBuilderUtils.FAIL_ACTION_NAME);
- workflow.getDecisionOrForkOrJoin().add(success);
-
- ACTION fail = OozieBuilderUtils.getFailPostProcessAction();
- OozieBuilderUtils.addTransition(fail, OozieBuilderUtils.FAIL_ACTION_NAME, OozieBuilderUtils.FAIL_ACTION_NAME);
- workflow.getDecisionOrForkOrJoin().add(fail);
+ if (!Boolean.parseBoolean(OozieBuilderUtils.ENABLE_POSTPROCESSING)){
+ OozieBuilderUtils.addTransition(eviction, OozieBuilderUtils.OK_ACTION_NAME,
+ OozieBuilderUtils.FAIL_ACTION_NAME);
+ workflow.getDecisionOrForkOrJoin().add(eviction);
+ } else {
+ OozieBuilderUtils.addTransition(eviction, OozieBuilderUtils.SUCCESS_POSTPROCESS_ACTION_NAME,
+ OozieBuilderUtils.FAIL_POSTPROCESS_ACTION_NAME);
+ workflow.getDecisionOrForkOrJoin().add(eviction);
+
+ //Add post-processing actions
+ ACTION success = OozieBuilderUtils.getSuccessPostProcessAction();
+ OozieBuilderUtils.addTransition(success, OozieBuilderUtils.OK_ACTION_NAME,
+ OozieBuilderUtils.FAIL_ACTION_NAME);
+ workflow.getDecisionOrForkOrJoin().add(success);
+
+ ACTION fail = OozieBuilderUtils.getFailPostProcessAction();
+ OozieBuilderUtils.addTransition(fail, OozieBuilderUtils.FAIL_ACTION_NAME,
+ OozieBuilderUtils.FAIL_ACTION_NAME);
+ workflow.getDecisionOrForkOrJoin().add(fail);
+ }
OozieBuilderUtils.decorateWorkflow(workflow, wfName, EVICTION_ACTION_NAME);
OozieBuilderUtils.addLibExtensionsToWorkflow(cluster, workflow, Tag.RETENTION, EntityType.FEED);
http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java
----------------------------------------------------------------------
diff --git a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java
index 8f1b53b..7d51c9a 100644
--- a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java
+++ b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java
@@ -102,6 +102,8 @@ public final class OozieBuilderUtils {
public static final String ENTITY_PATH = "ENTITY_PATH";
public static final String ENTITY_NAME = "ENTITY_NAME";
public static final String IGNORE = "IGNORE";
+ public static final String ENABLE_POSTPROCESSING = StartupProperties.get().
+ getProperty("falcon.postprocessing.enable");
static {
http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/oozie/pom.xml
----------------------------------------------------------------------
diff --git a/oozie/pom.xml b/oozie/pom.xml
index 7bfb086..2adbba3 100644
--- a/oozie/pom.xml
+++ b/oozie/pom.xml
@@ -105,7 +105,6 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
-
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
index 64596c6..6ec2a20 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.OozieClientException;
@@ -68,6 +69,19 @@ public class JobLogMover {
return conf == null ? new Configuration(): conf;
}
+ public void moveLog(WorkflowExecutionContext context){
+ if (UserGroupInformation.isSecurityEnabled()) {
+ LOG.info("Unable to move logs as security is enabled.");
+ return;
+ }
+ try {
+ run(context);
+ } catch (Exception ignored) {
+ // Mask exception, a failed log mover will not fail the user workflow
+ LOG.error("Exception in job log mover:", ignored);
+ }
+ }
+
public int run(WorkflowExecutionContext context) {
try {
OozieClient client = new OozieClient(context.getWorkflowEngineUrl());
http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/oozie/src/main/java/org/apache/falcon/oozie/DatabaseExportWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/DatabaseExportWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/DatabaseExportWorkflowBuilder.java
index 6468415..b86afaf 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/DatabaseExportWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/DatabaseExportWorkflowBuilder.java
@@ -67,18 +67,7 @@ public class DatabaseExportWorkflowBuilder extends ExportWorkflowBuilder {
ImportExportCommon.addHCatalogProperties(props, entity, cluster, workflow, this, buildPath, sqoopExport);
OozieUtils.marshalSqoopAction(action, actionJaxbElement);
- addTransition(action, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
- workflow.getDecisionOrForkOrJoin().add(action);
-
- //Add post-processing actions
- ACTION success = getSuccessPostProcessAction();
- addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME);
- workflow.getDecisionOrForkOrJoin().add(success);
-
- ACTION fail = getFailPostProcessAction();
- addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME);
- workflow.getDecisionOrForkOrJoin().add(fail);
-
+ addPostProcessing(workflow, action);
decorateWorkflow(workflow, workflow.getName(), EXPORT_ACTION_NAME);
addLibExtensionsToWorkflow(cluster, workflow, Tag.EXPORT);
http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java
index 44562f2..5c95162 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java
@@ -69,18 +69,7 @@ public class DatabaseImportWorkflowBuilder extends ImportWorkflowBuilder {
ImportExportCommon.addHCatalogProperties(props, entity, cluster, workflow, this, buildPath, sqoopImport);
OozieUtils.marshalSqoopAction(action, actionJaxbElement);
- addTransition(action, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
- workflow.getDecisionOrForkOrJoin().add(action);
-
- //Add post-processing actions
- ACTION success = getSuccessPostProcessAction();
- addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME);
- workflow.getDecisionOrForkOrJoin().add(success);
-
- ACTION fail = getFailPostProcessAction();
- addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME);
- workflow.getDecisionOrForkOrJoin().add(fail);
-
+ addPostProcessing(workflow, action);
decorateWorkflow(workflow, workflow.getName(), IMPORT_ACTION_NAME);
addLibExtensionsToWorkflow(cluster, workflow, Tag.IMPORT);
http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
index 5ad3d03..9683e62 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
@@ -80,6 +80,13 @@ import java.util.Set;
public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extends OozieEntityBuilder<T> {
public static final String HIVE_CREDENTIAL_NAME = "falconHiveAuth";
+ public String getEnablePostProcessing() {
+ return enablePostprocessing;
+ }
+
+ private String enablePostprocessing = StartupProperties.get().
+ getProperty("falcon.postprocessing.enable");
+
protected static final String USER_ACTION_NAME = "user-action";
protected static final String PREPROCESS_ACTION_NAME = "pre-processing";
protected static final String SUCCESS_POSTPROCESS_ACTION_NAME = "succeeded-post-processing";
@@ -130,6 +137,10 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
return get(entity, cluster, lifecycle, Scheduler.OOZIE);
}
+ public Boolean isPostProcessingEnabled(){
+ return Boolean.parseBoolean(getEnablePostProcessing());
+ }
+
public static OozieOrchestrationWorkflowBuilder get(Entity entity, Cluster cluster, Tag lifecycle,
Scheduler scheduler)
throws FalconException {
@@ -219,6 +230,25 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
wf.getDecisionOrForkOrJoin().add(kill);
}
+ protected void addPostProcessing(WORKFLOWAPP workflow, ACTION action) throws FalconException{
+ if (!isPostProcessingEnabled()){
+ addTransition(action, OK_ACTION_NAME, FAIL_ACTION_NAME);
+ workflow.getDecisionOrForkOrJoin().add(action);
+ }else{
+ addTransition(action, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+ workflow.getDecisionOrForkOrJoin().add(action);
+
+ //Add post-processing actions
+ ACTION success = getSuccessPostProcessAction();
+ addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME);
+ workflow.getDecisionOrForkOrJoin().add(success);
+
+ ACTION fail = getFailPostProcessAction();
+ addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME);
+ workflow.getDecisionOrForkOrJoin().add(fail);
+ }
+ }
+
protected ACTION getSuccessPostProcessAction() throws FalconException {
ACTION action = unmarshalAction(POSTPROCESS_TEMPLATE);
decorateWithOozieRetries(action);
http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
index cfcc698..598cf6f 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
@@ -59,20 +59,7 @@ public class FSReplicationWorkflowBuilder extends FeedReplicationWorkflowBuilder
addAdditionalReplicationProperties(replication);
enableCounters(replication);
enableTDE(replication);
- addTransition(replication, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
- workflow.getDecisionOrForkOrJoin().add(replication);
-
- //Add post-processing actions
- ACTION success = getSuccessPostProcessAction();
- addHDFSServersConfig(success, src, target);
- addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME);
- workflow.getDecisionOrForkOrJoin().add(success);
-
- ACTION fail = getFailPostProcessAction();
- addHDFSServersConfig(fail, src, target);
- addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME);
- workflow.getDecisionOrForkOrJoin().add(fail);
-
+ addPostProcessing(workflow, replication);
decorateWorkflow(workflow, wfName, start);
return workflow;
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
index b9e3848..fd51ed0 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
@@ -51,21 +51,10 @@ public class FeedRetentionWorkflowBuilder extends OozieOrchestrationWorkflowBuil
@Override public Properties build(Cluster cluster, Path buildPath) throws FalconException {
WORKFLOWAPP workflow = new WORKFLOWAPP();
String wfName = EntityUtil.getWorkflowName(Tag.RETENTION, entity).toString();
-
//Add eviction action
ACTION eviction = unmarshalAction(EVICTION_ACTION_TEMPLATE);
- addTransition(eviction, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
- workflow.getDecisionOrForkOrJoin().add(eviction);
-
- //Add post-processing actions
- ACTION success = getSuccessPostProcessAction();
- addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME);
- workflow.getDecisionOrForkOrJoin().add(success);
-
- ACTION fail = getFailPostProcessAction();
- addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME);
- workflow.getDecisionOrForkOrJoin().add(fail);
+ addPostProcessing(workflow, eviction);
decorateWorkflow(workflow, wfName, EVICTION_ACTION_NAME);
addLibExtensionsToWorkflow(cluster, workflow, Tag.RETENTION);
http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
index 3da97d3..f4eecb7 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
@@ -127,20 +127,7 @@ public class HCatReplicationWorkflowBuilder extends FeedReplicationWorkflowBuild
//Add cleanup action
ACTION cleanup = unmarshalAction(CLEANUP_ACTION_TEMPLATE);
- addTransition(cleanup, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
- workflow.getDecisionOrForkOrJoin().add(cleanup);
-
- //Add post-processing actions
- ACTION success = getSuccessPostProcessAction();
- addHDFSServersConfig(success, src, target);
- addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME);
- workflow.getDecisionOrForkOrJoin().add(success);
-
- ACTION fail = getFailPostProcessAction();
- addHDFSServersConfig(fail, src, target);
- addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME);
- workflow.getDecisionOrForkOrJoin().add(fail);
-
+ addPostProcessing(workflow, cleanup);
decorateWorkflow(workflow, wfName, start);
setupHiveCredentials(src, target, workflow);
return workflow;
http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
index 5d2c43e..c31b4ee 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
@@ -83,20 +83,10 @@ public abstract class ProcessExecutionWorkflowBuilder extends OozieOrchestration
wfApp.getDecisionOrForkOrJoin().add(preProcessAction);
startAction = PREPROCESS_ACTION_NAME;
}
-
//Add user action
ACTION userAction = getUserAction(cluster, buildPath);
- addTransition(userAction, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
- wfApp.getDecisionOrForkOrJoin().add(userAction);
-
- //Add post-processing
- ACTION success = getSuccessPostProcessAction();
- addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME);
- wfApp.getDecisionOrForkOrJoin().add(success);
- ACTION fail = getFailPostProcessAction();
- addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME);
- wfApp.getDecisionOrForkOrJoin().add(fail);
+ addPostProcessing(wfApp, userAction);
decorateWorkflow(wfApp, wfName, startAction);
http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java b/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java
new file mode 100644
index 0000000..1f3d0a0
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.service;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.logging.JobLogMover;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.falcon.workflow.WorkflowExecutionListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+
+
+/**
+ * Moves Falcon logs.
+ */
+public class LogMoverService implements WorkflowExecutionListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LogMoverService.class);
+
+ public static final String ENABLE_POSTPROCESSING = StartupProperties.get().
+ getProperty("falcon.postprocessing.enable");
+
+ private BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(50);
+ private ExecutorService executorService = new ThreadPoolExecutor(20, getThreadCount(), 120,
+ TimeUnit.SECONDS, blockingQueue);
+ public int getThreadCount() {
+ try{
+ return Integer.parseInt(StartupProperties.get().getProperty("falcon.logMoveService.threadCount"));
+ } catch (NumberFormatException e){
+ LOG.error("Exception in LogMoverService", e);
+ return 50;
+ }
+ }
+
+ @Override
+ public void onSuccess(WorkflowExecutionContext context) throws FalconException{
+ onEnd(context);
+ }
+
+ @Override
+ public void onFailure(WorkflowExecutionContext context) throws FalconException{
+ onEnd(context);
+ }
+
+ @Override
+ public void onStart(WorkflowExecutionContext context) throws FalconException{
+ //Do Nothing
+ }
+
+ @Override
+ public void onSuspend(WorkflowExecutionContext context) throws FalconException{
+ //DO Nothing
+ }
+
+ @Override
+ public void onWait(WorkflowExecutionContext context) throws FalconException{
+ //DO Nothing
+ }
+
+ private void onEnd(WorkflowExecutionContext context){
+ if (Boolean.parseBoolean(ENABLE_POSTPROCESSING)) {
+ return;
+ }
+ while(0<blockingQueue.remainingCapacity()){
+ try {
+ LOG.info("Sleeing, no capacity in threadpool....");
+ TimeUnit.MILLISECONDS.sleep(500);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ executorService.execute(new LogMover(context));
+ }
+
+
+ private static class LogMover implements Runnable {
+ private WorkflowExecutionContext context;
+ public LogMover(@Nonnull WorkflowExecutionContext context){
+ this.context = context;
+ }
+ @Override
+ public void run(){
+ new JobLogMover().moveLog(context);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
index 4961896..9594c04 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
@@ -23,7 +23,6 @@ import org.apache.falcon.messaging.JMSMessageProducer;
import org.apache.falcon.workflow.util.OozieActionConfigurationHelper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
@@ -60,7 +59,7 @@ public class FalconPostProcessing extends Configured implements Tool {
// JobLogMover doesn't throw exception, a failed log mover will not fail the user workflow
LOG.info("Moving logs {}", context);
- invokeLogProducer(context);
+ new JobLogMover().moveLog(context);
return 0;
}
@@ -71,19 +70,4 @@ public class FalconPostProcessing extends Configured implements Tool {
.build();
jmsMessageProducer.sendMessage(WorkflowExecutionContext.USER_MESSAGE_ARGS);
}
-
- private void invokeLogProducer(WorkflowExecutionContext context) {
- // todo: need to move this out to Falcon in-process
- if (UserGroupInformation.isSecurityEnabled()) {
- LOG.info("Unable to move logs as security is enabled.");
- return;
- }
-
- try {
- new JobLogMover().run(context);
- } catch (Exception ignored) {
- // Mask exception, a failed log mover will not fail the user workflow
- LOG.error("Exception in job log mover:", ignored);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
index 2040748..fde5532 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
@@ -77,6 +77,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Iterator;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
/**
* Tests for Oozie workflow definition for feed replication & retention.
@@ -261,6 +265,45 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
}
@Test
+ public void testPostProcessing() throws Exception{
+ StartupProperties.get().setProperty("falcon.postprocessing.enable", "false");
+ OozieEntityBuilder builder = OozieEntityBuilder.get(feed);
+ Path bundlePath = new Path("/projects/falcon/");
+ builder.build(trgCluster, bundlePath);
+ BUNDLEAPP bundle = getBundle(trgMiniDFS.getFileSystem(), bundlePath);
+ List<COORDINATOR> coords = bundle.getCoordinator();
+ COORDINATORAPP coord = getCoordinator(trgMiniDFS, coords.get(0).getAppPath());
+
+ WORKFLOWAPP workflow = getWorkflowapp(trgMiniDFS.getFileSystem(), coord);
+
+ Boolean foundUserAction = false;
+ Boolean foundPostProcessing = false;
+ Iterator<COORDINATOR> coordIterator = coords.iterator();
+
+ while(coordIterator.hasNext()){
+ COORDINATORAPP coord1 = getCoordinator(trgMiniDFS, coordIterator.next().getAppPath());
+ WORKFLOWAPP workflow1 = getWorkflowapp(trgMiniDFS.getFileSystem(), coord1);
+ Iterator<Object> workflowIterator = workflow1.getDecisionOrForkOrJoin().iterator();
+ while (workflowIterator.hasNext()){
+ Object object = workflowIterator.next();
+ if (ACTION.class.isAssignableFrom(object.getClass())){
+ ACTION action = (ACTION) object;
+ if (action.getName().equals("eviction") || action.getName().equals("replication")){
+ foundUserAction = true;
+ }
+ if (action.getName().contains("post")){
+ foundPostProcessing = true;
+ }
+ }
+ }
+ }
+
+ assertTrue(foundUserAction);
+ assertFalse(foundPostProcessing);
+ StartupProperties.get().setProperty("falcon.postprocessing.enable", "true");
+ }
+
+ @Test
public void testReplicationCoordsForFSStorage() throws Exception {
OozieEntityBuilder builder = OozieEntityBuilder.get(feed);
Path bundlePath = new Path("/projects/falcon/");
@@ -697,6 +740,10 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
Configuration conf = fs.getConf();
conf.set("fs.permissions.umask-mode", umask);
+ OozieEntityBuilder feedBuilder = OozieEntityBuilder.get(feed);
+ Path bundlePath = new Path("/projects/falcon/");
+ feedBuilder.build(trgCluster, bundlePath);
+
// ClusterHelper constructs new fs Conf. Add it to cluster properties so that it gets added to FS conf
setUmaskInFsConf(srcCluster, umask);
@@ -759,6 +806,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
@Test (dataProvider = "secureOptions")
public void testRetentionCoordsForTable(String secureOption) throws Exception {
+ StartupProperties.get().setProperty("falcon.postprocessing.enable", "true");
StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, secureOption);
final String umask = "000";
http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
index a692d0c..05b513e 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
@@ -78,6 +78,7 @@ import java.util.Map;
import java.util.Properties;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
/**
@@ -115,6 +116,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
storeEntity(EntityType.PROCESS, "clicksummary", PROCESS_XML);
storeEntity(EntityType.PROCESS, "pig-process", PIG_PROCESS_XML);
+
ConfigurationStore store = ConfigurationStore.get();
cluster = store.get(EntityType.CLUSTER, "corp");
org.apache.falcon.entity.v0.cluster.Property property =
@@ -785,6 +787,38 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
assertAction(parentWorkflow, "user-action", false);
}
+ @Test
+ public void testPostProcessingProcess() throws Exception {
+ StartupProperties.get().setProperty("falcon.postprocessing.enable", "false");
+ Process process = ConfigurationStore.get().get(EntityType.PROCESS, "pig-process");
+
+ OozieEntityBuilder builder = OozieEntityBuilder.get(process);
+ Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
+ builder.build(cluster, bundlePath);
+ BUNDLEAPP bundle = getBundle(fs, bundlePath);
+ String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
+ COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
+
+ String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+ WORKFLOWAPP workflowapp = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
+
+ Boolean foudUserAction = false;
+ Boolean foundpostProcessing =false;
+
+ for(Object action : workflowapp.getDecisionOrForkOrJoin()){
+ if (action instanceof ACTION && ((ACTION)action).getName().equals("user-action")){
+ foudUserAction = true;
+ }
+ if (action instanceof ACTION && ((ACTION)action).getName().contains("post")){
+ foundpostProcessing = true;
+ }
+
+ }
+ assertTrue(foudUserAction);
+ assertFalse(foundpostProcessing);
+ StartupProperties.get().setProperty("falcon.postprocessing.enable", "true");
+ }
+
@AfterMethod
public void cleanup() throws Exception {
cleanupStore();
http://git-wip-us.apache.org/repos/asf/falcon/blob/7d9687bc/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index ef07e57..a47327a 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -123,6 +123,7 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
##### Workflow Job Execution Completion listeners #####
*.workflow.execution.listeners=org.apache.falcon.handler.SLAMonitoringHandler
+#org.apache.falcon.service.LogMoverService
######### Implementation classes #########
@@ -353,3 +354,9 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
# Backlog Metric Properties
#*.falcon.backlog.metricservice.emit.interval.millisecs=60000
#*.falcon.backlog.metricservice.recheck.interval.millisecs=600000
+
+# Property to remove postProcessing
+*.falcon.postprocessing.enable=true
+
+### LogMoveService Thread count
+*.falcon.logMoveService.threadCount=50