You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/08/24 07:41:30 UTC

falcon git commit: FALCON-2123 Make Blocking Queue configurable in LogMoverService

Repository: falcon
Updated Branches:
  refs/heads/master 470f34313 -> d55cdd095


FALCON-2123 Make Blocking Queue configurable in LogMoverService

Author: Praveen Adlakha <ad...@gmail.com>

Reviewers: @pallavi-rao @sandeepSamudrala

Closes #272 from PraveenAdlakha/2123 and squashes the following commits:

d4fb9b9 [Praveen Adlakha] thread count changed in startup.properties
fd0e244 [Praveen Adlakha] 2125 commit
1342d9d [Praveen Adlakha] FALCON-2123 Make Blocking Queue configurable in LogMoverService


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/d55cdd09
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/d55cdd09
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/d55cdd09

Branch: refs/heads/master
Commit: d55cdd0951bc2c5344ce5e9493c290e58d85234a
Parents: 470f343
Author: Praveen Adlakha <ad...@gmail.com>
Authored: Wed Aug 24 13:11:23 2016 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Wed Aug 24 13:11:23 2016 +0530

----------------------------------------------------------------------
 common/src/main/resources/startup.properties              |  5 +++--
 .../falcon/oozie/OozieOrchestrationWorkflowBuilder.java   | 10 +++++++++-
 .../falcon/oozie/feed/FSReplicationWorkflowBuilder.java   |  2 +-
 .../falcon/oozie/feed/HCatReplicationWorkflowBuilder.java |  8 ++++----
 .../oozie/process/ProcessExecutionWorkflowBuilder.java    |  2 +-
 .../java/org/apache/falcon/service/LogMoverService.java   |  7 ++++---
 src/conf/startup.properties                               |  5 +++--
 7 files changed, 25 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/d55cdd09/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index 6c2ab5c..8d64c54 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -350,5 +350,6 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle
 # Property to remove postProcessing
 *.falcon.postprocessing.enable=true
 
-### LogMoveService Thread count
-*.falcon.logMoveService.threadCount=50
+### LogMoveService Properties
+*.falcon.logMoveService.threadCount=200
+*.falcon.logMoveService.blockingQueue.length=50

http://git-wip-us.apache.org/repos/asf/falcon/blob/d55cdd09/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
index 9683e62..8d45d7a 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
@@ -230,12 +230,20 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
         wf.getDecisionOrForkOrJoin().add(kill);
     }
 
