You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/07/20 17:09:49 UTC
[22/28] beam git commit: Revert "[BEAM-2610] This closes #3553"
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
index 365b6c4..c0919b9 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
@@ -64,9 +64,7 @@ public class ImmutabilityEnforcementFactoryTest implements Serializable {
c.element()[0] = 'b';
}
}));
- PCollection<Long> consumer = pcollection.apply(Count.<byte[]>globally());
- DirectGraphs.performDirectOverrides(p);
- this.consumer = DirectGraphs.getProducer(consumer);
+ consumer = DirectGraphs.getProducer(pcollection.apply(Count.<byte[]>globally()));
}
@Test
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index 7912538..09a21ac 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -98,7 +98,7 @@ public class ParDoEvaluatorTest {
when(evaluationContext.createBundle(output)).thenReturn(outputBundle);
ParDoEvaluator<Integer> evaluator =
- createEvaluator(singletonView, fn, inputPc, output);
+ createEvaluator(singletonView, fn, output);
IntervalWindow nonGlobalWindow = new IntervalWindow(new Instant(0), new Instant(10_000L));
WindowedValue<Integer> first = WindowedValue.valueInGlobalWindow(3);
@@ -132,7 +132,6 @@ public class ParDoEvaluatorTest {
private ParDoEvaluator<Integer> createEvaluator(
PCollectionView<Integer> singletonView,
RecorderFn fn,
- PCollection<Integer> input,
PCollection<Integer> output) {
when(
evaluationContext.createSideInputReader(
@@ -150,7 +149,6 @@ public class ParDoEvaluatorTest {
Mockito.any(AppliedPTransform.class), Mockito.any(StructuralKey.class)))
.thenReturn(executionContext);
- DirectGraphs.performDirectOverrides(p);
@SuppressWarnings("unchecked")
AppliedPTransform<PCollection<Integer>, ?, ?> transform =
(AppliedPTransform<PCollection<Integer>, ?, ?>) DirectGraphs.getProducer(output);
@@ -158,7 +156,8 @@ public class ParDoEvaluatorTest {
evaluationContext,
stepContext,
transform,
- input.getWindowingStrategy(),
+ ((PCollection<?>) Iterables.getOnlyElement(transform.getInputs().values()))
+ .getWindowingStrategy(),
fn,
null /* key */,
ImmutableList.<PCollectionView<?>>of(singletonView),
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
index fe0b743..9366b7c 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
@@ -41,7 +41,6 @@ import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
-import org.apache.beam.runners.core.construction.TransformInputs;
import org.apache.beam.runners.direct.ParDoMultiOverrideFactory.StatefulParDo;
import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -53,6 +52,7 @@ import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
@@ -128,17 +128,16 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
input
.apply(
new ParDoMultiOverrideFactory.GbkThenStatefulParDo<>(
- new DoFn<KV<String, Integer>, Integer>() {
- @StateId(stateId)
- private final StateSpec<ValueState<String>> spec =
- StateSpecs.value(StringUtf8Coder.of());
-
- @ProcessElement
- public void process(ProcessContext c) {}
- },
- mainOutput,
- TupleTagList.empty(),
- Collections.<PCollectionView<?>>emptyList()))
+ ParDo.of(
+ new DoFn<KV<String, Integer>, Integer>() {
+ @StateId(stateId)
+ private final StateSpec<ValueState<String>> spec =
+ StateSpecs.value(StringUtf8Coder.of());
+
+ @ProcessElement
+ public void process(ProcessContext c) {}
+ })
+ .withOutputTags(mainOutput, TupleTagList.empty())))
.get(mainOutput)
.setCoder(VarIntCoder.of());
@@ -154,7 +153,8 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
when(mockEvaluationContext.getExecutionContext(
eq(producingTransform), Mockito.<StructuralKey>any()))
.thenReturn(mockExecutionContext);
- when(mockExecutionContext.getStepContext(anyString())).thenReturn(mockStepContext);
+ when(mockExecutionContext.getStepContext(anyString()))
+ .thenReturn(mockStepContext);
IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(9));
IntervalWindow secondWindow = new IntervalWindow(new Instant(10), new Instant(19));
@@ -241,17 +241,18 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
mainInput
.apply(
new ParDoMultiOverrideFactory.GbkThenStatefulParDo<>(
- new DoFn<KV<String, Integer>, Integer>() {
- @StateId(stateId)
- private final StateSpec<ValueState<String>> spec =
- StateSpecs.value(StringUtf8Coder.of());
-
- @ProcessElement
- public void process(ProcessContext c) {}
- },
- mainOutput,
- TupleTagList.empty(),
- Collections.<PCollectionView<?>>singletonList(sideInput)))
+ ParDo
+ .of(
+ new DoFn<KV<String, Integer>, Integer>() {
+ @StateId(stateId)
+ private final StateSpec<ValueState<String>> spec =
+ StateSpecs.value(StringUtf8Coder.of());
+
+ @ProcessElement
+ public void process(ProcessContext c) {}
+ })
+ .withSideInputs(sideInput)
+ .withOutputTags(mainOutput, TupleTagList.empty())))
.get(mainOutput)
.setCoder(VarIntCoder.of());
@@ -268,7 +269,8 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
when(mockEvaluationContext.getExecutionContext(
eq(producingTransform), Mockito.<StructuralKey>any()))
.thenReturn(mockExecutionContext);
- when(mockExecutionContext.getStepContext(anyString())).thenReturn(mockStepContext);
+ when(mockExecutionContext.getStepContext(anyString()))
+ .thenReturn(mockStepContext);
when(mockEvaluationContext.createBundle(Matchers.<PCollection<Integer>>any()))
.thenReturn(mockUncommittedBundle);
when(mockStepContext.getTimerUpdate()).thenReturn(TimerUpdate.empty());
@@ -285,8 +287,11 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
// global window state merely by having the evaluator created. The cleanup logic does not
// depend on the window.
String key = "hello";
- WindowedValue<KV<String, Integer>> firstKv =
- WindowedValue.of(KV.of(key, 1), new Instant(3), firstWindow, PaneInfo.NO_FIRING);
+ WindowedValue<KV<String, Integer>> firstKv = WindowedValue.of(
+ KV.of(key, 1),
+ new Instant(3),
+ firstWindow,
+ PaneInfo.NO_FIRING);
WindowedValue<KeyedWorkItem<String, KV<String, Integer>>> gbkOutputElement =
firstKv.withValue(
@@ -301,8 +306,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
BUNDLE_FACTORY
.createBundle(
(PCollection<KeyedWorkItem<String, KV<String, Integer>>>)
- Iterables.getOnlyElement(
- TransformInputs.nonAdditionalInputs(producingTransform)))
+ Iterables.getOnlyElement(producingTransform.getInputs().values()))
.add(gbkOutputElement)
.commit(Instant.now());
TransformEvaluator<KeyedWorkItem<String, KV<String, Integer>>> evaluator =
@@ -312,7 +316,8 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable {
// This should push back every element as a KV<String, Iterable<Integer>>
// in the appropriate window. Since the keys are equal they are single-threaded
- TransformResult<KeyedWorkItem<String, KV<String, Integer>>> result = evaluator.finishBundle();
+ TransformResult<KeyedWorkItem<String, KV<String, Integer>>> result =
+ evaluator.finishBundle();
List<Integer> pushedBackInts = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
index b7f5a7c..86412a0 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -25,8 +25,6 @@ import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.when;
-import com.google.common.base.Optional;
-import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.Collection;
@@ -92,7 +90,6 @@ public class TransformExecutorTest {
created = p.apply(Create.of("foo", "spam", "third"));
PCollection<KV<Integer, String>> downstream = created.apply(WithKeys.<Integer, String>of(3));
- DirectGraphs.performDirectOverrides(p);
DirectGraph graph = DirectGraphs.getGraph(p);
createdProducer = graph.getProducer(created);
downstreamProducer = graph.getProducer(downstream);
@@ -417,13 +414,8 @@ public class TransformExecutorTest {
? Collections.emptyList()
: result.getUnprocessedElements();
- Optional<? extends CommittedBundle<?>> unprocessedBundle;
- if (inputBundle == null || Iterables.isEmpty(unprocessedElements)) {
- unprocessedBundle = Optional.absent();
- } else {
- unprocessedBundle =
- Optional.<CommittedBundle<?>>of(inputBundle.withElements(unprocessedElements));
- }
+ CommittedBundle<?> unprocessedBundle =
+ inputBundle == null ? null : inputBundle.withElements(unprocessedElements);
return CommittedResult.create(
result,
unprocessedBundle,
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
index 5bc48b7..419698e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PCollectionViews;
import org.joda.time.Instant;
import org.junit.Rule;
@@ -65,13 +66,12 @@ public class ViewEvaluatorFactoryTest {
.setCoder(KvCoder.of(VoidCoder.of(), StringUtf8Coder.of()))
.apply(GroupByKey.<Void, String>create())
.apply(Values.<Iterable<String>>create());
- PCollection<Iterable<String>> view =
- concat.apply(
- new ViewOverrideFactory.WriteView<String, Iterable<String>>(createView.getView()));
+ PCollectionView<Iterable<String>> view =
+ concat.apply(new ViewOverrideFactory.WriteView<>(createView));
EvaluationContext context = mock(EvaluationContext.class);
TestViewWriter<String, Iterable<String>> viewWriter = new TestViewWriter<>();
- when(context.createPCollectionViewWriter(concat, createView.getView())).thenReturn(viewWriter);
+ when(context.createPCollectionViewWriter(concat, view)).thenReturn(viewWriter);
CommittedBundle<String> inputBundle = bundleFactory.createBundle(input).commit(Instant.now());
AppliedPTransform<?, ?, ?> producer = DirectGraphs.getProducer(view);
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
index 6af9273..024e15c 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
@@ -37,11 +37,8 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
-import org.apache.beam.sdk.transforms.ViewFn;
-import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PCollectionViews;
@@ -66,19 +63,24 @@ public class ViewOverrideFactoryTest implements Serializable {
PCollection<Integer> ints = p.apply("CreateContents", Create.of(1, 2, 3));
final PCollectionView<List<Integer>> view =
PCollectionViews.listView(ints, WindowingStrategy.globalDefault(), ints.getCoder());
- PTransformReplacement<PCollection<Integer>, PCollection<Integer>>
+ PTransformReplacement<PCollection<Integer>, PCollectionView<List<Integer>>>
replacementTransform =
factory.getReplacementTransform(
AppliedPTransform
- .<PCollection<Integer>, PCollection<Integer>,
- PTransform<PCollection<Integer>, PCollection<Integer>>>
+ .<PCollection<Integer>, PCollectionView<List<Integer>>,
+ CreatePCollectionView<Integer, List<Integer>>>
of(
"foo",
ints.expand(),
view.expand(),
CreatePCollectionView.<Integer, List<Integer>>of(view),
p));
- ints.apply(replacementTransform.getTransform());
+ PCollectionView<List<Integer>> afterReplacement =
+ ints.apply(replacementTransform.getTransform());
+ assertThat(
+ "The CreatePCollectionView replacement should return the same View",
+ afterReplacement,
+ equalTo(view));
PCollection<Set<Integer>> outputViewContents =
p.apply("CreateSingleton", Create.of(0))
@@ -102,11 +104,11 @@ public class ViewOverrideFactoryTest implements Serializable {
final PCollection<Integer> ints = p.apply("CreateContents", Create.of(1, 2, 3));
final PCollectionView<List<Integer>> view =
PCollectionViews.listView(ints, WindowingStrategy.globalDefault(), ints.getCoder());
- PTransformReplacement<PCollection<Integer>, PCollection<Integer>> replacement =
+ PTransformReplacement<PCollection<Integer>, PCollectionView<List<Integer>>> replacement =
factory.getReplacementTransform(
AppliedPTransform
- .<PCollection<Integer>, PCollection<Integer>,
- PTransform<PCollection<Integer>, PCollection<Integer>>>
+ .<PCollection<Integer>, PCollectionView<List<Integer>>,
+ CreatePCollectionView<Integer, List<Integer>>>
of(
"foo",
ints.expand(),
@@ -124,19 +126,8 @@ public class ViewOverrideFactoryTest implements Serializable {
"There should only be one WriteView primitive in the graph",
writeViewVisited.getAndSet(true),
is(false));
- PCollectionView<?> replacementView = ((WriteView) node.getTransform()).getView();
-
- // replacementView.getPCollection() is null, but that is not a requirement
- // so not asserted one way or the other
- assertThat(
- replacementView.getTagInternal(),
- equalTo(view.getTagInternal()));
- assertThat(
- replacementView.getViewFn(),
- Matchers.<ViewFn<?, ?>>equalTo(view.getViewFn()));
- assertThat(
- replacementView.getWindowMappingFn(),
- Matchers.<WindowMappingFn<?>>equalTo(view.getWindowMappingFn()));
+ PCollectionView replacementView = ((WriteView) node.getTransform()).getView();
+ assertThat(replacementView, Matchers.<PCollectionView>theInstance(view));
assertThat(node.getInputs().entrySet(), hasSize(1));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
index 1d8aac1..b667346 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
@@ -59,7 +59,6 @@ public class WatermarkCallbackExecutorTest {
public void setup() {
PCollection<Integer> created = p.apply(Create.of(1, 2, 3));
PCollection<Integer> summed = created.apply(Sum.integersGlobally());
- DirectGraphs.performDirectOverrides(p);
DirectGraph graph = DirectGraphs.getGraph(p);
create = graph.getProducer(created);
sum = graph.getProducer(summed);
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
index e3f6215..9528ac9 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
@@ -24,7 +24,6 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
-import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
@@ -122,7 +121,6 @@ public class WatermarkManagerTest implements Serializable {
flattened = preFlatten.apply("flattened", Flatten.<Integer>pCollections());
clock = MockClock.fromInstant(new Instant(1000));
- DirectGraphs.performDirectOverrides(p);
graph = DirectGraphs.getGraph(p);
manager = WatermarkManager.create(clock, graph);
@@ -319,7 +317,7 @@ public class WatermarkManagerTest implements Serializable {
TimerUpdate.empty(),
CommittedResult.create(
StepTransformResult.withoutHold(graph.getProducer(created)).build(),
- Optional.<CommittedBundle<?>>absent(),
+ root.withElements(Collections.<WindowedValue<Void>>emptyList()),
Collections.singleton(createBundle),
EnumSet.allOf(OutputType.class)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -333,7 +331,7 @@ public class WatermarkManagerTest implements Serializable {
TimerUpdate.empty(),
CommittedResult.create(
StepTransformResult.withoutHold(theFlatten).build(),
- Optional.<CommittedBundle<?>>absent(),
+ createBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>emptyList(),
EnumSet.allOf(OutputType.class)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -346,7 +344,7 @@ public class WatermarkManagerTest implements Serializable {
TimerUpdate.empty(),
CommittedResult.create(
StepTransformResult.withoutHold(theFlatten).build(),
- Optional.<CommittedBundle<?>>absent(),
+ createBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
Collections.<CommittedBundle<?>>emptyList(),
EnumSet.allOf(OutputType.class)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -1502,15 +1500,9 @@ public class WatermarkManagerTest implements Serializable {
AppliedPTransform<?, ?, ?> transform,
@Nullable CommittedBundle<?> unprocessedBundle,
Iterable<? extends CommittedBundle<?>> bundles) {
- Optional<? extends CommittedBundle<?>> unprocessedElements;
- if (unprocessedBundle == null || Iterables.isEmpty(unprocessedBundle.getElements())) {
- unprocessedElements = Optional.absent();
- } else {
- unprocessedElements = Optional.of(unprocessedBundle);
- }
return CommittedResult.create(
StepTransformResult.withoutHold(transform).build(),
- unprocessedElements,
+ unprocessedBundle,
bundles,
Iterables.isEmpty(bundles)
? EnumSet.noneOf(OutputType.class)
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index 546a181..a88d95e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -30,7 +30,6 @@ import static org.junit.Assert.assertThat;
import java.io.File;
import java.io.FileReader;
import java.io.Reader;
-import java.io.Serializable;
import java.nio.CharBuffer;
import java.util.ArrayList;
import java.util.Collections;
@@ -39,8 +38,9 @@ import java.util.UUID;
import org.apache.beam.runners.direct.WriteWithShardingFactory.CalculateShardsFn;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.DynamicFileDestinations;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy;
import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.LocalResources;
import org.apache.beam.sdk.io.TextIO;
@@ -53,8 +53,6 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnTester;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
@@ -73,17 +71,11 @@ import org.junit.runners.JUnit4;
* Tests for {@link WriteWithShardingFactory}.
*/
@RunWith(JUnit4.class)
-public class WriteWithShardingFactoryTest implements Serializable {
-
+public class WriteWithShardingFactoryTest {
private static final int INPUT_SIZE = 10000;
-
- @Rule public transient TemporaryFolder tmp = new TemporaryFolder();
-
- private transient WriteWithShardingFactory<Object> factory = new WriteWithShardingFactory<>();
-
- @Rule
- public final transient TestPipeline p =
- TestPipeline.create().enableAbandonedNodeEnforcement(false);
+ @Rule public TemporaryFolder tmp = new TemporaryFolder();
+ private WriteWithShardingFactory<Object> factory = new WriteWithShardingFactory<>();
+ @Rule public final TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
@Test
public void dynamicallyReshardedWrite() throws Exception {
@@ -137,24 +129,26 @@ public class WriteWithShardingFactoryTest implements Serializable {
@Test
public void withNoShardingSpecifiedReturnsNewTransform() {
ResourceId outputDirectory = LocalResources.fromString("/foo", true /* isDirectory */);
-
- PTransform<PCollection<Object>, PDone> original =
+ FilenamePolicy policy =
+ DefaultFilenamePolicy.constructUsingStandardParameters(
+ StaticValueProvider.of(outputDirectory),
+ DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE,
+ "",
+ false);
+ WriteFiles<Object> original =
WriteFiles.to(
- new FileBasedSink<Object, Void>(
- StaticValueProvider.of(outputDirectory), DynamicFileDestinations.constant(null)) {
+ new FileBasedSink<Object>(StaticValueProvider.of(outputDirectory), policy) {
@Override
- public WriteOperation<Object, Void> createWriteOperation() {
+ public WriteOperation<Object> createWriteOperation() {
throw new IllegalArgumentException("Should not be used");
}
- },
- SerializableFunctions.identity());
+ });
@SuppressWarnings("unchecked")
PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));
- AppliedPTransform<PCollection<Object>, PDone, PTransform<PCollection<Object>, PDone>>
- originalApplication =
- AppliedPTransform.of(
- "write", objs.expand(), Collections.<TupleTag<?>, PValue>emptyMap(), original, p);
+ AppliedPTransform<PCollection<Object>, PDone, WriteFiles<Object>> originalApplication =
+ AppliedPTransform.of(
+ "write", objs.expand(), Collections.<TupleTag<?>, PValue>emptyMap(), original, p);
assertThat(
factory.getReplacementTransform(originalApplication).getTransform(),
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index c063a2d..c4c6b55 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-parent</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
@@ -91,6 +91,7 @@
<excludedGroups>
org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders,
org.apache.beam.sdk.testing.LargeKeys$Above100MB,
+ org.apache.beam.sdk.testing.UsesSetState,
org.apache.beam.sdk.testing.UsesCommittedMetrics,
org.apache.beam.sdk.testing.UsesTestStream,
org.apache.beam.sdk.testing.UsesSplittableParDo
@@ -380,13 +381,5 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
-
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-runners-core-java</artifactId>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
-
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
deleted file mode 100644
index 0cc3aec..0000000
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import com.google.common.collect.Iterables;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.core.construction.ReplacementOutputs;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.runners.PTransformOverrideFactory;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-/** Flink streaming overrides for various view (side input) transforms. */
-class CreateStreamingFlinkView<ElemT, ViewT>
- extends PTransform<PCollection<ElemT>, PCollection<ElemT>> {
- private final PCollectionView<ViewT> view;
-
- public CreateStreamingFlinkView(PCollectionView<ViewT> view) {
- this.view = view;
- }
-
- @Override
- public PCollection<ElemT> expand(PCollection<ElemT> input) {
- input
- .apply(Combine.globally(new Concatenate<ElemT>()).withoutDefaults())
- .apply(CreateFlinkPCollectionView.<ElemT, ViewT>of(view));
- return input;
- }
-
- /**
- * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
- *
- * <p>For internal use by {@link CreateStreamingFlinkView}. This combiner requires that the input
- * {@link PCollection} fits in memory. For a large {@link PCollection} this is expected to crash!
- *
- * @param <T> the type of elements to concatenate.
- */
- private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
- @Override
- public List<T> createAccumulator() {
- return new ArrayList<T>();
- }
-
- @Override
- public List<T> addInput(List<T> accumulator, T input) {
- accumulator.add(input);
- return accumulator;
- }
-
- @Override
- public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
- List<T> result = createAccumulator();
- for (List<T> accumulator : accumulators) {
- result.addAll(accumulator);
- }
- return result;
- }
-
- @Override
- public List<T> extractOutput(List<T> accumulator) {
- return accumulator;
- }
-
- @Override
- public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
- return ListCoder.of(inputCoder);
- }
-
- @Override
- public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
- return ListCoder.of(inputCoder);
- }
- }
-
- /**
- * Creates a primitive {@link PCollectionView}.
- *
- * <p>For internal use only by runner implementors.
- *
- * @param <ElemT> The type of the elements of the input PCollection
- * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
- */
- public static class CreateFlinkPCollectionView<ElemT, ViewT>
- extends PTransform<PCollection<List<ElemT>>, PCollection<List<ElemT>>> {
- private PCollectionView<ViewT> view;
-
- private CreateFlinkPCollectionView(PCollectionView<ViewT> view) {
- this.view = view;
- }
-
- public static <ElemT, ViewT> CreateFlinkPCollectionView<ElemT, ViewT> of(
- PCollectionView<ViewT> view) {
- return new CreateFlinkPCollectionView<>(view);
- }
-
- @Override
- public PCollection<List<ElemT>> expand(PCollection<List<ElemT>> input) {
- return PCollection.<List<ElemT>>createPrimitiveOutputInternal(
- input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
- .setCoder(input.getCoder());
- }
-
- public PCollectionView<ViewT> getView() {
- return view;
- }
- }
-
- public static class Factory<ElemT, ViewT>
- implements PTransformOverrideFactory<
- PCollection<ElemT>, PCollection<ElemT>, CreatePCollectionView<ElemT, ViewT>> {
- public Factory() {}
-
- @Override
- public PTransformReplacement<PCollection<ElemT>, PCollection<ElemT>> getReplacementTransform(
- AppliedPTransform<
- PCollection<ElemT>, PCollection<ElemT>, CreatePCollectionView<ElemT, ViewT>>
- transform) {
- return PTransformReplacement.of(
- (PCollection<ElemT>) Iterables.getOnlyElement(transform.getInputs().values()),
- new CreateStreamingFlinkView<ElemT, ViewT>(transform.getTransform().getView()));
- }
-
- @Override
- public Map<PValue, ReplacementOutput> mapOutputs(
- Map<TupleTag<?>, PValue> outputs, PCollection<ElemT> newOutput) {
- return ReplacementOutputs.singleton(outputs, newOutput);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
index 6e70198..0439119 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.flink;
import com.google.common.collect.Iterables;
import java.util.HashMap;
import java.util.Map;
-import org.apache.beam.runners.core.construction.TransformInputs;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -144,7 +143,7 @@ class FlinkBatchTranslationContext {
@SuppressWarnings("unchecked")
<T extends PValue> T getInput(PTransform<T, ?> transform) {
- return (T) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(currentTransform));
+ return (T) Iterables.getOnlyElement(currentTransform.getInputs().values());
}
Map<TupleTag<?>, PValue> getOutputs(PTransform<?, ?> transform) {
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index d2a2016..fe5dd87 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -84,8 +84,6 @@ class FlinkPipelineExecutionEnvironment {
this.flinkBatchEnv = null;
this.flinkStreamEnv = null;
- pipeline.replaceAll(FlinkTransformOverrides.getDefaultOverrides(options.isStreaming()));
-
PipelineTranslationOptimizer optimizer =
new PipelineTranslationOptimizer(TranslationMode.BATCH, options);
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
index f733e2e..8da68c5 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
@@ -17,18 +17,27 @@
*/
package org.apache.beam.runners.flink;
+import com.google.common.collect.ImmutableList;
+import java.util.List;
import java.util.Map;
+import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
+import org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.core.construction.ReplacementOutputs;
+import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
import org.apache.beam.runners.core.construction.SplittableParDo;
import org.apache.beam.runners.core.construction.UnconsumedReads;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PValue;
@@ -64,8 +73,54 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
@Override
public void translate(Pipeline pipeline) {
+ List<PTransformOverride> transformOverrides =
+ ImmutableList.<PTransformOverride>builder()
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.splittableParDoMulti(),
+ new SplittableParDoOverrideFactory()))
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(SplittableParDo.ProcessKeyedElements.class),
+ new SplittableParDoViaKeyedWorkItems.OverrideFactory()))
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(View.AsIterable.class),
+ new ReflectiveOneToOneOverrideFactory(
+ FlinkStreamingViewOverrides.StreamingViewAsIterable.class, flinkRunner)))
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(View.AsList.class),
+ new ReflectiveOneToOneOverrideFactory(
+ FlinkStreamingViewOverrides.StreamingViewAsList.class, flinkRunner)))
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(View.AsMap.class),
+ new ReflectiveOneToOneOverrideFactory(
+ FlinkStreamingViewOverrides.StreamingViewAsMap.class, flinkRunner)))
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(View.AsMultimap.class),
+ new ReflectiveOneToOneOverrideFactory(
+ FlinkStreamingViewOverrides.StreamingViewAsMultimap.class, flinkRunner)))
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(View.AsSingleton.class),
+ new ReflectiveOneToOneOverrideFactory(
+ FlinkStreamingViewOverrides.StreamingViewAsSingleton.class, flinkRunner)))
+ // this has to be last since the ViewAsSingleton override
+ // can expand to a Combine.GloballyAsSingletonView
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
+ new ReflectiveOneToOneOverrideFactory(
+ FlinkStreamingViewOverrides.StreamingCombineGloballyAsSingletonView.class,
+ flinkRunner)))
+ .build();
+
// Ensure all outputs of all reads are consumed.
UnconsumedReads.ensureAllReadsConsumed(pipeline);
+ pipeline.replaceAll(transformOverrides);
super.translate(pipeline);
}
@@ -173,6 +228,35 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
}
}
+ private static class ReflectiveOneToOneOverrideFactory<
+ InputT, OutputT, TransformT extends PTransform<PCollection<InputT>, PCollection<OutputT>>>
+ extends SingleInputOutputOverrideFactory<
+ PCollection<InputT>, PCollection<OutputT>, TransformT> {
+ private final Class<PTransform<PCollection<InputT>, PCollection<OutputT>>> replacement;
+ private final FlinkRunner runner;
+
+ private ReflectiveOneToOneOverrideFactory(
+ Class<PTransform<PCollection<InputT>, PCollection<OutputT>>> replacement,
+ FlinkRunner runner) {
+ this.replacement = replacement;
+ this.runner = runner;
+ }
+
+ @Override
+ public PTransformReplacement<PCollection<InputT>, PCollection<OutputT>> getReplacementTransform(
+ AppliedPTransform<PCollection<InputT>, PCollection<OutputT>, TransformT> transform) {
+ return PTransformReplacement.of(
+ PTransformReplacements.getSingletonMainInput(transform),
+ InstanceBuilder.ofType(replacement)
+ .withArg(FlinkRunner.class, runner)
+ .withArg(
+ (Class<PTransform<PCollection<InputT>, PCollection<OutputT>>>)
+ transform.getTransform().getClass(),
+ transform.getTransform())
+ .build());
+ }
+ }
+
/**
* A {@link PTransformOverrideFactory} that overrides a <a
* href="https://s.apache.org/splittable-do-fn">Splittable DoFn</a> with {@link SplittableParDo}.
@@ -188,7 +272,7 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
transform) {
return PTransformReplacement.of(
PTransformReplacements.getSingletonMainInput(transform),
- (SplittableParDo<InputT, OutputT, ?>) SplittableParDo.forAppliedParDo(transform));
+ new SplittableParDo<>(transform.getTransform()));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 3d7e81f..2a7c5d6 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -29,6 +29,7 @@ import java.util.Map;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.core.construction.ElementAndRestriction;
import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
@@ -123,7 +124,7 @@ class FlinkStreamingTransformTranslators {
TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator());
TRANSLATORS.put(Flatten.PCollections.class, new FlattenPCollectionTranslator());
TRANSLATORS.put(
- CreateStreamingFlinkView.CreateFlinkPCollectionView.class,
+ FlinkStreamingViewOverrides.CreateFlinkPCollectionView.class,
new CreateViewStreamingTranslator());
TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslatorStreaming());
@@ -362,13 +363,8 @@ class FlinkStreamingTransformTranslators {
Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags = Maps.newHashMap();
for (Map.Entry<TupleTag<?>, PValue> entry : outputs.entrySet()) {
if (!tagsToOutputTags.containsKey(entry.getKey())) {
- tagsToOutputTags.put(
- entry.getKey(),
- new OutputTag<WindowedValue<?>>(
- entry.getKey().getId(),
- (TypeInformation) context.getTypeInfo((PCollection<?>) entry.getValue())
- )
- );
+ tagsToOutputTags.put(entry.getKey(), new OutputTag<>(entry.getKey().getId(),
+ (TypeInformation) context.getTypeInfo((PCollection<?>) entry.getValue())));
}
}
@@ -547,11 +543,14 @@ class FlinkStreamingTransformTranslators {
transform.getAdditionalOutputTags().getAll(),
context,
new ParDoTranslationHelper.DoFnOperatorFactory<
- KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT>() {
+ KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>() {
@Override
- public DoFnOperator<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT>
- createDoFnOperator(
- DoFn<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> doFn,
+ public DoFnOperator<
+ KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>,
+ OutputT> createDoFnOperator(
+ DoFn<
+ KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>,
+ OutputT> doFn,
String stepName,
List<PCollectionView<?>> sideInputs,
TupleTag<OutputT> mainOutputTag,
@@ -559,8 +558,11 @@ class FlinkStreamingTransformTranslators {
FlinkStreamingTranslationContext context,
WindowingStrategy<?, ?> windowingStrategy,
Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags,
- Coder<WindowedValue<KeyedWorkItem<String, KV<InputT, RestrictionT>>>>
- inputCoder,
+ Coder<
+ WindowedValue<
+ KeyedWorkItem<
+ String,
+ ElementAndRestriction<InputT, RestrictionT>>>> inputCoder,
Coder keyCoder,
Map<Integer, PCollectionView<?>> transformedSideInputs) {
return new SplittableDoFnOperator<>(
@@ -582,17 +584,17 @@ class FlinkStreamingTransformTranslators {
private static class CreateViewStreamingTranslator<ElemT, ViewT>
extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
- CreateStreamingFlinkView.CreateFlinkPCollectionView<ElemT, ViewT>> {
+ FlinkStreamingViewOverrides.CreateFlinkPCollectionView<ElemT, ViewT>> {
@Override
public void translateNode(
- CreateStreamingFlinkView.CreateFlinkPCollectionView<ElemT, ViewT> transform,
+ FlinkStreamingViewOverrides.CreateFlinkPCollectionView<ElemT, ViewT> transform,
FlinkStreamingTranslationContext context) {
// just forward
DataStream<WindowedValue<List<ElemT>>> inputDataSet =
context.getInputDataStream(context.getInput(transform));
- PCollectionView<ViewT> view = transform.getView();
+ PCollectionView<ViewT> view = context.getOutput(transform);
context.setOutputDataStream(view, inputDataSet);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
index 74a5fb9..ea5f6b3 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
@@ -22,7 +22,6 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.collect.Iterables;
import java.util.HashMap;
import java.util.Map;
-import org.apache.beam.runners.core.construction.TransformInputs;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -114,7 +113,7 @@ class FlinkStreamingTranslationContext {
@SuppressWarnings("unchecked")
public <T extends PValue> T getInput(PTransform<T, ?> transform) {
- return (T) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(currentTransform));
+ return (T) Iterables.getOnlyElement(currentTransform.getInputs().values());
}
public <T extends PInput> Map<TupleTag<?>, PValue> getInputs(PTransform<T, ?> transform) {
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
new file mode 100644
index 0000000..ce1c895
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
+
+/**
+ * Flink streaming overrides for various view (side input) transforms.
+ */
+class FlinkStreamingViewOverrides {
+
+ /**
+ * Specialized implementation for
+ * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}
+ * for the Flink runner in streaming mode.
+ */
+ static class StreamingViewAsMap<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
+
+ private final transient FlinkRunner runner;
+
+ @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+ public StreamingViewAsMap(FlinkRunner runner, View.AsMap<K, V> transform) {
+ this.runner = runner;
+ }
+
+ @Override
+ public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
+ PCollectionView<Map<K, V>> view =
+ PCollectionViews.mapView(
+ input,
+ input.getWindowingStrategy(),
+ input.getCoder());
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+ try {
+ inputCoder.getKeyCoder().verifyDeterministic();
+ } catch (Coder.NonDeterministicException e) {
+ runner.recordViewUsesNonDeterministicKeyCoder(this);
+ }
+
+ return input
+ .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
+ .apply(CreateFlinkPCollectionView.<KV<K, V>, Map<K, V>>of(view));
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsMap";
+ }
+ }
+
+ /**
+ * Specialized expansion for {@link
+ * View.AsMultimap View.AsMultimap} for the
+ * Flink runner in streaming mode.
+ */
+ static class StreamingViewAsMultimap<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
+
+ private final transient FlinkRunner runner;
+
+ /**
+ * Builds an instance of this class from the overridden transform.
+ */
+ @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+ public StreamingViewAsMultimap(FlinkRunner runner, View.AsMultimap<K, V> transform) {
+ this.runner = runner;
+ }
+
+ @Override
+ public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
+ PCollectionView<Map<K, Iterable<V>>> view =
+ PCollectionViews.multimapView(
+ input,
+ input.getWindowingStrategy(),
+ input.getCoder());
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+ try {
+ inputCoder.getKeyCoder().verifyDeterministic();
+ } catch (Coder.NonDeterministicException e) {
+ runner.recordViewUsesNonDeterministicKeyCoder(this);
+ }
+
+ return input
+ .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
+ .apply(CreateFlinkPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsMultimap";
+ }
+ }
+
+ /**
+ * Specialized implementation for
+ * {@link View.AsList View.AsList} for the
+ * Flink runner in streaming mode.
+ */
+ static class StreamingViewAsList<T>
+ extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
+ /**
+ * Builds an instance of this class from the overridden transform.
+ */
+ @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+ public StreamingViewAsList(FlinkRunner runner, View.AsList<T> transform) {}
+
+ @Override
+ public PCollectionView<List<T>> expand(PCollection<T> input) {
+ PCollectionView<List<T>> view =
+ PCollectionViews.listView(
+ input,
+ input.getWindowingStrategy(),
+ input.getCoder());
+
+ return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
+ .apply(CreateFlinkPCollectionView.<T, List<T>>of(view));
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsList";
+ }
+ }
+
+ /**
+ * Specialized implementation for
+ * {@link View.AsIterable View.AsIterable} for the
+ * Flink runner in streaming mode.
+ */
+ static class StreamingViewAsIterable<T>
+ extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
+ /**
+ * Builds an instance of this class from the overridden transform.
+ */
+ @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+ public StreamingViewAsIterable(FlinkRunner runner, View.AsIterable<T> transform) { }
+
+ @Override
+ public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
+ PCollectionView<Iterable<T>> view =
+ PCollectionViews.iterableView(
+ input,
+ input.getWindowingStrategy(),
+ input.getCoder());
+
+ return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
+ .apply(CreateFlinkPCollectionView.<T, Iterable<T>>of(view));
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsIterable";
+ }
+ }
+
+ /**
+ * Specialized expansion for
+ * {@link View.AsSingleton View.AsSingleton} for the
+ * Flink runner in streaming mode.
+ */
+ static class StreamingViewAsSingleton<T>
+ extends PTransform<PCollection<T>, PCollectionView<T>> {
+ private View.AsSingleton<T> transform;
+
+ /**
+ * Builds an instance of this class from the overridden transform.
+ */
+ @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+ public StreamingViewAsSingleton(FlinkRunner runner, View.AsSingleton<T> transform) {
+ this.transform = transform;
+ }
+
+ @Override
+ public PCollectionView<T> expand(PCollection<T> input) {
+ Combine.Globally<T, T> combine = Combine.globally(
+ new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
+ if (!transform.hasDefaultValue()) {
+ combine = combine.withoutDefaults();
+ }
+ return input.apply(combine.asSingletonView());
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsSingleton";
+ }
+
+ private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
+ private boolean hasDefaultValue;
+ private T defaultValue;
+
+ SingletonCombine(boolean hasDefaultValue, T defaultValue) {
+ this.hasDefaultValue = hasDefaultValue;
+ this.defaultValue = defaultValue;
+ }
+
+ @Override
+ public T apply(T left, T right) {
+ throw new IllegalArgumentException("PCollection with more than one element "
+ + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
+ + "combine the PCollection into a single value");
+ }
+
+ @Override
+ public T identity() {
+ if (hasDefaultValue) {
+ return defaultValue;
+ } else {
+ throw new IllegalArgumentException(
+ "Empty PCollection accessed as a singleton view. "
+ + "Consider setting withDefault to provide a default value");
+ }
+ }
+ }
+ }
+
+ static class StreamingCombineGloballyAsSingletonView<InputT, OutputT>
+ extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
+ Combine.GloballyAsSingletonView<InputT, OutputT> transform;
+
+ /**
+ * Builds an instance of this class from the overridden transform.
+ */
+ @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+ public StreamingCombineGloballyAsSingletonView(
+ FlinkRunner runner,
+ Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
+ this.transform = transform;
+ }
+
+ @Override
+ public PCollectionView<OutputT> expand(PCollection<InputT> input) {
+ PCollection<OutputT> combined =
+ input.apply(Combine.globally(transform.getCombineFn())
+ .withoutDefaults()
+ .withFanout(transform.getFanout()));
+
+ PCollectionView<OutputT> view = PCollectionViews.singletonView(
+ combined,
+ combined.getWindowingStrategy(),
+ transform.getInsertDefault(),
+ transform.getInsertDefault()
+ ? transform.getCombineFn().defaultValue() : null,
+ combined.getCoder());
+ return combined
+ .apply(ParDo.of(new WrapAsList<OutputT>()))
+ .apply(CreateFlinkPCollectionView.<OutputT, OutputT>of(view));
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingCombineGloballyAsSingletonView";
+ }
+ }
+
+ private static class WrapAsList<T> extends DoFn<T, List<T>> {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(Collections.singletonList(c.element()));
+ }
+ }
+
+ /**
+ * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
+ *
+ * <p>For internal use by {@link StreamingViewAsMap}, {@link StreamingViewAsMultimap},
+ * {@link StreamingViewAsList}, {@link StreamingViewAsIterable}.
+ * They require the input {@link PCollection} fits in memory.
+ * For a large {@link PCollection} this is expected to crash!
+ *
+ * @param <T> the type of elements to concatenate.
+ */
+ private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
+ @Override
+ public List<T> createAccumulator() {
+ return new ArrayList<T>();
+ }
+
+ @Override
+ public List<T> addInput(List<T> accumulator, T input) {
+ accumulator.add(input);
+ return accumulator;
+ }
+
+ @Override
+ public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
+ List<T> result = createAccumulator();
+ for (List<T> accumulator : accumulators) {
+ result.addAll(accumulator);
+ }
+ return result;
+ }
+
+ @Override
+ public List<T> extractOutput(List<T> accumulator) {
+ return accumulator;
+ }
+
+ @Override
+ public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
+ return ListCoder.of(inputCoder);
+ }
+
+ @Override
+ public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
+ return ListCoder.of(inputCoder);
+ }
+ }
+
+ /**
+ * Creates a primitive {@link PCollectionView}.
+ *
+ * <p>For internal use only by runner implementors.
+ *
+ * @param <ElemT> The type of the elements of the input PCollection
+ * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
+ */
+ public static class CreateFlinkPCollectionView<ElemT, ViewT>
+ extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> {
+ private PCollectionView<ViewT> view;
+
+ private CreateFlinkPCollectionView(PCollectionView<ViewT> view) {
+ this.view = view;
+ }
+
+ public static <ElemT, ViewT> CreateFlinkPCollectionView<ElemT, ViewT> of(
+ PCollectionView<ViewT> view) {
+ return new CreateFlinkPCollectionView<>(view);
+ }
+
+ @Override
+ public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) {
+ return view;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java
deleted file mode 100644
index 1dc8de9..0000000
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import com.google.common.collect.ImmutableList;
-import java.util.List;
-import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
-import org.apache.beam.runners.core.construction.PTransformMatchers;
-import org.apache.beam.runners.core.construction.SplittableParDo;
-import org.apache.beam.sdk.runners.PTransformOverride;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.View;
-
-/**
- * {@link PTransform} overrides for Flink runner.
- */
-public class FlinkTransformOverrides {
- public static List<PTransformOverride> getDefaultOverrides(boolean streaming) {
- if (streaming) {
- return ImmutableList.<PTransformOverride>builder()
- .add(
- PTransformOverride.of(
- PTransformMatchers.splittableParDoMulti(),
- new FlinkStreamingPipelineTranslator.SplittableParDoOverrideFactory()))
- .add(
- PTransformOverride.of(
- PTransformMatchers.classEqualTo(SplittableParDo.ProcessKeyedElements.class),
- new SplittableParDoViaKeyedWorkItems.OverrideFactory()))
- .add(
- PTransformOverride.of(
- PTransformMatchers.classEqualTo(View.CreatePCollectionView.class),
- new CreateStreamingFlinkView.Factory()))
- .build();
- } else {
- return ImmutableList.of();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
index 2f095d4..5d08eba 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
@@ -35,6 +35,7 @@ import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.TimerInternalsFactory;
+import org.apache.beam.runners.core.construction.ElementAndRestriction;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
@@ -42,7 +43,6 @@ import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
@@ -55,15 +55,18 @@ import org.joda.time.Instant;
* the {@code @ProcessElement} method of a splittable {@link DoFn}.
*/
public class SplittableDoFnOperator<
- InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
- extends DoFnOperator<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> {
+ InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
+ extends DoFnOperator<
+ KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> {
private transient ScheduledExecutorService executorService;
public SplittableDoFnOperator(
- DoFn<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> doFn,
+ DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> doFn,
String stepName,
- Coder<WindowedValue<KeyedWorkItem<String, KV<InputT, RestrictionT>>>> inputCoder,
+ Coder<
+ WindowedValue<
+ KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>> inputCoder,
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> additionalOutputTags,
OutputManagerFactory<OutputT> outputManagerFactory,
@@ -84,6 +87,7 @@ public class SplittableDoFnOperator<
sideInputs,
options,
keyCoder);
+
}
@Override
@@ -147,7 +151,7 @@ public class SplittableDoFnOperator<
@Override
public void fireTimer(InternalTimer<?, TimerInternals.TimerData> timer) {
doFnRunner.processElement(WindowedValue.valueInGlobalWindow(
- KeyedWorkItems.<String, KV<InputT, RestrictionT>>timersWorkItem(
+ KeyedWorkItems.<String, ElementAndRestriction<InputT, RestrictionT>>timersWorkItem(
(String) stateInternals.getKey(),
Collections.singletonList(timer.getNamespace()))));
}