You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by xi...@apache.org on 2019/03/20 20:58:20 UTC
[beam] branch master updated: [BEAM-6866]: SamzaRunner: support
timers in ParDo (#8092)
This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 041329b [BEAM-6866]: SamzaRunner: support timers in ParDo (#8092)
041329b is described below
commit 041329bbe2454943d6b021ffd4b3795539c2bfef
Author: xinyuiscool <xi...@gmail.com>
AuthorDate: Wed Mar 20 13:58:11 2019 -0700
[BEAM-6866]: SamzaRunner: support timers in ParDo (#8092)
---
runners/samza/build.gradle | 1 -
.../beam/runners/samza/SamzaPipelineOptions.java | 6 +
.../beam/runners/samza/SamzaPipelineResult.java | 21 +-
.../apache/beam/runners/samza/runtime/DoFnOp.java | 52 ++---
.../beam/runners/samza/runtime/GroupByKeyOp.java | 21 +-
.../beam/runners/samza/runtime/KeyedTimerData.java | 87 +++++++-
.../org/apache/beam/runners/samza/runtime/Op.java | 2 +-
.../beam/runners/samza/runtime/OpAdapter.java | 17 +-
.../runners/samza/runtime/SamzaDoFnRunners.java | 2 +-
.../samza/runtime/SamzaStoreStateInternals.java | 81 +++++---
.../samza/runtime/SamzaTimerInternalsFactory.java | 209 ++++++++++++++-----
.../beam/runners/samza/runtime/TimerKey.java | 93 ---------
.../translation/ParDoBoundMultiTranslator.java | 4 -
.../runners/samza/runtime/KeyedTimerDataTest.java | 52 +++++
.../runtime/SamzaTimerInternalsFactoryTest.java | 229 +++++++++++++++++++++
website/src/_data/capability-matrix.yml | 6 +-
16 files changed, 657 insertions(+), 226 deletions(-)
diff --git a/runners/samza/build.gradle b/runners/samza/build.gradle
index a1454e1..faa4d51 100644
--- a/runners/samza/build.gradle
+++ b/runners/samza/build.gradle
@@ -84,7 +84,6 @@ task validatesRunner(type: Test) {
excludeCategories 'org.apache.beam.sdk.testing.UsesImpulse'
excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
- excludeCategories 'org.apache.beam.sdk.testing.UsesTimersInParDo'
excludeCategories 'org.apache.beam.sdk.testing.UsesMetricsPusher'
excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle'
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java
index e5955a6..cee55c4 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java
@@ -88,4 +88,10 @@ public interface SamzaPipelineOptions extends PipelineOptions {
Boolean getStateDurable();
void setStateDurable(Boolean stateDurable);
+
+ @Description("The maximum number of event-time timers buffered in memory for a transform.")
+ @Default.Integer(50000)
+ int getTimerBufferSize();
+
+ void setTimerBufferSize(int timerBufferSize);
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineResult.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineResult.java
index c7c78ae..9badbc3 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineResult.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineResult.java
@@ -23,6 +23,7 @@ import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.util.UserCodeException;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.runtime.ApplicationRunner;
@@ -98,7 +99,8 @@ public class SamzaPipelineResult implements PipelineResult {
case UnsuccessfulFinish:
LOG.error(status.getThrowable().getMessage(), status.getThrowable());
return new StateInfo(
- State.FAILED, new Pipeline.PipelineExecutionException(status.getThrowable()));
+ State.FAILED,
+ new Pipeline.PipelineExecutionException(getUserCodeException(status.getThrowable())));
default:
return new StateInfo(State.UNKNOWN);
}
@@ -117,4 +119,21 @@ public class SamzaPipelineResult implements PipelineResult {
this.error = error;
}
}
+
+ /**
+ * Some of the Beam unit tests relying on the exception message to do assertion. This function
+ * will find the original UserCodeException so the message will be exposed directly.
+ */
+ private static Throwable getUserCodeException(Throwable throwable) {
+ Throwable t = throwable;
+ while (t != null) {
+ if (t instanceof UserCodeException) {
+ return t;
+ }
+
+ t = t.getCause();
+ }
+
+ return throwable;
+ }
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
index 09a0154..663f0d0 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
@@ -40,7 +40,6 @@ import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.runners.samza.SamzaExecutionContext;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
@@ -56,7 +55,6 @@ import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterators;
import org.apache.samza.config.Config;
import org.apache.samza.operators.TimerRegistry;
-import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.task.TaskContext;
import org.joda.time.Instant;
import org.slf4j.Logger;
@@ -141,12 +139,11 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
public void open(
Config config,
TaskContext context,
- TimerRegistry<TimerKey<Void>> timerRegistry,
+ TimerRegistry<KeyedTimerData<Void>> timerRegistry,
OpEmitter<OutT> emitter) {
this.inputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
this.sideInputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
this.pushbackWatermarkHold = BoundedWindow.TIMESTAMP_MAX_VALUE;
- this.timerInternalsFactory = new SamzaTimerInternalsFactory(keyCoder, timerRegistry);
final DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
final SamzaPipelineOptions pipelineOptions =
@@ -156,7 +153,18 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
.as(SamzaPipelineOptions.class);
final SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory =
- createStateInternalFactory(null, context, pipelineOptions, signature, mainOutputTag);
+ SamzaStoreStateInternals.createStateInternalFactory(
+ null, context, pipelineOptions, signature, mainOutputTag);
+
+ this.timerInternalsFactory =
+ SamzaTimerInternalsFactory.createTimerInternalFactory(
+ keyCoder,
+ (TimerRegistry) timerRegistry,
+ getTimerStateId(signature),
+ nonKeyedStateInternalsFactory,
+ windowingStrategy,
+ pipelineOptions);
+
this.sideInputHandler =
new SideInputHandler(sideInputs, nonKeyedStateInternalsFactory.stateInternalsForKey(null));
@@ -208,34 +216,12 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
doFnInvoker.invokeSetup();
}
- static SamzaStoreStateInternals.Factory createStateInternalFactory(
- Coder<?> keyCoder,
- TaskContext context,
- SamzaPipelineOptions pipelineOptions,
- DoFnSignature signature,
- TupleTag<?> mainOutputTag) {
- final int batchGetSize = pipelineOptions.getStoreBatchGetSize();
- final Map<String, KeyValueStore<byte[], byte[]>> stores =
- new HashMap<>(SamzaStoreStateInternals.getBeamStore(context));
-
- final Coder stateKeyCoder;
- if (keyCoder != null) {
- signature
- .stateDeclarations()
- .keySet()
- .forEach(
- stateId ->
- stores.put(stateId, (KeyValueStore<byte[], byte[]>) context.getStore(stateId)));
- stateKeyCoder = keyCoder;
- } else {
- stateKeyCoder = VoidCoder.of();
+ private String getTimerStateId(DoFnSignature signature) {
+ final StringBuilder builder = new StringBuilder("timer");
+ if (signature.usesTimers()) {
+ signature.timerDeclarations().keySet().forEach(key -> builder.append(key));
}
- return new SamzaStoreStateInternals.Factory<>(
- // TODO: ??? what to do with empty output?
- mainOutputTag == null ? "null" : mainOutputTag.getId(),
- stores,
- stateKeyCoder,
- batchGetSize);
+ return builder.toString();
}
@Override
@@ -322,6 +308,8 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
pushbackFnRunner.startBundle();
fireTimer(keyedTimerData);
pushbackFnRunner.finishBundle();
+
+ this.timerInternalsFactory.removeProcessingTimer((KeyedTimerData) keyedTimerData);
}
@Override
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java
index 3f1c663..76f7a32 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java
@@ -56,6 +56,7 @@ import org.slf4j.LoggerFactory;
public class GroupByKeyOp<K, InputT, OutputT>
implements Op<KeyedWorkItem<K, InputT>, KV<K, OutputT>, K> {
private static final Logger LOG = LoggerFactory.getLogger(GroupByKeyOp.class);
+ private static final String TIMER_STATE_ID = "timer";
private final TupleTag<KV<K, OutputT>> mainOutputTag;
private final KeyedWorkItemCoder<K, InputT> inputCoder;
@@ -97,7 +98,7 @@ public class GroupByKeyOp<K, InputT, OutputT>
public void open(
Config config,
TaskContext context,
- TimerRegistry<TimerKey<K>> timerRegistry,
+ TimerRegistry<KeyedTimerData<K>> timerRegistry,
OpEmitter<KV<K, OutputT>> emitter) {
this.pipelineOptions =
Base64Serializer.deserializeUnchecked(
@@ -105,17 +106,29 @@ public class GroupByKeyOp<K, InputT, OutputT>
.get()
.as(SamzaPipelineOptions.class);
+ final SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory =
+ SamzaStoreStateInternals.createStateInternalFactory(
+ null, context, pipelineOptions, null, mainOutputTag);
+
final DoFnRunners.OutputManager outputManager = outputManagerFactory.create(emitter);
this.stateInternalsFactory =
new SamzaStoreStateInternals.Factory<>(
mainOutputTag.getId(),
- SamzaStoreStateInternals.getBeamStore(context),
+ Collections.singletonMap(
+ SamzaStoreStateInternals.BEAM_STORE,
+ SamzaStoreStateInternals.getBeamStore(context)),
keyCoder,
pipelineOptions.getStoreBatchGetSize());
this.timerInternalsFactory =
- new SamzaTimerInternalsFactory<>(inputCoder.getKeyCoder(), timerRegistry);
+ SamzaTimerInternalsFactory.createTimerInternalFactory(
+ keyCoder,
+ timerRegistry,
+ TIMER_STATE_ID,
+ nonKeyedStateInternalsFactory,
+ windowingStrategy,
+ pipelineOptions);
final DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFn =
GroupAlsoByWindowViaWindowSetNewDoFn.create(
@@ -192,6 +205,8 @@ public class GroupByKeyOp<K, InputT, OutputT>
fnRunner.startBundle();
fireTimer(keyedTimerData.getKey(), keyedTimerData.getTimerData());
fnRunner.finishBundle();
+
+ timerInternalsFactory.removeProcessingTimer(keyedTimerData);
}
private void fireTimer(K key, TimerData timer) {
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java
index 481996c..2f3b809 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java
@@ -17,8 +17,24 @@
*/
package org.apache.beam.runners.samza.runtime;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.joda.time.Instant;
/**
* {@link TimerInternals.TimerData} with key, used by {@link SamzaTimerInternalsFactory}. Implements
@@ -29,7 +45,7 @@ public class KeyedTimerData<K> implements Comparable<KeyedTimerData<K>> {
private final K key;
private final TimerInternals.TimerData timerData;
- public KeyedTimerData(byte[] keyBytes, K key, TimerInternals.TimerData timerData) {
+ public KeyedTimerData(byte[] keyBytes, K key, TimerData timerData) {
this.keyBytes = keyBytes;
this.key = key;
this.timerData = timerData;
@@ -102,4 +118,73 @@ public class KeyedTimerData<K> implements Comparable<KeyedTimerData<K>> {
result = 31 * result + timerData.hashCode();
return result;
}
+
+ /**
+ * Coder for {@link KeyedTimerData}. Note we don't use the {@link
+ * org.apache.beam.runners.core.TimerInternals.TimerDataCoder} here directly since we want to
+ * en/decode timestamp first so the timers will be sorted in the state.
+ */
+ public static class KeyedTimerDataCoder<K> extends StructuredCoder<KeyedTimerData<K>> {
+ private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();
+ private static final InstantCoder INSTANT_CODER = InstantCoder.of();
+
+ private final Coder<K> keyCoder;
+ private final Coder<? extends BoundedWindow> windowCoder;
+
+ KeyedTimerDataCoder(Coder<K> keyCoder, Coder<? extends BoundedWindow> windowCoder) {
+ this.keyCoder = keyCoder;
+ this.windowCoder = windowCoder;
+ }
+
+ @Override
+ public void encode(KeyedTimerData<K> value, OutputStream outStream)
+ throws CoderException, IOException {
+
+ final TimerData timer = value.getTimerData();
+ // encode the timestamp first
+ INSTANT_CODER.encode(timer.getTimestamp(), outStream);
+ STRING_CODER.encode(timer.getTimerId(), outStream);
+ STRING_CODER.encode(timer.getNamespace().stringKey(), outStream);
+ STRING_CODER.encode(timer.getDomain().name(), outStream);
+
+ if (keyCoder != null) {
+ keyCoder.encode(value.key, outStream);
+ }
+ }
+
+ @Override
+ public KeyedTimerData<K> decode(InputStream inStream) throws CoderException, IOException {
+ // decode the timestamp first
+ final Instant timestamp = INSTANT_CODER.decode(inStream);
+ final String timerId = STRING_CODER.decode(inStream);
+ final StateNamespace namespace =
+ StateNamespaces.fromString(STRING_CODER.decode(inStream), windowCoder);
+ final TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream));
+ final TimerData timer = TimerData.of(timerId, namespace, timestamp, domain);
+
+ byte[] keyBytes = null;
+ K key = null;
+ if (keyCoder != null) {
+ key = keyCoder.decode(inStream);
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try {
+ keyCoder.encode(key, baos);
+ } catch (IOException e) {
+ throw new RuntimeException("Could not encode key: " + key, e);
+ }
+ keyBytes = baos.toByteArray();
+ }
+
+ return new KeyedTimerData(keyBytes, key, timer);
+ }
+
+ @Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ return Arrays.asList(keyCoder, windowCoder);
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {}
+ }
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/Op.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/Op.java
index c9cceab..793ce6a 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/Op.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/Op.java
@@ -42,7 +42,7 @@ public interface Op<InT, OutT, K> extends Serializable {
default void open(
Config config,
TaskContext taskContext,
- TimerRegistry<TimerKey<K>> timerRegistry,
+ TimerRegistry<KeyedTimerData<K>> timerRegistry,
OpEmitter<OutT> emitter) {}
void processElement(WindowedValue<InT> inputElement, OpEmitter<OutT> emitter);
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java
index 21a3fb9..49601ff 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java
@@ -21,8 +21,6 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.samza.config.Config;
@@ -39,7 +37,7 @@ import org.slf4j.LoggerFactory;
public class OpAdapter<InT, OutT, K>
implements FlatMapFunction<OpMessage<InT>, OpMessage<OutT>>,
WatermarkFunction<OpMessage<OutT>>,
- TimerFunction<TimerKey<K>, OpMessage<OutT>>,
+ TimerFunction<KeyedTimerData<K>, OpMessage<OutT>>,
Serializable {
private static final Logger LOG = LoggerFactory.getLogger(OpAdapter.class);
@@ -68,7 +66,7 @@ public class OpAdapter<InT, OutT, K>
}
@Override
- public final void registerTimer(TimerRegistry<TimerKey<K>> timerRegistry) {
+ public final void registerTimer(TimerRegistry<KeyedTimerData<K>> timerRegistry) {
assert taskContext != null;
op.open(config, taskContext, timerRegistry, emitter);
@@ -126,19 +124,10 @@ public class OpAdapter<InT, OutT, K>
}
@Override
- public Collection<OpMessage<OutT>> onTimer(TimerKey<K> timerKey, long time) {
+ public Collection<OpMessage<OutT>> onTimer(KeyedTimerData<K> keyedTimerData, long time) {
assert outputList.isEmpty();
try {
- final TimerData timerData =
- TimerData.of(
- timerKey.getTimerId(),
- timerKey.getStateNamespace(),
- new Instant(time),
- TimeDomain.PROCESSING_TIME);
- final KeyedTimerData<K> keyedTimerData =
- new KeyedTimerData<>(timerKey.getKeyBytes(), timerKey.getKey(), timerData);
-
op.processTimer(keyedTimerData);
} catch (Exception e) {
LOG.error("Op {} threw an exception during processing timer", this.getClass().getName(), e);
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
index e50fe30..0dd93ee 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
@@ -76,7 +76,7 @@ public class SamzaDoFnRunners {
final StateInternals stateInternals;
final DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
final SamzaStoreStateInternals.Factory<?> stateInternalsFactory =
- DoFnOp.createStateInternalFactory(
+ SamzaStoreStateInternals.createStateInternalFactory(
keyCoder, taskContext, pipelineOptions, signature, mainOutputTag);
final SamzaExecutionContext executionContext =
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
index 0c545fe..9597ae9 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
@@ -17,8 +17,6 @@
*/
package org.apache.beam.runners.samza.runtime;
-import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
-
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
@@ -28,6 +26,7 @@ import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -37,12 +36,14 @@ import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.state.SamzaMapState;
import org.apache.beam.runners.samza.state.SamzaSetState;
import org.apache.beam.runners.samza.transforms.UpdatingCombineFn;
import org.apache.beam.sdk.coders.BooleanCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.MapState;
@@ -57,7 +58,9 @@ import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v20_0.com.google.common.primitives.Ints;
@@ -69,7 +72,7 @@ import org.joda.time.Instant;
/** {@link StateInternals} that uses Samza local {@link KeyValueStore} to manage state. */
public class SamzaStoreStateInternals<K> implements StateInternals {
- private static final String BEAM_STORE = "beamStore";
+ static final String BEAM_STORE = "beamStore";
private static ThreadLocal<SoftReference<ByteArrayOutputStream>> threadLocalBaos =
new ThreadLocal<>();
@@ -91,11 +94,39 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
this.batchGetSize = batchGetSize;
}
- public static Map<String, KeyValueStore<byte[], byte[]>> getBeamStore(TaskContext context) {
- @SuppressWarnings("unchecked")
- final KeyValueStore<byte[], byte[]> beamStore =
- (KeyValueStore<byte[], byte[]>) context.getStore(SamzaStoreStateInternals.BEAM_STORE);
- return Collections.singletonMap(BEAM_STORE, beamStore);
+ @SuppressWarnings("unchecked")
+ static KeyValueStore<byte[], byte[]> getBeamStore(TaskContext context) {
+ return (KeyValueStore<byte[], byte[]>) context.getStore(SamzaStoreStateInternals.BEAM_STORE);
+ }
+
+ static Factory createStateInternalFactory(
+ Coder<?> keyCoder,
+ TaskContext context,
+ SamzaPipelineOptions pipelineOptions,
+ DoFnSignature signature,
+ TupleTag<?> mainOutputTag) {
+ final int batchGetSize = pipelineOptions.getStoreBatchGetSize();
+ final Map<String, KeyValueStore<byte[], byte[]>> stores = new HashMap<>();
+ stores.put(BEAM_STORE, getBeamStore(context));
+
+ final Coder stateKeyCoder;
+ if (keyCoder != null) {
+ signature
+ .stateDeclarations()
+ .keySet()
+ .forEach(
+ stateId ->
+ stores.put(stateId, (KeyValueStore<byte[], byte[]>) context.getStore(stateId)));
+ stateKeyCoder = keyCoder;
+ } else {
+ stateKeyCoder = VoidCoder.of();
+ }
+ return new Factory<>(
+ // TODO: ??? what to do with empty output?
+ mainOutputTag == null ? "null" : mainOutputTag.getId(),
+ stores,
+ stateKeyCoder,
+ batchGetSize);
}
@Override
@@ -536,13 +567,13 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
private class SamzaMapStateImpl<KeyT, ValueT> extends AbstractSamzaState<ValueT>
implements SamzaMapState<KeyT, ValueT>, KeyValueIteratorState {
- private static final int MAX_KEY_SIZE = 100000; // 100K bytes
private final Coder<KeyT> keyCoder;
- private final byte[] maxKey;
private final int storeKeySize;
private final List<KeyValueIterator<byte[], byte[]>> openIterators =
Collections.synchronizedList(new ArrayList<>());
+ private int maxKeySize;
+
protected SamzaMapStateImpl(
StateNamespace namespace,
StateTag<? extends State> address,
@@ -551,27 +582,15 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
super(namespace, address, valueCoder);
this.keyCoder = keyCoder;
- this.maxKey = new byte[MAX_KEY_SIZE];
this.storeKeySize = getEncodedStoreKey().length;
-
- final byte[] encodedKey = getEncodedStoreKey();
- checkState(
- encodedKey.length < MAX_KEY_SIZE,
- "Encoded key size %s is longer than the max key size (100 KB) supported",
- encodedKey.length);
-
- Arrays.fill(maxKey, (byte) 0xff);
- System.arraycopy(encodedKey, 0, maxKey, 0, encodedKey.length);
+ // initial max key size is around 100k, so we can restore timer keys
+ this.maxKeySize = this.storeKeySize + 100_000;
}
@Override
public void put(KeyT key, ValueT value) {
final byte[] encodedKey = encodeKey(key);
- checkState(
- encodedKey.length < MAX_KEY_SIZE,
- "Encoded key size %s is longer than the max key size (100 KB) supported",
- encodedKey.length);
-
+ maxKeySize = Math.max(maxKeySize, encodedKey.length);
store.put(encodedKey, encodeValue(value));
}
@@ -648,6 +667,7 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
@Override
public ReadableState<Iterator<Map.Entry<KeyT, ValueT>>> readIterator() {
+ final byte[] maxKey = createMaxKey();
final KeyValueIterator<byte[], byte[]> kvIter = store.range(getEncodedStoreKey(), maxKey);
openIterators.add(kvIter);
@@ -688,6 +708,7 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
*/
private <OutputT> Iterable<OutputT> createIterable(
SerializableFunction<org.apache.samza.storage.kv.Entry<byte[], byte[]>, OutputT> fn) {
+ final byte[] maxKey = createMaxKey();
final KeyValueIterator<byte[], byte[]> kvIter = store.range(getEncodedStoreKey(), maxKey);
final List<Entry<byte[], byte[]>> iterable = ImmutableList.copyOf(kvIter);
kvIter.close();
@@ -714,6 +735,7 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
@Override
public void clear() {
+ final byte[] maxKey = createMaxKey();
final KeyValueIterator<byte[], byte[]> kvIter = store.range(getEncodedStoreKey(), maxKey);
while (kvIter.hasNext()) {
store.delete(kvIter.next().getKey());
@@ -741,6 +763,15 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
}
}
+ private byte[] createMaxKey() {
+ byte[] maxKey = new byte[maxKeySize];
+ Arrays.fill(maxKey, (byte) 0xff);
+
+ final byte[] encodedKey = getEncodedStoreKey();
+ System.arraycopy(encodedKey, 0, maxKey, 0, encodedKey.length);
+ return maxKey;
+ }
+
@Override
public void closeIterators() {
openIterators.forEach(KeyValueIterator::close);
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
index 41ae79a..5e16acd 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
@@ -21,18 +21,22 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.Iterator;
import java.util.NavigableSet;
import java.util.TreeSet;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.TimerInternalsFactory;
+import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.SamzaRunner;
+import org.apache.beam.runners.samza.state.SamzaSetState;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.samza.operators.TimerRegistry;
import org.joda.time.Instant;
import org.slf4j.Logger;
@@ -46,19 +50,46 @@ import org.slf4j.LoggerFactory;
public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
private static final Logger LOG = LoggerFactory.getLogger(SamzaTimerInternalsFactory.class);
- private final Map<TimerKey, KeyedTimerData<K>> timerMap = new HashMap<>();
- private final NavigableSet<KeyedTimerData<K>> eventTimeTimers = new TreeSet<>();
-
+ private final NavigableSet<KeyedTimerData<K>> eventTimeTimers;
private final Coder<K> keyCoder;
- private final TimerRegistry<TimerKey<K>> timerRegistry;
+ private final TimerRegistry<KeyedTimerData<K>> timerRegistry;
+ private final int timerBufferSize;
+ private final SamzaTimerState state;
- // TODO: use BoundedWindow.TIMESTAMP_MIN_VALUE when KafkaIO emits watermarks in bounds.
- private Instant inputWatermark = new Instant(Long.MIN_VALUE);
+ private Instant inputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
private Instant outputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
- public SamzaTimerInternalsFactory(Coder<K> keyCoder, TimerRegistry<TimerKey<K>> timerRegistry) {
+ private SamzaTimerInternalsFactory(
+ Coder<K> keyCoder,
+ TimerRegistry<KeyedTimerData<K>> timerRegistry,
+ int timerBufferSize,
+ String timerStateId,
+ SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory,
+ Coder<BoundedWindow> windowCoder) {
this.keyCoder = keyCoder;
this.timerRegistry = timerRegistry;
+ this.timerBufferSize = timerBufferSize;
+ this.eventTimeTimers = new TreeSet<>();
+ this.state = new SamzaTimerState(timerStateId, nonKeyedStateInternalsFactory, windowCoder);
+ }
+
+ static <K> SamzaTimerInternalsFactory<K> createTimerInternalFactory(
+ Coder<K> keyCoder,
+ TimerRegistry<KeyedTimerData<K>> timerRegistry,
+ String timerStateId,
+ SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory,
+ WindowingStrategy<?, BoundedWindow> windowingStrategy,
+ SamzaPipelineOptions pipelineOptions) {
+
+ final Coder<BoundedWindow> windowCoder = windowingStrategy.getWindowFn().windowCoder();
+
+ return new SamzaTimerInternalsFactory<>(
+ keyCoder,
+ timerRegistry,
+ pipelineOptions.getTimerBufferSize(),
+ timerStateId,
+ nonKeyedStateInternalsFactory,
+ windowCoder);
}
@Override
@@ -74,9 +105,9 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
} else {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
- keyCoder.encode(key, baos, Coder.Context.OUTER);
+ keyCoder.encode(key, baos);
} catch (IOException e) {
- throw new RuntimeException("Could not encode key: " + key);
+ throw new RuntimeException("Could not encode key: " + key, e);
}
keyBytes = baos.toByteArray();
}
@@ -111,23 +142,22 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
while (!eventTimeTimers.isEmpty()
&& eventTimeTimers.first().getTimerData().getTimestamp().isBefore(inputWatermark)) {
final KeyedTimerData<K> keyedTimerData = eventTimeTimers.pollFirst();
- final TimerInternals.TimerData timerData = keyedTimerData.getTimerData();
-
- final TimerKey<K> timerKey =
- new TimerKey<>(
- keyedTimerData.getKey(),
- keyedTimerData.getKeyBytes(),
- timerData.getNamespace(),
- timerData.getTimerId());
-
- timerMap.remove(timerKey);
-
readyTimers.add(keyedTimerData);
+ state.deletePersisted(keyedTimerData);
+
+ // if all the buffered timers are processed, load the next batch from state
+ if (eventTimeTimers.isEmpty()) {
+ state.loadEventTimeTimers();
+ }
}
return readyTimers;
}
+ public void removeProcessingTimer(KeyedTimerData<K> keyedTimerData) {
+ state.deletePersisted(keyedTimerData);
+ }
+
public Instant getInputWatermark() {
return inputWatermark;
}
@@ -154,31 +184,20 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
@Override
public void setTimer(TimerData timerData) {
final KeyedTimerData<K> keyedTimerData = new KeyedTimerData<>(keyBytes, key, timerData);
- final TimerKey<K> timerKey =
- new TimerKey<>(key, keyBytes, timerData.getNamespace(), timerData.getTimerId());
+
+ // persist it first
+ state.persist(keyedTimerData);
switch (timerData.getDomain()) {
case EVENT_TIME:
- final KeyedTimerData<K> oldTimer = timerMap.get(timerKey);
-
- if (oldTimer != null) {
- if (!oldTimer.getTimerData().getDomain().equals(timerData.getDomain())) {
- throw new IllegalArgumentException(
- String.format(
- "Attempt to set %s for time domain %s, "
- + "but it is already set for time domain %s",
- timerData.getTimerId(),
- timerData.getDomain(),
- oldTimer.getTimerData().getDomain()));
- }
- deleteTimer(oldTimer.getTimerData());
- }
eventTimeTimers.add(keyedTimerData);
- timerMap.put(timerKey, keyedTimerData);
+ while (eventTimeTimers.size() > timerBufferSize) {
+ eventTimeTimers.pollLast();
+ }
break;
case PROCESSING_TIME:
- timerRegistry.register(timerKey, timerData.getTimestamp().getMillis());
+ timerRegistry.register(keyedTimerData, timerData.getTimestamp().getMillis());
break;
default:
@@ -200,19 +219,17 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
@Override
public void deleteTimer(TimerData timerData) {
- final TimerKey<K> timerKey =
- new TimerKey<>(key, keyBytes, timerData.getNamespace(), timerData.getTimerId());
+ final KeyedTimerData<K> keyedTimerData = new KeyedTimerData<>(keyBytes, key, timerData);
+
+ state.deletePersisted(keyedTimerData);
switch (timerData.getDomain()) {
case EVENT_TIME:
- final KeyedTimerData<K> keyedTimerData = timerMap.remove(timerKey);
- if (keyedTimerData != null) {
- eventTimeTimers.remove(keyedTimerData);
- }
+ eventTimeTimers.remove(keyedTimerData);
break;
case PROCESSING_TIME:
- timerRegistry.delete(timerKey);
+ timerRegistry.delete(keyedTimerData);
break;
default:
@@ -243,4 +260,102 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
return outputWatermark;
}
}
+
+ private class SamzaTimerState {
+ private final SamzaSetState<KeyedTimerData<K>> eventTimerTimerState;
+ private final SamzaSetState<KeyedTimerData<K>> processingTimerTimerState;
+
+ SamzaTimerState(
+ String timerStateId,
+ SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory,
+ Coder<BoundedWindow> windowCoder) {
+
+ this.eventTimerTimerState =
+ (SamzaSetState<KeyedTimerData<K>>)
+ nonKeyedStateInternalsFactory
+ .stateInternalsForKey(null)
+ .state(
+ StateNamespaces.global(),
+ StateTags.set(
+ timerStateId + "-et",
+ new KeyedTimerData.KeyedTimerDataCoder<>(keyCoder, windowCoder)));
+
+ this.processingTimerTimerState =
+ (SamzaSetState<KeyedTimerData<K>>)
+ nonKeyedStateInternalsFactory
+ .stateInternalsForKey(null)
+ .state(
+ StateNamespaces.global(),
+ StateTags.set(
+ timerStateId + "-pt",
+ new KeyedTimerData.KeyedTimerDataCoder<>(keyCoder, windowCoder)));
+
+ restore();
+ }
+
+ void persist(KeyedTimerData<K> keyedTimerData) {
+ switch (keyedTimerData.getTimerData().getDomain()) {
+ case EVENT_TIME:
+ if (!eventTimeTimers.contains(keyedTimerData)) {
+ eventTimerTimerState.add(keyedTimerData);
+ }
+ break;
+
+ case PROCESSING_TIME:
+ processingTimerTimerState.add(keyedTimerData);
+ break;
+
+ default:
+ throw new UnsupportedOperationException(
+ String.format("%s currently only supports event time", SamzaRunner.class));
+ }
+ }
+
+ void deletePersisted(KeyedTimerData<K> keyedTimerData) {
+ switch (keyedTimerData.getTimerData().getDomain()) {
+ case EVENT_TIME:
+ eventTimerTimerState.remove(keyedTimerData);
+ break;
+
+ case PROCESSING_TIME:
+ processingTimerTimerState.remove(keyedTimerData);
+ break;
+
+ default:
+ throw new UnsupportedOperationException(
+ String.format("%s currently only supports event time", SamzaRunner.class));
+ }
+ }
+
+ private void loadEventTimeTimers() {
+ if (!eventTimerTimerState.isEmpty().read()) {
+ final Iterator<KeyedTimerData<K>> iter = eventTimerTimerState.readIterator().read();
+ for (int i = 0; i < timerBufferSize && iter.hasNext(); i++) {
+ eventTimeTimers.add(iter.next());
+ }
+
+ // manually close the iterator here
+ final SamzaStoreStateInternals.KeyValueIteratorState iteratorState =
+ (SamzaStoreStateInternals.KeyValueIteratorState) eventTimerTimerState;
+ iteratorState.closeIterators();
+ }
+ }
+
+ private void loadProcessingTimeTimers() {
+ if (!processingTimerTimerState.isEmpty().read()) {
+ final Iterator<KeyedTimerData<K>> iter = processingTimerTimerState.readIterator().read();
+ // since the iterator will reach to the end, it will be closed automatically
+ while (iter.hasNext()) {
+ final KeyedTimerData<K> keyedTimerData = iter.next();
+ timerRegistry.register(
+ keyedTimerData, keyedTimerData.getTimerData().getTimestamp().getMillis());
+ }
+ }
+ }
+
+ private void restore() {
+ loadEventTimeTimers();
+ loadProcessingTimeTimers();
+ }
+ }
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/TimerKey.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/TimerKey.java
deleted file mode 100644
index 507eb62..0000000
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/TimerKey.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.samza.runtime;
-
-import org.apache.beam.runners.core.StateNamespace;
-
-/** Timer key which is used to register and delete timers. */
-class TimerKey<K> {
- private final K key;
- private final byte[] keyBytes;
- private final StateNamespace stateNamespace;
- private final String timerId;
-
- TimerKey(K key, byte[] keyBytes, StateNamespace stateNamespace, String timerId) {
- this.key = key;
- this.keyBytes = keyBytes;
- this.stateNamespace = stateNamespace;
- this.timerId = timerId;
- }
-
- public K getKey() {
- return key;
- }
-
- public byte[] getKeyBytes() {
- return keyBytes;
- }
-
- public StateNamespace getStateNamespace() {
- return stateNamespace;
- }
-
- public String getTimerId() {
- return timerId;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- TimerKey<?> timerKey = (TimerKey<?>) o;
-
- if (key != null ? !key.equals(timerKey.key) : timerKey.key != null) {
- return false;
- }
- if (!stateNamespace.equals(timerKey.stateNamespace)) {
- return false;
- }
-
- return timerId.equals(timerKey.timerId);
- }
-
- @Override
- public int hashCode() {
- int result = key != null ? key.hashCode() : 0;
- result = 31 * result + stateNamespace.hashCode();
- result = 31 * result + timerId.hashCode();
- return result;
- }
-
- @Override
- public String toString() {
- return "TimerKey{"
- + "key="
- + key
- + ", stateNamespace="
- + stateNamespace
- + ", timerId='"
- + timerId
- + '\''
- + '}';
- }
-}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
index be38fec..25eddb7 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
@@ -92,10 +92,6 @@ class ParDoBoundMultiTranslator<InT, OutT>
final Coder<?> keyCoder =
signature.usesState() ? ((KvCoder<?, ?>) input.getCoder()).getKeyCoder() : null;
- if (signature.usesTimers()) {
- throw new UnsupportedOperationException("DoFn with timers is not currently supported");
- }
-
if (signature.processElement().isSplittable()) {
throw new UnsupportedOperationException("Splittable DoFn is not currently supported");
}
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java
new file mode 100644
index 0000000..e521774
--- /dev/null
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import java.io.ByteArrayOutputStream;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.joda.time.Instant;
+import org.junit.Test;
+
+/** Tests for {@link KeyedTimerData}. */
+public class KeyedTimerDataTest {
+ private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+
+ @Test
+ public void testCoder() throws Exception {
+ final TimerInternals.TimerData td =
+ TimerInternals.TimerData.of(
+ "timer", StateNamespaces.global(), new Instant(), TimeDomain.EVENT_TIME);
+
+ final String key = "timer-key";
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ STRING_CODER.encode(key, baos);
+ final byte[] keyBytes = baos.toByteArray();
+ final KeyedTimerData<String> ktd = new KeyedTimerData<>(keyBytes, key, td);
+
+ final KeyedTimerData.KeyedTimerDataCoder<String> ktdCoder =
+ new KeyedTimerData.KeyedTimerDataCoder<>(STRING_CODER, GlobalWindow.Coder.INSTANCE);
+
+ CoderProperties.coderDecodeEncodeEqual(ktdCoder, ktd);
+ }
+}
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
new file mode 100644
index 0000000..6e8b9be
--- /dev/null
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.runtime;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.samza.SamzaPipelineOptions;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.operators.TimerRegistry;
+import org.apache.samza.storage.kv.KeyValueStoreMetrics;
+import org.apache.samza.storage.kv.RocksDbKeyValueStore;
+import org.apache.samza.task.TaskContext;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.Options;
+import org.rocksdb.WriteOptions;
+
+/**
+ * Tests for {@link SamzaTimerInternalsFactory}. Covers both event-time timers and processing-timer
+ * timers.
+ */
+public class SamzaTimerInternalsFactoryTest {
+ private static RocksDbKeyValueStore createStore(String name) {
+ final Options options = new Options();
+ options.setCreateIfMissing(true);
+
+ return new RocksDbKeyValueStore(
+ new File(System.getProperty("java.io.tmpdir") + "/" + name),
+ options,
+ new MapConfig(),
+ false,
+ "beamStore",
+ new WriteOptions(),
+ new FlushOptions(),
+ new KeyValueStoreMetrics("beamStore", new MetricsRegistryMap()));
+ }
+
+ private static SamzaStoreStateInternals.Factory<?> createNonKeyedStateInternalsFactory(
+ SamzaPipelineOptions pipelineOptions, RocksDbKeyValueStore store) {
+ final TaskContext context = mock(TaskContext.class);
+ when(context.getStore(anyString())).thenReturn(store);
+ final TupleTag<?> mainOutputTag = new TupleTag<>("output");
+
+ return SamzaStoreStateInternals.createStateInternalFactory(
+ null, context, pipelineOptions, null, mainOutputTag);
+ }
+
+ private static SamzaTimerInternalsFactory<String> createTimerInternalsFactory(
+ TimerRegistry<KeyedTimerData<String>> timerRegistry,
+ String timerStateId,
+ SamzaPipelineOptions pipelineOptions,
+ RocksDbKeyValueStore store) {
+
+ final SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory =
+ createNonKeyedStateInternalsFactory(pipelineOptions, store);
+
+ return SamzaTimerInternalsFactory.createTimerInternalFactory(
+ StringUtf8Coder.of(),
+ timerRegistry,
+ timerStateId,
+ nonKeyedStateInternalsFactory,
+ (WindowingStrategy) WindowingStrategy.globalDefault(),
+ pipelineOptions);
+ }
+
+ private static class TestTimerRegistry implements TimerRegistry<KeyedTimerData<String>> {
+ private final List<KeyedTimerData<String>> timers = new ArrayList<>();
+
+ @Override
+ public void register(KeyedTimerData<String> key, long timestamp) {
+ timers.add(key);
+ }
+
+ @Override
+ public void delete(KeyedTimerData<String> key) {
+ timers.remove(key);
+ }
+ }
+
+ @Test
+ public void testEventTimeTimers() {
+ final SamzaPipelineOptions pipelineOptions =
+ PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
+ pipelineOptions.setTimerBufferSize(1);
+
+ final RocksDbKeyValueStore store = createStore("store1");
+ final SamzaTimerInternalsFactory<String> timerInternalsFactory =
+ createTimerInternalsFactory(null, "timer", pipelineOptions, store);
+
+ final StateNamespace nameSpace = StateNamespaces.global();
+ final TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey("testKey");
+ final TimerInternals.TimerData timer1 =
+ TimerInternals.TimerData.of("timer1", nameSpace, new Instant(10), TimeDomain.EVENT_TIME);
+ timerInternals.setTimer(timer1);
+
+ final TimerInternals.TimerData timer2 =
+ TimerInternals.TimerData.of("timer2", nameSpace, new Instant(100), TimeDomain.EVENT_TIME);
+ timerInternals.setTimer(timer2);
+
+ timerInternalsFactory.setInputWatermark(new Instant(5));
+ Collection<KeyedTimerData<String>> readyTimers = timerInternalsFactory.removeReadyTimers();
+ assertTrue(readyTimers.isEmpty());
+
+ timerInternalsFactory.setInputWatermark(new Instant(20));
+ readyTimers = timerInternalsFactory.removeReadyTimers();
+ assertEquals(1, readyTimers.size());
+ assertEquals(timer1, readyTimers.iterator().next().getTimerData());
+
+ timerInternalsFactory.setInputWatermark(new Instant(150));
+ readyTimers = timerInternalsFactory.removeReadyTimers();
+ assertEquals(1, readyTimers.size());
+ assertEquals(timer2, readyTimers.iterator().next().getTimerData());
+
+ store.close();
+ }
+
+ @Test
+ public void testRestore() {
+ final SamzaPipelineOptions pipelineOptions =
+ PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
+ pipelineOptions.setTimerBufferSize(1);
+
+ RocksDbKeyValueStore store = createStore("store2");
+ final SamzaTimerInternalsFactory<String> timerInternalsFactory =
+ createTimerInternalsFactory(null, "timer", pipelineOptions, store);
+
+ final StateNamespace nameSpace = StateNamespaces.global();
+ final TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey("testKey");
+ final TimerInternals.TimerData timer1 =
+ TimerInternals.TimerData.of("timer1", nameSpace, new Instant(10), TimeDomain.EVENT_TIME);
+ timerInternals.setTimer(timer1);
+
+ final TimerInternals.TimerData timer2 =
+ TimerInternals.TimerData.of("timer2", nameSpace, new Instant(100), TimeDomain.EVENT_TIME);
+ timerInternals.setTimer(timer2);
+
+ store.close();
+
+ // restore by creating a new instance
+ store = createStore("store2");
+ final SamzaTimerInternalsFactory<String> restoredFactory =
+ createTimerInternalsFactory(null, "timer", pipelineOptions, store);
+
+ restoredFactory.setInputWatermark(new Instant(150));
+ Collection<KeyedTimerData<String>> readyTimers = restoredFactory.removeReadyTimers();
+ assertEquals(2, readyTimers.size());
+
+ store.close();
+ }
+
+ @Test
+ public void testProcessingTimeTimers() throws IOException {
+ final SamzaPipelineOptions pipelineOptions =
+ PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
+
+ RocksDbKeyValueStore store = createStore("store3");
+ TestTimerRegistry timerRegistry = new TestTimerRegistry();
+
+ final SamzaTimerInternalsFactory<String> timerInternalsFactory =
+ createTimerInternalsFactory(timerRegistry, "timer", pipelineOptions, store);
+
+ final StateNamespace nameSpace = StateNamespaces.global();
+ final TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey("testKey");
+ final TimerInternals.TimerData timer1 =
+ TimerInternals.TimerData.of(
+ "timer1", nameSpace, new Instant(10), TimeDomain.PROCESSING_TIME);
+ timerInternals.setTimer(timer1);
+
+ final TimerInternals.TimerData timer2 =
+ TimerInternals.TimerData.of(
+ "timer2", nameSpace, new Instant(100), TimeDomain.PROCESSING_TIME);
+ timerInternals.setTimer(timer2);
+
+ assertEquals(2, timerRegistry.timers.size());
+
+ store.close();
+
+ // restore by creating a new instance
+ store = createStore("store3");
+ TestTimerRegistry restoredRegistry = new TestTimerRegistry();
+ final SamzaTimerInternalsFactory<String> restoredFactory =
+ createTimerInternalsFactory(restoredRegistry, "timer", pipelineOptions, store);
+
+ assertEquals(2, restoredRegistry.timers.size());
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ StringUtf8Coder.of().encode("testKey", baos);
+ final byte[] keyBytes = baos.toByteArray();
+ restoredFactory.removeProcessingTimer(new KeyedTimerData(keyBytes, "testKey", timer1));
+ restoredFactory.removeProcessingTimer(new KeyedTimerData(keyBytes, "testKey", timer2));
+
+ store.close();
+ }
+}
diff --git a/website/src/_data/capability-matrix.yml b/website/src/_data/capability-matrix.yml
index 1b85303..007de40 100644
--- a/website/src/_data/capability-matrix.yml
+++ b/website/src/_data/capability-matrix.yml
@@ -1210,9 +1210,9 @@ categories:
l2: non-merging windows
l3: ''
- class: samza
- l1: 'No'
- l2: ''
- l3: ''
+ l1: 'Partially'
+ l2: non-merging windows
+ l3: The Samza Runner supports timers in non-merging windows.
- class: nemo
l1: 'No'
l2: not implemented