+    protected String getFailAction(){
+        if (!isPostProcessingEnabled()){
+            return FAIL_ACTION_NAME;
+        }else{
+            return FAIL_POSTPROCESS_ACTION_NAME;
+        }
+    }
+
     protected void addPostProcessing(WORKFLOWAPP workflow, ACTION action) throws FalconException{
         if (!isPostProcessingEnabled()){
             addTransition(action, OK_ACTION_NAME, FAIL_ACTION_NAME);
             workflow.getDecisionOrForkOrJoin().add(action);
         }else{
-            addTransition(action, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+            addTransition(action, SUCCESS_POSTPROCESS_ACTION_NAME,  FAIL_POSTPROCESS_ACTION_NAME);
             workflow.getDecisionOrForkOrJoin().add(action);
 
             //Add post-processing actions

http://git-wip-us.apache.org/repos/asf/falcon/blob/d55cdd09/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
index 598cf6f..7cad507 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
@@ -48,7 +48,7 @@ public class FSReplicationWorkflowBuilder extends FeedReplicationWorkflowBuilder
         if (shouldPreProcess()) {
             ACTION action = getPreProcessingAction(false, Tag.REPLICATION);
             addHDFSServersConfig(action, src, target);
-            addTransition(action, REPLICATION_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+            addTransition(action, REPLICATION_ACTION_NAME, getFailAction());
             workflow.getDecisionOrForkOrJoin().add(action);
             start = PREPROCESS_ACTION_NAME;
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/d55cdd09/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
index f4eecb7..ba86c6e 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
@@ -73,7 +73,7 @@ public class HCatReplicationWorkflowBuilder extends FeedReplicationWorkflowBuild
                 action.getJava().getConfiguration().getProperty().add(prop);
             }
             addHDFSServersConfig(action, src, target);
-            addTransition(action, EXPORT_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+            addTransition(action, EXPORT_ACTION_NAME, getFailAction());
             workflow.getDecisionOrForkOrJoin().add(action);
             start = PREPROCESS_ACTION_NAME;
         }
@@ -95,14 +95,14 @@ public class HCatReplicationWorkflowBuilder extends FeedReplicationWorkflowBuild
             hiveExportAction.getConfiguration().getProperty().add(prop);
         }
         OozieUtils.marshalHiveAction(export, exportActionJaxbElement);
-        addTransition(export, REPLICATION_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+        addTransition(export, REPLICATION_ACTION_NAME, getFailAction());
         workflow.getDecisionOrForkOrJoin().add(export);
 
         //Add replication
         ACTION replication = unmarshalAction(REPLICATION_ACTION_TEMPLATE);
         addHDFSServersConfig(replication, src, target);
         addAdditionalReplicationProperties(replication);
-        addTransition(replication, IMPORT_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+        addTransition(replication, IMPORT_ACTION_NAME, getFailAction());
         workflow.getDecisionOrForkOrJoin().add(replication);
 
         //Add import action
@@ -122,7 +122,7 @@ public class HCatReplicationWorkflowBuilder extends FeedReplicationWorkflowBuild
             hiveImportAction.getConfiguration().getProperty().add(prop);
         }
         OozieUtils.marshalHiveAction(importAction, importActionJaxbElement);
-        addTransition(importAction, CLEANUP_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+        addTransition(importAction, CLEANUP_ACTION_NAME, getFailAction());
         workflow.getDecisionOrForkOrJoin().add(importAction);
 
         //Add cleanup action

http://git-wip-us.apache.org/repos/asf/falcon/blob/d55cdd09/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
index c31b4ee..20eeffd 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
@@ -79,7 +79,7 @@ public abstract class ProcessExecutionWorkflowBuilder extends OozieOrchestration
         //Add pre-processing action
         if (shouldPreProcess()) {
             ACTION preProcessAction = getPreProcessingAction(isTableStorageType, Tag.DEFAULT);
-            addTransition(preProcessAction, USER_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+            addTransition(preProcessAction, USER_ACTION_NAME, getFailAction());
             wfApp.getDecisionOrForkOrJoin().add(preProcessAction);
             startAction = PREPROCESS_ACTION_NAME;
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/d55cdd09/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java b/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java
index 1f3d0a0..ba4835d 100644
--- a/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java
+++ b/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java
@@ -44,12 +44,13 @@ public class LogMoverService implements WorkflowExecutionListener  {
     public static final String ENABLE_POSTPROCESSING = StartupProperties.get().
             getProperty("falcon.postprocessing.enable");
 
-    private BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(50);
+    private BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(Integer.parseInt(
+            StartupProperties.get().getProperty("falcon.logMoveService.blockingQueue.length", "50")));
     private ExecutorService executorService = new ThreadPoolExecutor(20, getThreadCount(), 120,
             TimeUnit.SECONDS, blockingQueue);
     public int getThreadCount() {
         try{
-            return Integer.parseInt(StartupProperties.get().getProperty("falcon.logMoveService.threadCount"));
+            return Integer.parseInt(StartupProperties.get().getProperty("falcon.logMoveService.threadCount", "200"));
         } catch (NumberFormatException  e){
             LOG.error("Exception in LogMoverService", e);
             return 50;
@@ -87,7 +88,7 @@ public class LogMoverService implements WorkflowExecutionListener  {
         }
         while(0<blockingQueue.remainingCapacity()){
             try {
-                LOG.info("Sleeing, no capacity in threadpool....");
+                LOG.debug("Sleeping, no capacity in threadpool....");
                 TimeUnit.MILLISECONDS.sleep(500);
             } catch (InterruptedException e) {
                 e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/falcon/blob/d55cdd09/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index 37e37bc..b663f04 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -367,5 +367,6 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
 # Property to remove postProcessing
 *.falcon.postprocessing.enable=true
 
-### LogMoveService Thread count
-*.falcon.logMoveService.threadCount=50
+### LogMoveService Properties
+*.falcon.logMoveService.threadCount=200
+*.falcon.logMoveService.blockingQueue.length=50