You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/08 01:26:31 UTC
[1/2] beam git commit: BEAM-1887 Switch Apex ParDo to new DoFn.
Repository: beam
Updated Branches:
refs/heads/master 1594849da -> 7f55746ce
BEAM-1887 Switch Apex ParDo to new DoFn.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8484ef91
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8484ef91
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8484ef91
Branch: refs/heads/master
Commit: 8484ef9168940cf929648fd53982acc6740c5107
Parents: 1594849
Author: Thomas Weise <th...@apache.org>
Authored: Tue Apr 4 00:33:35 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Apr 7 18:25:55 2017 -0700
----------------------------------------------------------------------
.../apache/beam/runners/apex/ApexRunner.java | 2 -
.../apex/translation/GroupByKeyTranslator.java | 4 +-
.../apex/translation/ParDoTranslator.java | 15 +-
.../apex/translation/TranslationContext.java | 16 +-
.../translation/WindowAssignTranslator.java | 58 ++----
.../operators/ApexGroupByKeyOperator.java | 21 +--
.../operators/ApexParDoOperator.java | 187 +++++++++++++++----
.../operators/ApexProcessFnOperator.java | 184 ++++++++++++++++++
.../translation/utils/ApexStateInternals.java | 73 ++++++--
.../translation/utils/StateInternalsProxy.java | 67 +++++++
.../translation/ApexGroupByKeyOperatorTest.java | 2 +-
.../apex/translation/ParDoTranslatorTest.java | 2 +-
.../utils/ApexStateInternalsTest.java | 25 ++-
.../streaming/SingletonKeyedWorkItemCoder.java | 2 +-
14 files changed, 512 insertions(+), 146 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/8484ef91/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index d23fc14..1c99f8d 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -67,8 +67,6 @@ import org.apache.hadoop.conf.Configuration;
* A {@link PipelineRunner} that translates the
* pipeline to an Apex DAG and executes it on an Apex cluster.
*
- * <p>Currently execution is always in embedded mode,
- * launch on Hadoop cluster will be added in subsequent iteration.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
http://git-wip-us.apache.org/repos/asf/beam/blob/8484ef91/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslator.java
index b46e3eb..2e0bae7 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslator.java
@@ -31,9 +31,9 @@ class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKey<K, V>
@Override
public void translate(GroupByKey<K, V> transform, TranslationContext context) {
- PCollection<KV<K, V>> input = (PCollection<KV<K, V>>) context.getInput();
+ PCollection<KV<K, V>> input = context.getInput();
ApexGroupByKeyOperator<K, V> group = new ApexGroupByKeyOperator<>(context.getPipelineOptions(),
- input, context.<K>stateInternalsFactory()
+ input, context.getStateBackend()
);
context.addOperator(group, group.output);
context.addStream(input, group.input);
http://git-wip-us.apache.org/repos/asf/beam/blob/8484ef91/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
index 75722c7..fa9d21d 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
@@ -82,23 +82,22 @@ class ParDoTranslator<InputT, OutputT>
}
List<TaggedPValue> outputs = context.getOutputs();
- PCollection<InputT> input = (PCollection<InputT>) context.getInput();
+ PCollection<InputT> input = context.getInput();
List<PCollectionView<?>> sideInputs = transform.getSideInputs();
Coder<InputT> inputCoder = input.getCoder();
WindowedValueCoder<InputT> wvInputCoder =
FullWindowedValueCoder.of(
inputCoder, input.getWindowingStrategy().getWindowFn().windowCoder());
- ApexParDoOperator<InputT, OutputT> operator =
- new ApexParDoOperator<>(
+ ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(
context.getPipelineOptions(),
doFn,
transform.getMainOutputTag(),
transform.getSideOutputTags().getAll(),
- ((PCollection<InputT>) context.getInput()).getWindowingStrategy(),
+ input.getWindowingStrategy(),
sideInputs,
wvInputCoder,
- context.<Void>stateInternalsFactory());
+ context.getStateBackend());
Map<PCollection<?>, OutputPort<?>> ports = Maps.newHashMapWithExpectedSize(outputs.size());
for (TaggedPValue output : outputs) {
@@ -126,15 +125,15 @@ class ParDoTranslator<InputT, OutputT>
context.addOperator(operator, ports);
context.addStream(context.getInput(), operator.input);
if (!sideInputs.isEmpty()) {
- addSideInputs(operator, sideInputs, context);
+ addSideInputs(operator.sideInput1, sideInputs, context);
}
}
static void addSideInputs(
- ApexParDoOperator<?, ?> operator,
+ Operator.InputPort<?> sideInputPort,
List<PCollectionView<?>> sideInputs,
TranslationContext context) {
- Operator.InputPort<?>[] sideInputPorts = {operator.sideInput1};
+ Operator.InputPort<?>[] sideInputPorts = {sideInputPort};
if (sideInputs.size() > sideInputPorts.length) {
PCollection<?> unionCollection = unionSideInputs(sideInputs, context);
context.addStream(unionCollection, sideInputPorts[0]);
http://git-wip-us.apache.org/repos/asf/beam/blob/8484ef91/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
index fc49fc7..81507ef 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
@@ -31,9 +31,9 @@ import java.util.List;
import java.util.Map;
import org.apache.beam.runners.apex.ApexPipelineOptions;
import org.apache.beam.runners.apex.translation.utils.ApexStateInternals;
+import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend;
import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
import org.apache.beam.runners.apex.translation.utils.CoderAdapterStreamCodec;
-import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -89,16 +89,16 @@ class TranslationContext {
return getCurrentTransform().getInputs();
}
- public PValue getInput() {
- return Iterables.getOnlyElement(getCurrentTransform().getInputs()).getValue();
+ public <InputT extends PValue> InputT getInput() {
+ return (InputT) Iterables.getOnlyElement(getCurrentTransform().getInputs()).getValue();
}
public List<TaggedPValue> getOutputs() {
return getCurrentTransform().getOutputs();
}
- public PValue getOutput() {
- return Iterables.getOnlyElement(getCurrentTransform().getOutputs()).getValue();
+ public <OutputT extends PValue> OutputT getOutput() {
+ return (OutputT) Iterables.getOnlyElement(getCurrentTransform().getOutputs()).getValue();
}
private AppliedPTransform<?, ?, ?> getCurrentTransform() {
@@ -192,10 +192,10 @@ class TranslationContext {
}
/**
- * Return the {@link StateInternalsFactory} for the pipeline translation.
+ * Return the state backend for the pipeline translation.
* @return
*/
- public <K> StateInternalsFactory<K> stateInternalsFactory() {
- return new ApexStateInternals.ApexStateInternalsFactory();
+ public ApexStateBackend getStateBackend() {
+ return new ApexStateInternals.ApexStateBackend();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8484ef91/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowAssignTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowAssignTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowAssignTranslator.java
index 6106f75..f34f9ee 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowAssignTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowAssignTranslator.java
@@ -18,61 +18,35 @@
package org.apache.beam.runners.apex.translation;
-import java.util.Collections;
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
-import org.apache.beam.runners.core.AssignWindowsDoFn;
-import org.apache.beam.runners.core.DoFnAdapters;
-import org.apache.beam.runners.core.OldDoFn;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.runners.apex.translation.operators.ApexProcessFnOperator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
/**
- * {@link Window} is translated to {link ApexParDoOperator} that wraps an {@link
- * AssignWindowsDoFn}.
+ * {@link Window} is translated to {@link ApexProcessFnOperator#assignWindows}.
*/
class WindowAssignTranslator<T> implements TransformTranslator<Window.Assign<T>> {
private static final long serialVersionUID = 1L;
@Override
public void translate(Window.Assign<T> transform, TranslationContext context) {
- PCollection<T> output = (PCollection<T>) context.getOutput();
- PCollection<T> input = (PCollection<T>) context.getInput();
- @SuppressWarnings("unchecked")
- WindowingStrategy<T, BoundedWindow> windowingStrategy =
- (WindowingStrategy<T, BoundedWindow>) output.getWindowingStrategy();
+ PCollection<T> output = context.getOutput();
+ PCollection<T> input = context.getInput();
- OldDoFn<T, T> fn =
- (transform.getWindowFn() == null)
- ? DoFnAdapters.toOldDoFn(new IdentityFn<T>())
- : new AssignWindowsDoFn<>(transform.getWindowFn());
+ if (transform.getWindowFn() == null) {
+ // no work to do
+ context.addAlias(output, input);
+ } else {
+ @SuppressWarnings("unchecked")
+ WindowFn<T, BoundedWindow> windowFn = (WindowFn<T, BoundedWindow>) transform.getWindowFn();
+ ApexProcessFnOperator<T> operator = ApexProcessFnOperator.assignWindows(windowFn,
+ context.getPipelineOptions());
+ context.addOperator(operator, operator.outputPort);
+ context.addStream(context.getInput(), operator.inputPort);
+ }
- ApexParDoOperator<T, T> operator =
- new ApexParDoOperator<T, T>(
- context.getPipelineOptions().as(ApexPipelineOptions.class),
- fn,
- new TupleTag<T>(),
- TupleTagList.empty().getAll(),
- windowingStrategy,
- Collections.<PCollectionView<?>>emptyList(),
- WindowedValue.getFullCoder(
- input.getCoder(), windowingStrategy.getWindowFn().windowCoder()),
- context.<Void>stateInternalsFactory());
- context.addOperator(operator, operator.output);
- context.addStream(context.getInput(), operator.input);
}
- private static class IdentityFn<T> extends DoFn<T, T> {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(c.element());
- }
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8484ef91/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 3508c3e..4551c9c 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -39,6 +39,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend;
import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
@@ -95,7 +96,6 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
private final SerializablePipelineOptions serializedOptions;
@Bind(JavaSerializer.class)
private final StateInternalsFactory<K> stateInternalsFactory;
- private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new HashMap<>();
private Map<ByteBuffer, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>();
private transient ProcessContext context;
@@ -135,13 +135,13 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
@SuppressWarnings("unchecked")
public ApexGroupByKeyOperator(ApexPipelineOptions pipelineOptions, PCollection<KV<K, V>> input,
- StateInternalsFactory<K> stateInternalsFactory) {
+ ApexStateBackend stateBackend) {
checkNotNull(pipelineOptions);
this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
this.windowingStrategy = (WindowingStrategy<V, BoundedWindow>) input.getWindowingStrategy();
this.keyCoder = ((KvCoder<K, V>) input.getCoder()).getKeyCoder();
this.valueCoder = ((KvCoder<K, V>) input.getCoder()).getValueCoder();
- this.stateInternalsFactory = stateInternalsFactory;
+ this.stateInternalsFactory = stateBackend.newStateInternalsFactory(keyCoder);
}
@SuppressWarnings("unused") // for Kryo
@@ -222,18 +222,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
}
private StateInternals<K> getStateInternalsForKey(K key) {
- final ByteBuffer keyBytes;
- try {
- keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key));
- } catch (CoderException e) {
- throw new RuntimeException(e);
- }
- StateInternals<K> stateInternals = perKeyStateInternals.get(keyBytes);
- if (stateInternals == null) {
- stateInternals = stateInternalsFactory.stateInternalsForKey(key);
- perKeyStateInternals.put(keyBytes, stateInternals);
- }
- return stateInternals;
+ return stateInternalsFactory.stateInternalsForKey(key);
}
private void registerActiveTimer(K key, TimerInternals.TimerData timer) {
@@ -423,7 +412,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
* An implementation of Beam's {@link TimerInternals}.
*
*/
- public class ApexTimerInternals implements TimerInternals {
+ private class ApexTimerInternals implements TimerInternals {
@Deprecated
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/8484ef91/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index 7f2512a..1fc91c8 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -25,41 +25,52 @@ import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.common.util.BaseOperator;
import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
-import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.apex.ApexPipelineOptions;
import org.apache.beam.runners.apex.ApexRunner;
+import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend;
import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
import org.apache.beam.runners.apex.translation.utils.NoOpStepContext;
import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
+import org.apache.beam.runners.apex.translation.utils.StateInternalsProxy;
import org.apache.beam.runners.apex.translation.utils.ValueAndCoderKryoSerializable;
import org.apache.beam.runners.core.AggregatorFactory;
-import org.apache.beam.runners.core.DoFnAdapters;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.DoFnRunners.OutputManager;
import org.apache.beam.runners.core.ExecutionContext;
-import org.apache.beam.runners.core.OldDoFn;
+import org.apache.beam.runners.core.InMemoryTimerInternals;
+import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateInternalsFactory;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StatefulDoFnRunner;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.TimerInternalsFactory;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.util.NullSideInputReader;
import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,7 +84,7 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
@Bind(JavaSerializer.class)
private final SerializablePipelineOptions pipelineOptions;
@Bind(JavaSerializer.class)
- private final OldDoFn<InputT, OutputT> doFn;
+ private final DoFn<InputT, OutputT> doFn;
@Bind(JavaSerializer.class)
private final TupleTag<OutputT> mainOutputTag;
@Bind(JavaSerializer.class)
@@ -83,6 +94,12 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
@Bind(JavaSerializer.class)
private final List<PCollectionView<?>> sideInputs;
+ private StateInternalsProxy<?> currentKeyStateInternals;
+ // TODO: if the operator gets restored to checkpointed state due to a failure,
+ // the timer state is lost.
+ private final transient CurrentKeyTimerInternals<Object> currentKeyTimerInternals =
+ new CurrentKeyTimerInternals<>();
+
private final StateInternals<Void> sideInputStateInternals;
private final ValueAndCoderKryoSerializable<List<WindowedValue<InputT>>> pushedBack;
private LongMin pushedBackWatermark = new LongMin();
@@ -93,17 +110,17 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
private transient SideInputHandler sideInputHandler;
private transient Map<TupleTag<?>, DefaultOutputPort<ApexStreamTuple<?>>> sideOutputPortMapping =
Maps.newHashMapWithExpectedSize(5);
+ private transient DoFnInvoker<InputT, OutputT> doFnInvoker;
- @Deprecated
public ApexParDoOperator(
ApexPipelineOptions pipelineOptions,
- OldDoFn<InputT, OutputT> doFn,
+ DoFn<InputT, OutputT> doFn,
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> sideOutputTags,
WindowingStrategy<?, ?> windowingStrategy,
List<PCollectionView<?>> sideInputs,
Coder<WindowedValue<InputT>> inputCoder,
- StateInternalsFactory<Void> stateInternalsFactory
+ ApexStateBackend stateBackend
) {
this.pipelineOptions = new SerializablePipelineOptions(pipelineOptions);
this.doFn = doFn;
@@ -111,7 +128,8 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
this.sideOutputTags = sideOutputTags;
this.windowingStrategy = windowingStrategy;
this.sideInputs = sideInputs;
- this.sideInputStateInternals = stateInternalsFactory.stateInternalsForKey(null);
+ this.sideInputStateInternals = new StateInternalsProxy<>(
+ stateBackend.newStateInternalsFactory(VoidCoder.of()));
if (sideOutputTags.size() > sideOutputPorts.length) {
String msg = String.format("Too many side outputs (currently only supporting %s).",
@@ -125,27 +143,6 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
}
- public ApexParDoOperator(
- ApexPipelineOptions pipelineOptions,
- DoFn<InputT, OutputT> doFn,
- TupleTag<OutputT> mainOutputTag,
- List<TupleTag<?>> sideOutputTags,
- WindowingStrategy<?, ?> windowingStrategy,
- List<PCollectionView<?>> sideInputs,
- Coder<WindowedValue<InputT>> inputCoder,
- StateInternalsFactory<Void> stateInternalsFactory
- ) {
- this(
- pipelineOptions,
- DoFnAdapters.toOldDoFn(doFn),
- mainOutputTag,
- sideOutputTags,
- windowingStrategy,
- sideInputs,
- inputCoder,
- stateInternalsFactory);
- }
-
@SuppressWarnings("unused") // for Kryo
private ApexParDoOperator() {
this.pipelineOptions = null;
@@ -255,6 +252,17 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
private Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem) {
try {
pushbackDoFnRunner.startBundle();
+ if (currentKeyStateInternals != null) {
+ InputT value = elem.getValue();
+ Object key;
+ if (value instanceof KeyedWorkItem) {
+ key = ((KeyedWorkItem) value).key();
+ } else {
+ key = ((KV) value).getKey();
+ }
+ ((StateInternalsProxy) currentKeyStateInternals).setKey(key);
+ currentKeyTimerInternals.currentKey = key;
+ }
Iterable<WindowedValue<InputT>> pushedBack = pushbackDoFnRunner
.processElementInReadyWindows(elem);
pushbackDoFnRunner.finishBundle();
@@ -305,6 +313,19 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
sideOutputPortMapping.put(sideOutputTags.get(i), port);
}
+ NoOpStepContext stepContext = new NoOpStepContext() {
+
+ @Override
+ public StateInternals<?> stateInternals() {
+ return currentKeyStateInternals;
+ }
+
+ @Override
+ public TimerInternals timerInternals() {
+ return currentKeyTimerInternals;
+ }
+
+ };
DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.simpleRunner(
pipelineOptions.get(),
doFn,
@@ -312,21 +333,47 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
this,
mainOutputTag,
sideOutputTags,
- new NoOpStepContext(),
+ stepContext,
new NoOpAggregatorFactory(),
windowingStrategy
);
+ doFnInvoker = DoFnInvokers.invokerFor(doFn);
+ doFnInvoker.invokeSetup();
+
+ if (this.currentKeyStateInternals != null) {
+
+ StatefulDoFnRunner.CleanupTimer cleanupTimer =
+ new StatefulDoFnRunner.TimeInternalsCleanupTimer(
+ stepContext.timerInternals(), windowingStrategy);
+
+ @SuppressWarnings({"rawtypes"})
+ Coder windowCoder = windowingStrategy.getWindowFn().windowCoder();
+
+ @SuppressWarnings({"unchecked"})
+ StatefulDoFnRunner.StateCleaner<?> stateCleaner =
+ new StatefulDoFnRunner.StateInternalsStateCleaner<>(
+ doFn, stepContext.stateInternals(), windowCoder);
+
+ doFnRunner = DoFnRunners.defaultStatefulDoFnRunner(
+ doFn,
+ doFnRunner,
+ stepContext,
+ new NoOpAggregatorFactory(),
+ windowingStrategy,
+ cleanupTimer,
+ stateCleaner);
+ }
+
pushbackDoFnRunner =
PushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler);
- try {
- doFn.setup();
- } catch (Exception e) {
- Throwables.propagateIfPossible(e);
- throw new RuntimeException(e);
- }
+ }
+ @Override
+ public void teardown() {
+ doFnInvoker.invokeTeardown();
+ super.teardown();
}
@Override
@@ -393,4 +440,70 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
}
+ private class CurrentKeyTimerInternals<K> implements TimerInternals {
+
+ private TimerInternalsFactory<K> factory = new TimerInternalsFactory<K>() {
+ @Override
+ public TimerInternals timerInternalsForKey(K key) {
+ InMemoryTimerInternals timerInternals = perKeyTimerInternals.get(key);
+ if (timerInternals == null) {
+ perKeyTimerInternals.put(key, timerInternals = new InMemoryTimerInternals());
+ }
+ return timerInternals;
+ }
+ };
+
+ // TODO: durable state store
+ final Map<K, InMemoryTimerInternals> perKeyTimerInternals = new HashMap<>();
+ private K currentKey;
+
+ @Override
+ public void setTimer(StateNamespace namespace, String timerId, Instant target,
+ TimeDomain timeDomain) {
+ factory.timerInternalsForKey(currentKey).setTimer(
+ namespace, timerId, target, timeDomain);
+ }
+
+ @Override
+ public void setTimer(TimerData timerData) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void deleteTimer(StateNamespace namespace, String timerId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void deleteTimer(TimerData timerKey) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Instant currentProcessingTime() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Instant currentSynchronizedProcessingTime() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Instant currentInputWatermarkTime() {
+ return new Instant(currentInputWatermark);
+ }
+
+ @Override
+ public Instant currentOutputWatermarkTime() {
+ throw new UnsupportedOperationException();
+ }
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8484ef91/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexProcessFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexProcessFnOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexProcessFnOperator.java
new file mode 100644
index 0000000..835c9e0
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexProcessFnOperator.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translation.operators;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItems;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Apex operator for simple native map operations.
+ */
+public class ApexProcessFnOperator<InputT> extends BaseOperator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ApexProcessFnOperator.class);
+ private boolean traceTuples = false;
+ @Bind(JavaSerializer.class)
+ private final ApexOperatorFn<InputT> fn;
+
+ public ApexProcessFnOperator(ApexOperatorFn<InputT> fn, boolean traceTuples) {
+ super();
+ this.traceTuples = traceTuples;
+ this.fn = fn;
+ }
+
+ @SuppressWarnings("unused")
+ private ApexProcessFnOperator() {
+ // for Kryo
+ fn = null;
+ }
+
+ private final transient OutputEmitter<ApexStreamTuple<? extends WindowedValue<?>>> outputEmitter =
+ new OutputEmitter<ApexStreamTuple<? extends WindowedValue<?>>>() {
+ @Override
+ public void emit(ApexStreamTuple<? extends WindowedValue<?>> tuple) {
+ if (traceTuples) {
+ LOG.debug("\nemitting {}\n", tuple);
+ }
+ outputPort.emit(tuple);
+ }
+ };
+
+ /**
+ * Something that emits results.
+ */
+ public interface OutputEmitter<T> {
+ void emit(T tuple);
+ };
+
+ /**
+ * The processing logic for this operator.
+ */
+ public interface ApexOperatorFn<InputT> extends Serializable {
+ void process(ApexStreamTuple<WindowedValue<InputT>> input,
+ OutputEmitter<ApexStreamTuple<? extends WindowedValue<?>>> outputEmitter) throws Exception;
+ }
+
+ /**
+ * Convert {@link KV} into {@link KeyedWorkItem}s.
+ */
+ public static class ToKeyedWorkItems<K, V> implements ApexOperatorFn<KV<K, V>> {
+ @Override
+ public final void process(ApexStreamTuple<WindowedValue<KV<K, V>>> tuple,
+ OutputEmitter<ApexStreamTuple<? extends WindowedValue<?>>> outputEmitter) {
+
+ if (tuple instanceof ApexStreamTuple.WatermarkTuple) {
+ outputEmitter.emit(tuple);
+ } else {
+ for (WindowedValue<KV<K, V>> in : tuple.getValue().explodeWindows()) {
+ KeyedWorkItem<K, V> kwi = KeyedWorkItems.elementsWorkItem(in.getValue().getKey(),
+ Collections.singletonList(in.withValue(in.getValue().getValue())));
+ outputEmitter.emit(ApexStreamTuple.DataTuple.of(in.withValue(kwi)));
+ }
+ }
+ }
+ }
+
+ public static <T, W extends BoundedWindow> ApexProcessFnOperator<T> assignWindows(
+ WindowFn<T, W> windowFn, ApexPipelineOptions options) {
+ ApexOperatorFn<T> fn = new AssignWindows<T, W>(windowFn);
+ return new ApexProcessFnOperator<T>(fn, options.isTupleTracingEnabled());
+ }
+
+ /**
+ * Function for implementing {@link org.apache.beam.sdk.transforms.windowing.Window.Assign}.
+ */
+ private static class AssignWindows<T, W extends BoundedWindow> implements ApexOperatorFn<T> {
+ private final WindowFn<T, W> windowFn;
+
+ private AssignWindows(WindowFn<T, W> windowFn) {
+ this.windowFn = windowFn;
+ }
+
+ @Override
+ public final void process(ApexStreamTuple<WindowedValue<T>> tuple,
+ OutputEmitter<ApexStreamTuple<? extends WindowedValue<?>>> outputEmitter) throws Exception {
+ if (tuple instanceof ApexStreamTuple.WatermarkTuple) {
+ outputEmitter.emit(tuple);
+ } else {
+ final WindowedValue<T> input = tuple.getValue();
+ Collection<W> windows =
+ (windowFn).assignWindows(
+ (windowFn).new AssignContext() {
+ @Override
+ public T element() {
+ return input.getValue();
+ }
+
+ @Override
+ public Instant timestamp() {
+ return input.getTimestamp();
+ }
+
+ @Override
+ public BoundedWindow window() {
+ return Iterables.getOnlyElement(input.getWindows());
+ }
+ });
+ for (W w: windows) {
+ WindowedValue<T> wv = WindowedValue.of(input.getValue(), input.getTimestamp(),
+ w, input.getPane());
+ outputEmitter.emit(ApexStreamTuple.DataTuple.of(wv));
+ }
+ }
+ }
+ }
+
+ /**
+ * Input port.
+ */
+ public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> inputPort =
+ new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() {
+ @Override
+ public void process(ApexStreamTuple<WindowedValue<InputT>> tuple) {
+ try {
+ fn.process(tuple, outputEmitter);
+ } catch (Exception e) {
+ Throwables.throwIfUnchecked(e);
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ /**
+ * Output port.
+ */
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<ApexStreamTuple<? extends WindowedValue<?>>>
+ outputPort = new DefaultOutputPort<>();
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8484ef91/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
index c59afc5..cfc57cd 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.apex.translation.utils;
+import com.datatorrent.netlet.util.Slice;
import com.esotericsoftware.kryo.DefaultSerializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
@@ -27,7 +28,9 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.StateNamespace;
@@ -35,6 +38,7 @@ import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTag.StateBinder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.Context;
+import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
@@ -42,6 +46,7 @@ import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
@@ -56,22 +61,18 @@ import org.apache.beam.sdk.util.state.WatermarkHoldState;
import org.joda.time.Instant;
/**
- * Implementation of {@link StateInternals} that can be serialized and
- * checkpointed with the operator. Suitable for small states, in the future this
- * should be based on the incremental state saving components in the Apex
- * library.
+ * Implementation of {@link StateInternals} for transient use.
+ *
+ * <p>For fields that need to be serialized, use {@link ApexStateInternalsFactory}
+ * or {@link StateInternalsProxy}
*/
-@DefaultSerializer(JavaSerializer.class)
-public class ApexStateInternals<K> implements StateInternals<K>, Serializable {
- private static final long serialVersionUID = 1L;
- public static <K> ApexStateInternals<K> forKey(K key) {
- return new ApexStateInternals<>(key);
- }
-
+public class ApexStateInternals<K> implements StateInternals<K> {
private final K key;
+ private final Table<String, String, byte[]> stateTable;
- protected ApexStateInternals(K key) {
+ protected ApexStateInternals(K key, Table<String, String, byte[]> stateTable) {
this.key = key;
+ this.stateTable = stateTable;
}
@Override
@@ -79,11 +80,6 @@ public class ApexStateInternals<K> implements StateInternals<K>, Serializable {
return key;
}
- /**
- * Serializable state for internals (namespace to state tag to coded value).
- */
- private final Table<String, String, byte[]> stateTable = HashBasedTable.create();
-
@Override
public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address) {
return state(namespace, address, StateContexts.nullContext());
@@ -437,17 +433,54 @@ public class ApexStateInternals<K> implements StateInternals<K>, Serializable {
}
/**
- * Factory for {@link ApexStateInternals}.
+ * Implementation of {@link StateInternals} that can be serialized and
+ * checkpointed with the operator. Suitable for small states, in the future this
+ * should be based on the incremental state saving components in the Apex
+ * library.
*
* @param <K> key type
*/
+ @DefaultSerializer(JavaSerializer.class)
public static class ApexStateInternalsFactory<K>
implements StateInternalsFactory<K>, Serializable {
private static final long serialVersionUID = 1L;
+ /**
+ * Serializable state for internals (namespace to state tag to coded value).
+ */
+ private Map<Slice, Table<String, String, byte[]>> perKeyState = new HashMap<>();
+ private final Coder<K> keyCoder;
+
+ private ApexStateInternalsFactory(Coder<K> keyCoder) {
+ this.keyCoder = keyCoder;
+ }
@Override
- public StateInternals<K> stateInternalsForKey(K key) {
- return ApexStateInternals.forKey(key);
+ public ApexStateInternals<K> stateInternalsForKey(K key) {
+ final Slice keyBytes;
+ try {
+ keyBytes = (key != null) ? new Slice(CoderUtils.encodeToByteArray(keyCoder, key)) :
+ new Slice(null);
+ } catch (CoderException e) {
+ throw new RuntimeException(e);
+ }
+ Table<String, String, byte[]> stateTable = perKeyState.get(keyBytes);
+ if (stateTable == null) {
+ stateTable = HashBasedTable.create();
+ perKeyState.put(keyBytes, stateTable);
+ }
+ return new ApexStateInternals<>(key, stateTable);
+ }
+
+ }
+
+ /**
+ * Factory to create the state internals.
+ */
+ public static class ApexStateBackend implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public <K> ApexStateInternalsFactory<K> newStateInternalsFactory(Coder<K> keyCoder) {
+ return new ApexStateInternalsFactory<K>(keyCoder);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8484ef91/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java
new file mode 100644
index 0000000..1f28364
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translation.utils;
+
+import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import java.io.Serializable;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateInternalsFactory;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.util.state.StateContext;
+
+/**
+ * State internals for reusable processing context.
+ * @param <K>
+ */
+@DefaultSerializer(JavaSerializer.class)
+public class StateInternalsProxy<K> implements StateInternals<K>, Serializable {
+
+ private final StateInternalsFactory<K> factory;
+ private transient K currentKey;
+
+ public StateInternalsProxy(ApexStateInternals.ApexStateInternalsFactory<K> factory) {
+ this.factory = factory;
+ }
+
+ public StateInternalsFactory<K> getFactory() {
+ return this.factory;
+ }
+
+ public void setKey(K key) {
+ currentKey = key;
+ }
+
+ @Override
+ public K getKey() {
+ return currentKey;
+ }
+
+ @Override
+ public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address) {
+ return factory.stateInternalsForKey(currentKey).state(namespace, address);
+ }
+
+ @Override
+ public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address,
+ StateContext<?> c) {
+ return factory.stateInternalsForKey(currentKey).state(namespace, address, c);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/8484ef91/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java
index fb80d0c..4b73114 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java
@@ -66,7 +66,7 @@ public class ApexGroupByKeyOperatorTest {
input.setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
ApexGroupByKeyOperator<String, Integer> operator = new ApexGroupByKeyOperator<>(options,
- input, new ApexStateInternals.ApexStateInternalsFactory<String>()
+ input, new ApexStateInternals.ApexStateBackend()
);
operator.setup(null);
http://git-wip-us.apache.org/repos/asf/beam/blob/8484ef91/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java
index 3bcba00..2760d06 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java
@@ -216,7 +216,7 @@ public class ParDoTranslatorTest {
WindowingStrategy.globalDefault(),
Collections.<PCollectionView<?>>singletonList(singletonView),
coder,
- new ApexStateInternals.ApexStateInternalsFactory<Void>());
+ new ApexStateInternals.ApexStateBackend());
operator.setup(null);
operator.beginWindow(0);
WindowedValue<Integer> wv1 = WindowedValue.valueInGlobalWindow(1);
http://git-wip-us.apache.org/repos/asf/beam/blob/8484ef91/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
index 4f4ecfb..7160e45 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
@@ -24,6 +24,8 @@ import static org.junit.Assert.assertThat;
import com.datatorrent.lib.util.KryoCloneUtils;
import java.util.Arrays;
+import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend;
+import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateInternalsFactory;
import org.apache.beam.runners.core.StateMerging;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaceForTest;
@@ -76,7 +78,9 @@ public class ApexStateInternalsTest {
@Before
public void initStateInternals() {
- underTest = new ApexStateInternals<>(null);
+ underTest = new ApexStateInternals.ApexStateBackend()
+ .newStateInternalsFactory(StringUtf8Coder.of())
+ .stateInternalsForKey((String) null);
}
@Test
@@ -344,16 +348,21 @@ public class ApexStateInternalsTest {
@Test
public void testSerialization() throws Exception {
- ApexStateInternals<String> original = new ApexStateInternals<String>(null);
- ValueState<String> value = original.state(NAMESPACE_1, STRING_VALUE_ADDR);
- assertEquals(original.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
+ ApexStateInternalsFactory<String> sif = new ApexStateBackend().
+ newStateInternalsFactory(StringUtf8Coder.of());
+ ApexStateInternals<String> keyAndState = sif.stateInternalsForKey("dummy");
+
+ ValueState<String> value = keyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR);
+ assertEquals(keyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
value.write("hello");
- ApexStateInternals<String> cloned;
- assertNotNull("Serialization", cloned = KryoCloneUtils.cloneObject(original));
- ValueState<String> clonedValue = cloned.state(NAMESPACE_1, STRING_VALUE_ADDR);
+ ApexStateInternalsFactory<String> cloned;
+ assertNotNull("Serialization", cloned = KryoCloneUtils.cloneObject(sif));
+ ApexStateInternals<String> clonedKeyAndState = cloned.stateInternalsForKey("dummy");
+
+ ValueState<String> clonedValue = clonedKeyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR);
assertThat(clonedValue.read(), Matchers.equalTo("hello"));
- assertEquals(cloned.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
+ assertEquals(clonedKeyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8484ef91/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
index d95ed7c..fe96eb1 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
@@ -36,7 +36,7 @@ import org.apache.beam.sdk.util.PropertyNames;
import org.apache.beam.sdk.util.WindowedValue;
/**
- * Singleton keyed word iteam coder.
+ * Singleton keyed work item coder.
* @param <K>
* @param <ElemT>
*/
[2/2] beam git commit: This closes #2450
Posted by jk...@apache.org.
This closes #2450
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7f55746c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7f55746c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7f55746c
Branch: refs/heads/master
Commit: 7f55746ce3eb2a0b6ee2f6372d6f7127835cbb34
Parents: 1594849 8484ef9
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 7 18:26:15 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Apr 7 18:26:15 2017 -0700
----------------------------------------------------------------------
.../apache/beam/runners/apex/ApexRunner.java | 2 -
.../apex/translation/GroupByKeyTranslator.java | 4 +-
.../apex/translation/ParDoTranslator.java | 15 +-
.../apex/translation/TranslationContext.java | 16 +-
.../translation/WindowAssignTranslator.java | 58 ++----
.../operators/ApexGroupByKeyOperator.java | 21 +--
.../operators/ApexParDoOperator.java | 187 +++++++++++++++----
.../operators/ApexProcessFnOperator.java | 184 ++++++++++++++++++
.../translation/utils/ApexStateInternals.java | 73 ++++++--
.../translation/utils/StateInternalsProxy.java | 67 +++++++
.../translation/ApexGroupByKeyOperatorTest.java | 2 +-
.../apex/translation/ParDoTranslatorTest.java | 2 +-
.../utils/ApexStateInternalsTest.java | 25 ++-
.../streaming/SingletonKeyedWorkItemCoder.java | 2 +-
14 files changed, 512 insertions(+), 146 deletions(-)
----------------------------------------------------------------------