You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/08/24 07:41:30 UTC
falcon git commit: FALCON-2123 Make Blocking Queue configurable in
LogMoverService
Repository: falcon
Updated Branches:
refs/heads/master 470f34313 -> d55cdd095
FALCON-2123 Make Blocking Queue configurable in LogMoverService
Author: Praveen Adlakha <ad...@gmail.com>
Reviewers: @pallavi-rao @sandeepSamudrala
Closes #272 from PraveenAdlakha/2123 and squashes the following commits:
d4fb9b9 [Praveen Adlakha] thread count changed in startup.properties
fd0e244 [Praveen Adlakha] 2125 commit
1342d9d [Praveen Adlakha] FALCON-2123 Make Blocking Queue configurable in LogMoverService
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/d55cdd09
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/d55cdd09
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/d55cdd09
Branch: refs/heads/master
Commit: d55cdd0951bc2c5344ce5e9493c290e58d85234a
Parents: 470f343
Author: Praveen Adlakha <ad...@gmail.com>
Authored: Wed Aug 24 13:11:23 2016 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Wed Aug 24 13:11:23 2016 +0530
----------------------------------------------------------------------
common/src/main/resources/startup.properties | 5 +++--
.../falcon/oozie/OozieOrchestrationWorkflowBuilder.java | 10 +++++++++-
.../falcon/oozie/feed/FSReplicationWorkflowBuilder.java | 2 +-
.../falcon/oozie/feed/HCatReplicationWorkflowBuilder.java | 8 ++++----
.../oozie/process/ProcessExecutionWorkflowBuilder.java | 2 +-
.../java/org/apache/falcon/service/LogMoverService.java | 7 ++++---
src/conf/startup.properties | 5 +++--
7 files changed, 25 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/d55cdd09/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index 6c2ab5c..8d64c54 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -350,5 +350,6 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle
# Property to remove postProcessing
*.falcon.postprocessing.enable=true
-### LogMoveService Thread count
-*.falcon.logMoveService.threadCount=50
+### LogMoveService Properties
+*.falcon.logMoveService.threadCount=200
+*.falcon.logMoveService.blockingQueue.length=50
http://git-wip-us.apache.org/repos/asf/falcon/blob/d55cdd09/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
index 9683e62..8d45d7a 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
@@ -230,12 +230,20 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
wf.getDecisionOrForkOrJoin().add(kill);
}
+ protected String getFailAction(){
+ if (!isPostProcessingEnabled()){
+ return FAIL_ACTION_NAME;
+ }else{
+ return FAIL_POSTPROCESS_ACTION_NAME;
+ }
+ }
+
protected void addPostProcessing(WORKFLOWAPP workflow, ACTION action) throws FalconException{
if (!isPostProcessingEnabled()){
addTransition(action, OK_ACTION_NAME, FAIL_ACTION_NAME);
workflow.getDecisionOrForkOrJoin().add(action);
}else{
- addTransition(action, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+ addTransition(action, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
workflow.getDecisionOrForkOrJoin().add(action);
//Add post-processing actions
http://git-wip-us.apache.org/repos/asf/falcon/blob/d55cdd09/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
index 598cf6f..7cad507 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
@@ -48,7 +48,7 @@ public class FSReplicationWorkflowBuilder extends FeedReplicationWorkflowBuilder
if (shouldPreProcess()) {
ACTION action = getPreProcessingAction(false, Tag.REPLICATION);
addHDFSServersConfig(action, src, target);
- addTransition(action, REPLICATION_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+ addTransition(action, REPLICATION_ACTION_NAME, getFailAction());
workflow.getDecisionOrForkOrJoin().add(action);
start = PREPROCESS_ACTION_NAME;
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/d55cdd09/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
index f4eecb7..ba86c6e 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
@@ -73,7 +73,7 @@ public class HCatReplicationWorkflowBuilder extends FeedReplicationWorkflowBuild
action.getJava().getConfiguration().getProperty().add(prop);
}
addHDFSServersConfig(action, src, target);
- addTransition(action, EXPORT_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+ addTransition(action, EXPORT_ACTION_NAME, getFailAction());
workflow.getDecisionOrForkOrJoin().add(action);
start = PREPROCESS_ACTION_NAME;
}
@@ -95,14 +95,14 @@ public class HCatReplicationWorkflowBuilder extends FeedReplicationWorkflowBuild
hiveExportAction.getConfiguration().getProperty().add(prop);
}
OozieUtils.marshalHiveAction(export, exportActionJaxbElement);
- addTransition(export, REPLICATION_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+ addTransition(export, REPLICATION_ACTION_NAME, getFailAction());
workflow.getDecisionOrForkOrJoin().add(export);
//Add replication
ACTION replication = unmarshalAction(REPLICATION_ACTION_TEMPLATE);
addHDFSServersConfig(replication, src, target);
addAdditionalReplicationProperties(replication);
- addTransition(replication, IMPORT_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+ addTransition(replication, IMPORT_ACTION_NAME, getFailAction());
workflow.getDecisionOrForkOrJoin().add(replication);
//Add import action
@@ -122,7 +122,7 @@ public class HCatReplicationWorkflowBuilder extends FeedReplicationWorkflowBuild
hiveImportAction.getConfiguration().getProperty().add(prop);
}
OozieUtils.marshalHiveAction(importAction, importActionJaxbElement);
- addTransition(importAction, CLEANUP_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+ addTransition(importAction, CLEANUP_ACTION_NAME, getFailAction());
workflow.getDecisionOrForkOrJoin().add(importAction);
//Add cleanup action
http://git-wip-us.apache.org/repos/asf/falcon/blob/d55cdd09/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
index c31b4ee..20eeffd 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
@@ -79,7 +79,7 @@ public abstract class ProcessExecutionWorkflowBuilder extends OozieOrchestration
//Add pre-processing action
if (shouldPreProcess()) {
ACTION preProcessAction = getPreProcessingAction(isTableStorageType, Tag.DEFAULT);
- addTransition(preProcessAction, USER_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+ addTransition(preProcessAction, USER_ACTION_NAME, getFailAction());
wfApp.getDecisionOrForkOrJoin().add(preProcessAction);
startAction = PREPROCESS_ACTION_NAME;
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/d55cdd09/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java b/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java
index 1f3d0a0..ba4835d 100644
--- a/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java
+++ b/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java
@@ -44,12 +44,13 @@ public class LogMoverService implements WorkflowExecutionListener {
public static final String ENABLE_POSTPROCESSING = StartupProperties.get().
getProperty("falcon.postprocessing.enable");
- private BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(50);
+ private BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(Integer.parseInt(
+ StartupProperties.get().getProperty("falcon.logMoveService.blockingQueue.length", "50")));
private ExecutorService executorService = new ThreadPoolExecutor(20, getThreadCount(), 120,
TimeUnit.SECONDS, blockingQueue);
public int getThreadCount() {
try{
- return Integer.parseInt(StartupProperties.get().getProperty("falcon.logMoveService.threadCount"));
+ return Integer.parseInt(StartupProperties.get().getProperty("falcon.logMoveService.threadCount", "200"));
} catch (NumberFormatException e){
LOG.error("Exception in LogMoverService", e);
return 50;
@@ -87,7 +88,7 @@ public class LogMoverService implements WorkflowExecutionListener {
}
while(0<blockingQueue.remainingCapacity()){
try {
- LOG.info("Sleeing, no capacity in threadpool....");
+ LOG.debug("Sleeping, no capacity in threadpool....");
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/falcon/blob/d55cdd09/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index 37e37bc..b663f04 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -367,5 +367,6 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
# Property to remove postProcessing
*.falcon.postprocessing.enable=true
-### LogMoveService Thread count
-*.falcon.logMoveService.threadCount=50
+### LogMoveService Properties
+*.falcon.logMoveService.threadCount=200
+*.falcon.logMoveService.blockingQueue.length=50