You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/02/16 01:17:15 UTC
samza git commit: SAMZA-1087: Schedule after debounce time
Repository: samza
Updated Branches:
refs/heads/master 2226e3e71 -> 09bf8339c
SAMZA-1087: Schedule after debounce time
SAMZA-1087: Allows scheduling an action (a Runnable) after some de-bounce delay.
Author: Boris Shkolnik <bs...@bshkolni-ld1.linkedin.biz>
Author: Boris Shkolnik <bo...@apache.org>
Reviewers: Navina Ramesh <na...@apache.org>, Fred Ji <fj...@linkedin.com>
Closes #49 from sborya/ScheduleAfterDebounceTime1
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/09bf8339
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/09bf8339
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/09bf8339
Branch: refs/heads/master
Commit: 09bf8339ce7ed893c3a2971d49c91fda845f1675
Parents: 2226e3e
Author: Boris Shkolnik <bo...@apache.org>
Authored: Wed Feb 15 17:17:01 2017 -0800
Committer: navina <na...@apache.org>
Committed: Wed Feb 15 17:17:01 2017 -0800
----------------------------------------------------------------------
.../samza/zk/ScheduleAfterDebounceTime.java | 86 +++++++++++++++
.../samza/zk/TestScheduleAfterDebounceTime.java | 110 +++++++++++++++++++
2 files changed, 196 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/09bf8339/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
new file mode 100644
index 0000000..0a4db6d
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.zk;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class allows scheduling a Runnable actions after some debounce time.
+ * When the same action is scheduled it needs to cancel the previous one. To accomplish that we keep the previous
+ * future in a map, keyed by the action name. Here we predefine some actions, which are used in the
+ * ZK based standalone app.
+ */
+public class ScheduleAfterDebounceTime {
+ public static final Logger LOG = LoggerFactory.getLogger(ScheduleAfterDebounceTime.class);
+ public static final long TIMEOUT_MS = 1000 * 10; // timeout to wait for a task to complete
+
+ // Action name when the JobModel version changes
+ public static final String JOB_MODEL_VERSION_CHANGE = "JobModelVersionChange";
+
+ // Action name when the Processor membership changes
+ public static final String ON_PROCESSOR_CHANGE = "OnProcessorChange";
+
+ // Action name when the Processor Data changes
+ public static final String ON_DATA_CHANGE_ON = "OnDataChangeOn";
+
+ public static final int DEBOUNCE_TIME_MS = 2000;
+
+ private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder().setNameFormat("zk-debounce-thread-%d").setDaemon(true).build());
+ private final Map<String, ScheduledFuture> futureHandles = new HashMap<>();
+
+ synchronized public void scheduleAfterDebounceTime(String actionName, long debounceTimeMs, Runnable runnable) {
+ // check if this action has been scheduled already
+ ScheduledFuture sf = futureHandles.get(actionName);
+ if (sf != null && !sf.isDone()) {
+ LOG.info("DEBOUNCE: cancel future for " + actionName);
+ // attempt to cancel
+ if (!sf.cancel(false)) {
+ try {
+ sf.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ // we ignore the exception
+ LOG.warn("cancel for action " + actionName + " failed with ", e);
+ }
+ }
+ futureHandles.remove(actionName);
+ }
+ // schedule a new task
+ sf = scheduledExecutorService.schedule(runnable, debounceTimeMs, TimeUnit.MILLISECONDS);
+ LOG.info("DEBOUNCE: scheduled " + actionName + " in " + debounceTimeMs);
+ futureHandles.put(actionName, sf);
+ }
+
+ public void stopScheduler() {
+ // shutdown executor service
+ scheduledExecutorService.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/09bf8339/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java b/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
new file mode 100644
index 0000000..e57372f
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.zk;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class TestScheduleAfterDebounceTime {
+ private static final long DEBOUNCE_TIME = 500;
+ int i = 0;
+ @Before
+ public void setup() {
+
+ }
+
+ class TestObj {
+ public void inc() {
+ i++;
+ }
+ public void setTo(int val) {
+ i = val;
+ }
+ public void doNothing() {
+
+ }
+ }
+ @Test
+ public void testSchedule() {
+ ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
+
+ final TestObj testObj = new TestScheduleAfterDebounceTime.TestObj();
+ debounceTimer.scheduleAfterDebounceTime("TEST1", DEBOUNCE_TIME, () ->
+ {
+ testObj.inc();
+ }
+ );
+ // action is delayed
+ Assert.assertEquals(0, i);
+
+ try {
+ Thread.sleep(DEBOUNCE_TIME + 10);
+ } catch (InterruptedException e) {
+ Assert.fail("Sleep was interrupted");
+ }
+
+ // debounce time passed
+ Assert.assertEquals(1, i);
+ }
+
+ @Test
+ public void testCancelAndSchedule() {
+ ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
+
+ final TestObj testObj = new TestScheduleAfterDebounceTime.TestObj();
+ debounceTimer.scheduleAfterDebounceTime("TEST1", DEBOUNCE_TIME, () ->
+ {
+ testObj.inc();
+ }
+ );
+ Assert.assertEquals(0, i);
+
+ // next schedule should cancel the previous one with the same name
+ debounceTimer.scheduleAfterDebounceTime("TEST1", 2 * DEBOUNCE_TIME, () ->
+ {
+ testObj.setTo(100);
+ }
+ );
+
+ try {
+ Thread.sleep(DEBOUNCE_TIME + 10);
+ } catch (InterruptedException e) {
+ Assert.fail("Sleep was interrupted");
+ }
+ // still should be the old value
+ Assert.assertEquals(0, i);
+
+ // this schedule should not cancel the previous one, because it has different name
+ debounceTimer.scheduleAfterDebounceTime("TEST2", DEBOUNCE_TIME, () ->
+ {
+ testObj.doNothing();
+ }
+ );
+
+ try {
+ Thread.sleep(3 * DEBOUNCE_TIME + 10);
+ } catch (InterruptedException e) {
+ Assert.fail("Sleep was interrupted");
+ }
+ Assert.assertEquals(100, i);
+ }
+}