You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by xi...@apache.org on 2019/03/20 20:58:20 UTC

[beam] branch master updated: [BEAM-6866]: SamzaRunner: support timers in ParDo (#8092)

This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 041329b  [BEAM-6866]: SamzaRunner: support timers in ParDo (#8092)
041329b is described below

commit 041329bbe2454943d6b021ffd4b3795539c2bfef
Author: xinyuiscool <xi...@gmail.com>
AuthorDate: Wed Mar 20 13:58:11 2019 -0700

    [BEAM-6866]: SamzaRunner: support timers in ParDo (#8092)
---
 runners/samza/build.gradle                         |   1 -
 .../beam/runners/samza/SamzaPipelineOptions.java   |   6 +
 .../beam/runners/samza/SamzaPipelineResult.java    |  21 +-
 .../apache/beam/runners/samza/runtime/DoFnOp.java  |  52 ++---
 .../beam/runners/samza/runtime/GroupByKeyOp.java   |  21 +-
 .../beam/runners/samza/runtime/KeyedTimerData.java |  87 +++++++-
 .../org/apache/beam/runners/samza/runtime/Op.java  |   2 +-
 .../beam/runners/samza/runtime/OpAdapter.java      |  17 +-
 .../runners/samza/runtime/SamzaDoFnRunners.java    |   2 +-
 .../samza/runtime/SamzaStoreStateInternals.java    |  81 +++++---
 .../samza/runtime/SamzaTimerInternalsFactory.java  | 209 ++++++++++++++-----
 .../beam/runners/samza/runtime/TimerKey.java       |  93 ---------
 .../translation/ParDoBoundMultiTranslator.java     |   4 -
 .../runners/samza/runtime/KeyedTimerDataTest.java  |  52 +++++
 .../runtime/SamzaTimerInternalsFactoryTest.java    | 229 +++++++++++++++++++++
 website/src/_data/capability-matrix.yml            |   6 +-
 16 files changed, 657 insertions(+), 226 deletions(-)

diff --git a/runners/samza/build.gradle b/runners/samza/build.gradle
index a1454e1..faa4d51 100644
--- a/runners/samza/build.gradle
+++ b/runners/samza/build.gradle
@@ -84,7 +84,6 @@ task validatesRunner(type: Test) {
     excludeCategories 'org.apache.beam.sdk.testing.UsesImpulse'
     excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
     excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
-    excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo'
     excludeCategories 'org.apache.beam.sdk.testing.UsesMetricsPusher'
     excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle'
   }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java
index e5955a6..cee55c4 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java
@@ -88,4 +88,10 @@ public interface SamzaPipelineOptions extends PipelineOptions {
   Boolean getStateDurable();
 
   void setStateDurable(Boolean stateDurable);
+
+  @Description("The maximum number of event-time timers buffered in memory for a transform.")
+  @Default.Integer(50000)
+  int getTimerBufferSize();
+
+  void setTimerBufferSize(int timerBufferSize);
 }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineResult.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineResult.java
index c7c78ae..9badbc3 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineResult.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineResult.java
@@ -23,6 +23,7 @@ import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.runtime.ApplicationRunner;
@@ -98,7 +99,8 @@ public class SamzaPipelineResult implements PipelineResult {
       case UnsuccessfulFinish:
         LOG.error(status.getThrowable().getMessage(), status.getThrowable());
         return new StateInfo(
-            State.FAILED, new Pipeline.PipelineExecutionException(status.getThrowable()));
+            State.FAILED,
+            new Pipeline.PipelineExecutionException(getUserCodeException(status.getThrowable())));
       default:
         return new StateInfo(State.UNKNOWN);
     }
@@ -117,4 +119,21 @@ public class SamzaPipelineResult implements PipelineResult {
       this.error = error;
     }
   }
+
+  /**
+   * Some of the Beam unit tests relying on the exception message to do assertion. This function
+   * will find the original UserCodeException so the message will be exposed directly.
+   */
+  private static Throwable getUserCodeException(Throwable throwable) {
+    Throwable t = throwable;
+    while (t != null) {
+      if (t instanceof UserCodeException) {
+        return t;
+      }
+
+      t = t.getCause();
+    }
+
+    return throwable;
+  }
 }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
index 09a0154..663f0d0 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
@@ -40,7 +40,6 @@ import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
 import org.apache.beam.runners.samza.SamzaExecutionContext;
 import org.apache.beam.runners.samza.SamzaPipelineOptions;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
@@ -56,7 +55,6 @@ import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterators;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.TimerRegistry;
-import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.task.TaskContext;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -141,12 +139,11 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
   public void open(
       Config config,
       TaskContext context,
-      TimerRegistry<TimerKey<Void>> timerRegistry,
+      TimerRegistry<KeyedTimerData<Void>> timerRegistry,
       OpEmitter<OutT> emitter) {
     this.inputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
     this.sideInputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
     this.pushbackWatermarkHold = BoundedWindow.TIMESTAMP_MAX_VALUE;
-    this.timerInternalsFactory = new SamzaTimerInternalsFactory(keyCoder, timerRegistry);
 
     final DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
     final SamzaPipelineOptions pipelineOptions =
@@ -156,7 +153,18 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
             .as(SamzaPipelineOptions.class);
 
     final SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory =
-        createStateInternalFactory(null, context, pipelineOptions, signature, mainOutputTag);
+        SamzaStoreStateInternals.createStateInternalFactory(
+            null, context, pipelineOptions, signature, mainOutputTag);
+
+    this.timerInternalsFactory =
+        SamzaTimerInternalsFactory.createTimerInternalFactory(
+            keyCoder,
+            (TimerRegistry) timerRegistry,
+            getTimerStateId(signature),
+            nonKeyedStateInternalsFactory,
+            windowingStrategy,
+            pipelineOptions);
+
     this.sideInputHandler =
         new SideInputHandler(sideInputs, nonKeyedStateInternalsFactory.stateInternalsForKey(null));
 
@@ -208,34 +216,12 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
     doFnInvoker.invokeSetup();
   }
 
