You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/09/26 21:08:27 UTC

[2/2] samza git commit: SAMZA-1880: Rename non-metrics classes which use Timer in their name

SAMZA-1880: Rename non-metrics classes which use Timer in their name

Summary of API changes:
1. TimerRegistry -> KeyScheduler; _register_ -> _schedule_
2. TimerFunction -> SchedulingFunction; _registerTimer_ -> _schedulingInit_, _onTimer_ -> _executeForKey_
3. TimerCallback -> SchedulingCallback _onTimer_ -> _execute_
4. TaskContext: _registerTimer_ -> _scheduleCallback_, _deleteTimer_ -> _deleteScheduledCallback_

Only terminology changes are intended (e.g. classes, var names, logs). No functionality change is intended.
An upcoming PR will further update TaskContext and the access to the scheduling logic.

Author: Cameron Lee <ca...@linkedin.com>

Reviewers: Prateek Maheshwari <pm...@apaapache.org>

Closes #644 from cameronlee314/rename_timer


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/10607f0a
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/10607f0a
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/10607f0a

Branch: refs/heads/master
Commit: 10607f0a65f1ded52788de490c1fde7ebb6d5bba
Parents: 03410b8
Author: Cameron Lee <ca...@linkedin.com>
Authored: Wed Sep 26 14:08:20 2018 -0700
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Wed Sep 26 14:08:20 2018 -0700

----------------------------------------------------------------------
 .../org/apache/samza/operators/Scheduler.java   |  39 ++++
 .../apache/samza/operators/TimerRegistry.java   |  41 -----
 .../operators/functions/ScheduledFunction.java  |  73 ++++++++
 .../operators/functions/TimerFunction.java      |  65 -------
 .../samza/scheduler/ScheduledCallback.java      |  39 ++++
 .../java/org/apache/samza/task/TaskContext.java |  19 +-
 .../org/apache/samza/task/TimerCallback.java    |  34 ----
 .../apache/samza/container/TaskContextImpl.java |  14 +-
 .../samza/operators/impl/OperatorImpl.java      |  26 +--
 .../samza/operators/impl/OperatorImplGraph.java |   8 +-
 .../operators/impl/WindowOperatorImpl.java      |   2 +-
 .../operators/spec/BroadcastOperatorSpec.java   |   4 +-
 .../operators/spec/FilterOperatorSpec.java      |   6 +-
 .../operators/spec/FlatMapOperatorSpec.java     |   6 +-
 .../samza/operators/spec/InputOperatorSpec.java |   4 +-
 .../samza/operators/spec/JoinOperatorSpec.java  |   6 +-
 .../samza/operators/spec/MapOperatorSpec.java   |   6 +-
 .../samza/operators/spec/MergeOperatorSpec.java |   4 +-
 .../samza/operators/spec/OperatorSpec.java      |   4 +-
 .../operators/spec/OutputOperatorSpec.java      |   4 +-
 .../operators/spec/PartitionByOperatorSpec.java |  12 +-
 .../operators/spec/SendToTableOperatorSpec.java |   4 +-
 .../samza/operators/spec/SinkOperatorSpec.java  |   6 +-
 .../spec/StreamTableJoinOperatorSpec.java       |   6 +-
 .../operators/spec/WindowOperatorSpec.java      |  30 ++--
 .../org/apache/samza/task/AsyncRunLoop.java     |  64 +++----
 .../apache/samza/task/EpochTimeScheduler.java   | 156 ++++++++++++++++
 .../apache/samza/task/SystemTimerScheduler.java | 155 ----------------
 .../apache/samza/container/TaskInstance.scala   |   7 +-
 .../samza/operators/TestOperatorSpecGraph.java  |   4 +-
 .../samza/operators/impl/TestOperatorImpl.java  |   4 +-
 .../operators/spec/OperatorSpecTestUtils.java   |  10 +-
 .../samza/operators/spec/TestOperatorSpec.java  |  37 ++--
 .../spec/TestPartitionByOperatorSpec.java       |  26 +--
 .../operators/spec/TestWindowOperatorSpec.java  |  47 ++---
 .../samza/task/TestEpochTimeScheduler.java      | 176 +++++++++++++++++++
 .../samza/task/TestSystemTimerScheduler.java    | 176 -------------------
 .../sql/translator/TestProjectTranslator.java   |   4 +-
 .../samza/test/framework/SchedulingTest.java    |  60 +++++++
 .../samza/test/framework/TestSchedulingApp.java |  89 ++++++++++
 .../samza/test/framework/TestTimerApp.java      |  88 ----------
 .../apache/samza/test/framework/TimerTest.java  |  60 -------
 42 files changed, 821 insertions(+), 804 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-api/src/main/java/org/apache/samza/operators/Scheduler.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/Scheduler.java b/samza-api/src/main/java/org/apache/samza/operators/Scheduler.java
