You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/07/25 19:11:45 UTC
[1/4] incubator-beam git commit: Port runners to use
GroupAlsoByWindows via StateInternalsFactory
Repository: incubator-beam
Updated Branches:
refs/heads/master cf1464465 -> 7809f6bd2
Port runners to use GroupAlsoByWindows via StateInternalsFactory
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/902997d0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/902997d0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/902997d0
Branch: refs/heads/master
Commit: 902997d040023c83d23e57362bdfb2d62c53d142
Parents: d2594e0
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Jul 18 14:33:58 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jul 25 09:30:32 2016 -0700
----------------------------------------------------------------------
.../core/GroupAlsoByWindowViaWindowSetDoFn.java | 16 ++--
.../beam/sdk/util/GroupAlsoByWindowsDoFn.java | 10 ++-
.../GroupAlsoByWindowsViaOutputBufferDoFn.java | 10 +--
.../sdk/util/GroupByKeyViaGroupByKeyOnly.java | 79 +++++++++++++++++---
.../sdk/util/GroupAlsoByWindowsProperties.java | 50 +++++++++++--
...oupAlsoByWindowsViaOutputBufferDoFnTest.java | 8 +-
.../GroupAlsoByWindowEvaluatorFactory.java | 30 ++++++++
.../beam/runners/direct/ParDoEvaluator.java | 4 +-
.../direct/ParDoMultiEvaluatorFactory.java | 6 ++
.../direct/ParDoSingleEvaluatorFactory.java | 6 ++
.../beam/runners/direct/ParDoEvaluatorTest.java | 1 +
.../FlinkGroupAlsoByWindowWrapper.java | 31 +++++---
.../spark/translation/TransformTranslator.java | 70 ++++++++++++++++-
.../apache/beam/sdk/transforms/DoFnTester.java | 41 +++++++++-
14 files changed, 309 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index 73244f7..0d320bc 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -31,6 +31,7 @@ import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.TimerInternals.TimerData;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
import org.apache.beam.sdk.values.KV;
/**
@@ -44,8 +45,10 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
public static <K, InputT, OutputT, W extends BoundedWindow>
DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> create(
- WindowingStrategy<?, W> strategy, SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) {
- return new GroupAlsoByWindowViaWindowSetDoFn<>(strategy, reduceFn);
+ WindowingStrategy<?, W> strategy,
+ StateInternalsFactory<K> stateInternalsFactory,
+ SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) {
+ return new GroupAlsoByWindowViaWindowSetDoFn<>(strategy, stateInternalsFactory, reduceFn);
}
protected final Aggregator<Long, Long> droppedDueToClosedWindow =
@@ -55,15 +58,18 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, new Sum.SumLongFn());
private final WindowingStrategy<Object, W> windowingStrategy;
+ private final StateInternalsFactory<K> stateInternalsFactory;
private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn;
private GroupAlsoByWindowViaWindowSetDoFn(
WindowingStrategy<?, W> windowingStrategy,
+ StateInternalsFactory<K> stateInternalsFactory,
SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) {
@SuppressWarnings("unchecked")
WindowingStrategy<Object, W> noWildcard = (WindowingStrategy<Object, W>) windowingStrategy;
this.windowingStrategy = noWildcard;
this.reduceFn = reduceFn;
+ this.stateInternalsFactory = stateInternalsFactory;
}
@Override
@@ -72,11 +78,7 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
K key = c.element().key();
TimerInternals timerInternals = c.windowingInternals().timerInternals();
-
- // It is the responsibility of the user of GroupAlsoByWindowsViaWindowSet to only
- // provide a WindowingInternals instance with the appropriate key type for StateInternals.
- @SuppressWarnings("unchecked")
- StateInternals<K> stateInternals = (StateInternals<K>) c.windowingInternals().stateInternals();
+ StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key);
ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner =
new ReduceFnRunner<>(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java
index f5de0bc..b575559 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java
@@ -22,6 +22,7 @@ import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
import org.apache.beam.sdk.values.KV;
/**
@@ -51,9 +52,12 @@ public abstract class GroupAlsoByWindowsDoFn<K, InputT, OutputT, W extends Bound
* @param windowingStrategy The window function and trigger to use for grouping
* @param inputCoder the input coder to use
*/
- public static <K, V, W extends BoundedWindow> GroupAlsoByWindowsDoFn<K, V, Iterable<V>, W>
- createDefault(WindowingStrategy<?, W> windowingStrategy, Coder<V> inputCoder) {
+ public static <K, V, W extends BoundedWindow>
+ GroupAlsoByWindowsDoFn<K, V, Iterable<V>, W> createDefault(
+ WindowingStrategy<?, W> windowingStrategy,
+ StateInternalsFactory<K> stateInternalsFactory,
+ Coder<V> inputCoder) {
return new GroupAlsoByWindowsViaOutputBufferDoFn<>(
- windowingStrategy, SystemReduceFn.<K, V, W>buffering(inputCoder));
+ windowingStrategy, stateInternalsFactory, SystemReduceFn.<K, V, W>buffering(inputCoder));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java
index d364168..d185a24 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.util;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
import org.apache.beam.sdk.values.KV;
import com.google.common.collect.Iterables;
@@ -37,13 +38,16 @@ public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends
extends GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> {
private final WindowingStrategy<?, W> strategy;
+ private final StateInternalsFactory<K> stateInternalsFactory;
private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn;
public GroupAlsoByWindowsViaOutputBufferDoFn(
WindowingStrategy<?, W> windowingStrategy,
+ StateInternalsFactory<K> stateInternalsFactory,
SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) {
this.strategy = windowingStrategy;
this.reduceFn = reduceFn;
+ this.stateInternalsFactory = stateInternalsFactory;
}
@Override
@@ -55,11 +59,7 @@ public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends
// timer manager from the context because it doesn't exist. So we create one and emulate the
// watermark, knowing that we have all data and it is in timestamp order.
BatchTimerInternals timerInternals = new BatchTimerInternals(Instant.now());
-
- // It is the responsibility of the user of GroupAlsoByWindowsViaOutputBufferDoFn to only
- // provide a WindowingInternals instance with the appropriate key type for StateInternals.
- @SuppressWarnings("unchecked")
- StateInternals<K> stateInternals = (StateInternals<K>) c.windowingInternals().stateInternals();
+ StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key);
ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner =
new ReduceFnRunner<K, InputT, OutputT, W>(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
index 9450495..8a0152e 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.util;
+import static com.google.common.base.Preconditions.checkArgument;
+
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
@@ -26,6 +28,7 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -68,7 +71,7 @@ import java.util.List;
public class GroupByKeyViaGroupByKeyOnly<K, V>
extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
- private GroupByKey<K, V> gbkTransform;
+ private final GroupByKey<K, V> gbkTransform;
public GroupByKeyViaGroupByKeyOnly(GroupByKey<K, V> originalTransform) {
this.gbkTransform = originalTransform;
@@ -161,13 +164,12 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
}
/**
- * Helper transform that takes a collection of timestamp-ordered
- * values associated with each key, groups the values by window,
- * combines windows as needed, and for each window in each key,
- * outputs a collection of key/value-list pairs implicitly assigned
- * to the window and with the timestamp derived from that window.
+ * Runner-specific primitive that takes a collection of timestamp-ordered values associated with
+ * each key, groups the values by window, merges windows as needed, and for each window in each
+ * key, outputs a collection of key/value-list pairs implicitly assigned to the window and with
+ * the timestamp derived from that window.
*/
- private static class GroupAlsoByWindow<K, V>
+ public static class GroupAlsoByWindow<K, V>
extends PTransform<
PCollection<KV<K, Iterable<WindowedValue<V>>>>, PCollection<KV<K, Iterable<V>>>> {
private final WindowingStrategy<?, ?> windowingStrategy;
@@ -176,8 +178,57 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
this.windowingStrategy = windowingStrategy;
}
+ public WindowingStrategy<?, ?> getWindowingStrategy() {
+ return windowingStrategy;
+ }
+
+ private KvCoder<K, Iterable<WindowedValue<V>>> getKvCoder(
+ Coder<KV<K, Iterable<WindowedValue<V>>>> inputCoder) {
+ // Coder<KV<...>> --> KvCoder<...>
+ checkArgument(inputCoder instanceof KvCoder,
+ "%s requires a %s<...> but got %s",
+ getClass().getSimpleName(),
+ KvCoder.class.getSimpleName(),
+ inputCoder);
+ @SuppressWarnings("unchecked")
+ KvCoder<K, Iterable<WindowedValue<V>>> kvCoder =
+ (KvCoder<K, Iterable<WindowedValue<V>>>) inputCoder;
+ return kvCoder;
+ }
+
+ public Coder<K> getKeyCoder(Coder<KV<K, Iterable<WindowedValue<V>>>> inputCoder) {
+ return getKvCoder(inputCoder).getKeyCoder();
+ }
+
+ public Coder<V> getValueCoder(Coder<KV<K, Iterable<WindowedValue<V>>>> inputCoder) {
+ // Coder<Iterable<...>> --> IterableCoder<...>
+ Coder<Iterable<WindowedValue<V>>> iterableWindowedValueCoder =
+ getKvCoder(inputCoder).getValueCoder();
+ checkArgument(iterableWindowedValueCoder instanceof IterableCoder,
+ "%s requires a %s<..., %s> but got a %s",
+ getClass().getSimpleName(),
+ KvCoder.class.getSimpleName(),
+ IterableCoder.class.getSimpleName(),
+ iterableWindowedValueCoder);
+ IterableCoder<WindowedValue<V>> iterableCoder =
+ (IterableCoder<WindowedValue<V>>) iterableWindowedValueCoder;
+
+ // Coder<WindowedValue<...>> --> WindowedValueCoder<...>
+ Coder<WindowedValue<V>> iterableElementCoder = iterableCoder.getElemCoder();
+ checkArgument(iterableElementCoder instanceof WindowedValueCoder,
+ "%s requires a %s<..., %s<%s>> but got a %s",
+ getClass().getSimpleName(),
+ KvCoder.class.getSimpleName(),
+ IterableCoder.class.getSimpleName(),
+ WindowedValueCoder.class.getSimpleName(),
+ iterableElementCoder);
+ WindowedValueCoder<V> windowedValueCoder =
+ (WindowedValueCoder<V>) iterableElementCoder;
+
+ return windowedValueCoder.getValueCoder();
+ }
+
@Override
- @SuppressWarnings("unchecked")
public PCollection<KV<K, Iterable<V>>> apply(
PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
@SuppressWarnings("unchecked")
@@ -197,16 +248,20 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
Coder<Iterable<V>> outputValueCoder = IterableCoder.of(inputIterableElementValueCoder);
Coder<KV<K, Iterable<V>>> outputKvCoder = KvCoder.of(keyCoder, outputValueCoder);
- return input
- .apply(ParDo.of(groupAlsoByWindowsFn(windowingStrategy, inputIterableElementValueCoder)))
+ return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
+ input.getPipeline(), windowingStrategy, input.isBounded())
.setCoder(outputKvCoder);
}
private <W extends BoundedWindow>
GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W> groupAlsoByWindowsFn(
- WindowingStrategy<?, W> strategy, Coder<V> inputIterableElementValueCoder) {
+ WindowingStrategy<?, W> strategy,
+ StateInternalsFactory<K> stateInternalsFactory,
+ Coder<V> inputIterableElementValueCoder) {
return new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>(
- strategy, SystemReduceFn.<K, V, W>buffering(inputIterableElementValueCoder));
+ strategy,
+ stateInternalsFactory,
+ SystemReduceFn.<K, V, W>buffering(inputIterableElementValueCoder));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java
index fe2a495..43c287e 100644
--- a/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java
+++ b/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertThat;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.DoFnTester;
+import org.apache.beam.sdk.transforms.DoFnTester.CloningBehavior;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -32,10 +33,15 @@ import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.util.state.InMemoryStateInternals;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TimestampedValue;
-import org.apache.beam.sdk.values.TupleTag;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
@@ -61,7 +67,7 @@ public class GroupAlsoByWindowsProperties {
*/
public interface GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> {
<W extends BoundedWindow> GroupAlsoByWindowsDoFn<K, InputT, OutputT, W>
- forStrategy(WindowingStrategy<?, W> strategy);
+ forStrategy(WindowingStrategy<?, W> strategy, StateInternalsFactory<K> stateInternalsFactory);
}
/**
@@ -77,10 +83,15 @@ public class GroupAlsoByWindowsProperties {
WindowingStrategy<?, IntervalWindow> windowingStrategy =
WindowingStrategy.of(FixedWindows.of(Duration.millis(10)));
+ // This key should never actually be used, though it is eagerly passed to the
+ // StateInternalsFactory so must be non-null
+ @SuppressWarnings("unchecked")
+ K fakeKey = (K) "this key should never be used";
+
DoFnTester<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> result = runGABW(
gabwFactory,
windowingStrategy,
- (K) null, // key should never be used
+ fakeKey,
Collections.<WindowedValue<InputT>>emptyList());
assertThat(result.peekOutputElements(), hasSize(0));
@@ -599,11 +610,14 @@ public class GroupAlsoByWindowsProperties {
K key,
Collection<WindowedValue<InputT>> values) throws Exception {
- TupleTag<KV<K, OutputT>> outputTag = new TupleTag<>();
- DoFnRunnerBase.ListOutputManager outputManager = new DoFnRunnerBase.ListOutputManager();
+ final StateInternalsFactory<K> stateInternalsCache = new CachingStateInternalsFactory<K>();
DoFnTester<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> tester =
- DoFnTester.of(gabwFactory.forStrategy(windowingStrategy));
+ DoFnTester.of(gabwFactory.forStrategy(windowingStrategy, stateInternalsCache));
+
+ // Though we use a DoFnTester, the function itself is instantiated directly by the
+ // runner and should not be serialized; it may not even be serializable.
+ tester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
tester.startBundle();
tester.processElement(KV.<K, Iterable<WindowedValue<InputT>>>of(key, values));
tester.finishBundle();
@@ -620,4 +634,28 @@ public class GroupAlsoByWindowsProperties {
return new IntervalWindow(new Instant(start), new Instant(end));
}
+ private static final class CachingStateInternalsFactory<K> implements StateInternalsFactory<K> {
+ private final LoadingCache<K, StateInternals<K>> stateInternalsCache;
+
+ private CachingStateInternalsFactory() {
+ this.stateInternalsCache = CacheBuilder.newBuilder().build(new StateInternalsLoader<K>());
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public StateInternals<K> stateInternalsForKey(K key) {
+ try {
+ return stateInternalsCache.get(key);
+ } catch (Exception exc) {
+ throw new RuntimeException(exc);
+ }
+ }
+ }
+
+ private static class StateInternalsLoader<K> extends CacheLoader<K, StateInternals<K>> {
+ @Override
+ public StateInternals<K> load(K key) throws Exception {
+ return InMemoryStateInternals.forKey(key);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java b/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java
index 4ac6164..1f02a8f 100644
--- a/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java
@@ -21,6 +21,7 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.GroupAlsoByWindowsProperties.GroupAlsoByWindowsDoFnFactory;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -42,10 +43,13 @@ public class GroupAlsoByWindowsViaOutputBufferDoFnTest {
}
@Override
- public <W extends BoundedWindow> GroupAlsoByWindowsDoFn<K, InputT, Iterable<InputT>, W>
- forStrategy(WindowingStrategy<?, W> windowingStrategy) {
+ public <W extends BoundedWindow>
+ GroupAlsoByWindowsDoFn<K, InputT, Iterable<InputT>, W> forStrategy(
+ WindowingStrategy<?, W> windowingStrategy,
+ StateInternalsFactory<K> stateInternalsFactory) {
return new GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, Iterable<InputT>, W>(
windowingStrategy,
+ stateInternalsFactory,
SystemReduceFn.<K, InputT, W>buffering(inputCoder));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index 9782ab1..5d3ab3f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.direct;
import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
+import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.coders.Coder;
@@ -26,11 +27,14 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly;
+import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
import org.apache.beam.sdk.util.KeyedWorkItem;
import org.apache.beam.sdk.util.SystemReduceFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
@@ -94,9 +98,19 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
WindowingStrategy<?, BoundedWindow> windowingStrategy =
(WindowingStrategy<?, BoundedWindow>) application.getTransform().getWindowingStrategy();
+ DirectStepContext stepContext =
+ evaluationContext
+ .getExecutionContext(application, inputBundle.getKey())
+ .getOrCreateStepContext(
+ evaluationContext.getStepName(application), application.getTransform().getName());
+
+ StateInternals<K> stateInternals = (StateInternals<K>) stepContext.stateInternals();
+
DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> gabwDoFn =
GroupAlsoByWindowViaWindowSetDoFn.create(
windowingStrategy,
+ // new DirectStateInternalsFactory<K, V>(stepContext),
+ new ConstantStateInternalsFactory<K>(stateInternals),
SystemReduceFn.<K, V, BoundedWindow>buffering(valueCoder));
TupleTag<KV<K, Iterable<V>>> mainOutputTag = new TupleTag<KV<K, Iterable<V>>>() {};
@@ -105,6 +119,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
this.gabwParDoEvaluator =
ParDoEvaluator.create(
evaluationContext,
+ stepContext,
inputBundle,
application,
gabwDoFn,
@@ -124,4 +139,19 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
return gabwParDoEvaluator.finishBundle();
}
}
+
+ private static final class ConstantStateInternalsFactory<K>
+ implements StateInternalsFactory<K> {
+ private final StateInternals<K> stateInternals;
+
+ private ConstantStateInternalsFactory(StateInternals<K> stateInternals) {
+ this.stateInternals = stateInternals;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public StateInternals<K> stateInternalsForKey(K key) {
+ return stateInternals;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index 58cee4d..485cf4b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -46,6 +46,7 @@ import java.util.Map;
class ParDoEvaluator<T> implements TransformEvaluator<T> {
public static <InputT, OutputT> ParDoEvaluator<InputT> create(
EvaluationContext evaluationContext,
+ DirectStepContext stepContext,
CommittedBundle<InputT> inputBundle,
AppliedPTransform<PCollection<InputT>, ?, ?> application,
DoFn<InputT, OutputT> fn,
@@ -55,9 +56,6 @@ class ParDoEvaluator<T> implements TransformEvaluator<T> {
Map<TupleTag<?>, PCollection<?>> outputs) {
DirectExecutionContext executionContext =
evaluationContext.getExecutionContext(application, inputBundle.getKey());
- String stepName = evaluationContext.getStepName(application);
- DirectStepContext stepContext =
- executionContext.getOrCreateStepContext(stepName, stepName);
CounterSet counters = evaluationContext.createCounterSet();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
index b87cd3e..eda3db4 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.direct;
+import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.DoFn;
@@ -77,10 +78,15 @@ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory {
@SuppressWarnings({"unchecked", "rawtypes"})
ThreadLocal<DoFn<InT, OuT>> fnLocal =
(ThreadLocal) fnClones.getUnchecked((AppliedPTransform) application);
+ String stepName = evaluationContext.getStepName(application);
+ DirectStepContext stepContext =
+ evaluationContext.getExecutionContext(application, inputBundle.getKey())
+ .getOrCreateStepContext(stepName, stepName);
try {
TransformEvaluator<InT> parDoEvaluator =
ParDoEvaluator.create(
evaluationContext,
+ stepContext,
inputBundle,
application,
fnLocal.get(),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
index e9c7dd6..044abdc 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.direct;
+import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.DoFn;
@@ -73,6 +74,10 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
CommittedBundle<InputT> inputBundle,
EvaluationContext evaluationContext) {
TupleTag<OutputT> mainOutputTag = new TupleTag<>("out");
+ String stepName = evaluationContext.getStepName(application);
+ DirectStepContext stepContext =
+ evaluationContext.getExecutionContext(application, inputBundle.getKey())
+ .getOrCreateStepContext(stepName, stepName);
@SuppressWarnings({"unchecked", "rawtypes"})
ThreadLocal<DoFn<InputT, OutputT>> fnLocal =
@@ -81,6 +86,7 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
ParDoEvaluator<InputT> parDoEvaluator =
ParDoEvaluator.create(
evaluationContext,
+ stepContext,
inputBundle,
application,
fnLocal.get(),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index 3c9c9ee..bce37e4 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -160,6 +160,7 @@ public class ParDoEvaluatorTest {
return ParDoEvaluator.create(
evaluationContext,
+ stepContext,
inputBundle,
(AppliedPTransform<PCollection<Integer>, ?, ?>) output.getProducingTransformInternal(),
fn,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
index 3f845cf..0e977db 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming;
import static com.google.common.base.Preconditions.checkNotNull;
+import java.io.Serializable;
import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
@@ -47,6 +48,8 @@ import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingInternals;
import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
@@ -262,12 +265,15 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
*/
private <W extends BoundedWindow> DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> createGroupAlsoByWindowOperator() {
if (this.operator == null) {
+
+ StateInternalsFactory<K> stateInternalsFactory = new GroupAlsoByWindowWrapperStateInternalsFactory();
+
if (this.combineFn == null) {
// Thus VOUT == Iterable<VIN>
Coder<VIN> inputValueCoder = inputKvCoder.getValueCoder();
this.operator = (DoFn) GroupAlsoByWindowViaWindowSetDoFn.create(
- (WindowingStrategy<?, W>) this.windowingStrategy, SystemReduceFn.<K, VIN, W>buffering(inputValueCoder));
+ (WindowingStrategy<?, W>) this.windowingStrategy, stateInternalsFactory, SystemReduceFn.<K, VIN, W>buffering(inputValueCoder));
} else {
Coder<K> inputKeyCoder = inputKvCoder.getKeyCoder();
@@ -275,14 +281,14 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
.withInputCoder(combineFn, coderRegistry, inputKvCoder);
this.operator = GroupAlsoByWindowViaWindowSetDoFn.create(
- (WindowingStrategy<?, W>) this.windowingStrategy, SystemReduceFn.<K, VIN, VACC, VOUT, W>combining(inputKeyCoder, appliedCombineFn));
+ (WindowingStrategy<?, W>) this.windowingStrategy, stateInternalsFactory, SystemReduceFn.<K, VIN, VACC, VOUT, W>combining(inputKeyCoder, appliedCombineFn));
}
}
return this.operator;
}
private void processKeyedWorkItem(KeyedWorkItem<K, VIN> workItem) throws Exception {
- context.setElement(workItem, getStateInternalsForKey(workItem.key()));
+ context.setElement(workItem);
operator.processElement(context);
}
@@ -438,8 +444,6 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
private final TimestampedCollector<WindowedValue<KV<K, VOUT>>> collector;
- private FlinkStateInternals<K> stateInternals;
-
private KeyedWorkItem<K, VIN> element;
public ProcessContext(DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> function,
@@ -452,10 +456,8 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
this.timerInternals = checkNotNull(timerInternals);
}
- public void setElement(KeyedWorkItem<K, VIN> element,
- FlinkStateInternals<K> stateForKey) {
+ public void setElement(KeyedWorkItem<K, VIN> element) {
this.element = element;
- this.stateInternals = stateForKey;
}
public void setCurrentInputWatermark(Instant watermark) {
@@ -509,8 +511,8 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
return new WindowingInternals<KeyedWorkItem<K, VIN>, KV<K, VOUT>>() {
@Override
- public org.apache.beam.sdk.util.state.StateInternals stateInternals() {
- return stateInternals;
+ public StateInternals stateInternals() {
+ throw new UnsupportedOperationException("stateInternals() is not available");
}
@Override
@@ -628,4 +630,13 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
// restore the timerInternals.
this.timerInternals.restoreTimerInternals(reader, inputKvCoder, windowCoder);
}
+
+ private class GroupAlsoByWindowWrapperStateInternalsFactory implements
+ StateInternalsFactory<K>, Serializable {
+
+ @Override
+ public StateInternals<K> stateInternalsForKey(K key) {
+ return getStateInternalsForKey(key);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 34a0ede..c5d5802 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -16,6 +16,7 @@
* limitations under the License.
*/
+
package org.apache.beam.runners.spark.translation;
import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputDirectory;
@@ -32,6 +33,7 @@ import org.apache.beam.runners.spark.util.BroadcastHelper;
import org.apache.beam.runners.spark.util.ByteArray;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.TextIO;
@@ -47,8 +49,16 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.AssignWindowsDoFn;
+import org.apache.beam.sdk.util.GroupAlsoByWindowsViaOutputBufferDoFn;
+import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
+import org.apache.beam.sdk.util.SystemReduceFn;
import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.InMemoryStateInternals;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
@@ -78,12 +88,12 @@ import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import java.io.IOException;
+import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-
import scala.Tuple2;
/**
@@ -161,6 +171,55 @@ public final class TransformTranslator {
};
}
+ private static <K, V, W extends BoundedWindow>
+ TransformEvaluator<GroupAlsoByWindow<K, V>> gabw() {
+ return new TransformEvaluator<GroupAlsoByWindow<K, V>>() {
+ @Override
+ public void evaluate(GroupAlsoByWindow<K, V> transform, EvaluationContext context) {
+ @SuppressWarnings("unchecked")
+ JavaRDDLike<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>, ?> inRDD =
+ (JavaRDDLike<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>, ?>)
+ context.getInputRDD(transform);
+
+ Coder<KV<K, Iterable<WindowedValue<V>>>> inputCoder =
+ context.getInput(transform).getCoder();
+ Coder<K> keyCoder = transform.getKeyCoder(inputCoder);
+ Coder<V> valueCoder = transform.getValueCoder(inputCoder);
+
+ @SuppressWarnings("unchecked")
+ KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder =
+ (KvCoder<K, Iterable<WindowedValue<V>>>) context.getInput(transform).getCoder();
+ Coder<Iterable<WindowedValue<V>>> inputValueCoder = inputKvCoder.getValueCoder();
+
+ IterableCoder<WindowedValue<V>> inputIterableValueCoder =
+ (IterableCoder<WindowedValue<V>>) inputValueCoder;
+ Coder<WindowedValue<V>> inputIterableElementCoder = inputIterableValueCoder.getElemCoder();
+ WindowedValueCoder<V> inputIterableWindowedValueCoder =
+ (WindowedValueCoder<V>) inputIterableElementCoder;
+
+ Coder<V> inputIterableElementValueCoder = inputIterableWindowedValueCoder.getValueCoder();
+
+ @SuppressWarnings("unchecked")
+ WindowingStrategy<?, W> windowingStrategy =
+ (WindowingStrategy<?, W>) transform.getWindowingStrategy();
+
+ DoFn<KV<K, Iterable<WindowedValue<V>>>, KV<K, Iterable<V>>> gabwDoFn =
+ new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>(
+ windowingStrategy,
+ new InMemoryStateInternalsFactory<K>(),
+ SystemReduceFn.<K, V, W>buffering(inputIterableElementValueCoder));
+
+ // GroupAlsoByWindow current uses a dummy in-memory StateInternals
+ JavaRDDLike<WindowedValue<KV<K, Iterable<V>>>, ?> outRDD =
+ inRDD.mapPartitions(
+ new DoFnFunction<KV<K, Iterable<WindowedValue<V>>>, KV<K, Iterable<V>>>(
+ gabwDoFn, context.getRuntimeContext(), null));
+
+ context.setOutputRDD(transform, outRDD);
+ }
+ };
+ }
+
private static final FieldGetter GROUPED_FG = new FieldGetter(Combine.GroupedValues.class);
private static <K, InputT, OutputT> TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>>
@@ -815,6 +874,7 @@ public final class TransformTranslator {
EVALUATORS.put(ParDo.Bound.class, parDo());
EVALUATORS.put(ParDo.BoundMulti.class, multiDo());
EVALUATORS.put(GroupByKeyOnly.class, gbk());
+ EVALUATORS.put(GroupAlsoByWindow.class, gabw());
EVALUATORS.put(Combine.GroupedValues.class, grouped());
EVALUATORS.put(Combine.Globally.class, combineGlobally());
EVALUATORS.put(Combine.PerKey.class, combinePerKey());
@@ -853,4 +913,12 @@ public final class TransformTranslator {
return getTransformEvaluator(clazz);
}
}
+
+ private static class InMemoryStateInternalsFactory<K> implements StateInternalsFactory<K>,
+ Serializable {
+ @Override
+ public StateInternals<K> stateInternalsForKey(K key) {
+ return InMemoryStateInternals.forKey(key);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/902997d0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index c38f0ab..c8bd5de 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -131,6 +131,28 @@ public class DoFnTester<InputT, OutputT> {
}
/**
+ * Whether or not a {@link DoFnTester} should clone the {@link DoFn} under test.
+ */
+ public enum CloningBehavior {
+ CLONE,
+ DO_NOT_CLONE;
+ }
+
+ /**
+ * Instruct this {@link DoFnTester} whether or not to clone the {@link DoFn} under test.
+ */
+ public void setCloningBehavior(CloningBehavior newValue) {
+ this.cloningBehavior = newValue;
+ }
+
+ /**
+ * Indicates whether this {@link DoFnTester} will clone the {@link DoFn} under test.
+ */
+ public CloningBehavior getCloningBehavior() {
+ return cloningBehavior;
+ }
+
+ /**
* A convenience operation that first calls {@link #startBundle},
* then calls {@link #processElement} on each of the input elements, then
* calls {@link #finishBundle}, then returns the result of
@@ -644,6 +666,13 @@ public class DoFnTester<InputT, OutputT> {
/** The original DoFn under test. */
private final DoFn<InputT, OutputT> origFn;
+ /**
+ * Whether to clone the original {@link DoFn} or just use it as-is.
+ *
+ * <p></p>Worker-side {@link DoFn DoFns} may not be serializable, and are not required to be.
+ */
+ private CloningBehavior cloningBehavior = CloningBehavior.CLONE;
+
/** The side input values to provide to the DoFn under test. */
private Map<PCollectionView<?>, Map<BoundedWindow, ?>> sideInputs =
new HashMap<>();
@@ -676,10 +705,14 @@ public class DoFnTester<InputT, OutputT> {
@SuppressWarnings("unchecked")
private void initializeState() {
- fn = (DoFn<InputT, OutputT>)
- SerializableUtils.deserializeFromByteArray(
- SerializableUtils.serializeToByteArray(origFn),
- origFn.toString());
+ if (cloningBehavior.equals(CloningBehavior.DO_NOT_CLONE)) {
+ fn = origFn;
+ } else {
+ fn = (DoFn<InputT, OutputT>)
+ SerializableUtils.deserializeFromByteArray(
+ SerializableUtils.serializeToByteArray(origFn),
+ origFn.toString());
+ }
outputs = new HashMap<>();
accumulators = new HashMap<>();
}
[2/4] incubator-beam git commit: Introduce StateInternalsFactory
Posted by ke...@apache.org.
Introduce StateInternalsFactory
This class vends a StateInternals for a particular key.
Various DoFns that access StateInternals via ProcessContext
will be ported to instead accept a StateInternalsFactory.
The StateInternalsFatory must be provided by the runner
to enable provisioning of the appropriate run-time
StateInternals.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d2594e0f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d2594e0f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d2594e0f
Branch: refs/heads/master
Commit: d2594e0fc588849dba905e168523deacd4f1f4dd
Parents: 20244ba
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Jul 18 13:56:22 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jul 25 09:30:32 2016 -0700
----------------------------------------------------------------------
.../sdk/util/state/StateInternalsFactory.java | 36 ++++++++++++++++++++
1 file changed, 36 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d2594e0f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateInternalsFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateInternalsFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateInternalsFactory.java
new file mode 100644
index 0000000..54355c7
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateInternalsFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util.state;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+import java.io.Serializable;
+
+/**
+ * A factory for providing {@link StateInternals} for a particular key.
+ *
+ * <p>Because it will generally be embedded in a {@link DoFn}, albeit at execution time,
+ * it is marked {@link Serializable}.
+ */
+@Experimental(Kind.STATE)
+public interface StateInternalsFactory<K> {
+
+ /** Returns {@link StateInternals} for the provided key. */
+ StateInternals<K> stateInternalsForKey(K key);
+}
[3/4] incubator-beam git commit: Tidy WriteWithShardingFactory
Posted by ke...@apache.org.
Tidy WriteWithShardingFactory
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/20244bad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/20244bad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/20244bad
Branch: refs/heads/master
Commit: 20244badc3d1b8defd9e5b9a718f54475c502365
Parents: cf14644
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Jul 19 19:16:13 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jul 25 09:30:32 2016 -0700
----------------------------------------------------------------------
.../apache/beam/runners/direct/WriteWithShardingFactory.java | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/20244bad/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index 93f2408..d6ee6ea 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -78,7 +78,8 @@ class WriteWithShardingFactory implements PTransformOverrideFactory {
Window.<T>into(new GlobalWindows()).triggering(DefaultTrigger.of())
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
- final PCollectionView<Long> numRecords = records.apply(Count.<T>globally().asSingletonView());
+ final PCollectionView<Long> numRecords = records
+ .apply("CountRecords", Count.<T>globally().asSingletonView());
PCollection<T> resharded =
records
.apply(
@@ -107,7 +108,7 @@ class WriteWithShardingFactory implements PTransformOverrideFactory {
private final PCollectionView<Long> numRecords;
private final int randomExtraShards;
private int currentShard;
- private int maxShards;
+ private int maxShards = 0;
KeyBasedOnCountFn(PCollectionView<Long> numRecords, int extraShards) {
this.numRecords = numRecords;
@@ -116,7 +117,7 @@ class WriteWithShardingFactory implements PTransformOverrideFactory {
@Override
public void processElement(ProcessContext c) throws Exception {
- if (maxShards == 0L) {
+ if (maxShards == 0) {
maxShards = calculateShards(c.sideInput(numRecords));
currentShard = ThreadLocalRandom.current().nextInt(maxShards);
}
[4/4] incubator-beam git commit: This closes #697
Posted by ke...@apache.org.
This closes #697
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7809f6bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7809f6bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7809f6bd
Branch: refs/heads/master
Commit: 7809f6bd2cd3605ce389a174aba242ea08860f88
Parents: cf14644 902997d
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Jul 25 12:11:27 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jul 25 12:11:27 2016 -0700
----------------------------------------------------------------------
.../core/GroupAlsoByWindowViaWindowSetDoFn.java | 16 ++--
.../beam/sdk/util/GroupAlsoByWindowsDoFn.java | 10 ++-
.../GroupAlsoByWindowsViaOutputBufferDoFn.java | 10 +--
.../sdk/util/GroupByKeyViaGroupByKeyOnly.java | 79 +++++++++++++++++---
.../sdk/util/GroupAlsoByWindowsProperties.java | 50 +++++++++++--
...oupAlsoByWindowsViaOutputBufferDoFnTest.java | 8 +-
.../GroupAlsoByWindowEvaluatorFactory.java | 30 ++++++++
.../beam/runners/direct/ParDoEvaluator.java | 4 +-
.../direct/ParDoMultiEvaluatorFactory.java | 6 ++
.../direct/ParDoSingleEvaluatorFactory.java | 6 ++
.../direct/WriteWithShardingFactory.java | 7 +-
.../beam/runners/direct/ParDoEvaluatorTest.java | 1 +
.../FlinkGroupAlsoByWindowWrapper.java | 31 +++++---
.../spark/translation/TransformTranslator.java | 70 ++++++++++++++++-
.../apache/beam/sdk/transforms/DoFnTester.java | 41 +++++++++-
.../sdk/util/state/StateInternalsFactory.java | 36 +++++++++
16 files changed, 349 insertions(+), 56 deletions(-)
----------------------------------------------------------------------