-  static SamzaStoreStateInternals.Factory createStateInternalFactory(
-      Coder<?> keyCoder,
-      TaskContext context,
-      SamzaPipelineOptions pipelineOptions,
-      DoFnSignature signature,
-      TupleTag<?> mainOutputTag) {
-    final int batchGetSize = pipelineOptions.getStoreBatchGetSize();
-    final Map<String, KeyValueStore<byte[], byte[]>> stores =
-        new HashMap<>(SamzaStoreStateInternals.getBeamStore(context));
-
-    final Coder stateKeyCoder;
-    if (keyCoder != null) {
-      signature
-          .stateDeclarations()
-          .keySet()
-          .forEach(
-              stateId ->
-                  stores.put(stateId, (KeyValueStore<byte[], byte[]>) context.getStore(stateId)));
-      stateKeyCoder = keyCoder;
-    } else {
-      stateKeyCoder = VoidCoder.of();
+  private String getTimerStateId(DoFnSignature signature) {
+    final StringBuilder builder = new StringBuilder("timer");
+    if (signature.usesTimers()) {
+      signature.timerDeclarations().keySet().forEach(key -> builder.append(key));
     }
-    return new SamzaStoreStateInternals.Factory<>(
-        // TODO: ??? what to do with empty output?
-        mainOutputTag == null ? "null" : mainOutputTag.getId(),
-        stores,
-        stateKeyCoder,
-        batchGetSize);
+    return builder.toString();
   }
 
   @Override
@@ -322,6 +308,8 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
     pushbackFnRunner.startBundle();
     fireTimer(keyedTimerData);
     pushbackFnRunner.finishBundle();
+
+    this.timerInternalsFactory.removeProcessingTimer((KeyedTimerData) keyedTimerData);
   }
 
   @Override
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java
index 3f1c663..76f7a32 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java
@@ -56,6 +56,7 @@ import org.slf4j.LoggerFactory;
 public class GroupByKeyOp<K, InputT, OutputT>
     implements Op<KeyedWorkItem<K, InputT>, KV<K, OutputT>, K> {
   private static final Logger LOG = LoggerFactory.getLogger(GroupByKeyOp.class);
+  private static final String TIMER_STATE_ID = "timer";
 
   private final TupleTag<KV<K, OutputT>> mainOutputTag;
   private final KeyedWorkItemCoder<K, InputT> inputCoder;
@@ -97,7 +98,7 @@ public class GroupByKeyOp<K, InputT, OutputT>
   public void open(
       Config config,
       TaskContext context,
-      TimerRegistry<TimerKey<K>> timerRegistry,
+      TimerRegistry<KeyedTimerData<K>> timerRegistry,
       OpEmitter<KV<K, OutputT>> emitter) {
     this.pipelineOptions =
         Base64Serializer.deserializeUnchecked(
@@ -105,17 +106,29 @@ public class GroupByKeyOp<K, InputT, OutputT>
             .get()
             .as(SamzaPipelineOptions.class);
 
+    final SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory =
+        SamzaStoreStateInternals.createStateInternalFactory(
+            null, context, pipelineOptions, null, mainOutputTag);
+
     final DoFnRunners.OutputManager outputManager = outputManagerFactory.create(emitter);
 
     this.stateInternalsFactory =
         new SamzaStoreStateInternals.Factory<>(
             mainOutputTag.getId(),
-            SamzaStoreStateInternals.getBeamStore(context),
+            Collections.singletonMap(
+                SamzaStoreStateInternals.BEAM_STORE,
+                SamzaStoreStateInternals.getBeamStore(context)),
             keyCoder,
             pipelineOptions.getStoreBatchGetSize());
 
     this.timerInternalsFactory =
-        new SamzaTimerInternalsFactory<>(inputCoder.getKeyCoder(), timerRegistry);
+        SamzaTimerInternalsFactory.createTimerInternalFactory(
+            keyCoder,
+            timerRegistry,
+            TIMER_STATE_ID,
+            nonKeyedStateInternalsFactory,
+            windowingStrategy,
+            pipelineOptions);
 
     final DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFn =
         GroupAlsoByWindowViaWindowSetNewDoFn.create(
@@ -192,6 +205,8 @@ public class GroupByKeyOp<K, InputT, OutputT>
     fnRunner.startBundle();
     fireTimer(keyedTimerData.getKey(), keyedTimerData.getTimerData());
     fnRunner.finishBundle();
+
+    timerInternalsFactory.removeProcessingTimer(keyedTimerData);
   }
 
   private void fireTimer(K key, TimerData timer) {
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java
index 481996c..2f3b809 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java
@@ -17,8 +17,24 @@
  */
 package org.apache.beam.runners.samza.runtime;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaces;
 import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.joda.time.Instant;
 
 /**
  * {@link TimerInternals.TimerData} with key, used by {@link SamzaTimerInternalsFactory}. Implements
@@ -29,7 +45,7 @@ public class KeyedTimerData<K> implements Comparable<KeyedTimerData<K>> {
   private final K key;
   private final TimerInternals.TimerData timerData;
 
-  public KeyedTimerData(byte[] keyBytes, K key, TimerInternals.TimerData timerData) {
+  public KeyedTimerData(byte[] keyBytes, K key, TimerData timerData) {
     this.keyBytes = keyBytes;
     this.key = key;
     this.timerData = timerData;
@@ -102,4 +118,73 @@ public class KeyedTimerData<K> implements Comparable<KeyedTimerData<K>> {
     result = 31 * result + timerData.hashCode();
     return result;
   }
+
+  /**
+   * Coder for {@link KeyedTimerData}. Note we don't use the {@link
+   * org.apache.beam.runners.core.TimerInternals.TimerDataCoder} here directly since we want to
+   * en/decode timestamp first so the timers will be sorted in the state.
+   */
+  public static class KeyedTimerDataCoder<K> extends StructuredCoder<KeyedTimerData<K>> {
+    private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();
+    private static final InstantCoder INSTANT_CODER = InstantCoder.of();
+
+    private final Coder<K> keyCoder;
+    private final Coder<? extends BoundedWindow> windowCoder;
+
+    KeyedTimerDataCoder(Coder<K> keyCoder, Coder<? extends BoundedWindow> windowCoder) {
+      this.keyCoder = keyCoder;
+      this.windowCoder = windowCoder;
+    }
+
+    @Override
+    public void encode(KeyedTimerData<K> value, OutputStream outStream)
+        throws CoderException, IOException {
+
+      final TimerData timer = value.getTimerData();
+      // encode the timestamp first
+      INSTANT_CODER.encode(timer.getTimestamp(), outStream);
+      STRING_CODER.encode(timer.getTimerId(), outStream);
+      STRING_CODER.encode(timer.getNamespace().stringKey(), outStream);
+      STRING_CODER.encode(timer.getDomain().name(), outStream);
+
+      if (keyCoder != null) {
+        keyCoder.encode(value.key, outStream);
+      }
+    }
+
+    @Override
+    public KeyedTimerData<K> decode(InputStream inStream) throws CoderException, IOException {
+      // decode the timestamp first
+      final Instant timestamp = INSTANT_CODER.decode(inStream);
+      final String timerId = STRING_CODER.decode(inStream);
+      final StateNamespace namespace =
+          StateNamespaces.fromString(STRING_CODER.decode(inStream), windowCoder);
+      final TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream));
+      final TimerData timer = TimerData.of(timerId, namespace, timestamp, domain);
+
+      byte[] keyBytes = null;
+      K key = null;
+      if (keyCoder != null) {
+        key = keyCoder.decode(inStream);
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try {
+          keyCoder.encode(key, baos);
+        } catch (IOException e) {
+          throw new RuntimeException("Could not encode key: " + key, e);
+        }
+        keyBytes = baos.toByteArray();
+      }
+
+      return new KeyedTimerData(keyBytes, key, timer);
+    }
+
+    @Override
+    public List<? extends Coder<?>> getCoderArguments() {
+      return Arrays.asList(keyCoder, windowCoder);
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {}
+  }
 }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/Op.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/Op.java
index c9cceab..793ce6a 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/Op.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/Op.java
@@ -42,7 +42,7 @@ public interface Op<InT, OutT, K> extends Serializable {
   default void open(
       Config config,
       TaskContext taskContext,
-      TimerRegistry<TimerKey<K>> timerRegistry,
+      TimerRegistry<KeyedTimerData<K>> timerRegistry,
       OpEmitter<OutT> emitter) {}
 
   void processElement(WindowedValue<InT> inputElement, OpEmitter<OutT> emitter);
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java
index 21a3fb9..49601ff 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java
@@ -21,8 +21,6 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.samza.config.Config;
@@ -39,7 +37,7 @@ import org.slf4j.LoggerFactory;
 public class OpAdapter<InT, OutT, K>
     implements FlatMapFunction<OpMessage<InT>, OpMessage<OutT>>,
         WatermarkFunction<OpMessage<OutT>>,
-        TimerFunction<TimerKey<K>, OpMessage<OutT>>,
+        TimerFunction<KeyedTimerData<K>, OpMessage<OutT>>,
         Serializable {
   private static final Logger LOG = LoggerFactory.getLogger(OpAdapter.class);
 
@@ -68,7 +66,7 @@ public class OpAdapter<InT, OutT, K>
   }
 
   @Override
-  public final void registerTimer(TimerRegistry<TimerKey<K>> timerRegistry) {
+  public final void registerTimer(TimerRegistry<KeyedTimerData<K>> timerRegistry) {
     assert taskContext != null;
 
     op.open(config, taskContext, timerRegistry, emitter);
@@ -126,19 +124,10 @@ public class OpAdapter<InT, OutT, K>
   }
 
   @Override
-  public Collection<OpMessage<OutT>> onTimer(TimerKey<K> timerKey, long time) {
+  public Collection<OpMessage<OutT>> onTimer(KeyedTimerData<K> keyedTimerData, long time) {
     assert outputList.isEmpty();
 
     try {
-      final TimerData timerData =
-          TimerData.of(
-              timerKey.getTimerId(),
-              timerKey.getStateNamespace(),
-              new Instant(time),
-              TimeDomain.PROCESSING_TIME);
-      final KeyedTimerData<K> keyedTimerData =
-          new KeyedTimerData<>(timerKey.getKeyBytes(), timerKey.getKey(), timerData);
-
       op.processTimer(keyedTimerData);
     } catch (Exception e) {
       LOG.error("Op {} threw an exception during processing timer", this.getClass().getName(), e);
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
index e50fe30..0dd93ee 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
@@ -76,7 +76,7 @@ public class SamzaDoFnRunners {
     final StateInternals stateInternals;
     final DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
     final SamzaStoreStateInternals.Factory<?> stateInternalsFactory =
-        DoFnOp.createStateInternalFactory(
+        SamzaStoreStateInternals.createStateInternalFactory(
             keyCoder, taskContext, pipelineOptions, signature, mainOutputTag);
 
     final SamzaExecutionContext executionContext =
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
index 0c545fe..9597ae9 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.runners.samza.runtime;
 
-import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
-
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
@@ -28,6 +26,7 @@ import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -37,12 +36,14 @@ import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.samza.SamzaPipelineOptions;
 import org.apache.beam.runners.samza.state.SamzaMapState;
 import org.apache.beam.runners.samza.state.SamzaSetState;
 import org.apache.beam.runners.samza.transforms.UpdatingCombineFn;
 import org.apache.beam.sdk.coders.BooleanCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.state.BagState;
 import org.apache.beam.sdk.state.CombiningState;
 import org.apache.beam.sdk.state.MapState;
@@ -57,7 +58,9 @@ import org.apache.beam.sdk.state.WatermarkHoldState;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.primitives.Ints;
@@ -69,7 +72,7 @@ import org.joda.time.Instant;
 
 /** {@link StateInternals} that uses Samza local {@link KeyValueStore} to manage state. */
 public class SamzaStoreStateInternals<K> implements StateInternals {
-  private static final String BEAM_STORE = "beamStore";
+  static final String BEAM_STORE = "beamStore";
 
   private static ThreadLocal<SoftReference<ByteArrayOutputStream>> threadLocalBaos =
       new ThreadLocal<>();
@@ -91,11 +94,39 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
     this.batchGetSize = batchGetSize;
   }
 
-  public static Map<String, KeyValueStore<byte[], byte[]>> getBeamStore(TaskContext context) {
-    @SuppressWarnings("unchecked")
-    final KeyValueStore<byte[], byte[]> beamStore =
-        (KeyValueStore<byte[], byte[]>) context.getStore(SamzaStoreStateInternals.BEAM_STORE);
-    return Collections.singletonMap(BEAM_STORE, beamStore);
+  @SuppressWarnings("unchecked")
+  static KeyValueStore<byte[], byte[]> getBeamStore(TaskContext context) {
+    return (KeyValueStore<byte[], byte[]>) context.getStore(SamzaStoreStateInternals.BEAM_STORE);
+  }
+
+  static Factory createStateInternalFactory(
+      Coder<?> keyCoder,
+      TaskContext context,
+      SamzaPipelineOptions pipelineOptions,
+      DoFnSignature signature,
+      TupleTag<?> mainOutputTag) {
+    final int batchGetSize = pipelineOptions.getStoreBatchGetSize();
+    final Map<String, KeyValueStore<byte[], byte[]>> stores = new HashMap<>();
+    stores.put(BEAM_STORE, getBeamStore(context));
+
+    final Coder stateKeyCoder;
+    if (keyCoder != null) {
+      signature
+          .stateDeclarations()
+          .keySet()
+          .forEach(
+              stateId ->
+                  stores.put(stateId, (KeyValueStore<byte[], byte[]>) context.getStore(stateId)));
+      stateKeyCoder = keyCoder;
+    } else {
+      stateKeyCoder = VoidCoder.of();
+    }
+    return new Factory<>(
+        // TODO: ??? what to do with empty output?
+        mainOutputTag == null ? "null" : mainOutputTag.getId(),
+        stores,
+        stateKeyCoder,
+        batchGetSize);
   }
 
   @Override
@@ -536,13 +567,13 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
   private class SamzaMapStateImpl<KeyT, ValueT> extends AbstractSamzaState<ValueT>
       implements SamzaMapState<KeyT, ValueT>, KeyValueIteratorState {
 
-    private static final int MAX_KEY_SIZE = 100000; // 100K bytes
     private final Coder<KeyT> keyCoder;
-    private final byte[] maxKey;
     private final int storeKeySize;
     private final List<KeyValueIterator<byte[], byte[]>> openIterators =
         Collections.synchronizedList(new ArrayList<>());
 
+    private int maxKeySize;
+
     protected SamzaMapStateImpl(
         StateNamespace namespace,
         StateTag<? extends State> address,
@@ -551,27 +582,15 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
       super(namespace, address, valueCoder);
 
       this.keyCoder = keyCoder;
-      this.maxKey = new byte[MAX_KEY_SIZE];
       this.storeKeySize = getEncodedStoreKey().length;
-
-      final byte[] encodedKey = getEncodedStoreKey();
-      checkState(
-          encodedKey.length < MAX_KEY_SIZE,
-          "Encoded key size %s is longer than the max key size (100 KB) supported",
-          encodedKey.length);
-
-      Arrays.fill(maxKey, (byte) 0xff);
-      System.arraycopy(encodedKey, 0, maxKey, 0, encodedKey.length);
+      // initial max key size is around 100k, so we can restore timer keys
+      this.maxKeySize = this.storeKeySize + 100_000;
     }
 
     @Override
     public void put(KeyT key, ValueT value) {
       final byte[] encodedKey = encodeKey(key);
-      checkState(
-          encodedKey.length < MAX_KEY_SIZE,
-          "Encoded key size %s is longer than the max key size (100 KB) supported",
-          encodedKey.length);
-
+      maxKeySize = Math.max(maxKeySize, encodedKey.length);
       store.put(encodedKey, encodeValue(value));
     }
 
@@ -648,6 +667,7 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
 
     @Override
     public ReadableState<Iterator<Map.Entry<KeyT, ValueT>>> readIterator() {
+      final byte[] maxKey = createMaxKey();
       final KeyValueIterator<byte[], byte[]> kvIter = store.range(getEncodedStoreKey(), maxKey);
       openIterators.add(kvIter);
 
@@ -688,6 +708,7 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
      */
     private <OutputT> Iterable<OutputT> createIterable(
         SerializableFunction<org.apache.samza.storage.kv.Entry<byte[], byte[]>, OutputT> fn) {
+      final byte[] maxKey = createMaxKey();
       final KeyValueIterator<byte[], byte[]> kvIter = store.range(getEncodedStoreKey(), maxKey);
       final List<Entry<byte[], byte[]>> iterable = ImmutableList.copyOf(kvIter);
       kvIter.close();
@@ -714,6 +735,7 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
 
     @Override
     public void clear() {
+      final byte[] maxKey = createMaxKey();
       final KeyValueIterator<byte[], byte[]> kvIter = store.range(getEncodedStoreKey(), maxKey);
       while (kvIter.hasNext()) {
         store.delete(kvIter.next().getKey());
@@ -741,6 +763,15 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
       }
     }
 
+    private byte[] createMaxKey() {
+      byte[] maxKey = new byte[maxKeySize];
+      Arrays.fill(maxKey, (byte) 0xff);
+
+      final byte[] encodedKey = getEncodedStoreKey();
+      System.arraycopy(encodedKey, 0, maxKey, 0, encodedKey.length);
+      return maxKey;
+    }
+
     @Override
     public void closeIterators() {
       openIterators.forEach(KeyValueIterator::close);
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
index 41ae79a..5e16acd 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
@@ -21,18 +21,22 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.Iterator;
 import java.util.NavigableSet;
 import java.util.TreeSet;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.StateTags;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternalsFactory;
+import org.apache.beam.runners.samza.SamzaPipelineOptions;
 import org.apache.beam.runners.samza.SamzaRunner;
+import org.apache.beam.runners.samza.state.SamzaSetState;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.samza.operators.TimerRegistry;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -46,19 +50,46 @@ import org.slf4j.LoggerFactory;
 public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
   private static final Logger LOG = LoggerFactory.getLogger(SamzaTimerInternalsFactory.class);
 
-  private final Map<TimerKey, KeyedTimerData<K>> timerMap = new HashMap<>();
-  private final NavigableSet<KeyedTimerData<K>> eventTimeTimers = new TreeSet<>();
-
+  private final NavigableSet<KeyedTimerData<K>> eventTimeTimers;
   private final Coder<K> keyCoder;
-  private final TimerRegistry<TimerKey<K>> timerRegistry;
+  private final TimerRegistry<KeyedTimerData<K>> timerRegistry;
+  private final int timerBufferSize;
+  private final SamzaTimerState state;
 
-  // TODO: use BoundedWindow.TIMESTAMP_MIN_VALUE when KafkaIO emits watermarks in bounds.
-  private Instant inputWatermark = new Instant(Long.MIN_VALUE);
+  private Instant inputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
   private Instant outputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
 
-  public SamzaTimerInternalsFactory(Coder<K> keyCoder, TimerRegistry<TimerKey<K>> timerRegistry) {
+  private SamzaTimerInternalsFactory(
+      Coder<K> keyCoder,
+      TimerRegistry<KeyedTimerData<K>> timerRegistry,
+      int timerBufferSize,
+      String timerStateId,
+      SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory,
+      Coder<BoundedWindow> windowCoder) {
     this.keyCoder = keyCoder;
     this.timerRegistry = timerRegistry;
+    this.timerBufferSize = timerBufferSize;
+    this.eventTimeTimers = new TreeSet<>();
+    this.state = new SamzaTimerState(timerStateId, nonKeyedStateInternalsFactory, windowCoder);
+  }
+
+  static <K> SamzaTimerInternalsFactory<K> createTimerInternalFactory(
+      Coder<K> keyCoder,
+      TimerRegistry<KeyedTimerData<K>> timerRegistry,
+      String timerStateId,
+      SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory,
+      WindowingStrategy<?, BoundedWindow> windowingStrategy,
+      SamzaPipelineOptions pipelineOptions) {
+
+    final Coder<BoundedWindow> windowCoder = windowingStrategy.getWindowFn().windowCoder();
+
+    return new SamzaTimerInternalsFactory<>(
+        keyCoder,
+        timerRegistry,
+        pipelineOptions.getTimerBufferSize(),
+        timerStateId,
+        nonKeyedStateInternalsFactory,
+        windowCoder);
   }
 
   @Override
@@ -74,9 +105,9 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
     } else {
       final ByteArrayOutputStream baos = new ByteArrayOutputStream();
       try {
-        keyCoder.encode(key, baos, Coder.Context.OUTER);
+        keyCoder.encode(key, baos);
       } catch (IOException e) {
-        throw new RuntimeException("Could not encode key: " + key);
+        throw new RuntimeException("Could not encode key: " + key, e);
       }
       keyBytes = baos.toByteArray();
     }
@@ -111,23 +142,22 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
     while (!eventTimeTimers.isEmpty()
         && eventTimeTimers.first().getTimerData().getTimestamp().isBefore(inputWatermark)) {
       final KeyedTimerData<K> keyedTimerData = eventTimeTimers.pollFirst();
-      final TimerInternals.TimerData timerData = keyedTimerData.getTimerData();
-
-      final TimerKey<K> timerKey =
-          new TimerKey<>(
-              keyedTimerData.getKey(),
-              keyedTimerData.getKeyBytes(),
-              timerData.getNamespace(),
-              timerData.getTimerId());
-
-      timerMap.remove(timerKey);
-
       readyTimers.add(keyedTimerData);
+      state.deletePersisted(keyedTimerData);
+
+      // if all the buffered timers are processed, load the next batch from state
+      if (eventTimeTimers.isEmpty()) {
+        state.loadEventTimeTimers();
+      }
     }
 
     return readyTimers;
   }
 
+  public void removeProcessingTimer(KeyedTimerData<K> keyedTimerData) {
+    state.deletePersisted(keyedTimerData);
+  }
+
   public Instant getInputWatermark() {
     return inputWatermark;
   }
@@ -154,31 +184,20 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
     @Override
     public void setTimer(TimerData timerData) {
       final KeyedTimerData<K> keyedTimerData = new KeyedTimerData<>(keyBytes, key, timerData);
-      final TimerKey<K> timerKey =
-          new TimerKey<>(key, keyBytes, timerData.getNamespace(), timerData.getTimerId());
+
+      // persist it first
+      state.persist(keyedTimerData);
 
       switch (timerData.getDomain()) {
         case EVENT_TIME:
-          final KeyedTimerData<K> oldTimer = timerMap.get(timerKey);
-
-          if (oldTimer != null) {
-            if (!oldTimer.getTimerData().getDomain().equals(timerData.getDomain())) {
-              throw new IllegalArgumentException(
-                  String.format(
-                      "Attempt to set %s for time domain %s, "
-                          + "but it is already set for time domain %s",
-                      timerData.getTimerId(),
-                      timerData.getDomain(),
-                      oldTimer.getTimerData().getDomain()));
-            }
-            deleteTimer(oldTimer.getTimerData());
-          }
           eventTimeTimers.add(keyedTimerData);
-          timerMap.put(timerKey, keyedTimerData);
+          while (eventTimeTimers.size() > timerBufferSize) {
+            eventTimeTimers.pollLast();
+          }
           break;
 
         case PROCESSING_TIME:
-          timerRegistry.register(timerKey, timerData.getTimestamp().getMillis());
+          timerRegistry.register(keyedTimerData, timerData.getTimestamp().getMillis());
           break;
 
         default:
@@ -200,19 +219,17 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
 
     @Override
     public void deleteTimer(TimerData timerData) {
-      final TimerKey<K> timerKey =
-          new TimerKey<>(key, keyBytes, timerData.getNamespace(), timerData.getTimerId());
+      final KeyedTimerData<K> keyedTimerData = new KeyedTimerData<>(keyBytes, key, timerData);
+
+      state.deletePersisted(keyedTimerData);
 
       switch (timerData.getDomain()) {
         case EVENT_TIME:
-          final KeyedTimerData<K> keyedTimerData = timerMap.remove(timerKey);
-          if (keyedTimerData != null) {
-            eventTimeTimers.remove(keyedTimerData);
-          }
+          eventTimeTimers.remove(keyedTimerData);
           break;
 
         case PROCESSING_TIME:
-          timerRegistry.delete(timerKey);
+          timerRegistry.delete(keyedTimerData);
           break;
 
         default:
@@ -243,4 +260,102 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
       return outputWatermark;
     }
   }
+
+  private class SamzaTimerState {
+    private final SamzaSetState<KeyedTimerData<K>> eventTimerTimerState;
+    private final SamzaSetState<KeyedTimerData<K>> processingTimerTimerState;
+
+    SamzaTimerState(
+        String timerStateId,
+        SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory,
+        Coder<BoundedWindow> windowCoder) {
+
+      this.eventTimerTimerState =
+          (SamzaSetState<KeyedTimerData<K>>)
+              nonKeyedStateInternalsFactory
+                  .stateInternalsForKey(null)
+                  .state(
+                      StateNamespaces.global(),
+                      StateTags.set(
+                          timerStateId + "-et",
+                          new KeyedTimerData.KeyedTimerDataCoder<>(keyCoder, windowCoder)));
+
+      this.processingTimerTimerState =
+          (SamzaSetState<KeyedTimerData<K>>)
+              nonKeyedStateInternalsFactory
+                  .stateInternalsForKey(null)
+                  .state(
+                      StateNamespaces.global(),
+                      StateTags.set(
+                          timerStateId + "-pt",
+                          new KeyedTimerData.KeyedTimerDataCoder<>(keyCoder, windowCoder)));
+
+      restore();
+    }
+
+    void persist(KeyedTimerData<K> keyedTimerData) {
+      switch (keyedTimerData.getTimerData().getDomain()) {
+        case EVENT_TIME:
+          if (!eventTimeTimers.contains(keyedTimerData)) {
+            eventTimerTimerState.add(keyedTimerData);
+          }
+          break;
+
+        case PROCESSING_TIME:
+          processingTimerTimerState.add(keyedTimerData);
+          break;
+
+        default:
+          throw new UnsupportedOperationException(
+              String.format("%s currently only supports event time", SamzaRunner.class));
+      }
+    }
+
+    void deletePersisted(KeyedTimerData<K> keyedTimerData) {
+      switch (keyedTimerData.getTimerData().getDomain()) {
+        case EVENT_TIME:
+          eventTimerTimerState.remove(keyedTimerData);
+          break;
+
+        case PROCESSING_TIME:
+          processingTimerTimerState.remove(keyedTimerData);
+          break;
+
+        default:
+          throw new UnsupportedOperationException(
+              String.format("%s currently only supports event time", SamzaRunner.class));
+      }
+    }
+
+    private void loadEventTimeTimers() {
+      if (!eventTimerTimerState.isEmpty().read()) {
+        final Iterator<KeyedTimerData<K>> iter = eventTimerTimerState.readIterator().read();
+        for (int i = 0; i < timerBufferSize && iter.hasNext(); i++) {
+          eventTimeTimers.add(iter.next());
+        }
+
+        // manually close the iterator here
+        final SamzaStoreStateInternals.KeyValueIteratorState iteratorState =
+            (SamzaStoreStateInternals.KeyValueIteratorState) eventTimerTimerState;
+        iteratorState.closeIterators();
+      }
+    }
+
+    private void loadProcessingTimeTimers() {
+      if (!processingTimerTimerState.isEmpty().read()) {
+        final Iterator<KeyedTimerData<K>> iter = processingTimerTimerState.readIterator().read();
+        // since the iterator will reach to the end, it will be closed automatically
+        while (iter.hasNext()) {
+          final KeyedTimerData<K> keyedTimerData = iter.next();
+          timerRegistry.register(
+              keyedTimerData, keyedTimerData.getTimerData().getTimestamp().getMillis());
+        }
+      }
+    }
+
+    private void restore() {
+      loadEventTimeTimers();
+      loadProcessingTimeTimers();
+    }
+  }
 }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/TimerKey.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/TimerKey.java
deleted file mode 100644
index 507eb62..0000000
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/TimerKey.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.samza.runtime;
-
-import org.apache.beam.runners.core.StateNamespace;
-
-/** Timer key which is used to register and delete timers. */
-class TimerKey<K> {
-  private final K key;
-  private final byte[] keyBytes;
-  private final StateNamespace stateNamespace;
-  private final String timerId;
-
-  TimerKey(K key, byte[] keyBytes, StateNamespace stateNamespace, String timerId) {
-    this.key = key;
-    this.keyBytes = keyBytes;
-    this.stateNamespace = stateNamespace;
-    this.timerId = timerId;
-  }
-
-  public K getKey() {
-    return key;
-  }
-
-  public byte[] getKeyBytes() {
-    return keyBytes;
-  }
-
-  public StateNamespace getStateNamespace() {
-    return stateNamespace;
-  }
-
-  public String getTimerId() {
-    return timerId;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-
-    TimerKey<?> timerKey = (TimerKey<?>) o;
-
-    if (key != null ? !key.equals(timerKey.key) : timerKey.key != null) {
-      return false;
-    }
-    if (!stateNamespace.equals(timerKey.stateNamespace)) {
-      return false;
-    }
-
-    return timerId.equals(timerKey.timerId);
-  }
-
-  @Override
-  public int hashCode() {
-    int result = key != null ? key.hashCode() : 0;
-    result = 31 * result + stateNamespace.hashCode();
-    result = 31 * result + timerId.hashCode();
-    return result;
-  }
-
-  @Override
-  public String toString() {
-    return "TimerKey{"
-        + "key="
-        + key
-        + ", stateNamespace="
-        + stateNamespace
-        + ", timerId='"
-        + timerId
-        + '\''
-        + '}';
-  }
-}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
index be38fec..25eddb7 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
@@ -92,10 +92,6 @@ class ParDoBoundMultiTranslator<InT, OutT>
     final Coder<?> keyCoder =
         signature.usesState() ? ((KvCoder<?, ?>) input.getCoder()).getKeyCoder() : null;
 
-    if (signature.usesTimers()) {
-      throw new UnsupportedOperationException("DoFn with timers is not currently supported");
-    }
-
     if (signature.processElement().isSplittable()) {
       throw new UnsupportedOperationException("Splittable DoFn is not currently supported");
     }
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java
new file mode 100644
index 0000000..e521774
--- /dev/null
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import java.io.ByteArrayOutputStream;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.joda.time.Instant;
+import org.junit.Test;
+
+/** Tests for {@link KeyedTimerData}. */
+public class KeyedTimerDataTest {
+  private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+
+  @Test
+  public void testCoder() throws Exception {
+    final TimerInternals.TimerData td =
+        TimerInternals.TimerData.of(
+            "timer", StateNamespaces.global(), new Instant(), TimeDomain.EVENT_TIME);
+
+    final String key = "timer-key";
+    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    STRING_CODER.encode(key, baos);
+    final byte[] keyBytes = baos.toByteArray();
+    final KeyedTimerData<String> ktd = new KeyedTimerData<>(keyBytes, key, td);
+
+    final KeyedTimerData.KeyedTimerDataCoder<String> ktdCoder =
+        new KeyedTimerData.KeyedTimerDataCoder<>(STRING_CODER, GlobalWindow.Coder.INSTANCE);
+
+    CoderProperties.coderDecodeEncodeEqual(ktdCoder, ktd);
+  }
+}
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
new file mode 100644
index 0000000..6e8b9be
--- /dev/null
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.samza.SamzaPipelineOptions;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.operators.TimerRegistry;
+import org.apache.samza.storage.kv.KeyValueStoreMetrics;
+import org.apache.samza.storage.kv.RocksDbKeyValueStore;
+import org.apache.samza.task.TaskContext;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.Options;
+import org.rocksdb.WriteOptions;
+
+/**
+ * Tests for {@link SamzaTimerInternalsFactory}. Covers both event-time timers and processing-timer
+ * timers.
+ */
+public class SamzaTimerInternalsFactoryTest {
+  private static RocksDbKeyValueStore createStore(String name) {
+    final Options options = new Options();
+    options.setCreateIfMissing(true);
+
+    return new RocksDbKeyValueStore(
+        new File(System.getProperty("java.io.tmpdir") + "/" + name),
+        options,
+        new MapConfig(),
+        false,
+        "beamStore",
+        new WriteOptions(),
+        new FlushOptions(),
+        new KeyValueStoreMetrics("beamStore", new MetricsRegistryMap()));
+  }
+
+  private static SamzaStoreStateInternals.Factory<?> createNonKeyedStateInternalsFactory(
+      SamzaPipelineOptions pipelineOptions, RocksDbKeyValueStore store) {
+    final TaskContext context = mock(TaskContext.class);
+    when(context.getStore(anyString())).thenReturn(store);
+    final TupleTag<?> mainOutputTag = new TupleTag<>("output");
+
+    return SamzaStoreStateInternals.createStateInternalFactory(
+        null, context, pipelineOptions, null, mainOutputTag);
+  }
+
+  private static SamzaTimerInternalsFactory<String> createTimerInternalsFactory(
+      TimerRegistry<KeyedTimerData<String>> timerRegistry,
+      String timerStateId,
+      SamzaPipelineOptions pipelineOptions,
+      RocksDbKeyValueStore store) {
+
+    final SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory =
+        createNonKeyedStateInternalsFactory(pipelineOptions, store);
+
+    return SamzaTimerInternalsFactory.createTimerInternalFactory(
+        StringUtf8Coder.of(),
+        timerRegistry,
+        timerStateId,
+        nonKeyedStateInternalsFactory,
+        (WindowingStrategy) WindowingStrategy.globalDefault(),
+        pipelineOptions);
+  }
+
+  private static class TestTimerRegistry implements TimerRegistry<KeyedTimerData<String>> {
+    private final List<KeyedTimerData<String>> timers = new ArrayList<>();
+
+    @Override
+    public void register(KeyedTimerData<String> key, long timestamp) {
+      timers.add(key);
+    }
+
+    @Override
+    public void delete(KeyedTimerData<String> key) {
+      timers.remove(key);
+    }
+  }
+
+  @Test
+  public void testEventTimeTimers() {
+    final SamzaPipelineOptions pipelineOptions =
+        PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
+    pipelineOptions.setTimerBufferSize(1);
+
+    final RocksDbKeyValueStore store = createStore("store1");
+    final SamzaTimerInternalsFactory<String> timerInternalsFactory =
+        createTimerInternalsFactory(null, "timer", pipelineOptions, store);
+
+    final StateNamespace nameSpace = StateNamespaces.global();
+    final TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey("testKey");
+    final TimerInternals.TimerData timer1 =
+        TimerInternals.TimerData.of("timer1", nameSpace, new Instant(10), TimeDomain.EVENT_TIME);
+    timerInternals.setTimer(timer1);
+
+    final TimerInternals.TimerData timer2 =
+        TimerInternals.TimerData.of("timer2", nameSpace, new Instant(100), TimeDomain.EVENT_TIME);
+    timerInternals.setTimer(timer2);
+
+    timerInternalsFactory.setInputWatermark(new Instant(5));
+    Collection<KeyedTimerData<String>> readyTimers = timerInternalsFactory.removeReadyTimers();
+    assertTrue(readyTimers.isEmpty());
+
+    timerInternalsFactory.setInputWatermark(new Instant(20));
+    readyTimers = timerInternalsFactory.removeReadyTimers();
+    assertEquals(1, readyTimers.size());
+    assertEquals(timer1, readyTimers.iterator().next().getTimerData());
+
+    timerInternalsFactory.setInputWatermark(new Instant(150));
+    readyTimers = timerInternalsFactory.removeReadyTimers();
+    assertEquals(1, readyTimers.size());
+    assertEquals(timer2, readyTimers.iterator().next().getTimerData());
+
+    store.close();
+  }
+
+  @Test
+  public void testRestore() {
+    final SamzaPipelineOptions pipelineOptions =
+        PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
+    pipelineOptions.setTimerBufferSize(1);
+
+    RocksDbKeyValueStore store = createStore("store2");
+    final SamzaTimerInternalsFactory<String> timerInternalsFactory =
+        createTimerInternalsFactory(null, "timer", pipelineOptions, store);
+
+    final StateNamespace nameSpace = StateNamespaces.global();
+    final TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey("testKey");
+    final TimerInternals.TimerData timer1 =
+        TimerInternals.TimerData.of("timer1", nameSpace, new Instant(10), TimeDomain.EVENT_TIME);
+    timerInternals.setTimer(timer1);
+
+    final TimerInternals.TimerData timer2 =
+        TimerInternals.TimerData.of("timer2", nameSpace, new Instant(100), TimeDomain.EVENT_TIME);
+    timerInternals.setTimer(timer2);
+
+    store.close();
+
+    // restore by creating a new instance
+    store = createStore("store2");
+    final SamzaTimerInternalsFactory<String> restoredFactory =
+        createTimerInternalsFactory(null, "timer", pipelineOptions, store);
+
+    restoredFactory.setInputWatermark(new Instant(150));
+    Collection<KeyedTimerData<String>> readyTimers = restoredFactory.removeReadyTimers();
+    assertEquals(2, readyTimers.size());
+
+    store.close();
+  }
+
+  @Test
+  public void testProcessingTimeTimers() throws IOException {
+    final SamzaPipelineOptions pipelineOptions =
+        PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
+
+    RocksDbKeyValueStore store = createStore("store3");
+    TestTimerRegistry timerRegistry = new TestTimerRegistry();
+
+    final SamzaTimerInternalsFactory<String> timerInternalsFactory =
+        createTimerInternalsFactory(timerRegistry, "timer", pipelineOptions, store);
+
+    final StateNamespace nameSpace = StateNamespaces.global();
+    final TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey("testKey");
+    final TimerInternals.TimerData timer1 =
+        TimerInternals.TimerData.of(
+            "timer1", nameSpace, new Instant(10), TimeDomain.PROCESSING_TIME);
+    timerInternals.setTimer(timer1);
+
+    final TimerInternals.TimerData timer2 =
+        TimerInternals.TimerData.of(
+            "timer2", nameSpace, new Instant(100), TimeDomain.PROCESSING_TIME);
+    timerInternals.setTimer(timer2);
+
+    assertEquals(2, timerRegistry.timers.size());
+
+    store.close();
+
+    // restore by creating a new instance
+    store = createStore("store3");
+    TestTimerRegistry restoredRegistry = new TestTimerRegistry();
+    final SamzaTimerInternalsFactory<String> restoredFactory =
+        createTimerInternalsFactory(restoredRegistry, "timer", pipelineOptions, store);
+
+    assertEquals(2, restoredRegistry.timers.size());
+
+    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    StringUtf8Coder.of().encode("testKey", baos);
+    final byte[] keyBytes = baos.toByteArray();
+    restoredFactory.removeProcessingTimer(new KeyedTimerData(keyBytes, "testKey", timer1));
+    restoredFactory.removeProcessingTimer(new KeyedTimerData(keyBytes, "testKey", timer2));
+
+    store.close();
+  }
+}
diff --git a/website/src/_data/capability-matrix.yml b/website/src/_data/capability-matrix.yml
index 1b85303..007de40 100644
--- a/website/src/_data/capability-matrix.yml
+++ b/website/src/_data/capability-matrix.yml
@@ -1210,9 +1210,9 @@ categories:
             l2: non-merging windows
             l3: ''
           - class: samza
-            l1: 'No'
-            l2: ''
-            l3: ''
+            l1: 'Partially'
+            l2: non-merging windows
+            l3: The Samza Runner supports timers in non-merging windows.
           - class: nemo
             l1: 'No'
             l2: not implemented