You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/06/01 01:49:57 UTC

[3/5] samza git commit: SAMZA-1309; Debounce time config

SAMZA-1309; Debounce time config

Author: Boris Shkolnik <bo...@apache.org>

Reviewers: Navina Ramesh <na...@apache.org>

Closes #203 from sborya/DebounceConfig


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1ffcbc2c
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1ffcbc2c
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1ffcbc2c

Branch: refs/heads/0.13.0
Commit: 1ffcbc2ceab3d3169f7be456569f188c52d2ab2f
Parents: a31010c
Author: Boris Shkolnik <bo...@apache.org>
Authored: Fri May 26 16:31:45 2017 -0700
Committer: vjagadish1989 <jv...@linkedin.com>
Committed: Wed May 31 18:11:38 2017 -0700

----------------------------------------------------------------------
 .../versioned/jobs/configuration-table.html     |  8 ++++++++
 .../samza/zk/ScheduleAfterDebounceTime.java     |  2 --
 .../org/apache/samza/zk/ZkJobCoordinator.java   | 20 +++++++++++++-------
 .../org/apache/samza/config/JobConfig.scala     |  4 ++++
 4 files changed, 25 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/1ffcbc2c/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 3eea37d..7a1b56e 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -462,9 +462,17 @@
                     </td>
                 </tr>
                 <tr>
+                    <td class="property" id="job.debounce.time.ms">job.debounce.time.ms</td>
+                    <td class="default"> 2000 </td>
+                    <td class="description">
+                        How long the Leader processor will wait before recalculating the JobModel on change of registered processors.
+                    </td>
+                </tr>
+                <tr>
                     <th colspan="3" class="section" id="task"><a href="../api/overview.html">Task configuration</a></th>
                 </tr>
 
+
                 <tr>
                     <td class="property" id="task-class">task.class</td>
                     <td class="default"></td>

http://git-wip-us.apache.org/repos/asf/samza/blob/1ffcbc2c/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
index 5cfd37a..9b8ea66 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
@@ -48,8 +48,6 @@ public class ScheduleAfterDebounceTime {
   // Action name when the Processor membership changes
   public static final String ON_PROCESSOR_CHANGE = "OnProcessorChange";
 
-  public static final int DEBOUNCE_TIME_MS = 2000;
-
   private final ScheduledTaskFailureCallback scheduledTaskFailureCallback;
 
   private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(

http://git-wip-us.apache.org/repos/asf/samza/blob/1ffcbc2c/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 64395ac..5e46ce5 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -18,9 +18,13 @@
  */
 package org.apache.samza.zk;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.ZkConfig;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorListener;
@@ -34,10 +38,6 @@ import org.apache.samza.util.ClassLoaderHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
 /**
  * JobCoordinator for stand alone processor managed via Zookeeper.
  */
@@ -47,6 +47,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   // with locality. Since host-affinity is not yet implemented, this can be fixed as part of SAMZA-1197
   private static final int METADATA_CACHE_TTL_MS = 5000;
 
+
   private final ZkUtils zkUtils;
   private final String processorId;
   private final ZkController zkController;
@@ -59,6 +60,8 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   private JobCoordinatorListener coordinatorListener = null;
   private JobModel newJobModel;
 
+  private int debounceTimeMs;
+
   public ZkJobCoordinator(Config config) {
     this.config = config;
     ZkConfig zkConfig = new ZkConfig(config);
@@ -79,11 +82,14 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
         keyBuilder.getJobModelVersionBarrierPrefix(),
         zkUtils,
         new ZkBarrierListenerImpl());
+    this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs();
+
   }
 
   @Override
   public void start() {
     streamMetadataCache = StreamMetadataCache.apply(METADATA_CACHE_TTL_MS, config);
+
     debounceTimer = new ScheduleAfterDebounceTime(throwable -> {
         LOG.error("Received exception from in JobCoordinator Processing!", throwable);
         stop();
@@ -126,7 +132,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   public void onProcessorChange(List<String> processors) {
     LOG.info("ZkJobCoordinator::onProcessorChange - list of processors changed! List size=" + processors.size());
     debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE,
-        ScheduleAfterDebounceTime.DEBOUNCE_TIME_MS, () -> doOnProcessorChange(processors));
+        debounceTimeMs, () -> doOnProcessorChange(processors));
   }
 
   void doOnProcessorChange(List<String> processors) {
@@ -232,8 +238,8 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
       zkController.subscribeToProcessorChange();
       debounceTimer.scheduleAfterDebounceTime(
         ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE,
-        ScheduleAfterDebounceTime.DEBOUNCE_TIME_MS, () -> {
-          // actual actions to do are the same as onProcessorChange()
+        debounceTimeMs, () -> {
+          // actual actions to do are the same as onProcessorChange
           doOnProcessorChange(new ArrayList<>());
         });
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/1ffcbc2c/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 030d945..2545194 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -47,6 +47,8 @@ object JobConfig {
   val JOB_CONTAINER_THREAD_POOL_SIZE = "job.container.thread.pool.size"
   val JOB_CONTAINER_SINGLE_THREAD_MODE = "job.container.single.thread.mode"
   val JOB_INTERMEDIATE_STREAM_PARTITIONS = "job.intermediate.stream.partitions"
+  val JOB_DEBOUNCE_TIME_MS = "job.debounce.time.ms"
+  val DEFAULT_DEBOUNCE_TIME_MS = 2000
 
   val SSP_GROUPER_FACTORY = "job.systemstreampartition.grouper.factory"
 
@@ -172,4 +174,6 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
     case Some(mode) => mode.toBoolean
     case _ => false
   }
+
+  def getDebounceTimeMs = getInt(JobConfig.JOB_DEBOUNCE_TIME_MS, JobConfig.DEFAULT_DEBOUNCE_TIME_MS)
 }