new file mode 100644
index 0000000..77148f0
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/Scheduler.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators;
+
+/**
+ * Allows scheduling {@link org.apache.samza.operators.functions.ScheduledFunction} callbacks to be invoked later.
+ * @param <K> type of the key to schedule
+ */
+public interface Scheduler<K> {
+  /**
+   * Schedule a callback for the {@code key} to be invoked at {@code timestamp}.
+   * @param key unique key associated with the callback to schedule
+   * @param timestamp epoch time when the callback for the key will be invoked, in milliseconds
+   */
+  void schedule(K key, long timestamp);
+
+  /**
+   * Delete the scheduled callback for the provided {@code key}.
+   * @param key key to delete
+   */
+  void delete(K key);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-api/src/main/java/org/apache/samza/operators/TimerRegistry.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/TimerRegistry.java b/samza-api/src/main/java/org/apache/samza/operators/TimerRegistry.java
deleted file mode 100644
index 64dd4ec..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/TimerRegistry.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators;
-
-/**
- * Allows registering epoch-time timer callbacks from the operators.
- * See {@link org.apache.samza.operators.functions.TimerFunction} for details.
- * @param <K> type of the timer key
- */
-public interface TimerRegistry<K> {
-
-  /**
-   * Register a epoch-time timer with key.
-   * @param key unique timer key
-   * @param timestamp epoch time when the timer will be fired, in milliseconds
-   */
-  void register(K key, long timestamp);
-
-  /**
-   * Delete the timer for the provided key.
-   * @param key key for the timer to delete
-   */
-  void delete(K key);
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-api/src/main/java/org/apache/samza/operators/functions/ScheduledFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/ScheduledFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/ScheduledFunction.java
new file mode 100644
index 0000000..952948c
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/ScheduledFunction.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.functions;
+
+import org.apache.samza.operators.Scheduler;
+
+import java.util.Collection;
+
+
+/**
+ * Allows scheduling a callback for a specific epoch-time.
+ * Key must be a unique identifier for its corresponding logic to execute, and is provided in the callback when the
+ * corresponding schedule time occurs.
+ *
+ * <p>
+ * Example of a {@link FlatMapFunction} with {@link ScheduledFunction}:
+ * <pre>{@code
+ *    public class ExampleScheduledFn implements FlatMapFunction<String, String>, ScheduledFunction<String, String> {
+ *      // for recurring callbacks, keep track of the scheduler from "schedule"
+ *      private Scheduler scheduler;
+ *
+ *      public void schedule(Scheduler scheduler) {
+ *        // save the scheduler for recurring callbacks
+ *        this.scheduler = scheduler;
+ *        long time = System.currentTimeMillis() + 5000; // fire after 5 sec
+ *        scheduler.schedule("do-delayed-logic", time);
+ *      }
+ *      public Collection<String> apply(String s) {
+ *        ...
+ *      }
+ *      public Collection<String> onCallback(String key, long timestamp) {
+ *        // do some logic for key "do-delayed-logic"
+ *        ...
+ *        // for recurring callbacks, call the saved scheduler again
+ *        this.scheduler.schedule("example-process", System.currentTimeMillis() + 5000);
+ *      }
+ *    }
+ * }</pre>
+ * @param <K> type of the key
+ * @param <OM> type of the output
+ */
+public interface ScheduledFunction<K, OM> {
+  /**
+   * Allows scheduling the initial callback(s) and saving the {@code scheduler} for later use for recurring callbacks.
+   * @param scheduler used to specify the schedule time(s) and key(s)
+   */
+  void schedule(Scheduler<K> scheduler);
+
+  /**
+   * Returns the output from the scheduling logic corresponding to the key that was triggered.
+   * @param key key corresponding to the callback that got invoked
+   * @param timestamp schedule time that was set for the callback for the key, in milliseconds since epoch
+   * @return {@link Collection} of output elements
+   */
+  Collection<OM> onCallback(K key, long timestamp);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-api/src/main/java/org/apache/samza/operators/functions/TimerFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/TimerFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/TimerFunction.java
deleted file mode 100644
index 01825c6..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/TimerFunction.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.functions;
-
-import org.apache.samza.operators.TimerRegistry;
-
-import java.util.Collection;
-
-/**
- * Allows timer registration with a key and is invoked when the timer is fired.
- * Key must be a unique identifier for this timer, and is provided in the callback when the timer fires.
- *
- * <p>
- * Example of a {@link FlatMapFunction} with timer:
- * <pre>{@code
- *    public class ExampleTimerFn implements FlatMapFunction<String, String>, TimerFunction<String, String> {
- *      public void registerTimer(TimerRegistry timerRegistry) {
- *        long time = System.currentTimeMillis() + 5000; // fire after 5 sec
- *        timerRegistry.register("example-timer", time);
- *      }
- *      public Collection<String> apply(String s) {
- *        ...
- *      }
- *      public Collection<String> onTimer(String key, long timestamp) {
- *        // example-timer fired
- *        ...
- *      }
- *    }
- * }</pre>
- * @param <K> type of the key
- * @param <OM> type of the output
- */
-public interface TimerFunction<K, OM> {
-
-  /**
-   * Registers any epoch-time timers using the registry
-   * @param timerRegistry a keyed {@link TimerRegistry}
-   */
-  void registerTimer(TimerRegistry<K> timerRegistry);
-
-  /**
-   * Returns the output after the timer with key fires.
-   * @param key timer key
-   * @param timestamp time of the epoch-time timer fired, in milliseconds
-   * @return {@link Collection} of output elements
-   */
-  Collection<OM> onTimer(K key, long timestamp);
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-api/src/main/java/org/apache/samza/scheduler/ScheduledCallback.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/scheduler/ScheduledCallback.java b/samza-api/src/main/java/org/apache/samza/scheduler/ScheduledCallback.java
new file mode 100644
index 0000000..8745422
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/scheduler/ScheduledCallback.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.scheduler;
+
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+
+/**
+ * The callback that is invoked when its corresponding schedule time registered via
+ * {@link org.apache.samza.task.TaskContext} is reached.
+ * @param <K> type of the callback key
+ */
+public interface ScheduledCallback<K> {
+  /**
+   * Invoked when the corresponding schedule time is reached.
+   * @param key key for callback
+   * @param collector contains the means of sending message envelopes to the output stream.
+   * @param coordinator manages execution of tasks.
+   */
+  void onCallback(K key, MessageCollector collector, TaskCoordinator coordinator);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
index ea2a3bc..eccedba 100644
--- a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
+++ b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import org.apache.samza.container.SamzaContainerContext;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.scheduler.ScheduledCallback;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.table.Table;
 
@@ -76,21 +77,21 @@ public interface TaskContext {
   }
 
   /**
-   * Register a keyed timer with a callback of {@link TimerCallback} in this task.
+   * Schedule the {@code callback} for the provided {@code key} to be invoked at epoch-time {@code timestamp}.
    * The callback will be invoked exclusively with any other operations for this task,
    * e.g. processing, windowing and commit.
-   * @param key timer key
-   * @param timestamp epoch time when the timer will be fired, in milliseconds
-   * @param callback callback when the timer is fired
+   * @param key key for the callback
+   * @param timestamp epoch time when the callback will be fired, in milliseconds
+   * @param callback callback to call when the {@code timestamp} is reached
    * @param <K> type of the key
    */
-  <K> void registerTimer(K key, long timestamp, TimerCallback<K> callback);
+  <K> void scheduleCallback(K key, long timestamp, ScheduledCallback<K> callback);
 
   /**
-   * Delete the keyed timer in this task.
-   * Deletion only happens if the timer hasn't been fired. Otherwise it will not interrupt.
-   * @param key timer key
+   * Delete the scheduled {@code callback} for the {@code key}.
+   * Deletion only happens if the callback hasn't been fired. Otherwise it will not interrupt.
+   * @param key callback key
    * @param <K> type of the key
    */
-  <K> void deleteTimer(K key);
+  <K> void deleteScheduledCallback(K key);
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-api/src/main/java/org/apache/samza/task/TimerCallback.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/task/TimerCallback.java b/samza-api/src/main/java/org/apache/samza/task/TimerCallback.java
deleted file mode 100644
index 3add129..0000000
--- a/samza-api/src/main/java/org/apache/samza/task/TimerCallback.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.task;
-
-/**
- * The callback that is invoked when its corresponding timer registered via {@link TaskContext} fires.
- * @param <K> type of the timer key
- */
-public interface TimerCallback<K> {
-  /**
-   * Invoked when the timer of key fires.
-   * @param key timer key
-   * @param collector contains the means of sending message envelopes to the output stream.
-   * @param coordinator manages execution of tasks.
-   */
-  void onTimer(K key, MessageCollector collector, TaskCoordinator coordinator);
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java b/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
index d65be4c..bea6373 100644
--- a/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
@@ -29,9 +29,9 @@ import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.table.Table;
 import org.apache.samza.table.TableManager;
-import org.apache.samza.task.SystemTimerScheduler;
+import org.apache.samza.task.EpochTimeScheduler;
 import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TimerCallback;
+import org.apache.samza.scheduler.ScheduledCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,7 +53,7 @@ public class TaskContextImpl implements TaskContext {
   private final JobModel jobModel;
   private final StreamMetadataCache streamMetadataCache;
   private final Map<String, Object> objectRegistry = new HashMap<>();
-  private final SystemTimerScheduler timerScheduler;
+  private final EpochTimeScheduler timerScheduler;
 
   private Object userContext = null;
 
@@ -76,7 +76,7 @@ public class TaskContextImpl implements TaskContext {
     this.tableManager = tableManager;
     this.jobModel = jobModel;
     this.streamMetadataCache = streamMetadataCache;
-    this.timerScheduler = SystemTimerScheduler.create(timerExecutor);
+    this.timerScheduler = EpochTimeScheduler.create(timerExecutor);
   }
 
   @Override
@@ -134,12 +134,12 @@ public class TaskContextImpl implements TaskContext {
   }
 
   @Override
-  public <K> void registerTimer(K key, long timestamp, TimerCallback<K> callback) {
+  public <K> void scheduleCallback(K key, long timestamp, ScheduledCallback<K> callback) {
     timerScheduler.setTimer(key, timestamp, callback);
   }
 
   @Override
-  public <K> void deleteTimer(K key) {
+  public <K> void deleteScheduledCallback(K key) {
     timerScheduler.deleteTimer(key);
   }
 
@@ -159,7 +159,7 @@ public class TaskContextImpl implements TaskContext {
     return streamMetadataCache;
   }
 
-  public SystemTimerScheduler getTimerScheduler() {
+  public EpochTimeScheduler getTimerScheduler() {
     return timerScheduler;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index eedf45d..5cafd26 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -25,8 +25,8 @@ import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.TaskModel;
-import org.apache.samza.operators.TimerRegistry;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.Scheduler;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.system.EndOfStreamMessage;
 import org.apache.samza.metrics.Counter;
@@ -439,19 +439,19 @@ public abstract class OperatorImpl<M, RM> {
 
   /**
    * Returns a registry which allows registering arbitrary system-clock timer with K-typed key.
-   * The user-defined function in the operator spec needs to implement {@link TimerFunction#onTimer(Object, long)}
+   * The user-defined function in the operator spec needs to implement {@link ScheduledFunction#onCallback(Object, long)}
    * for timer notifications.
    * @param <K> key type for the timer.
-   * @return an instance of {@link TimerRegistry}
+   * @return an instance of {@link Scheduler}
    */
-  <K> TimerRegistry<K> createOperatorTimerRegistry() {
-    return new TimerRegistry<K>() {
+  <K> Scheduler<K> createOperatorScheduler() {
+    return new Scheduler<K>() {
       @Override
-      public void register(K key, long time) {
-        taskContext.registerTimer(key, time, (k, collector, coordinator) -> {
-            final TimerFunction<K, RM> timerFn = getOperatorSpec().getTimerFn();
-            if (timerFn != null) {
-              final Collection<RM> output = timerFn.onTimer(key, time);
+      public void schedule(K key, long time) {
+        taskContext.scheduleCallback(key, time, (k, collector, coordinator) -> {
+            final ScheduledFunction<K, RM> scheduledFn = getOperatorSpec().getScheduledFn();
+            if (scheduledFn != null) {
+              final Collection<RM> output = scheduledFn.onCallback(key, time);
 
               if (!output.isEmpty()) {
                 output.forEach(rm ->
@@ -460,7 +460,7 @@ public abstract class OperatorImpl<M, RM> {
               }
             } else {
               throw new SamzaException(
-                  String.format("Operator %s id %s (created at %s) must implement TimerFunction to use system timer.",
+                  String.format("Operator %s id %s (created at %s) must implement ScheduledFunction to use system timer.",
                       getOperatorSpec().getOpCode().name(), getOpImplId(), getOperatorSpec().getSourceLocation()));
             }
           });
@@ -468,7 +468,7 @@ public abstract class OperatorImpl<M, RM> {
 
       @Override
       public void delete(K key) {
-        taskContext.deleteTimer(key);
+        taskContext.deleteScheduledCallback(key);
       }
     };
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
index 367576a..d76c7de 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
@@ -28,7 +28,7 @@ import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.operators.KV;
-import org.apache.samza.operators.TimerRegistry;
+import org.apache.samza.operators.Scheduler;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.PartialJoinFunction;
 import org.apache.samza.util.TimestampedValue;
@@ -172,9 +172,9 @@ public class OperatorImplGraph {
       operatorImpl.init(config, context);
       operatorImpl.registerInputStream(inputStream);
 
-      if (operatorSpec.getTimerFn() != null) {
-        final TimerRegistry timerRegistry = operatorImpl.createOperatorTimerRegistry();
-        operatorSpec.getTimerFn().registerTimer(timerRegistry);
+      if (operatorSpec.getScheduledFn() != null) {
+        final Scheduler scheduler = operatorImpl.createOperatorScheduler();
+        operatorSpec.getScheduledFn().schedule(scheduler);
       }
 
       // Note: The key here is opImplId, which may not equal opId for some impls (e.g. PartialJoinOperatorImpl).

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
index 82dc0bf..b175671 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
@@ -183,7 +183,7 @@ public class WindowOperatorImpl<M, K> extends OperatorImpl<M, WindowPane<K, Obje
 
   @Override
   public Collection<WindowPane<K, Object>> handleTimer(MessageCollector collector, TaskCoordinator coordinator) {
-    LOG.trace("Processing timer.");
+    LOG.trace("Processing time triggers");
     List<WindowPane<K, Object>> results = new ArrayList<>();
     List<TriggerKey<K>> keys = triggerScheduler.runPendingCallbacks();
 

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-core/src/main/java/org/apache/samza/operators/spec/BroadcastOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/BroadcastOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/BroadcastOperatorSpec.java
index 2c76e60..fb6515a 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/BroadcastOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/BroadcastOperatorSpec.java
@@ -20,7 +20,7 @@
 
 package org.apache.samza.operators.spec;
 
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 
 public class BroadcastOperatorSpec<M> extends OperatorSpec<M, Void> {
@@ -43,7 +43,7 @@ public class BroadcastOperatorSpec<M> extends OperatorSpec<M, Void> {
   }
 
   @Override
-  public TimerFunction getTimerFn() {
+  public ScheduledFunction getScheduledFn() {
     return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java
index a5cdb82..4e640dc 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java
@@ -23,7 +23,7 @@ import java.util.Collection;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.task.TaskContext;
 
@@ -68,7 +68,7 @@ class FilterOperatorSpec<M> extends StreamOperatorSpec<M, M> {
   }
 
   @Override
-  public TimerFunction getTimerFn() {
-    return this.filterFn instanceof TimerFunction ? (TimerFunction) this.filterFn : null;
+  public ScheduledFunction getScheduledFn() {
+    return this.filterFn instanceof ScheduledFunction ? (ScheduledFunction) this.filterFn : null;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-core/src/main/java/org/apache/samza/operators/spec/FlatMapOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/FlatMapOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/FlatMapOperatorSpec.java
index a93a221..160f432 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/FlatMapOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/FlatMapOperatorSpec.java
@@ -19,7 +19,7 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 
 
@@ -41,7 +41,7 @@ class FlatMapOperatorSpec<M, OM> extends StreamOperatorSpec<M, OM> {
   }
 
   @Override
-  public TimerFunction getTimerFn() {
-    return this.transformFn instanceof TimerFunction ? (TimerFunction) this.transformFn : null;
+  public ScheduledFunction getScheduledFn() {
+    return this.transformFn instanceof ScheduledFunction ? (ScheduledFunction) this.transformFn : null;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
index c49443d..1af4806 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
@@ -19,7 +19,7 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.operators.functions.InputTransformer;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.IncomingMessageEnvelope;
@@ -99,7 +99,7 @@ public class InputOperatorSpec extends OperatorSpec<IncomingMessageEnvelope, Obj
   }
 
   @Override
-  public TimerFunction getTimerFn() {
+  public ScheduledFunction getScheduledFn() {
     return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
index 1b55784..bb6ed59 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
@@ -20,7 +20,7 @@ package org.apache.samza.operators.spec;
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.operators.impl.store.TimestampedValueSerde;
 import org.apache.samza.util.TimestampedValue;
@@ -105,8 +105,8 @@ public class JoinOperatorSpec<K, M, OM, JM> extends OperatorSpec<Object, JM> imp
   }
 
   @Override
-  public TimerFunction getTimerFn() {
-    return joinFn instanceof TimerFunction ? (TimerFunction) joinFn : null;
+  public ScheduledFunction getScheduledFn() {
+    return joinFn instanceof ScheduledFunction ? (ScheduledFunction) joinFn : null;
   }
 
   public OperatorSpec getLeftInputOpSpec() {

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java
index 1e2190b..6ce522f 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java
@@ -23,7 +23,7 @@ import java.util.Collection;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.task.TaskContext;
 
@@ -71,7 +71,7 @@ class MapOperatorSpec<M, OM> extends StreamOperatorSpec<M, OM> {
   }
 
   @Override
-  public TimerFunction getTimerFn() {
-    return this.mapFn instanceof TimerFunction ? (TimerFunction) this.mapFn : null;
+  public ScheduledFunction getScheduledFn() {
+    return this.mapFn instanceof ScheduledFunction ? (ScheduledFunction) this.mapFn : null;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-core/src/main/java/org/apache/samza/operators/spec/MergeOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/MergeOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/MergeOperatorSpec.java
index 987f72c..3685c5f 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/MergeOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/MergeOperatorSpec.java
@@ -19,7 +19,7 @@
 package org.apache.samza.operators.spec;
 
 import java.util.ArrayList;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 
 
@@ -45,7 +45,7 @@ class MergeOperatorSpec<M> extends StreamOperatorSpec<M, M> {
   }
 
   @Override
-  public TimerFunction getTimerFn() {
+  public ScheduledFunction getScheduledFn() {
     return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
index 1e021f5..0442f7c 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
@@ -25,7 +25,7 @@ import java.util.LinkedHashSet;
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 
 /**
@@ -143,5 +143,5 @@ public abstract class OperatorSpec<M, OM> implements Serializable {
 
   abstract public WatermarkFunction getWatermarkFn();
 
-  abstract public TimerFunction getTimerFn();
+  abstract public ScheduledFunction getScheduledFn();
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
index 40a5c0e..d6238b8 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
@@ -19,7 +19,7 @@
 package org.apache.samza.operators.spec;
 
 
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 
 /**
@@ -59,7 +59,7 @@ public class OutputOperatorSpec<M> extends OperatorSpec<M, Void> {
   }
 
   @Override
-  public TimerFunction getTimerFn() {
+  public ScheduledFunction getScheduledFn() {
     return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java
index d6bf3d9..069c867 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java
@@ -20,7 +20,7 @@ package org.apache.samza.operators.spec;
 
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 
 import static com.google.common.base.Preconditions.checkArgument;
@@ -55,10 +55,10 @@ public class PartitionByOperatorSpec<M, K, V> extends OperatorSpec<M, Void> {
       MapFunction<? super M, ? extends K> keyFunction,
       MapFunction<? super M, ? extends V> valueFunction, String opId) {
     super(OpCode.PARTITION_BY, opId);
-    checkArgument(!(keyFunction instanceof TimerFunction || keyFunction instanceof WatermarkFunction),
-        "keyFunction for partitionBy should not implement TimerFunction or WatermarkFunction.");
-    checkArgument(!(valueFunction instanceof TimerFunction || valueFunction instanceof WatermarkFunction),
-        "valueFunction for partitionBy should not implement TimerFunction or WatermarkFunction.");
+    checkArgument(!(keyFunction instanceof ScheduledFunction || keyFunction instanceof WatermarkFunction),
+        "keyFunction for partitionBy should not implement ScheduledFunction or WatermarkFunction.");
+    checkArgument(!(valueFunction instanceof ScheduledFunction || valueFunction instanceof WatermarkFunction),
+        "valueFunction for partitionBy should not implement ScheduledFunction or WatermarkFunction.");
     this.outputStream = outputStream;
     this.keyFunction = keyFunction;
     this.valueFunction = valueFunction;
@@ -86,7 +86,7 @@ public class PartitionByOperatorSpec<M, K, V> extends OperatorSpec<M, Void> {
   }
 
   @Override
-  public TimerFunction getTimerFn() {
+  public ScheduledFunction getScheduledFn() {
     return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java
index 22f393e..bf032a2 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java
@@ -20,7 +20,7 @@ package org.apache.samza.operators.spec;
 
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.operators.KV;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.table.TableSpec;
 
@@ -58,7 +58,7 @@ public class SendToTableOperatorSpec<K, V> extends OperatorSpec<KV<K, V>, Void>
   }
 
   @Override
-  public TimerFunction getTimerFn() {
+  public ScheduledFunction getScheduledFn() {
     return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
index aa0f066..91e2775 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
@@ -19,7 +19,7 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 
 
@@ -57,7 +57,7 @@ public class SinkOperatorSpec<M> extends OperatorSpec<M, Void> {
   }
 
   @Override
-  public TimerFunction getTimerFn() {
-    return sinkFn instanceof TimerFunction ? (TimerFunction) sinkFn : null;
+  public ScheduledFunction getScheduledFn() {
+    return sinkFn instanceof ScheduledFunction ? (ScheduledFunction) sinkFn : null;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java
index c7735c6..1849c64 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java
@@ -19,8 +19,8 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.StreamTableJoinFunction;
-import org.apache.samza.operators.functions.TimerFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.table.TableSpec;
 
@@ -66,8 +66,8 @@ public class StreamTableJoinOperatorSpec<K, M, R, JM> extends OperatorSpec<M, JM
   }
 
   @Override
-  public TimerFunction getTimerFn() {
-    return joinFn instanceof TimerFunction ? (TimerFunction) joinFn : null;
+  public ScheduledFunction getScheduledFn() {
+    return joinFn instanceof ScheduledFunction ? (ScheduledFunction) joinFn : null;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
index 8d1ad29..ede16a5 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
@@ -20,7 +20,7 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.operators.functions.FoldLeftFunction;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.operators.impl.store.TimeSeriesKeySerde;
 import org.apache.samza.operators.triggers.AnyTrigger;
@@ -64,14 +64,14 @@ public class WindowOperatorSpec<M, WK, WV> extends OperatorSpec<M, WindowPane<WK
   WindowOperatorSpec(WindowInternal<M, WK, WV> window, String opId) {
     super(OpCode.WINDOW, opId);
     checkArgument(window.getInitializer() == null ||
-        !(window.getInitializer() instanceof TimerFunction || window.getInitializer() instanceof WatermarkFunction),
-        "A window does not accepts a user-defined TimerFunction or WatermarkFunction as the initializer.");
+        !(window.getInitializer() instanceof ScheduledFunction || window.getInitializer() instanceof WatermarkFunction),
+        "A window does not accepts a user-defined ScheduledFunction or WatermarkFunction as the initializer.");
     checkArgument(window.getKeyExtractor() == null ||
-        !(window.getKeyExtractor() instanceof TimerFunction || window.getKeyExtractor() instanceof WatermarkFunction),
-        "A window does not accepts a user-defined TimerFunction or WatermarkFunction as the keyExtractor.");
+        !(window.getKeyExtractor() instanceof ScheduledFunction || window.getKeyExtractor() instanceof WatermarkFunction),
+        "A window does not accepts a user-defined ScheduledFunction or WatermarkFunction as the keyExtractor.");
     checkArgument(window.getEventTimeExtractor() == null ||
-        !(window.getEventTimeExtractor() instanceof TimerFunction || window.getEventTimeExtractor() instanceof WatermarkFunction),
-        "A window does not accepts a user-defined TimerFunction or WatermarkFunction as the eventTimeExtractor.");
+        !(window.getEventTimeExtractor() instanceof ScheduledFunction || window.getEventTimeExtractor() instanceof WatermarkFunction),
+        "A window does not accepts a user-defined ScheduledFunction or WatermarkFunction as the eventTimeExtractor.");
     this.window = window;
   }
 
@@ -88,21 +88,21 @@ public class WindowOperatorSpec<M, WK, WV> extends OperatorSpec<M, WindowPane<WK
    * @return the default triggering interval
    */
   public long getDefaultTriggerMs() {
-    List<TimeBasedTrigger> timerTriggers = new ArrayList<>();
+    List<TimeBasedTrigger> timeBasedTriggers = new ArrayList<>();
 
     if (window.getDefaultTrigger() != null) {
-      timerTriggers.addAll(getTimeBasedTriggers(window.getDefaultTrigger()));
+      timeBasedTriggers.addAll(getTimeBasedTriggers(window.getDefaultTrigger()));
     }
     if (window.getEarlyTrigger() != null) {
-      timerTriggers.addAll(getTimeBasedTriggers(window.getEarlyTrigger()));
+      timeBasedTriggers.addAll(getTimeBasedTriggers(window.getEarlyTrigger()));
     }
     if (window.getLateTrigger() != null) {
-      timerTriggers.addAll(getTimeBasedTriggers(window.getLateTrigger()));
+      timeBasedTriggers.addAll(getTimeBasedTriggers(window.getLateTrigger()));
     }
 
-    LOG.info("Got {} timer triggers", timerTriggers.size());
+    LOG.info("Got {} time-based triggers", timeBasedTriggers.size());
 
-    List<Long> candidateDurations = timerTriggers.stream()
+    List<Long> candidateDurations = timeBasedTriggers.stream()
         .map(timeBasedTrigger -> timeBasedTrigger.getDuration().toMillis())
         .collect(Collectors.toList());
 
@@ -135,9 +135,9 @@ public class WindowOperatorSpec<M, WK, WV> extends OperatorSpec<M, WindowPane<WK
   }
 
   @Override
-  public TimerFunction getTimerFn() {
+  public ScheduledFunction getScheduledFn() {
     FoldLeftFunction fn = window.getFoldLeftFunction();
-    return fn instanceof TimerFunction ? (TimerFunction) fn : null;
+    return fn instanceof ScheduledFunction ? (ScheduledFunction) fn : null;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
index 3b3e008..111869c 100644
--- a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
+++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
@@ -330,7 +330,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
     COMMIT,
     PROCESS,
     END_OF_STREAM,
-    TIMER,
+    SCHEDULER,
     NO_OP
   }
 
@@ -374,10 +374,10 @@ public class AsyncRunLoop implements Runnable, Throttleable {
         }, commitMs, commitMs, TimeUnit.MILLISECONDS);
       }
 
-      final SystemTimerScheduler timerFactory = task.context().getTimerScheduler();
-      if (timerFactory != null) {
-        timerFactory.registerListener(() -> {
-            state.needTimer();
+      final EpochTimeScheduler epochTimeScheduler = task.context().getTimerScheduler();
+      if (epochTimeScheduler != null) {
+        epochTimeScheduler.registerListener(() -> {
+            state.needScheduler();
           });
       }
     }
@@ -409,8 +409,8 @@ public class AsyncRunLoop implements Runnable, Throttleable {
         case WINDOW:
           window();
           break;
-        case TIMER:
-          timer();
+        case SCHEDULER:
+          scheduler();
           break;
         case COMMIT:
           commit();
@@ -551,8 +551,8 @@ public class AsyncRunLoop implements Runnable, Throttleable {
       }
     }
 
-    private void timer() {
-      state.startTimer();
+    private void scheduler() {
+      state.startScheduler();
       Runnable timerWorker = new Runnable() {
         @Override
         public void run() {
@@ -560,26 +560,26 @@ public class AsyncRunLoop implements Runnable, Throttleable {
             ReadableCoordinator coordinator = new ReadableCoordinator(task.taskName());
 
             long startTime = clock.nanoTime();
-            task.timer(coordinator);
+            task.scheduler(coordinator);
             containerMetrics.timerNs().update(clock.nanoTime() - startTime);
 
             coordinatorRequests.update(coordinator);
-            state.doneTimer();
+            state.doneScheduler();
           } catch (Throwable t) {
-            log.error("Task {} timer failed", task.taskName(), t);
+            log.error("Task {} scheduler failed", task.taskName(), t);
             abort(t);
           } finally {
-            log.trace("Task {} timer completed", task.taskName());
+            log.trace("Task {} scheduler completed", task.taskName());
             resume();
           }
         }
       };
 
       if (threadPool != null) {
-        log.trace("Task {} timer runs on the thread pool", task.taskName());
+        log.trace("Task {} scheduler runs on the thread pool", task.taskName());
         threadPool.submit(timerWorker);
       } else {
-        log.trace("Task {} timer runs on the run loop thread", task.taskName());
+        log.trace("Task {} scheduler runs on the run loop thread", task.taskName());
         timerWorker.run();
       }
     }
@@ -655,12 +655,12 @@ public class AsyncRunLoop implements Runnable, Throttleable {
   private final class AsyncTaskState {
     private volatile boolean needWindow = false;
     private volatile boolean needCommit = false;
-    private volatile boolean needTimer = false;
+    private volatile boolean needScheduler = false;
     private volatile boolean complete = false;
     private volatile boolean endOfStream = false;
     private volatile boolean windowInFlight = false;
     private volatile boolean commitInFlight = false;
-    private volatile boolean timerInFlight = false;
+    private volatile boolean schedulerInFlight = false;
     private final AtomicInteger messagesInFlight = new AtomicInteger(0);
     private final ArrayDeque<PendingEnvelope> pendingEnvelopeQueue;
 
@@ -706,28 +706,28 @@ public class AsyncRunLoop implements Runnable, Throttleable {
         needCommit = true;
       }
 
-      boolean opInFlight = windowInFlight || commitInFlight || timerInFlight;
+      boolean opInFlight = windowInFlight || commitInFlight || schedulerInFlight;
       /*
        * A task is ready to commit, when task.commit(needCommit) is requested either by user or commit thread
        * and either of the following conditions are true.
-       * a) When process, window, commit and timer are not in progress.
+       * a) When process, window, commit and scheduler are not in progress.
        * b) When task.async.commit is true and window, commit are not in progress.
        */
       if (needCommit) {
         return (messagesInFlight.get() == 0 || isAsyncCommitEnabled) && !opInFlight;
-      } else if (needWindow || needTimer || endOfStream) {
+      } else if (needWindow || needScheduler || endOfStream) {
         /*
-         * A task is ready for window, timer or end-of-stream operation.
+         * A task is ready for window, scheduler or end-of-stream operation.
          */
         return messagesInFlight.get() == 0 && !opInFlight;
       } else {
         /*
          * A task is ready to process new message, when number of task.process calls in progress < task.max.concurrency
          * and either of the following conditions are true.
-         * a) When window, commit and timer are not in progress.
-         * b) When task.async.commit is true and window and timer are not in progress.
+         * a) When window, commit and scheduler are not in progress.
+         * b) When task.async.commit is true and window and scheduler are not in progress.
          */
-        return messagesInFlight.get() < maxConcurrency && !windowInFlight && !timerInFlight && (isAsyncCommitEnabled || !commitInFlight);
+        return messagesInFlight.get() < maxConcurrency && !windowInFlight && !schedulerInFlight && (isAsyncCommitEnabled || !commitInFlight);
       }
     }
 
@@ -741,7 +741,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
       if (isReady()) {
         if (needCommit) return WorkerOp.COMMIT;
         else if (needWindow) return WorkerOp.WINDOW;
-        else if (needTimer) return WorkerOp.TIMER;
+        else if (needScheduler) return WorkerOp.SCHEDULER;
         else if (endOfStream && pendingEnvelopeQueue.isEmpty()) return WorkerOp.END_OF_STREAM;
         else if (!pendingEnvelopeQueue.isEmpty()) return WorkerOp.PROCESS;
       }
@@ -756,8 +756,8 @@ public class AsyncRunLoop implements Runnable, Throttleable {
       needCommit = true;
     }
 
-    private void needTimer() {
-      needTimer = true;
+    private void needScheduler() {
+      needScheduler = true;
     }
 
     private void startWindow() {
@@ -775,9 +775,9 @@ public class AsyncRunLoop implements Runnable, Throttleable {
       taskMetrics.messagesInFlight().set(count);
     }
 
-    private void startTimer() {
-      needTimer = false;
-      timerInFlight = true;
+    private void startScheduler() {
+      needScheduler = false;
+      schedulerInFlight = true;
     }
 
     private void doneCommit() {
@@ -793,8 +793,8 @@ public class AsyncRunLoop implements Runnable, Throttleable {
       taskMetrics.messagesInFlight().set(count);
     }
 
-    private void doneTimer() {
-      timerInFlight = false;
+    private void doneScheduler() {
+      schedulerInFlight = false;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-core/src/main/java/org/apache/samza/task/EpochTimeScheduler.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/EpochTimeScheduler.java b/samza-core/src/main/java/org/apache/samza/task/EpochTimeScheduler.java
new file mode 100644
index 0000000..3f50d9a
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/task/EpochTimeScheduler.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task;
+
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.samza.scheduler.ScheduledCallback;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Per-task scheduler for keyed timers.
+ * It does the following things:
+ * 1) schedules the timer on the {@link ScheduledExecutorService}.
+ * 2) keeps track of the timers created and timers that are ready.
+ * 3) triggers listener whenever a timer fires.
+ */
+public class EpochTimeScheduler {
+
+  /**
+   * For run loop to listen to timer firing so it can schedule the callbacks.
+   */
+  public interface TimerListener {
+    void onTimer();
+  }
+
+  private final ScheduledExecutorService executor;
+  private final Map<Object, ScheduledFuture> scheduledFutures = new ConcurrentHashMap<>();
+  private final Map<TimerKey<?>, ScheduledCallback> readyTimers = new ConcurrentHashMap<>();
+  private TimerListener timerListener;
+
+  public static EpochTimeScheduler create(ScheduledExecutorService executor) {
+    return new EpochTimeScheduler(executor);
+  }
+
+  private EpochTimeScheduler(ScheduledExecutorService executor) {
+    this.executor = executor;
+  }
+
+  public <K> void setTimer(K key, long timestamp, ScheduledCallback<K> callback) {
+    checkState(!scheduledFutures.containsKey(key),
+        String.format("Duplicate key %s registration for the same timer", key));
+
+    final long delay = timestamp - System.currentTimeMillis();
+    final ScheduledFuture<?> scheduledFuture = executor.schedule(() -> {
+        scheduledFutures.remove(key);
+        readyTimers.put(TimerKey.of(key, timestamp), callback);
+
+        if (timerListener != null) {
+          timerListener.onTimer();
+        }
+      }, delay > 0 ? delay : 0, TimeUnit.MILLISECONDS);
+    scheduledFutures.put(key, scheduledFuture);
+  }
+
+  public <K> void deleteTimer(K key) {
+    final ScheduledFuture<?> scheduledFuture = scheduledFutures.remove(key);
+    if (scheduledFuture != null) {
+      scheduledFuture.cancel(false);
+    }
+  }
+
+  void registerListener(TimerListener listener) {
+    timerListener = listener;
+  }
+
+  public Map<TimerKey<?>, ScheduledCallback> removeReadyTimers() {
+    final Map<TimerKey<?>, ScheduledCallback> timers = new TreeMap<>(readyTimers);
+    readyTimers.keySet().removeAll(timers.keySet());
+    return timers;
+  }
+
+  public static class TimerKey<K> implements Comparable<TimerKey<K>> {
+    private final K key;
+    private final long time;
+
+    static <K> TimerKey<K> of(K key, long time) {
+      return new TimerKey<>(key, time);
+    }
+
+    private TimerKey(K key, long time) {
+      this.key = key;
+      this.time = time;
+    }
+
+    public K getKey() {
+      return key;
+    }
+
+    public long getTime() {
+      return time;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      TimerKey<?> timerKey = (TimerKey<?>) o;
+      if (time != ((TimerKey<?>) o).time) {
+        return false;
+      }
+      return key.equals(timerKey.key);
+    }
+
+    @Override
+    public int hashCode() {
+      int result = key.hashCode();
+      result = 31 * result + Long.valueOf(time).hashCode();
+      return result;
+    }
+
+    @Override
+    public String toString() {
+      return "TimerKey{"
+          + "key=" + key
+          + ", time='" + time + '\''
+          + '}';
+    }
+
+    @Override
+    public int compareTo(TimerKey<K> o) {
+      final int timeCompare = Long.compare(time, o.time);
+      if (timeCompare != 0) {
+        return timeCompare;
+      }
+
+      return key.hashCode() - o.key.hashCode();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-core/src/main/java/org/apache/samza/task/SystemTimerScheduler.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/SystemTimerScheduler.java b/samza-core/src/main/java/org/apache/samza/task/SystemTimerScheduler.java
deleted file mode 100644
index aa9792b..0000000
--- a/samza-core/src/main/java/org/apache/samza/task/SystemTimerScheduler.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.task;
-
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Preconditions.checkState;
-
-/**
- * Per-task scheduler for keyed timers.
- * It does the following things:
- * 1) schedules the timer on the {@link ScheduledExecutorService}.
- * 2) keeps track of the timers created and timers that are ready.
- * 3) triggers listener whenever a timer fires.
- */
-public class SystemTimerScheduler {
-
-  /**
-   * For run loop to listen to timer firing so it can schedule the callbacks.
-   */
-  public interface TimerListener {
-    void onTimer();
-  }
-
-  private final ScheduledExecutorService executor;
-  private final Map<Object, ScheduledFuture> scheduledFutures = new ConcurrentHashMap<>();
-  private final Map<TimerKey<?>, TimerCallback> readyTimers = new ConcurrentHashMap<>();
-  private TimerListener timerListener;
-
-  public static SystemTimerScheduler create(ScheduledExecutorService executor) {
-    return new SystemTimerScheduler(executor);
-  }
-
-  private SystemTimerScheduler(ScheduledExecutorService executor) {
-    this.executor = executor;
-  }
-
-  public <K> void setTimer(K key, long timestamp, TimerCallback<K> callback) {
-    checkState(!scheduledFutures.containsKey(key),
-        String.format("Duplicate key %s registration for the same timer", key));
-
-    final long delay = timestamp - System.currentTimeMillis();
-    final ScheduledFuture<?> scheduledFuture = executor.schedule(() -> {
-        scheduledFutures.remove(key);
-        readyTimers.put(TimerKey.of(key, timestamp), callback);
-
-        if (timerListener != null) {
-          timerListener.onTimer();
-        }
-      }, delay > 0 ? delay : 0, TimeUnit.MILLISECONDS);
-    scheduledFutures.put(key, scheduledFuture);
-  }
-
-  public <K> void deleteTimer(K key) {
-    final ScheduledFuture<?> scheduledFuture = scheduledFutures.remove(key);
-    if (scheduledFuture != null) {
-      scheduledFuture.cancel(false);
-    }
-  }
-
-  void registerListener(TimerListener listener) {
-    timerListener = listener;
-  }
-
-  public Map<TimerKey<?>, TimerCallback> removeReadyTimers() {
-    final Map<TimerKey<?>, TimerCallback> timers = new TreeMap<>(readyTimers);
-    readyTimers.keySet().removeAll(timers.keySet());
-    return timers;
-  }
-
-  public static class TimerKey<K> implements Comparable<TimerKey<K>> {
-    private final K key;
-    private final long time;
-
-    static <K> TimerKey<K> of(K key, long time) {
-      return new TimerKey<>(key, time);
-    }
-
-    private TimerKey(K key, long time) {
-      this.key = key;
-      this.time = time;
-    }
-
-    public K getKey() {
-      return key;
-    }
-
-    public long getTime() {
-      return time;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      TimerKey<?> timerKey = (TimerKey<?>) o;
-      if (time != ((TimerKey<?>) o).time) {
-        return false;
-      }
-      return key.equals(timerKey.key);
-    }
-
-    @Override
-    public int hashCode() {
-      int result = key.hashCode();
-      result = 31 * result + Long.valueOf(time).hashCode();
-      return result;
-    }
-
-    @Override
-    public String toString() {
-      return "TimerKey{"
-          + "key=" + key
-          + ", time='" + time + '\''
-          + '}';
-    }
-
-    @Override
-    public int compareTo(TimerKey<K> o) {
-      final int timeCompare = Long.compare(time, o.time);
-      if (timeCompare != 0) {
-        return timeCompare;
-      }
-
-      return key.hashCode() - o.key.hashCode();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index d85f10f..9f4fd17 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -28,6 +28,7 @@ import org.apache.samza.config.Config
 import org.apache.samza.config.StreamConfig.Config2Stream
 import org.apache.samza.job.model.JobModel
 import org.apache.samza.metrics.MetricsReporter
+import org.apache.samza.scheduler.ScheduledCallback
 import org.apache.samza.storage.kv.KeyValueStore
 import org.apache.samza.storage.{TaskSideInputStorageManager, TaskStorageManager}
 import org.apache.samza.system._
@@ -221,12 +222,12 @@ class TaskInstance(
     }
   }
 
-  def timer(coordinator: ReadableCoordinator) {
-    trace("Timer for taskName: %s" format taskName)
+  def scheduler(coordinator: ReadableCoordinator) {
+    trace("Scheduler for taskName: %s" format taskName)
 
     exceptionHandler.maybeHandle {
       context.getTimerScheduler.removeReadyTimers().entrySet().foreach { entry =>
-        entry.getValue.asInstanceOf[TimerCallback[Any]].onTimer(entry.getKey.getKey, collector, coordinator)
+        entry.getValue.asInstanceOf[ScheduledCallback[Any]].onCallback(entry.getKey.getKey, collector, coordinator)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java b/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
index 57ae6d8..abbbd3b 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
@@ -28,7 +28,7 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplicationDescriptorImpl;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
@@ -176,7 +176,7 @@ public class TestOperatorSpecGraph {
     }
 
     @Override
-    public TimerFunction getTimerFn() {
+    public ScheduledFunction getScheduledFn() {
       return null;
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
index 249ff09..6d12d99 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
@@ -28,7 +28,7 @@ import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.metrics.ReadableMetricsRegistry;
 import org.apache.samza.metrics.Timer;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.task.MessageCollector;
@@ -220,7 +220,7 @@ public class TestOperatorImpl {
     }
 
     @Override
-    public TimerFunction getTimerFn() {
+    public ScheduledFunction getScheduledFn() {
       return null;
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java b/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java
index a34fdc3..454a661 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java
@@ -28,7 +28,7 @@ import java.util.List;
 import java.util.Map;
 import org.apache.samza.operators.OperatorSpecGraph;
 import org.apache.samza.operators.TableImpl;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.serializers.SerializableSerde;
 import org.apache.samza.table.TableSpec;
@@ -71,7 +71,7 @@ public class OperatorSpecTestUtils {
     assertNotEquals(oOpSpec, nOpSpec);
     assertEquals(oOpSpec.getOpId(), nOpSpec.getOpId());
     assertEquals(oOpSpec.getOpCode(), nOpSpec.getOpCode());
-    assertTimerFnsNotEqual(oOpSpec.getTimerFn(), nOpSpec.getTimerFn());
+    assertScheduledFnsNotEqual(oOpSpec.getScheduledFn(), nOpSpec.getScheduledFn());
     assertWatermarkFnNotEqual(nOpSpec.getWatermarkFn(), nOpSpec.getWatermarkFn());
     assertAllOperators(oOpSpec.getRegisteredOperatorSpecs(), nOpSpec.getRegisteredOperatorSpecs());
   }
@@ -83,11 +83,11 @@ public class OperatorSpecTestUtils {
     assertNotEquals(watermarkFn, watermarkFn1);
   }
 
-  private static void assertTimerFnsNotEqual(TimerFunction timerFn, TimerFunction timerFn1) {
-    if (timerFn == timerFn1 && timerFn == null) {
+  private static void assertScheduledFnsNotEqual(ScheduledFunction scheduledFn, ScheduledFunction scheduledFn1) {
+    if (scheduledFn == scheduledFn1 && scheduledFn == null) {
       return;
     }
-    assertNotEquals(timerFn, timerFn1);
+    assertNotEquals(scheduledFn, scheduledFn1);
   }
 
   private static void assertClonedTables(Map<TableSpec, TableImpl> originalTables, Map<TableSpec, TableImpl> clonedTables) {

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java
index a9ccd12..860e630 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java
@@ -24,7 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.operators.KV;
-import org.apache.samza.operators.TimerRegistry;
+import org.apache.samza.operators.Scheduler;
 import org.apache.samza.operators.data.TestMessageEnvelope;
 import org.apache.samza.operators.data.TestOutputMessageEnvelope;
 import org.apache.samza.operators.functions.FilterFunction;
@@ -33,7 +33,7 @@ import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.operators.functions.StreamTableJoinFunction;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
@@ -74,7 +74,8 @@ public class TestOperatorSpec {
     }
   }
 
-  private static class MapWithTimerFn implements MapFunction<TestMessageEnvelope, TestOutputMessageEnvelope>, TimerFunction<String, TestOutputMessageEnvelope> {
+  private static class MapWithScheduledFn implements MapFunction<TestMessageEnvelope, TestOutputMessageEnvelope>,
+                                                     ScheduledFunction<String, TestOutputMessageEnvelope> {
 
     @Override
     public TestOutputMessageEnvelope apply(TestMessageEnvelope m) {
@@ -82,12 +83,12 @@ public class TestOperatorSpec {
     }
 
     @Override
-    public void registerTimer(TimerRegistry<String> timerRegistry) {
+    public void schedule(Scheduler<String> scheduler) {
 
     }
 
     @Override
-    public Collection<TestOutputMessageEnvelope> onTimer(String key, long timestamp) {
+    public Collection<TestOutputMessageEnvelope> onCallback(String key, long timestamp) {
       return null;
     }
   }
@@ -164,8 +165,8 @@ public class TestOperatorSpec {
     assertTrue(cloneOperatorSpec.getTransformFn() instanceof FlatMapFunction);
     assertNull(streamOperatorSpec.getWatermarkFn());
     assertNull(cloneOperatorSpec.getWatermarkFn());
-    assertNull(streamOperatorSpec.getTimerFn());
-    assertNull(cloneOperatorSpec.getTimerFn());
+    assertNull(streamOperatorSpec.getScheduledFn());
+    assertNull(cloneOperatorSpec.getScheduledFn());
   }
 
   @Test
@@ -187,8 +188,8 @@ public class TestOperatorSpec {
     assertNotEquals(userFn, clonedUserFn);
     assertNull(streamOperatorSpec.getWatermarkFn());
     assertNull(cloneOperatorSpec.getWatermarkFn());
-    assertNull(streamOperatorSpec.getTimerFn());
-    assertNull(cloneOperatorSpec.getTimerFn());
+    assertNull(streamOperatorSpec.getScheduledFn());
+    assertNull(cloneOperatorSpec.getScheduledFn());
   }
 
   @Test
@@ -209,8 +210,8 @@ public class TestOperatorSpec {
     assertNotEquals(userFn, clonedUserFn);
     assertNull(streamOperatorSpec.getWatermarkFn());
     assertNull(cloneOperatorSpec.getWatermarkFn());
-    assertNull(streamOperatorSpec.getTimerFn());
-    assertNull(cloneOperatorSpec.getTimerFn());
+    assertNull(streamOperatorSpec.getScheduledFn());
+    assertNull(cloneOperatorSpec.getScheduledFn());
   }
 
   @Test
@@ -360,13 +361,13 @@ public class TestOperatorSpec {
     assertEquals(streamOperatorSpec.getWatermarkFn(), testMapFn);
     assertNotNull(cloneOperatorSpec.getWatermarkFn());
     assertNotEquals(cloneOperatorSpec.getTransformFn(), cloneOperatorSpec.getWatermarkFn());
-    assertNull(streamOperatorSpec.getTimerFn());
-    assertNull(cloneOperatorSpec.getTimerFn());
+    assertNull(streamOperatorSpec.getScheduledFn());
+    assertNull(cloneOperatorSpec.getScheduledFn());
   }
 
   @Test
-  public void testMapStreamOperatorSpecWithTimer() {
-    MapWithTimerFn testMapFn = new MapWithTimerFn();
+  public void testMapStreamOperatorSpecWithScheduledFunction() {
+    MapWithScheduledFn testMapFn = new MapWithScheduledFn();
 
     StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> streamOperatorSpec =
         OperatorSpecs.createMapOperatorSpec(testMapFn, "op0");
@@ -378,9 +379,9 @@ public class TestOperatorSpec {
     assertNull(streamOperatorSpec.getWatermarkFn());
     assertNull(cloneOperatorSpec.getWatermarkFn());
     assertNotEquals(cloneOperatorSpec.getTransformFn(), cloneOperatorSpec.getWatermarkFn());
-    assertEquals(streamOperatorSpec.getTimerFn(), testMapFn);
-    assertNotNull(cloneOperatorSpec.getTimerFn());
-    assertNotEquals(streamOperatorSpec.getTimerFn(), cloneOperatorSpec.getTimerFn());
+    assertEquals(streamOperatorSpec.getScheduledFn(), testMapFn);
+    assertNotNull(cloneOperatorSpec.getScheduledFn());
+    assertNotEquals(streamOperatorSpec.getScheduledFn(), cloneOperatorSpec.getScheduledFn());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/samza/blob/10607f0a/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java
index db7079c..1d98580 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java
@@ -23,13 +23,13 @@ import java.util.Map;
 import org.apache.samza.application.StreamApplicationDescriptorImpl;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.operators.Scheduler;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OperatorSpecGraph;
-import org.apache.samza.operators.TimerRegistry;
 import org.apache.samza.operators.descriptors.GenericInputDescriptor;
 import org.apache.samza.operators.descriptors.GenericSystemDescriptor;
 import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
@@ -58,7 +58,7 @@ public class TestPartitionByOperatorSpec {
   private final String testJobId = "1";
   private final String testRepartitionedStreamName = "parByKey";
 
-  class TimerMapFn implements MapFunction<Object, String>, TimerFunction<String, Object> {
+  class ScheduledMapFn implements MapFunction<Object, String>, ScheduledFunction<String, Object> {
 
     @Override
     public String apply(Object message) {
@@ -66,12 +66,12 @@ public class TestPartitionByOperatorSpec {
     }
 
     @Override
-    public void registerTimer(TimerRegistry<String> timerRegistry) {
+    public void schedule(Scheduler<String> scheduler) {
 
     }
 
     @Override
-    public Collection<Object> onTimer(String key, long timestamp) {
+    public Collection<Object> onCallback(String key, long timestamp) {
       return null;
     }
   }
@@ -117,7 +117,7 @@ public class TestPartitionByOperatorSpec {
     assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
     assertTrue(inputOpSpec.getValueSerde() instanceof NoOpSerde);
     assertTrue(inputOpSpec.isKeyed());
-    assertNull(inputOpSpec.getTimerFn());
+    assertNull(inputOpSpec.getScheduledFn());
     assertNull(inputOpSpec.getWatermarkFn());
     InputOperatorSpec originInputSpec = inputOpSpecs.get(testinputDescriptor.getStreamId());
     assertTrue(originInputSpec.getRegisteredOperatorSpecs().toArray()[0] instanceof PartitionByOperatorSpec);
@@ -126,7 +126,7 @@ public class TestPartitionByOperatorSpec {
     assertEquals(reparOpSpec.getKeyFunction(), keyFn);
     assertEquals(reparOpSpec.getValueFunction(), valueFn);
     assertEquals(reparOpSpec.getOutputStream().getStreamId(), reparOpSpec.getOpId());
-    assertNull(reparOpSpec.getTimerFn());
+    assertNull(reparOpSpec.getScheduledFn());
     assertNull(reparOpSpec.getWatermarkFn());
   }
 
@@ -144,7 +144,7 @@ public class TestPartitionByOperatorSpec {
     assertNull(inputOpSpec.getKeySerde());
     assertNull(inputOpSpec.getValueSerde());
     assertTrue(inputOpSpec.isKeyed());
-    assertNull(inputOpSpec.getTimerFn());
+    assertNull(inputOpSpec.getScheduledFn());
     assertNull(inputOpSpec.getWatermarkFn());
     InputOperatorSpec originInputSpec = streamAppDesc.getInputOperators().get(testinputDescriptor.getStreamId());
     assertTrue(originInputSpec.getRegisteredOperatorSpecs().toArray()[0] instanceof PartitionByOperatorSpec);
@@ -153,7 +153,7 @@ public class TestPartitionByOperatorSpec {
     assertEquals(reparOpSpec.getKeyFunction(), keyFn);
     assertEquals(reparOpSpec.getValueFunction(), valueFn);
     assertEquals(reparOpSpec.getOutputStream().getStreamId(), reparOpSpec.getOpId());
-    assertNull(reparOpSpec.getTimerFn());
+    assertNull(reparOpSpec.getScheduledFn());
     assertNull(reparOpSpec.getWatermarkFn());
   }
 
@@ -169,8 +169,8 @@ public class TestPartitionByOperatorSpec {
   }
 
   @Test(expected = IllegalArgumentException.class)
-  public void testTimerFunctionAsKeyFn() {
-    TimerMapFn keyFn = new TimerMapFn();
+  public void testScheduledFunctionAsKeyFn() {
+    ScheduledMapFn keyFn = new ScheduledMapFn();
     new StreamApplicationDescriptorImpl(appDesc -> {
         MessageStream<Object> inputStream = appDesc.getInputStream(testinputDescriptor);
         inputStream.partitionBy(keyFn, m -> m, "parByKey");
@@ -187,8 +187,8 @@ public class TestPartitionByOperatorSpec {
   }
 
   @Test(expected = IllegalArgumentException.class)
-  public void testTimerFunctionAsValueFn() {
-    TimerMapFn valueFn = new TimerMapFn();
+  public void testScheduledFunctionAsValueFn() {
+    ScheduledMapFn valueFn = new ScheduledMapFn();
     new StreamApplicationDescriptorImpl(appDesc -> {
         MessageStream<Object> inputStream = appDesc.getInputStream(testinputDescriptor);
         inputStream.partitionBy(m -> m.toString(), valueFn, "parByKey");