You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/05/26 23:31:52 UTC
samza git commit: SAMZA-1309; Debounce time config
Repository: samza
Updated Branches:
refs/heads/master 8f1609d0b -> 0c05cb344
SAMZA-1309; Debounce time config
Author: Boris Shkolnik <bo...@apache.org>
Reviewers: Navina Ramesh <na...@apache.org>
Closes #203 from sborya/DebounceConfig
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/0c05cb34
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/0c05cb34
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/0c05cb34
Branch: refs/heads/master
Commit: 0c05cb34415cf44d4f547e1986b803c100930db6
Parents: 8f1609d
Author: Boris Shkolnik <bo...@apache.org>
Authored: Fri May 26 16:31:45 2017 -0700
Committer: navina <na...@apache.org>
Committed: Fri May 26 16:31:45 2017 -0700
----------------------------------------------------------------------
.../versioned/jobs/configuration-table.html | 8 ++++++++
.../samza/zk/ScheduleAfterDebounceTime.java | 2 --
.../org/apache/samza/zk/ZkJobCoordinator.java | 20 +++++++++++++-------
.../org/apache/samza/config/JobConfig.scala | 4 ++++
4 files changed, 25 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/0c05cb34/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 3eea37d..7a1b56e 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -462,9 +462,17 @@
</td>
</tr>
<tr>
+ <td class="property" id="job.debounce.time.ms">job.debounce.time.ms</td>
+ <td class="default"> 2000 </td>
+ <td class="description">
+ How long the Leader processor will wait before recalculating the JobModel on change of registered processors.
+ </td>
+ </tr>
+ <tr>
<th colspan="3" class="section" id="task"><a href="../api/overview.html">Task configuration</a></th>
</tr>
+
<tr>
<td class="property" id="task-class">task.class</td>
<td class="default"></td>
http://git-wip-us.apache.org/repos/asf/samza/blob/0c05cb34/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
index 5cfd37a..9b8ea66 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
@@ -48,8 +48,6 @@ public class ScheduleAfterDebounceTime {
// Action name when the Processor membership changes
public static final String ON_PROCESSOR_CHANGE = "OnProcessorChange";
- public static final int DEBOUNCE_TIME_MS = 2000;
-
private final ScheduledTaskFailureCallback scheduledTaskFailureCallback;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
http://git-wip-us.apache.org/repos/asf/samza/blob/0c05cb34/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 64395ac..5e46ce5 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -18,9 +18,13 @@
*/
package org.apache.samza.zk;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.JobConfig;
import org.apache.samza.config.ZkConfig;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobCoordinatorListener;
@@ -34,10 +38,6 @@ import org.apache.samza.util.ClassLoaderHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
/**
* JobCoordinator for stand alone processor managed via Zookeeper.
*/
@@ -47,6 +47,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
// with locality. Since host-affinity is not yet implemented, this can be fixed as part of SAMZA-1197
private static final int METADATA_CACHE_TTL_MS = 5000;
+
private final ZkUtils zkUtils;
private final String processorId;
private final ZkController zkController;
@@ -59,6 +60,8 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
private JobCoordinatorListener coordinatorListener = null;
private JobModel newJobModel;
+ private int debounceTimeMs;
+
public ZkJobCoordinator(Config config) {
this.config = config;
ZkConfig zkConfig = new ZkConfig(config);
@@ -79,11 +82,14 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
keyBuilder.getJobModelVersionBarrierPrefix(),
zkUtils,
new ZkBarrierListenerImpl());
+ this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs();
+
}
@Override
public void start() {
streamMetadataCache = StreamMetadataCache.apply(METADATA_CACHE_TTL_MS, config);
+
debounceTimer = new ScheduleAfterDebounceTime(throwable -> {
LOG.error("Received exception from in JobCoordinator Processing!", throwable);
stop();
@@ -126,7 +132,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
public void onProcessorChange(List<String> processors) {
LOG.info("ZkJobCoordinator::onProcessorChange - list of processors changed! List size=" + processors.size());
debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE,
- ScheduleAfterDebounceTime.DEBOUNCE_TIME_MS, () -> doOnProcessorChange(processors));
+ debounceTimeMs, () -> doOnProcessorChange(processors));
}
void doOnProcessorChange(List<String> processors) {
@@ -232,8 +238,8 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
zkController.subscribeToProcessorChange();
debounceTimer.scheduleAfterDebounceTime(
ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE,
- ScheduleAfterDebounceTime.DEBOUNCE_TIME_MS, () -> {
- // actual actions to do are the same as onProcessorChange()
+ debounceTimeMs, () -> {
+ // actual actions to do are the same as onProcessorChange
doOnProcessorChange(new ArrayList<>());
});
}
http://git-wip-us.apache.org/repos/asf/samza/blob/0c05cb34/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 030d945..2545194 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -47,6 +47,8 @@ object JobConfig {
val JOB_CONTAINER_THREAD_POOL_SIZE = "job.container.thread.pool.size"
val JOB_CONTAINER_SINGLE_THREAD_MODE = "job.container.single.thread.mode"
val JOB_INTERMEDIATE_STREAM_PARTITIONS = "job.intermediate.stream.partitions"
+ val JOB_DEBOUNCE_TIME_MS = "job.debounce.time.ms"
+ val DEFAULT_DEBOUNCE_TIME_MS = 2000
val SSP_GROUPER_FACTORY = "job.systemstreampartition.grouper.factory"
@@ -172,4 +174,6 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
case Some(mode) => mode.toBoolean
case _ => false
}
+
+ def getDebounceTimeMs = getInt(JobConfig.JOB_DEBOUNCE_TIME_MS, JobConfig.DEFAULT_DEBOUNCE_TIME_MS)
}