You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/02/16 01:17:15 UTC

samza git commit: SAMZA-1087: Schedule after debounce time

Repository: samza
Updated Branches:
  refs/heads/master 2226e3e71 -> 09bf8339c


SAMZA-1087: Schedule after debounce time

SAMZA-1087: Allows scheduling an action (a Runnable) after some de-bounce delay.

Author: Boris Shkolnik <bs...@bshkolni-ld1.linkedin.biz>
Author: Boris Shkolnik <bo...@apache.org>

Reviewers: Navina Ramesh <na...@apache.org>, Fred Ji <fj...@linkedin.com>

Closes #49 from sborya/ScheduleAfterDebounceTime1


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/09bf8339
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/09bf8339
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/09bf8339

Branch: refs/heads/master
Commit: 09bf8339ce7ed893c3a2971d49c91fda845f1675
Parents: 2226e3e
Author: Boris Shkolnik <bo...@apache.org>
Authored: Wed Feb 15 17:17:01 2017 -0800
Committer: navina <na...@apache.org>
Committed: Wed Feb 15 17:17:01 2017 -0800

----------------------------------------------------------------------
 .../samza/zk/ScheduleAfterDebounceTime.java     |  86 +++++++++++++++
 .../samza/zk/TestScheduleAfterDebounceTime.java | 110 +++++++++++++++++++
 2 files changed, 196 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/09bf8339/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
new file mode 100644
index 0000000..0a4db6d
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.zk;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class allows scheduling a Runnable actions after some debounce time.
+ * When the same action is scheduled it needs to cancel the previous one. To accomplish that we keep the previous
+ * future in a map, keyed by the action name. Here we predefine some actions, which are used in the
+ * ZK based standalone app.
+ */
+public class ScheduleAfterDebounceTime {
+  public static final Logger LOG = LoggerFactory.getLogger(ScheduleAfterDebounceTime.class);
+  public static final long TIMEOUT_MS = 1000 * 10; // timeout to wait for a task to complete
+
+  // Action name when the JobModel version changes
+  public static final String JOB_MODEL_VERSION_CHANGE = "JobModelVersionChange";
+
+  // Action name when the Processor membership changes
+  public static final String ON_PROCESSOR_CHANGE = "OnProcessorChange";
+
+  // Action name when the Processor Data changes
+  public static final String ON_DATA_CHANGE_ON = "OnDataChangeOn";
+
+  public static final int DEBOUNCE_TIME_MS = 2000;
+
+  private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
+      new ThreadFactoryBuilder().setNameFormat("zk-debounce-thread-%d").setDaemon(true).build());
+  private final Map<String, ScheduledFuture> futureHandles = new HashMap<>();
+
+  synchronized public void scheduleAfterDebounceTime(String actionName, long debounceTimeMs, Runnable runnable) {
+    // check if this action has been scheduled already
+    ScheduledFuture sf = futureHandles.get(actionName);
+    if (sf != null && !sf.isDone()) {
+      LOG.info("DEBOUNCE: cancel future for " + actionName);
+      // attempt to cancel
+      if (!sf.cancel(false)) {
+        try {
+          sf.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
+        } catch (Exception e) {
+          // we ignore the exception
+          LOG.warn("cancel for action " + actionName + " failed with ", e);
+        }
+      }
+      futureHandles.remove(actionName);
+    }
+    // schedule a new task
+    sf = scheduledExecutorService.schedule(runnable, debounceTimeMs, TimeUnit.MILLISECONDS);
+    LOG.info("DEBOUNCE: scheduled " + actionName + " in " + debounceTimeMs);
+    futureHandles.put(actionName, sf);
+  }
+
+  public void stopScheduler() {
+    // shutdown executor service
+    scheduledExecutorService.shutdown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/09bf8339/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java b/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
new file mode 100644
index 0000000..e57372f
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.zk;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class TestScheduleAfterDebounceTime {
+  private static final long DEBOUNCE_TIME = 500;
+  int i = 0;
+  @Before
+  public void setup() {
+
+  }
+
+  class TestObj {
+    public void inc() {
+      i++;
+    }
+    public void setTo(int val) {
+      i = val;
+    }
+    public void doNothing() {
+
+    }
+  }
+  @Test
+  public void testSchedule() {
+    ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
+
+    final TestObj testObj = new TestScheduleAfterDebounceTime.TestObj();
+    debounceTimer.scheduleAfterDebounceTime("TEST1", DEBOUNCE_TIME, () ->
+      {
+        testObj.inc();
+      }
+    );
+    // action is delayed
+    Assert.assertEquals(0, i);
+
+    try {
+      Thread.sleep(DEBOUNCE_TIME + 10);
+    } catch (InterruptedException e) {
+      Assert.fail("Sleep was interrupted");
+    }
+
+    // debounce time passed
+    Assert.assertEquals(1, i);
+  }
+
+  @Test
+  public void testCancelAndSchedule() {
+    ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
+
+    final TestObj testObj = new TestScheduleAfterDebounceTime.TestObj();
+    debounceTimer.scheduleAfterDebounceTime("TEST1", DEBOUNCE_TIME, () ->
+      {
+        testObj.inc();
+      }
+    );
+    Assert.assertEquals(0, i);
+
+    // next schedule should cancel the previous one with the same name
+    debounceTimer.scheduleAfterDebounceTime("TEST1", 2 * DEBOUNCE_TIME, () ->
+      {
+        testObj.setTo(100);
+      }
+    );
+
+    try {
+      Thread.sleep(DEBOUNCE_TIME + 10);
+    } catch (InterruptedException e) {
+      Assert.fail("Sleep was interrupted");
+    }
+    // still should be the old value
+    Assert.assertEquals(0, i);
+
+    // this schedule should not cancel the previous one, because it has different name
+    debounceTimer.scheduleAfterDebounceTime("TEST2", DEBOUNCE_TIME, () ->
+      {
+        testObj.doNothing();
+      }
+    );
+
+    try {
+      Thread.sleep(3 * DEBOUNCE_TIME + 10);
+    } catch (InterruptedException e) {
+      Assert.fail("Sleep was interrupted");
+    }
+    Assert.assertEquals(100, i);
+  }
+}