You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/11/15 17:00:35 UTC
[1/7] beam git commit: Fix BatchViewOverrides ViewAsSingleton to
apply the combine fn that was being replaced.
Repository: beam
Updated Branches:
refs/heads/master 7ce0a82b0 -> 30886acee
Fix BatchViewOverrides ViewAsSingleton to apply the combine fn that was being replaced.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/accb2087
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/accb2087
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/accb2087
Branch: refs/heads/master
Commit: accb2087f88c641be9db038bbb5be715aacffb8d
Parents: c2f815c
Author: Luke Cwik <lc...@google.com>
Authored: Tue Nov 14 19:19:48 2017 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Nov 15 08:59:51 2017 -0800
----------------------------------------------------------------------
.../runners/dataflow/BatchViewOverrides.java | 16 +++-
.../runners/dataflow/CreateDataflowView.java | 6 +-
.../beam/runners/dataflow/DataflowRunner.java | 80 +++++++++++++++-----
.../DataflowPipelineTranslatorTest.java | 6 +-
4 files changed, 80 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/accb2087/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
index 8ed41cb..2953a42 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
@@ -55,6 +55,8 @@ import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
@@ -893,15 +895,23 @@ class BatchViewOverrides {
private final DataflowRunner runner;
private final PCollectionView<T> view;
- /** Builds an instance of this class from the overridden transform. */
- @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
- public BatchViewAsSingleton(DataflowRunner runner, CreatePCollectionView<T, T> transform) {
+ private final CombineFn<T, ?, T> combineFn;
+ private final int fanout;
+
+ public BatchViewAsSingleton(
+ DataflowRunner runner,
+ CreatePCollectionView<T, T> transform,
+ CombineFn<T, ?, T> combineFn,
+ int fanout) {
this.runner = runner;
this.view = transform.getView();
+ this.combineFn = combineFn;
+ this.fanout = fanout;
}
@Override
public PCollection<?> expand(PCollection<T> input) {
+ input = input.apply(Combine.globally(combineFn).withoutDefaults().withFanout(fanout));
@SuppressWarnings("unchecked")
Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>)
input.getWindowingStrategy().getWindowFn().windowCoder();
http://git-wip-us.apache.org/repos/asf/beam/blob/accb2087/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
index 10888c2..f64f3fb 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
@@ -25,11 +25,13 @@ import org.apache.beam.sdk.values.PCollectionView;
/** A {@link DataflowRunner} marker class for creating a {@link PCollectionView}. */
public class CreateDataflowView<ElemT, ViewT>
extends PTransform<PCollection<ElemT>, PCollection<ElemT>> {
- public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> forBatch(PCollectionView<ViewT> view) {
+ public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> forBatch(
+ PCollectionView<ViewT> view) {
return new CreateDataflowView<>(view, false);
}
- public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> forStreaming(PCollectionView<ViewT> view) {
+ public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> forStreaming(
+ PCollectionView<ViewT> view) {
return new CreateDataflowView<>(view, true);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/accb2087/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 72e4f83..a650092 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -116,6 +116,7 @@ import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.state.MapState;
import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Combine.GroupedValues;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
@@ -406,9 +407,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
BatchViewOverrides.BatchViewAsMultimap.class, this)))
.add(
PTransformOverride.of(
- PTransformMatchers.classEqualTo(View.AsSingleton.class),
- new ReflectiveViewOverrideFactory(
- BatchViewOverrides.BatchViewAsSingleton.class, this)))
+ PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
+ new CombineGloballyAsSingletonViewOverrideFactory(this)))
.add(
PTransformOverride.of(
PTransformMatchers.classEqualTo(View.AsList.class),
@@ -437,29 +437,58 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
/**
- * Replace the View.AsYYY transform with specialized view overrides for Dataflow. It is required that the
- * new replacement transform uses the supplied PCollectionView and does not create another instance.
+ * Replace the {@link Combine.GloballyAsSingletonView} transform with a specialization which
+ * re-applies the {@link CombineFn} and adds a specialization specific to the Dataflow runner.
+ */
+ private static class CombineGloballyAsSingletonViewOverrideFactory<InputT, ViewT>
+ extends ReflectiveViewOverrideFactory<InputT, ViewT> {
+
+ private CombineGloballyAsSingletonViewOverrideFactory(DataflowRunner runner) {
+ super((Class) BatchViewOverrides.BatchViewAsSingleton.class, runner);
+ }
+
+ @Override
+ public PTransformReplacement<PCollection<InputT>, PValue> getReplacementTransform(
+ AppliedPTransform<
+ PCollection<InputT>,
+ PValue,
+ PTransform<PCollection<InputT>, PValue>> transform) {
+ Combine.GloballyAsSingletonView<?, ?> combineTransform =
+ (Combine.GloballyAsSingletonView) transform.getTransform();
+ return PTransformReplacement.of(
+ PTransformReplacements.getSingletonMainInput(transform),
+ new BatchViewOverrides.BatchViewAsSingleton(
+ runner,
+ findCreatePCollectionView(transform),
+ (CombineFn) combineTransform.getCombineFn(),
+ combineTransform.getFanout()));
+ }
+ }
+
+ /**
+ * Replace the View.AsYYY transform with specialized view overrides for Dataflow. It is required
+ * that the new replacement transform uses the supplied PCollectionView and does not create
+ * another instance.
*/
private static class ReflectiveViewOverrideFactory<InputT, ViewT>
implements PTransformOverrideFactory<PCollection<InputT>,
PValue, PTransform<PCollection<InputT>, PValue>> {
- private final Class<PTransform<PCollection<InputT>, PCollectionView<ViewT>>> replacement;
- private final DataflowRunner runner;
+ final Class<PTransform<PCollection<InputT>, PValue>> replacement;
+ final DataflowRunner runner;
private ReflectiveViewOverrideFactory(
- Class<PTransform<PCollection<InputT>, PCollectionView<ViewT>>> replacement,
+ Class<PTransform<PCollection<InputT>, PValue>> replacement,
DataflowRunner runner) {
this.replacement = replacement;
this.runner = runner;
}
- @Override
- public PTransformReplacement<PCollection<InputT>, PValue> getReplacementTransform(
- final AppliedPTransform<PCollection<InputT>,
- PValue,
- PTransform<PCollection<InputT>, PValue>> transform) {
-
+ CreatePCollectionView<ViewT, ViewT> findCreatePCollectionView(
+ final AppliedPTransform<
+ PCollection<InputT>,
+ PValue,
+ PTransform<PCollection<InputT>, PValue>> transform) {
final AtomicReference<CreatePCollectionView> viewTransformRef = new AtomicReference<>();
transform.getPipeline().traverseTopologically(new PipelineVisitor.Defaults() {
// Stores whether we have entered the expected composite view transform.
@@ -495,18 +524,29 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
checkState(viewTransformRef.get() != null,
"Expected to find CreatePCollectionView contained within %s",
transform.getTransform());
- PTransform<PCollection<InputT>, PCollectionView<ViewT>> rep =
+ return viewTransformRef.get();
+ }
+
+ @Override
+ public PTransformReplacement<PCollection<InputT>, PValue> getReplacementTransform(
+ final AppliedPTransform<PCollection<InputT>,
+ PValue,
+ PTransform<PCollection<InputT>, PValue>> transform) {
+
+ PTransform<PCollection<InputT>, PValue> rep =
InstanceBuilder.ofType(replacement)
.withArg(DataflowRunner.class, runner)
- .withArg(CreatePCollectionView.class, viewTransformRef.get())
+ .withArg(CreatePCollectionView.class, findCreatePCollectionView(transform))
.build();
- return PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(transform), (PTransform) rep);
+ return PTransformReplacement.of(
+ PTransformReplacements.getSingletonMainInput(transform), (PTransform) rep);
}
@Override
- public Map<PValue, ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> outputs, PValue newOutput) {
- // We do not replace any of the outputs because we expect that the new PTransform will re-use the original
- // PCollectionView that was returned.
+ public Map<PValue, ReplacementOutput> mapOutputs(
+ Map<TupleTag<?>, PValue> outputs, PValue newOutput) {
+ // We do not replace any of the outputs because we expect that the new PTransform will
+ // re-use the original PCollectionView that was returned.
return ImmutableMap.of();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/accb2087/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index e03abb9..81e7a97 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -991,15 +991,15 @@ public class DataflowPipelineTranslatorTest implements Serializable {
assertAllStepOutputsHaveUniqueIds(job);
List<Step> steps = job.getSteps();
- assertEquals(5, steps.size());
+ assertEquals(9, steps.size());
@SuppressWarnings("unchecked")
List<Map<String, Object>> toIsmRecordOutputs =
- (List<Map<String, Object>>) steps.get(3).getProperties().get(PropertyNames.OUTPUT_INFO);
+ (List<Map<String, Object>>) steps.get(7).getProperties().get(PropertyNames.OUTPUT_INFO);
assertTrue(
Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format"));
- Step collectionToSingletonStep = steps.get(4);
+ Step collectionToSingletonStep = steps.get(8);
assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
}
[4/7] beam git commit: [BEAM-2926] Migrate to using a trivial
multimap materialization within the Java SDK.
Posted by lc...@apache.org.
[BEAM-2926] Migrate to using a trivial multimap materialization within the Java SDK.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5e2593da
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5e2593da
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5e2593da
Branch: refs/heads/master
Commit: 5e2593daacec83e876b747d56d8c335531a54d1d
Parents: 7ce0a82
Author: Luke Cwik <lc...@google.com>
Authored: Fri Nov 10 11:08:24 2017 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Nov 15 08:59:51 2017 -0800
----------------------------------------------------------------------
.../apex/translation/ParDoTranslatorTest.java | 3 +-
.../core/construction/PTransformMatchers.java | 3 +-
.../core/construction/ParDoTranslation.java | 17 +-
.../CreatePCollectionViewTranslationTest.java | 10 +-
.../construction/PTransformMatchersTest.java | 33 +-
.../core/construction/ParDoTranslationTest.java | 7 +-
.../core/InMemoryMultimapSideInputView.java | 62 +++
.../beam/runners/core/SideInputHandler.java | 63 ++--
.../core/InMemoryMultimapSideInputViewTest.java | 53 +++
.../beam/runners/core/SideInputHandlerTest.java | 89 +++--
.../beam/runners/direct/SideInputContainer.java | 38 +-
.../runners/direct/EvaluationContextTest.java | 44 ++-
.../runners/direct/SideInputContainerTest.java | 226 +++++------
.../direct/ViewEvaluatorFactoryTest.java | 13 +-
.../runners/direct/ViewOverrideFactoryTest.java | 9 +-
.../direct/WriteWithShardingFactoryTest.java | 9 +-
.../FlinkStreamingTransformTranslators.java | 1 -
.../functions/FlinkSideInputReader.java | 27 +-
.../functions/SideInputInitializer.java | 50 ++-
.../flink/streaming/DoFnOperatorTest.java | 40 +-
.../DataflowPipelineTranslatorTest.java | 12 +-
.../spark/translation/TransformTranslator.java | 7 +-
.../spark/util/SparkSideInputReader.java | 50 ++-
.../org/apache/beam/sdk/transforms/Combine.java | 13 +-
.../apache/beam/sdk/transforms/DoFnTester.java | 20 +-
.../beam/sdk/transforms/Materializations.java | 29 +-
.../org/apache/beam/sdk/transforms/View.java | 67 +++-
.../org/apache/beam/sdk/transforms/ViewFn.java | 6 +-
.../apache/beam/sdk/values/PCollectionView.java | 7 +-
.../beam/sdk/values/PCollectionViews.java | 256 ++++++-------
.../sdk/testing/PCollectionViewTesting.java | 375 +++----------------
.../beam/sdk/transforms/DoFnTesterTest.java | 12 +-
.../beam/sdk/io/gcp/bigquery/BatchLoads.java | 14 +-
33 files changed, 809 insertions(+), 856 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java
index 73382e3..4a4ca1d 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.apex.translation;
+import static org.apache.beam.sdk.testing.PCollectionViewTesting.materializeValuesFor;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -219,7 +220,7 @@ public class ParDoTranslatorTest {
operator.beginWindow(0);
WindowedValue<Integer> wv1 = WindowedValue.valueInGlobalWindow(1);
WindowedValue<Iterable<?>> sideInput = WindowedValue.<Iterable<?>>valueInGlobalWindow(
- Lists.<Integer>newArrayList(22));
+ materializeValuesFor(View.asSingleton(), 22));
operator.input.process(ApexStreamTuple.DataTuple.of(wv1)); // pushed back input
final List<Object> results = Lists.newArrayList();
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
index 0d27241..42ac73f 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
@@ -36,7 +36,6 @@ import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.ProcessElementMethod;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
@@ -304,7 +303,7 @@ public class PTransformMatchers {
}
CreatePCollectionView<?, ?> createView =
(CreatePCollectionView<?, ?>) application.getTransform();
- ViewFn<Iterable<WindowedValue<?>>, ?> viewFn = createView.getView().getViewFn();
+ ViewFn<?, ?> viewFn = createView.getView().getViewFn();
return viewFn.getClass().equals(viewFnType);
}
};
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index f88cbe5..e00b912 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -50,7 +50,6 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput.Builder;
import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
@@ -73,8 +72,6 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
@@ -561,25 +558,19 @@ public class ParDoTranslation {
ViewFn<?, ?> viewFn = viewFnFromProto(sideInput.getViewFn());
WindowingStrategy<?, ?> windowingStrategy = pCollection.getWindowingStrategy().fixDefaults();
- Coder<Iterable<WindowedValue<?>>> coder =
- (Coder)
- IterableCoder.of(
- FullWindowedValueCoder.of(
- pCollection.getCoder(),
- pCollection.getWindowingStrategy().getWindowFn().windowCoder()));
checkArgument(
- sideInput.getAccessPattern().getUrn().equals(Materializations.ITERABLE_MATERIALIZATION_URN),
+ sideInput.getAccessPattern().getUrn().equals(Materializations.MULTIMAP_MATERIALIZATION_URN),
"Unknown View Materialization URN %s",
sideInput.getAccessPattern().getUrn());
PCollectionView<?> view =
new RunnerPCollectionView<>(
pCollection,
- (TupleTag<Iterable<WindowedValue<?>>>) tag,
- (ViewFn<Iterable<WindowedValue<?>>, ?>) viewFn,
+ (TupleTag) tag,
+ (ViewFn) viewFn,
windowMappingFn,
windowingStrategy,
- coder);
+ (Coder) pCollection.getCoder());
return view;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java
index df659a8..690e3ca 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java
@@ -23,12 +23,14 @@ import static org.junit.Assert.assertThat;
import com.google.common.collect.ImmutableList;
import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PCollectionViews;
@@ -63,12 +65,11 @@ public class CreatePCollectionViewTranslationTest {
testPCollection.getWindowingStrategy(),
false,
null,
- testPCollection.getCoder())),
+ StringUtf8Coder.of())),
CreatePCollectionView.of(
PCollectionViews.listView(
testPCollection,
- testPCollection.getWindowingStrategy(),
- testPCollection.getCoder())));
+ testPCollection.getWindowingStrategy())));
}
@Parameter(0)
@@ -76,7 +77,8 @@ public class CreatePCollectionViewTranslationTest {
public static TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
- private static final PCollection<String> testPCollection = p.apply(Create.of("one"));
+ private static final PCollection<KV<Void, String>> testPCollection =
+ p.apply(Create.of(KV.of((Void) null, "one")));
@Test
public void testEncodedProto() throws Exception {
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index 324e38d..c2dab4c 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -54,8 +54,6 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.Materialization;
-import org.apache.beam.sdk.transforms.Materializations;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
@@ -67,7 +65,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
@@ -376,9 +373,8 @@ public class PTransformMatchersTest implements Serializable {
@Test
public void createViewWithViewFn() {
PCollection<Integer> input = p.apply(Create.of(1));
- PCollectionView<Iterable<Integer>> view =
- PCollectionViews.iterableView(input, input.getWindowingStrategy(), input.getCoder());
- ViewFn<Iterable<WindowedValue<?>>, Iterable<Integer>> viewFn = view.getViewFn();
+ PCollectionView<Iterable<Integer>> view = input.apply(View.<Integer>asIterable());
+ ViewFn<?, ?> viewFn = view.getViewFn();
CreatePCollectionView<?, ?> createView = CreatePCollectionView.of(view);
PTransformMatcher matcher = PTransformMatchers.createViewWithViewFn(viewFn.getClass());
@@ -388,23 +384,10 @@ public class PTransformMatchersTest implements Serializable {
@Test
public void createViewWithViewFnDifferentViewFn() {
PCollection<Integer> input = p.apply(Create.of(1));
- PCollectionView<Iterable<Integer>> view =
- PCollectionViews.iterableView(input, input.getWindowingStrategy(), input.getCoder());
- ViewFn<Iterable<WindowedValue<?>>, Iterable<Integer>> viewFn =
- new ViewFn<Iterable<WindowedValue<?>>, Iterable<Integer>>() {
- @Override
- public Materialization<Iterable<WindowedValue<?>>> getMaterialization() {
- @SuppressWarnings({"rawtypes", "unchecked"})
- Materialization<Iterable<WindowedValue<?>>> materialization =
- (Materialization) Materializations.iterable();
- return materialization;
- }
-
- @Override
- public Iterable<Integer> apply(Iterable<WindowedValue<?>> contents) {
- return Collections.emptyList();
- }
- };
+ PCollectionView<Iterable<Integer>> view = input.apply(View.<Integer>asIterable());
+
+ // Purposely create a subclass to get a different class then what was expected.
+ ViewFn<?, ?> viewFn = new PCollectionViews.IterableViewFn() {};
CreatePCollectionView<?, ?> createView = CreatePCollectionView.of(view);
PTransformMatcher matcher = PTransformMatchers.createViewWithViewFn(viewFn.getClass());
@@ -414,9 +397,7 @@ public class PTransformMatchersTest implements Serializable {
@Test
public void createViewWithViewFnNotCreatePCollectionView() {
PCollection<Integer> input = p.apply(Create.of(1));
- PCollectionView<Iterable<Integer>> view =
- PCollectionViews.iterableView(input, input.getWindowingStrategy(), input.getCoder());
-
+ PCollectionView<Iterable<Integer>> view = input.apply(View.<Integer>asIterable());
PTransformMatcher matcher =
PTransformMatchers.createViewWithViewFn(view.getViewFn().getClass());
assertThat(matcher.matches(getAppliedTransform(View.asIterable())), is(false));
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
index b79947e..83594f1 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
@@ -29,6 +29,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
@@ -166,7 +167,8 @@ public class ParDoTranslationTest {
view.getPCollection(),
protoTransform,
rehydratedComponents);
- assertThat(restoredView.getTagInternal(), equalTo(view.getTagInternal()));
+ assertThat(restoredView.getTagInternal(),
+ Matchers.<TupleTag<?>>equalTo(view.getTagInternal()));
assertThat(restoredView.getViewFn(), instanceOf(view.getViewFn().getClass()));
assertThat(
restoredView.getWindowMappingFn(), instanceOf(view.getWindowMappingFn().getClass()));
@@ -174,7 +176,8 @@ public class ParDoTranslationTest {
restoredView.getWindowingStrategyInternal(),
Matchers.<WindowingStrategy<?, ?>>equalTo(
view.getWindowingStrategyInternal().fixDefaults()));
- assertThat(restoredView.getCoderInternal(), equalTo(view.getCoderInternal()));
+ assertThat(restoredView.getCoderInternal(),
+ Matchers.<Coder<?>>equalTo(view.getCoderInternal()));
}
String mainInputId = sdkComponents.registerPCollection(mainInput);
assertThat(
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryMultimapSideInputView.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryMultimapSideInputView.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryMultimapSideInputView.java
new file mode 100644
index 0000000..b451547
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryMultimapSideInputView.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.Materializations.MultimapView;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * An in-memory representation of {@link MultimapView}.
+ */
+public class InMemoryMultimapSideInputView<K, V> implements Materializations.MultimapView<K, V> {
+
+ /**
+ * Creates a {@link MultimapView} from the provided values. The provided {@link Coder} is used
+ * to guarantee structural equality for keys instead of assuming Java object equality.
+ */
+ public static <K, V> MultimapView<K, V> fromIterable(
+ Coder<K> keyCoder, Iterable<KV<K, V>> values) {
+ // We specifically use an array list multimap to allow for:
+ // * null keys
+ // * null values
+ // * duplicate values
+ Multimap<Object, Object> multimap = ArrayListMultimap.create();
+ for (KV<K, V> value : values) {
+ multimap.put(keyCoder.structuralValue(value.getKey()), value.getValue());
+ }
+ return new InMemoryMultimapSideInputView(keyCoder, Multimaps.unmodifiableMultimap(multimap));
+ }
+
+ private final Coder<K> keyCoder;
+ private final Multimap<Object, V> structuralKeyToValuesMap;
+
+ private InMemoryMultimapSideInputView(Coder<K> keyCoder, Multimap<Object, V> data) {
+ this.keyCoder = keyCoder;
+ this.structuralKeyToValuesMap = data;
+ }
+
+ @Override
+ public Iterable<V> get(K k) {
+ return structuralKeyToValuesMap.get(keyCoder.structuralValue(k));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
index 3b37702..3ff4c94 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
@@ -17,22 +17,28 @@
*/
package org.apache.beam.runners.core;
-import java.util.ArrayList;
+import static com.google.common.base.Preconditions.checkArgument;
+
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.SetCoder;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.Materializations.MultimapView;
+import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
/**
@@ -58,7 +64,7 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
/**
* State internals that are scoped not to the key of a value but are global. The state can still
- * be keep locally but if side inputs are broadcast to all parallel operators then all will
+ * be kept locally but if side inputs are broadcast to all parallel operators then all will
* have the same view of the state.
*/
private final StateInternals stateInternals;
@@ -80,7 +86,7 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
*/
private final Map<
PCollectionView<?>,
- StateTag<ValueState<Iterable<WindowedValue<?>>>>> sideInputContentsTags;
+ StateTag<ValueState<Iterable<?>>>> sideInputContentsTags;
/**
* Creates a new {@code SideInputHandler} for the given side inputs that uses
@@ -94,7 +100,15 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
this.availableWindowsTags = new HashMap<>();
this.sideInputContentsTags = new HashMap<>();
- for (PCollectionView<?> sideInput: sideInputs) {
+ for (PCollectionView<?> sideInput : sideInputs) {
+ checkArgument(
+ Materializations.MULTIMAP_MATERIALIZATION_URN.equals(
+ sideInput.getViewFn().getMaterialization().getUrn()),
+ "This handler is only capable of dealing with %s materializations "
+ + "but was asked to handle %s for PCollectionView with tag %s.",
+ Materializations.MULTIMAP_MATERIALIZATION_URN,
+ sideInput.getViewFn().getMaterialization().getUrn(),
+ sideInput.getTagInternal().getId());
@SuppressWarnings("unchecked")
Coder<BoundedWindow> windowCoder =
@@ -114,9 +128,9 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
availableWindowsTags.put(sideInput, availableTag);
- Coder<Iterable<WindowedValue<?>>> coder = sideInput.getCoderInternal();
- StateTag<ValueState<Iterable<WindowedValue<?>>>> stateTag =
- StateTags.value("side-input-data-" + sideInput.getTagInternal().getId(), coder);
+ StateTag<ValueState<Iterable<?>>> stateTag =
+ StateTags.value("side-input-data-" + sideInput.getTagInternal().getId(),
+ (Coder) IterableCoder.of(sideInput.getCoderInternal()));
sideInputContentsTags.put(sideInput, stateTag);
}
}
@@ -129,7 +143,6 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
public void addSideInputValue(
PCollectionView<?> sideInput,
WindowedValue<Iterable<?>> value) {
-
@SuppressWarnings("unchecked")
Coder<BoundedWindow> windowCoder =
(Coder<BoundedWindow>) sideInput
@@ -137,19 +150,13 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
.getWindowFn()
.windowCoder();
- // reify the WindowedValue
- List<WindowedValue<?>> inputWithReifiedWindows = new ArrayList<>();
- for (Object e: value.getValue()) {
- inputWithReifiedWindows.add(value.withValue(e));
- }
-
- StateTag<ValueState<Iterable<WindowedValue<?>>>> stateTag =
+ StateTag<ValueState<Iterable<?>>> stateTag =
sideInputContentsTags.get(sideInput);
- for (BoundedWindow window: value.getWindows()) {
+ for (BoundedWindow window : value.getWindows()) {
stateInternals
.state(StateNamespaces.window(windowCoder, window), stateTag)
- .write(inputWithReifiedWindows);
+ .write(value.getValue());
stateInternals
.state(StateNamespaces.global(), availableWindowsTags.get(sideInput))
@@ -159,28 +166,32 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
@Nullable
@Override
- public <T> T get(PCollectionView<T> sideInput, BoundedWindow window) {
-
+ public <T> T get(PCollectionView<T> view, BoundedWindow window) {
@SuppressWarnings("unchecked")
Coder<BoundedWindow> windowCoder =
- (Coder<BoundedWindow>) sideInput
+ (Coder<BoundedWindow>) view
.getWindowingStrategyInternal()
.getWindowFn()
.windowCoder();
- StateTag<ValueState<Iterable<WindowedValue<?>>>> stateTag =
- sideInputContentsTags.get(sideInput);
+ StateTag<ValueState<Iterable<?>>> stateTag =
+ sideInputContentsTags.get(view);
- ValueState<Iterable<WindowedValue<?>>> state =
+ ValueState<Iterable<?>> state =
stateInternals.state(StateNamespaces.window(windowCoder, window), stateTag);
- @Nullable Iterable<WindowedValue<?>> elements = state.read();
+ // TODO: Add support for choosing which representation is contained based upon the
+ // side input materialization. We currently can assume that we always have a multimap
+ // materialization as that is the only supported type within the Java SDK.
+ @Nullable Iterable<KV<?, ?>> elements = (Iterable<KV<?, ?>>) state.read();
if (elements == null) {
elements = Collections.emptyList();
}
- return sideInput.getViewFn().apply(elements);
+ ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) view.getViewFn();
+ Coder<?> keyCoder = ((KvCoder<?, ?>) view.getCoderInternal()).getKeyCoder();
+ return viewFn.apply(InMemoryMultimapSideInputView.fromIterable(keyCoder, (Iterable) elements));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryMultimapSideInputViewTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryMultimapSideInputViewTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryMultimapSideInputViewTest.java
new file mode 100644
index 0000000..6840355
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryMultimapSideInputViewTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.Materializations.MultimapView;
+import org.apache.beam.sdk.values.KV;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link InMemoryMultimapSideInputView}. */
+@RunWith(JUnit4.class)
+public class InMemoryMultimapSideInputViewTest {
+ @Test
+ public void testStructuralKeyEquality() {
+ MultimapView<byte[], Integer> view = InMemoryMultimapSideInputView.fromIterable(
+ ByteArrayCoder.of(),
+ ImmutableList.of(KV.of(new byte[]{ 0x00 }, 0), KV.of(new byte[]{ 0x01 }, 1)));
+ assertEquals(view.get(new byte[]{ 0x00 }), ImmutableList.of(0));
+ assertEquals(view.get(new byte[]{ 0x01 }), ImmutableList.of(1));
+ assertEquals(view.get(new byte[]{ 0x02 }), ImmutableList.of());
+ }
+
+ @Test
+ public void testValueGrouping() {
+ MultimapView<String, String> view = InMemoryMultimapSideInputView.fromIterable(
+ StringUtf8Coder.of(),
+ ImmutableList.of(KV.of("A", "a1"), KV.of("A", "a2"), KV.of("B", "b1")));
+ assertEquals(view.get("A"), ImmutableList.of("a1", "a2"));
+ assertEquals(view.get("B"), ImmutableList.of("b1"));
+ assertEquals(view.get("C"), ImmutableList.of());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java
index f9e0aaf..7cbd1b0 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java
@@ -17,24 +17,28 @@
*/
package org.apache.beam.runners.core;
+import static org.apache.beam.sdk.testing.PCollectionViewTesting.materializeValuesFor;
import static org.hamcrest.Matchers.contains;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import com.google.common.collect.ImmutableList;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.testing.PCollectionViewTesting;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Duration;
import org.joda.time.Instant;
-import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -47,26 +51,19 @@ public class SideInputHandlerTest {
private static final long WINDOW_MSECS_1 = 100;
private static final long WINDOW_MSECS_2 = 500;
-
- private WindowingStrategy<Object, IntervalWindow> windowingStrategy1 =
- WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_1)));
-
- private PCollectionView<Iterable<String>> view1 =
- PCollectionViewTesting.testingView(
- new TupleTag<Iterable<WindowedValue<String>>>() {},
- new PCollectionViewTesting.IdentityViewFn<String>(),
- StringUtf8Coder.of(),
- windowingStrategy1);
-
- private WindowingStrategy<Object, IntervalWindow> windowingStrategy2 =
- WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_2)));
-
- private PCollectionView<Iterable<String>> view2 =
- PCollectionViewTesting.testingView(
- new TupleTag<Iterable<WindowedValue<String>>>() {},
- new PCollectionViewTesting.IdentityViewFn<String>(),
- StringUtf8Coder.of(),
- windowingStrategy2);
+ private PCollectionView<Iterable<String>> view1;
+ private PCollectionView<Iterable<String>> view2;
+
+ @Before
+ public void setUp() {
+ PCollection<String> pc = Pipeline.create().apply(Create.of("1"));
+ view1 = pc
+ .apply(Window.<String>into(FixedWindows.of(new Duration(WINDOW_MSECS_1))))
+ .apply(View.<String>asIterable());
+ view2 = pc
+ .apply(Window.<String>into(FixedWindows.of(new Duration(WINDOW_MSECS_2))))
+ .apply(View.<String>asIterable());
+ }
@Test
public void testIsEmpty() {
@@ -113,7 +110,9 @@ public class SideInputHandlerTest {
// add a value for view1
sideInputHandler.addSideInputValue(
view1,
- valuesInWindow(ImmutableList.of("Hello"), new Instant(0), firstWindow));
+ valuesInWindow(
+ materializeValuesFor(View.asIterable(), "Hello"),
+ new Instant(0), firstWindow));
// now side input should be ready
assertTrue(sideInputHandler.isReady(view1, firstWindow));
@@ -139,16 +138,20 @@ public class SideInputHandlerTest {
// add a first value for view1
sideInputHandler.addSideInputValue(
view1,
- valuesInWindow(ImmutableList.of("Hello"), new Instant(0), window));
+ valuesInWindow(
+ materializeValuesFor(View.asIterable(), "Hello"),
+ new Instant(0), window));
- Assert.assertThat(sideInputHandler.get(view1, window), contains("Hello"));
+ assertThat(sideInputHandler.get(view1, window), contains("Hello"));
// subsequent values should replace existing values
sideInputHandler.addSideInputValue(
view1,
- valuesInWindow(ImmutableList.of("Ciao", "Buongiorno"), new Instant(0), window));
+ valuesInWindow(
+ materializeValuesFor(View.asIterable(), "Ciao", "Buongiorno"),
+ new Instant(0), window));
- Assert.assertThat(sideInputHandler.get(view1, window), contains("Ciao", "Buongiorno"));
+ assertThat(sideInputHandler.get(view1, window), contains("Ciao", "Buongiorno"));
}
@Test
@@ -166,19 +169,21 @@ public class SideInputHandlerTest {
// add a first value for view1 in the first window
sideInputHandler.addSideInputValue(
view1,
- valuesInWindow(ImmutableList.of("Hello"), new Instant(0), firstWindow));
+ valuesInWindow(materializeValuesFor(View.asIterable(), "Hello"),
+ new Instant(0), firstWindow));
- Assert.assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
+ assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
// add something for second window of view1
sideInputHandler.addSideInputValue(
view1,
- valuesInWindow(ImmutableList.of("Arrivederci"), new Instant(0), secondWindow));
+ valuesInWindow(materializeValuesFor(View.asIterable(), "Arrivederci"),
+ new Instant(0), secondWindow));
- Assert.assertThat(sideInputHandler.get(view1, secondWindow), contains("Arrivederci"));
+ assertThat(sideInputHandler.get(view1, secondWindow), contains("Arrivederci"));
// contents for first window should be unaffected
- Assert.assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
+ assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
}
@Test
@@ -194,9 +199,10 @@ public class SideInputHandlerTest {
// add value for view1 in the first window
sideInputHandler.addSideInputValue(
view1,
- valuesInWindow(ImmutableList.of("Hello"), new Instant(0), firstWindow));
+ valuesInWindow(materializeValuesFor(View.asIterable(), "Hello"),
+ new Instant(0), firstWindow));
- Assert.assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
+ assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
// view2 should not have any data
assertFalse(sideInputHandler.isReady(view2, firstWindow));
@@ -204,18 +210,19 @@ public class SideInputHandlerTest {
// also add some data for view2
sideInputHandler.addSideInputValue(
view2,
- valuesInWindow(ImmutableList.of("Salut"), new Instant(0), firstWindow));
+ valuesInWindow(materializeValuesFor(View.asIterable(), "Salut"),
+ new Instant(0), firstWindow));
assertTrue(sideInputHandler.isReady(view2, firstWindow));
- Assert.assertThat(sideInputHandler.get(view2, firstWindow), contains("Salut"));
+ assertThat(sideInputHandler.get(view2, firstWindow), contains("Salut"));
// view1 should not be affected by that
- Assert.assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
+ assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
}
@SuppressWarnings({"unchecked", "rawtypes"})
private WindowedValue<Iterable<?>> valuesInWindow(
- Iterable<?> values, Instant timestamp, BoundedWindow window) {
+ List<Object> values, Instant timestamp, BoundedWindow window) {
return (WindowedValue) WindowedValue.of(values, timestamp, window, PaneInfo.NO_FIRING);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
index 43da92f..ea8f168 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.direct;
import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
import com.google.common.base.Optional;
import com.google.common.cache.CacheBuilder;
@@ -35,11 +36,18 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
+import org.apache.beam.runners.core.InMemoryMultimapSideInputView;
import org.apache.beam.runners.core.ReadyCheckingSideInputReader;
import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.Materializations.MultimapView;
+import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.WindowingStrategy;
@@ -60,6 +68,16 @@ class SideInputContainer {
*/
public static SideInputContainer create(
final EvaluationContext context, Collection<PCollectionView<?>> containedViews) {
+ for (PCollectionView<?> pCollectionView : containedViews) {
+ checkArgument(
+ Materializations.MULTIMAP_MATERIALIZATION_URN.equals(
+ pCollectionView.getViewFn().getMaterialization().getUrn()),
+ "This handler is only capable of dealing with %s materializations "
+ + "but was asked to handle %s for PCollectionView with tag %s.",
+ Materializations.MULTIMAP_MATERIALIZATION_URN,
+ pCollectionView.getViewFn().getMaterialization().getUrn(),
+ pCollectionView.getTagInternal().getId());
+ }
LoadingCache<PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>>
viewByWindows = CacheBuilder.newBuilder().build(new CallbackSchedulingLoader(context));
return new SideInputContainer(containedViews, viewByWindows);
@@ -239,11 +257,21 @@ class SideInputContainer {
"calling get() on PCollectionView %s that is not ready in window %s",
view,
window);
- // Safe covariant cast
- @SuppressWarnings("unchecked") Iterable<WindowedValue<?>> values =
- (Iterable<WindowedValue<?>>) viewContents.getUnchecked(PCollectionViewWindow.of(view,
- window)).get();
- return view.getViewFn().apply(values);
+ // Safe covariant cast since we know that the view only contains KVs.
+ @SuppressWarnings("unchecked") Iterable<KV<?, ?>> elements = Iterables.transform(
+ (Iterable<WindowedValue<KV<?, ?>>>) viewContents.getUnchecked(
+ PCollectionViewWindow.of(view, window)).get(),
+ new Function<WindowedValue<KV<?, ?>>, KV<?, ?>>() {
+ @Override
+ public KV<?, ?> apply(WindowedValue<KV<?, ?>> windowedValue) {
+ return windowedValue.getValue();
+ }
+ });
+
+ ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) view.getViewFn();
+ Coder<?> keyCoder = ((KvCoder<?, ?>) view.getCoderInternal()).getKeyCoder();
+ return viewFn.apply(
+ InMemoryMultimapSideInputView.fromIterable(keyCoder, (Iterable) elements));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index cc9ce60..0a1ffe7 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.direct;
+import static org.apache.beam.sdk.testing.PCollectionViewTesting.materializeValuesFor;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.emptyIterable;
@@ -28,7 +29,6 @@ import static org.junit.Assert.assertThat;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import java.util.Collection;
-import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.core.SideInputReader;
@@ -41,8 +41,10 @@ import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -126,33 +128,47 @@ public class EvaluationContextTest {
@Test
public void writeToViewWriterThenReadReads() {
- PCollectionViewWriter<Integer, Iterable<Integer>> viewWriter =
+ PCollectionViewWriter<?, Iterable<Integer>> viewWriter =
context.createPCollectionViewWriter(
PCollection.createPrimitiveOutputInternal(
p,
WindowingStrategy.globalDefault(),
IsBounded.BOUNDED,
- IterableCoder.of(VarIntCoder.of())),
+ IterableCoder.of(KvCoder.of(VoidCoder.of(), VarIntCoder.of()))),
view);
BoundedWindow window = new TestBoundedWindow(new Instant(1024L));
BoundedWindow second = new TestBoundedWindow(new Instant(899999L));
- WindowedValue<Integer> firstValue =
- WindowedValue.of(1, new Instant(1222), window, PaneInfo.ON_TIME_AND_ONLY_FIRING);
- WindowedValue<Integer> secondValue =
- WindowedValue.of(
- 2, new Instant(8766L), second, PaneInfo.createPane(true, false, Timing.ON_TIME, 0, 0));
- Iterable<WindowedValue<Integer>> values = ImmutableList.of(firstValue, secondValue);
- viewWriter.add(values);
+ ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder();
+ for (Object materializedValue : materializeValuesFor(View.asIterable(), 1)) {
+ valuesBuilder.add(WindowedValue.of(
+ materializedValue,
+ new Instant(1222),
+ window,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING));
+ }
+ for (Object materializedValue : materializeValuesFor(View.asIterable(), 2)) {
+ valuesBuilder.add(WindowedValue.of(
+ materializedValue,
+ new Instant(8766L),
+ second,
+ PaneInfo.createPane(true, false, Timing.ON_TIME, 0, 0)));
+ }
+ viewWriter.add((Iterable) valuesBuilder.build());
SideInputReader reader =
context.createSideInputReader(ImmutableList.<PCollectionView<?>>of(view));
assertThat(reader.get(view, window), containsInAnyOrder(1));
assertThat(reader.get(view, second), containsInAnyOrder(2));
- WindowedValue<Integer> overrittenSecondValue =
- WindowedValue.of(
- 4444, new Instant(8677L), second, PaneInfo.createPane(false, true, Timing.LATE, 1, 1));
- viewWriter.add(Collections.singleton(overrittenSecondValue));
+ ImmutableList.Builder<WindowedValue<?>> overwrittenValuesBuilder = ImmutableList.builder();
+ for (Object materializedValue : materializeValuesFor(View.asIterable(), 4444)) {
+ overwrittenValuesBuilder.add(WindowedValue.of(
+ materializedValue,
+ new Instant(8677L),
+ second,
+ PaneInfo.createPane(false, true, Timing.LATE, 1, 1)));
+ }
+ viewWriter.add((Iterable) overwrittenValuesBuilder.build());
assertThat(reader.get(view, second), containsInAnyOrder(2));
// The cached value is served in the earlier reader
reader = context.createSideInputReader(ImmutableList.<PCollectionView<?>>of(view));
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
index 5e7c799..91255e0 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.direct;
+import static org.apache.beam.sdk.testing.PCollectionViewTesting.materializeValuesFor;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
@@ -34,8 +35,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.core.ReadyCheckingSideInputReader;
import org.apache.beam.runners.core.SideInputReader;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Mean;
@@ -49,9 +48,7 @@ import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.TypeDescriptor;
-import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Rule;
@@ -134,13 +131,22 @@ public class SideInputContainerTest {
@Test
public void getAfterWriteReturnsPaneInWindow() throws Exception {
- WindowedValue<KV<String, Integer>> one =
- WindowedValue.of(
- KV.of("one", 1), new Instant(1L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING);
- WindowedValue<KV<String, Integer>> two =
- WindowedValue.of(
- KV.of("two", 2), new Instant(20L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING);
- container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two));
+ ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder();
+ for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("one", 1))) {
+ valuesBuilder.add(WindowedValue.of(
+ materializedValue,
+ new Instant(1L),
+ FIRST_WINDOW,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING));
+ }
+ for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("two", 2))) {
+ valuesBuilder.add(WindowedValue.of(
+ materializedValue,
+ new Instant(20L),
+ FIRST_WINDOW,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING));
+ }
+ container.write(mapView, valuesBuilder.build());
Map<String, Integer> viewContents =
container
@@ -153,19 +159,22 @@ public class SideInputContainerTest {
@Test
public void getReturnsLatestPaneInWindow() throws Exception {
- WindowedValue<KV<String, Integer>> one =
- WindowedValue.of(
- KV.of("one", 1),
- new Instant(1L),
- SECOND_WINDOW,
- PaneInfo.createPane(true, false, Timing.EARLY));
- WindowedValue<KV<String, Integer>> two =
- WindowedValue.of(
- KV.of("two", 2),
- new Instant(20L),
- SECOND_WINDOW,
- PaneInfo.createPane(true, false, Timing.EARLY));
- container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two));
+ ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder();
+ for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("one", 1))) {
+ valuesBuilder.add(WindowedValue.of(
+ materializedValue,
+ new Instant(1L),
+ SECOND_WINDOW,
+ PaneInfo.createPane(true, false, Timing.EARLY)));
+ }
+ for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("two", 2))) {
+ valuesBuilder.add(WindowedValue.of(
+ materializedValue,
+ new Instant(20L),
+ SECOND_WINDOW,
+ PaneInfo.createPane(true, false, Timing.EARLY)));
+ }
+ container.write(mapView, valuesBuilder.build());
Map<String, Integer> viewContents =
container
@@ -175,13 +184,15 @@ public class SideInputContainerTest {
assertThat(viewContents, hasEntry("two", 2));
assertThat(viewContents.size(), is(2));
- WindowedValue<KV<String, Integer>> three =
- WindowedValue.of(
- KV.of("three", 3),
- new Instant(300L),
- SECOND_WINDOW,
- PaneInfo.createPane(false, false, Timing.EARLY, 1, -1));
- container.write(mapView, ImmutableList.<WindowedValue<?>>of(three));
+ ImmutableList.Builder<WindowedValue<?>> overwriteValuesBuilder = ImmutableList.builder();
+ for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("three", 3))) {
+ overwriteValuesBuilder.add(WindowedValue.of(
+ materializedValue,
+ new Instant(300L),
+ SECOND_WINDOW,
+ PaneInfo.createPane(false, false, Timing.EARLY, 1, -1)));
+ }
+ container.write(mapView, overwriteValuesBuilder.build());
Map<String, Integer> overwrittenViewContents =
container
@@ -209,10 +220,7 @@ public class SideInputContainerTest {
PCollection<KV<String, String>> input =
pipeline.apply(Create.empty(new TypeDescriptor<KV<String, String>>() {}));
PCollectionView<Map<String, Iterable<String>>> newView =
- PCollectionViews.multimapView(
- input,
- WindowingStrategy.globalDefault(),
- KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
+ input.apply(View.<String, String>asMultimap());
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("unknown views");
@@ -232,19 +240,22 @@ public class SideInputContainerTest {
@Test
public void writeForMultipleElementsInDifferentWindowsSucceeds() throws Exception {
- WindowedValue<Double> firstWindowedValue =
- WindowedValue.of(
- 2.875,
- FIRST_WINDOW.maxTimestamp().minus(200L),
- FIRST_WINDOW,
- PaneInfo.ON_TIME_AND_ONLY_FIRING);
- WindowedValue<Double> secondWindowedValue =
- WindowedValue.of(
- 4.125,
- SECOND_WINDOW.maxTimestamp().minus(2_000_000L),
- SECOND_WINDOW,
- PaneInfo.ON_TIME_AND_ONLY_FIRING);
- container.write(singletonView, ImmutableList.of(firstWindowedValue, secondWindowedValue));
+ ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder();
+ for (Object materializedValue : materializeValuesFor(View.asSingleton(), 2.875)) {
+ valuesBuilder.add(WindowedValue.of(
+ materializedValue,
+ FIRST_WINDOW.maxTimestamp().minus(200L),
+ FIRST_WINDOW,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING));
+ }
+ for (Object materializedValue : materializeValuesFor(View.asSingleton(), 4.125)) {
+ valuesBuilder.add(WindowedValue.of(
+ materializedValue,
+ SECOND_WINDOW.maxTimestamp().minus(2_000_000L),
+ SECOND_WINDOW,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING));
+ }
+ container.write(singletonView, valuesBuilder.build());
assertThat(
container
.createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
@@ -259,20 +270,15 @@ public class SideInputContainerTest {
@Test
public void writeForMultipleIdenticalElementsInSameWindowSucceeds() throws Exception {
- WindowedValue<Integer> firstValue =
- WindowedValue.of(
- 44,
- FIRST_WINDOW.maxTimestamp().minus(200L),
- FIRST_WINDOW,
- PaneInfo.ON_TIME_AND_ONLY_FIRING);
- WindowedValue<Integer> secondValue =
- WindowedValue.of(
- 44,
- FIRST_WINDOW.maxTimestamp().minus(200L),
- FIRST_WINDOW,
- PaneInfo.ON_TIME_AND_ONLY_FIRING);
-
- container.write(iterableView, ImmutableList.of(firstValue, secondValue));
+ ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder();
+ for (Object materializedValue : materializeValuesFor(View.asIterable(), 44, 44)) {
+ valuesBuilder.add(WindowedValue.of(
+ materializedValue,
+ FIRST_WINDOW.maxTimestamp().minus(200L),
+ FIRST_WINDOW,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING));
+ }
+ container.write(iterableView, valuesBuilder.build());
assertThat(
container
@@ -283,13 +289,15 @@ public class SideInputContainerTest {
@Test
public void writeForElementInMultipleWindowsSucceeds() throws Exception {
- WindowedValue<Double> multiWindowedValue =
- WindowedValue.of(
- 2.875,
- FIRST_WINDOW.maxTimestamp().minus(200L),
- ImmutableList.of(FIRST_WINDOW, SECOND_WINDOW),
- PaneInfo.ON_TIME_AND_ONLY_FIRING);
- container.write(singletonView, ImmutableList.of(multiWindowedValue));
+ ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder();
+ for (Object materializedValue : materializeValuesFor(View.asSingleton(), 2.875)) {
+ valuesBuilder.add(WindowedValue.of(
+ materializedValue,
+ FIRST_WINDOW.maxTimestamp().minus(200L),
+ ImmutableList.of(FIRST_WINDOW, SECOND_WINDOW),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING));
+ }
+ container.write(singletonView, valuesBuilder.build());
assertThat(
container
.createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
@@ -304,19 +312,22 @@ public class SideInputContainerTest {
@Test
public void finishDoesNotOverwriteWrittenElements() throws Exception {
- WindowedValue<KV<String, Integer>> one =
- WindowedValue.of(
- KV.of("one", 1),
- new Instant(1L),
- SECOND_WINDOW,
- PaneInfo.createPane(true, false, Timing.EARLY));
- WindowedValue<KV<String, Integer>> two =
- WindowedValue.of(
- KV.of("two", 2),
- new Instant(20L),
- SECOND_WINDOW,
- PaneInfo.createPane(true, false, Timing.EARLY));
- container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two));
+ ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder();
+ for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("one", 1))) {
+ valuesBuilder.add(WindowedValue.of(
+ materializedValue,
+ new Instant(1L),
+ SECOND_WINDOW,
+ PaneInfo.createPane(true, false, Timing.EARLY)));
+ }
+ for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("two", 2))) {
+ valuesBuilder.add(WindowedValue.of(
+ materializedValue,
+ new Instant(20L),
+ SECOND_WINDOW,
+ PaneInfo.createPane(true, false, Timing.EARLY)));
+ }
+ container.write(mapView, valuesBuilder.build());
immediatelyInvokeCallback(mapView, SECOND_WINDOW);
@@ -362,14 +373,15 @@ public class SideInputContainerTest {
*/
@Test
public void isReadyForSomeNotReadyViewsFalseUntilElements() {
- container.write(
- mapView,
- ImmutableList.of(
- WindowedValue.of(
- KV.of("one", 1),
- SECOND_WINDOW.maxTimestamp().minus(100L),
- SECOND_WINDOW,
- PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+ ImmutableList.Builder<WindowedValue<?>> mapValuesBuilder = ImmutableList.builder();
+ for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("one", 1))) {
+ mapValuesBuilder.add(WindowedValue.of(
+ materializedValue,
+ SECOND_WINDOW.maxTimestamp().minus(100L),
+ SECOND_WINDOW,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING));
+ }
+ container.write(mapView, mapValuesBuilder.build());
ReadyCheckingSideInputReader reader =
container.createReaderForViews(ImmutableList.of(mapView, singletonView));
@@ -378,25 +390,27 @@ public class SideInputContainerTest {
assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(false));
- container.write(
- mapView,
- ImmutableList.of(
- WindowedValue.of(
- KV.of("too", 2),
- FIRST_WINDOW.maxTimestamp().minus(100L),
- FIRST_WINDOW,
- PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+ ImmutableList.Builder<WindowedValue<?>> newMapValuesBuilder = ImmutableList.builder();
+ for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("too", 2))) {
+ newMapValuesBuilder.add(WindowedValue.of(
+ materializedValue,
+ FIRST_WINDOW.maxTimestamp().minus(100L),
+ FIRST_WINDOW,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING));
+ }
+ container.write(mapView, newMapValuesBuilder.build());
// Cached value is false
assertThat(reader.isReady(mapView, FIRST_WINDOW), is(false));
- container.write(
- singletonView,
- ImmutableList.of(
- WindowedValue.of(
- 1.25,
- SECOND_WINDOW.maxTimestamp().minus(100L),
- SECOND_WINDOW,
- PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+ ImmutableList.Builder<WindowedValue<?>> singletonValuesBuilder = ImmutableList.builder();
+ for (Object materializedValue : materializeValuesFor(View.asSingleton(), 1.25)) {
+ singletonValuesBuilder.add(WindowedValue.of(
+ materializedValue,
+ SECOND_WINDOW.maxTimestamp().minus(100L),
+ SECOND_WINDOW,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING));
+ }
+ container.write(singletonView, singletonValuesBuilder.build());
assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true));
assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(false));
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
index 5bc48b7..3716ec8 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
@@ -32,11 +32,11 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.Values;
-import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
+import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionViews;
+import org.apache.beam.sdk.values.PCollectionView;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
@@ -56,10 +56,7 @@ public class ViewEvaluatorFactoryTest {
@Test
public void testInMemoryEvaluator() throws Exception {
PCollection<String> input = p.apply(Create.of("foo", "bar"));
- CreatePCollectionView<String, Iterable<String>> createView =
- CreatePCollectionView.of(
- PCollectionViews.iterableView(
- input, input.getWindowingStrategy(), StringUtf8Coder.of()));
+ PCollectionView<Iterable<String>> pCollectionView = input.apply(View.<String>asIterable());
PCollection<Iterable<String>> concat =
input.apply(WithKeys.<Void, String>of((Void) null))
.setCoder(KvCoder.of(VoidCoder.of(), StringUtf8Coder.of()))
@@ -67,11 +64,11 @@ public class ViewEvaluatorFactoryTest {
.apply(Values.<Iterable<String>>create());
PCollection<Iterable<String>> view =
concat.apply(
- new ViewOverrideFactory.WriteView<String, Iterable<String>>(createView.getView()));
+ new ViewOverrideFactory.WriteView<String, Iterable<String>>(pCollectionView));
EvaluationContext context = mock(EvaluationContext.class);
TestViewWriter<String, Iterable<String>> viewWriter = new TestViewWriter<>();
- when(context.createPCollectionViewWriter(concat, createView.getView())).thenReturn(viewWriter);
+ when(context.createPCollectionViewWriter(concat, pCollectionView)).thenReturn(viewWriter);
CommittedBundle<String> inputBundle = bundleFactory.createBundle(input).commit(Instant.now());
AppliedPTransform<?, ?, ?> producer = DirectGraphs.getProducer(view);
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
index 94d8d70..556cac5 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
@@ -34,13 +34,13 @@ import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews;
-import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.sdk.values.TupleTag;
import org.hamcrest.Matchers;
import org.junit.Rule;
import org.junit.Test;
@@ -59,8 +59,7 @@ public class ViewOverrideFactoryTest implements Serializable {
@Test
public void replacementGetViewReturnsOriginal() {
final PCollection<Integer> ints = p.apply("CreateContents", Create.of(1, 2, 3));
- final PCollectionView<List<Integer>> view =
- PCollectionViews.listView(ints, WindowingStrategy.globalDefault(), ints.getCoder());
+ final PCollectionView<List<Integer>> view = ints.apply(View.<Integer>asList());
PTransformReplacement<PCollection<Integer>, PCollection<Integer>> replacement =
factory.getReplacementTransform(
AppliedPTransform
@@ -89,7 +88,7 @@ public class ViewOverrideFactoryTest implements Serializable {
// so not asserted one way or the other
assertThat(
replacementView.getTagInternal(),
- equalTo(view.getTagInternal()));
+ equalTo((TupleTag) view.getTagInternal()));
assertThat(
replacementView.getViewFn(),
Matchers.<ViewFn<?, ?>>equalTo(view.getViewFn()));
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index 79a23cc..cffcc5a 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -38,7 +38,6 @@ import java.util.List;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.beam.runners.direct.WriteWithShardingFactory.CalculateShardsFn;
-import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.DynamicFileDestinations;
import org.apache.beam.sdk.io.FileBasedSink;
@@ -56,15 +55,14 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -217,9 +215,8 @@ public class WriteWithShardingFactoryTest implements Serializable {
public void keyBasedOnCountFnFewElementsExtraShards() throws Exception {
long countValue = (long) WriteWithShardingFactory.MIN_SHARDS_FOR_LOG + 3;
PCollection<Long> inputCount = p.apply(Create.of(countValue));
- PCollectionView<Long> elementCountView =
- PCollectionViews.singletonView(
- inputCount, WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of());
+ PCollectionView<Long> elementCountView = inputCount.apply(
+ View.<Long>asSingleton().withDefaultValue(countValue));
CalculateShardsFn fn = new CalculateShardsFn(3);
DoFnTester<Long, Integer> fnTester = DoFnTester.of(fn);
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index cec01f8..aa5cc39 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -307,7 +307,6 @@ class FlinkStreamingTransformTranslators {
intToViewMapping.put(count, sideInput);
tagToIntMapping.put(tag, count);
count++;
- Coder<Iterable<WindowedValue<?>>> coder = sideInput.getCoderInternal();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
index f275290..fb3f375 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.flink.translation.functions;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Collections;
@@ -24,8 +25,10 @@ import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.Materializations.MultimapView;
+import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
@@ -35,6 +38,13 @@ import org.apache.flink.api.common.functions.RuntimeContext;
* A {@link SideInputReader} for the Flink Batch Runner.
*/
public class FlinkSideInputReader implements SideInputReader {
+ /** A {@link MultimapView} which always returns an empty iterable. */
+ private static final MultimapView EMPTY_MULTMAP_VIEW = new MultimapView() {
+ @Override
+ public Iterable get(Object o) {
+ return Collections.EMPTY_LIST;
+ }
+ };
private final Map<TupleTag<?>, WindowingStrategy<?, ?>> sideInputs;
@@ -42,6 +52,16 @@ public class FlinkSideInputReader implements SideInputReader {
public FlinkSideInputReader(Map<PCollectionView<?>, WindowingStrategy<?, ?>> indexByView,
RuntimeContext runtimeContext) {
+ for (PCollectionView<?> view : indexByView.keySet()) {
+ checkArgument(
+ Materializations.MULTIMAP_MATERIALIZATION_URN.equals(
+ view.getViewFn().getMaterialization().getUrn()),
+ "This handler is only capable of dealing with %s materializations "
+ + "but was asked to handle %s for PCollectionView with tag %s.",
+ Materializations.MULTIMAP_MATERIALIZATION_URN,
+ view.getViewFn().getMaterialization().getUrn(),
+ view.getTagInternal().getId());
+ }
sideInputs = new HashMap<>();
for (Map.Entry<PCollectionView<?>, WindowingStrategy<?, ?>> entry : indexByView.entrySet()) {
sideInputs.put(entry.getKey().getTagInternal(), entry.getValue());
@@ -53,7 +73,7 @@ public class FlinkSideInputReader implements SideInputReader {
@Override
public <T> T get(PCollectionView<T> view, BoundedWindow window) {
checkNotNull(view, "View passed to sideInput cannot be null");
- TupleTag<Iterable<WindowedValue<?>>> tag = view.getTagInternal();
+ TupleTag<?> tag = view.getTagInternal();
checkNotNull(
sideInputs.get(tag),
"Side input for " + view + " not available.");
@@ -63,7 +83,8 @@ public class FlinkSideInputReader implements SideInputReader {
tag.getId(), new SideInputInitializer<>(view));
T result = sideInputs.get(window);
if (result == null) {
- result = view.getViewFn().apply(Collections.<WindowedValue<?>>emptyList());
+ ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) view.getViewFn();
+ result = viewFn.apply(EMPTY_MULTMAP_VIEW);
}
return result;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java
index 12222b4..782f72a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java
@@ -17,12 +17,23 @@
*/
package org.apache.beam.runners.flink.translation.functions;
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.beam.runners.core.InMemoryMultimapSideInputView;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.Materializations.MultimapView;
+import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
@@ -30,24 +41,33 @@ import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
* {@link BroadcastVariableInitializer} that initializes the broadcast input as a {@code Map}
* from window to side input.
*/
-public class SideInputInitializer<ElemT, ViewT, W extends BoundedWindow>
- implements BroadcastVariableInitializer<WindowedValue<ElemT>, Map<BoundedWindow, ViewT>> {
+public class SideInputInitializer<ViewT>
+ implements BroadcastVariableInitializer<WindowedValue<?>, Map<BoundedWindow, ViewT>> {
PCollectionView<ViewT> view;
public SideInputInitializer(PCollectionView<ViewT> view) {
+ checkArgument(
+ Materializations.MULTIMAP_MATERIALIZATION_URN.equals(
+ view.getViewFn().getMaterialization().getUrn()),
+ "This handler is only capable of dealing with %s materializations "
+ + "but was asked to handle %s for PCollectionView with tag %s.",
+ Materializations.MULTIMAP_MATERIALIZATION_URN,
+ view.getViewFn().getMaterialization().getUrn(),
+ view.getTagInternal().getId());
this.view = view;
}
@Override
public Map<BoundedWindow, ViewT> initializeBroadcastVariable(
- Iterable<WindowedValue<ElemT>> inputValues) {
+ Iterable<WindowedValue<?>> inputValues) {
// first partition into windows
- Map<BoundedWindow, List<WindowedValue<ElemT>>> partitionedElements = new HashMap<>();
- for (WindowedValue<ElemT> value: inputValues) {
+ Map<BoundedWindow, List<WindowedValue<KV<?, ?>>>> partitionedElements = new HashMap<>();
+ for (WindowedValue<KV<?, ?>> value
+ : (Iterable<WindowedValue<KV<?, ?>>>) (Iterable) inputValues) {
for (BoundedWindow window: value.getWindows()) {
- List<WindowedValue<ElemT>> windowedValues = partitionedElements.get(window);
+ List<WindowedValue<KV<?, ?>>> windowedValues = partitionedElements.get(window);
if (windowedValues == null) {
windowedValues = new ArrayList<>();
partitionedElements.put(window, windowedValues);
@@ -58,14 +78,20 @@ public class SideInputInitializer<ElemT, ViewT, W extends BoundedWindow>
Map<BoundedWindow, ViewT> resultMap = new HashMap<>();
- for (Map.Entry<BoundedWindow, List<WindowedValue<ElemT>>> elements:
+ for (Map.Entry<BoundedWindow, List<WindowedValue<KV<?, ?>>>> elements:
partitionedElements.entrySet()) {
- @SuppressWarnings("unchecked")
- Iterable<WindowedValue<?>> elementsIterable =
- (List<WindowedValue<?>>) (List<?>) elements.getValue();
-
- resultMap.put(elements.getKey(), view.getViewFn().apply(elementsIterable));
+ ViewFn<MultimapView, ViewT> viewFn = (ViewFn<MultimapView, ViewT>) view.getViewFn();
+ Coder keyCoder = ((KvCoder<?, ?>) view.getCoderInternal()).getKeyCoder();
+ resultMap.put(elements.getKey(), viewFn.apply(InMemoryMultimapSideInputView.fromIterable(
+ keyCoder,
+ (Iterable) Iterables.transform(elements.getValue(),
+ new Function<WindowedValue<KV<?, ?>>, KV<?, ?>>() {
+ @Override
+ public KV<?, ?> apply(WindowedValue<KV<?, ?>> windowedValue) {
+ return windowedValue.getValue();
+ }
+ }))));
}
return resultMap;
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
index ad17de8..33ac024f 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -35,6 +35,7 @@ import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
+import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -47,16 +48,19 @@ import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
-import org.apache.beam.sdk.testing.PCollectionViewTesting;
+import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
@@ -71,6 +75,7 @@ import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
import org.apache.flink.util.OutputTag;
import org.joda.time.Duration;
import org.joda.time.Instant;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -84,26 +89,19 @@ public class DoFnOperatorTest {
// views and windows for testing side inputs
private static final long WINDOW_MSECS_1 = 100;
private static final long WINDOW_MSECS_2 = 500;
-
- private WindowingStrategy<Object, IntervalWindow> windowingStrategy1 =
- WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_1)));
-
- private PCollectionView<Iterable<String>> view1 =
- PCollectionViewTesting.testingView(
- new TupleTag<Iterable<WindowedValue<String>>>() {},
- new PCollectionViewTesting.IdentityViewFn<String>(),
- StringUtf8Coder.of(),
- windowingStrategy1);
-
- private WindowingStrategy<Object, IntervalWindow> windowingStrategy2 =
- WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_2)));
-
- private PCollectionView<Iterable<String>> view2 =
- PCollectionViewTesting.testingView(
- new TupleTag<Iterable<WindowedValue<String>>>() {},
- new PCollectionViewTesting.IdentityViewFn<String>(),
- StringUtf8Coder.of(),
- windowingStrategy2);
+ private PCollectionView<Iterable<String>> view1;
+ private PCollectionView<Iterable<String>> view2;
+
+ @Before
+ public void setUp() {
+ PCollection<String> pc = Pipeline.create().apply(Create.of("1"));
+ view1 = pc
+ .apply(Window.<String>into(FixedWindows.of(new Duration(WINDOW_MSECS_1))))
+ .apply(View.<String>asIterable());
+ view2 = pc
+ .apply(Window.<String>into(FixedWindows.of(new Duration(WINDOW_MSECS_2))))
+ .apply(View.<String>asIterable());
+ }
@Test
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 81e7a97..cc43e27 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -991,15 +991,15 @@ public class DataflowPipelineTranslatorTest implements Serializable {
assertAllStepOutputsHaveUniqueIds(job);
List<Step> steps = job.getSteps();
- assertEquals(9, steps.size());
+ assertEquals(10, steps.size());
@SuppressWarnings("unchecked")
List<Map<String, Object>> toIsmRecordOutputs =
- (List<Map<String, Object>>) steps.get(7).getProperties().get(PropertyNames.OUTPUT_INFO);
+ (List<Map<String, Object>>) steps.get(8).getProperties().get(PropertyNames.OUTPUT_INFO);
assertTrue(
Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format"));
- Step collectionToSingletonStep = steps.get(8);
+ Step collectionToSingletonStep = steps.get(9);
assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
}
@@ -1023,16 +1023,16 @@ public class DataflowPipelineTranslatorTest implements Serializable {
assertAllStepOutputsHaveUniqueIds(job);
List<Step> steps = job.getSteps();
- assertEquals(3, steps.size());
+ assertEquals(4, steps.size());
@SuppressWarnings("unchecked")
List<Map<String, Object>> toIsmRecordOutputs =
- (List<Map<String, Object>>) steps.get(1).getProperties().get(PropertyNames.OUTPUT_INFO);
+ (List<Map<String, Object>>) steps.get(2).getProperties().get(PropertyNames.OUTPUT_INFO);
assertTrue(
Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format"));
- Step collectionToSingletonStep = steps.get(2);
+ Step collectionToSingletonStep = steps.get(3);
assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 7cb8628..68e3e3c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -40,6 +40,7 @@ import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
import org.apache.beam.runners.spark.util.SideInputBroadcast;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.transforms.Combine;
@@ -527,7 +528,11 @@ public final class TransformTranslator {
Iterable<? extends WindowedValue<?>> iter =
context.getWindowedValues(context.getInput(transform));
PCollectionView<WriteT> output = transform.getView();
- Coder<Iterable<WindowedValue<?>>> coderInternal = output.getCoderInternal();
+ Coder<Iterable<WindowedValue<?>>> coderInternal =
+ (Coder) IterableCoder.of(
+ WindowedValue.getFullCoder(
+ output.getCoderInternal(),
+ output.getWindowingStrategyInternal().getWindowFn().windowCoder()));
@SuppressWarnings("unchecked")
Iterable<WindowedValue<?>> iterCast = (Iterable<WindowedValue<?>>) iter;
[6/7] beam git commit: Update Dataflow worker image to be compatible
with side input changes.
Posted by lc...@apache.org.
Update Dataflow worker image to be compatible with side input changes.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d5a8aea0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d5a8aea0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d5a8aea0
Branch: refs/heads/master
Commit: d5a8aea016286106950135743c1c6de667b10e17
Parents: 12246ad
Author: Luke Cwik <lc...@google.com>
Authored: Mon Nov 13 13:09:13 2017 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Nov 15 08:59:51 2017 -0800
----------------------------------------------------------------------
runners/google-cloud-dataflow-java/pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d5a8aea0/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index 2e08181..9312a8e 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -33,7 +33,7 @@
<packaging>jar</packaging>
<properties>
- <dataflow.container_version>beam-master-20170926</dataflow.container_version>
+ <dataflow.container_version>beam-master-20171113</dataflow.container_version>
<dataflow.fnapi_environment_major_version>1</dataflow.fnapi_environment_major_version>
<dataflow.legacy_environment_major_version>6</dataflow.legacy_environment_major_version>
</properties>
[2/7] beam git commit: Replace the View.As transforms for Dataflow
batch because the entire implementation is specialized.
Posted by lc...@apache.org.
Replace the View.As transforms for Dataflow batch because the entire implementation is specialized.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c2f815c5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c2f815c5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c2f815c5
Branch: refs/heads/master
Commit: c2f815c581cf5d6cd72b600cef42f436c9702d85
Parents: d5a8aea
Author: Luke Cwik <lc...@google.com>
Authored: Tue Nov 14 07:47:53 2017 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Nov 15 08:59:51 2017 -0800
----------------------------------------------------------------------
.../runners/dataflow/BatchViewOverrides.java | 53 ++++-----
.../runners/dataflow/CreateDataflowView.java | 19 +++-
.../beam/runners/dataflow/DataflowRunner.java | 110 ++++++++++++++++---
.../dataflow/StreamingViewOverrides.java | 2 +-
.../DataflowPipelineTranslatorTest.java | 12 +-
5 files changed, 142 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/c2f815c5/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
index 9a77b4b..8ed41cb 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
@@ -116,7 +116,7 @@ class BatchViewOverrides {
* a warning to users to specify a deterministic key coder.
*/
static class BatchViewAsMap<K, V>
- extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
+ extends PTransform<PCollection<KV<K, V>>, PCollection<?>> {
/**
* A {@link DoFn} which groups elements by window boundaries. For each group,
@@ -193,11 +193,11 @@ class BatchViewOverrides {
}
@Override
- public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
+ public PCollection<?> expand(PCollection<KV<K, V>> input) {
return this.<BoundedWindow>applyInternal(input);
}
- private <W extends BoundedWindow> PCollectionView<Map<K, V>>
+ private <W extends BoundedWindow> PCollection<?>
applyInternal(PCollection<KV<K, V>> input) {
try {
return BatchViewAsMultimap.applyForMapLike(runner, input, view, true /* unique keys */);
@@ -216,7 +216,7 @@ class BatchViewOverrides {
}
/** Transforms the input {@link PCollection} into a singleton {@link Map} per window. */
- private <W extends BoundedWindow> PCollectionView<Map<K, V>>
+ private <W extends BoundedWindow> PCollection<?>
applyForSingletonFallback(PCollection<KV<K, V>> input) {
@SuppressWarnings("unchecked")
Coder<W> windowCoder = (Coder<W>)
@@ -280,7 +280,7 @@ class BatchViewOverrides {
* a warning to users to specify a deterministic key coder.
*/
static class BatchViewAsMultimap<K, V>
- extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
+ extends PTransform<PCollection<KV<K, V>>, PCollection<?>> {
/**
* A {@link PTransform} that groups elements by the hash of window's byte representation
* if the input {@link PCollection} is not within the global window. Otherwise by the hash
@@ -672,11 +672,11 @@ class BatchViewOverrides {
}
@Override
- public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
+ public PCollection<?> expand(PCollection<KV<K, V>> input) {
return this.<BoundedWindow>applyInternal(input);
}
- private <W extends BoundedWindow> PCollectionView<Map<K, Iterable<V>>>
+ private <W extends BoundedWindow> PCollection<?>
applyInternal(PCollection<KV<K, V>> input) {
try {
return applyForMapLike(runner, input, view, false /* unique keys not expected */);
@@ -690,7 +690,7 @@ class BatchViewOverrides {
}
/** Transforms the input {@link PCollection} into a singleton {@link Map} per window. */
- private <W extends BoundedWindow> PCollectionView<Map<K, Iterable<V>>>
+ private <W extends BoundedWindow> PCollection<?>
applyForSingletonFallback(PCollection<KV<K, V>> input) {
@SuppressWarnings("unchecked")
Coder<W> windowCoder = (Coder<W>)
@@ -727,7 +727,7 @@ class BatchViewOverrides {
view);
}
- private static <K, V, W extends BoundedWindow, ViewT> PCollectionView<ViewT> applyForMapLike(
+ private static <K, V, W extends BoundedWindow, ViewT> PCollection<?> applyForMapLike(
DataflowRunner runner,
PCollection<KV<K, V>> input,
PCollectionView<ViewT> view,
@@ -804,9 +804,10 @@ class BatchViewOverrides {
PCollectionList.of(ImmutableList.of(
perHashWithReifiedWindows, windowMapSizeMetadata, windowMapKeysMetadata));
- Pipeline.applyTransform(outputs, Flatten.<IsmRecord<WindowedValue<V>>>pCollections())
- .apply(CreateDataflowView.<IsmRecord<WindowedValue<V>>, ViewT>of(view));
- return view;
+ PCollection<IsmRecord<WindowedValue<V>>> flattenedOutputs =
+ Pipeline.applyTransform(outputs, Flatten.<IsmRecord<WindowedValue<V>>>pCollections());
+ flattenedOutputs.apply(CreateDataflowView.<IsmRecord<WindowedValue<V>>, ViewT>forBatch(view));
+ return flattenedOutputs;
}
@Override
@@ -843,7 +844,7 @@ class BatchViewOverrides {
* </ul>
*/
static class BatchViewAsSingleton<T>
- extends PTransform<PCollection<T>, PCollectionView<T>> {
+ extends PTransform<PCollection<T>, PCollection<?>> {
/**
* A {@link DoFn} that outputs {@link IsmRecord}s. These records are structured as follows:
@@ -900,7 +901,7 @@ class BatchViewOverrides {
}
@Override
- public PCollectionView<T> expand(PCollection<T> input) {
+ public PCollection<?> expand(PCollection<T> input) {
@SuppressWarnings("unchecked")
Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>)
input.getWindowingStrategy().getWindowFn().windowCoder();
@@ -913,7 +914,7 @@ class BatchViewOverrides {
view);
}
- static <T, FinalT, ViewT, W extends BoundedWindow> PCollectionView<ViewT>
+ static <T, FinalT, ViewT, W extends BoundedWindow> PCollection<?>
applyForSingleton(
DataflowRunner runner,
PCollection<T> input,
@@ -936,8 +937,8 @@ class BatchViewOverrides {
runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted);
reifiedPerWindowAndSorted.apply(
- CreateDataflowView.<IsmRecord<WindowedValue<FinalT>>, ViewT>of(view));
- return view;
+ CreateDataflowView.<IsmRecord<WindowedValue<FinalT>>, ViewT>forBatch(view));
+ return reifiedPerWindowAndSorted;
}
@Override
@@ -969,7 +970,7 @@ class BatchViewOverrides {
* </ul>
*/
static class BatchViewAsList<T>
- extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
+ extends PTransform<PCollection<T>, PCollection<?>> {
/**
* A {@link DoFn} which creates {@link IsmRecord}s assuming that each element is within the
* global window. Each {@link IsmRecord} has
@@ -1054,11 +1055,11 @@ class BatchViewOverrides {
}
@Override
- public PCollectionView<List<T>> expand(PCollection<T> input) {
+ public PCollection<?> expand(PCollection<T> input) {
return applyForIterableLike(runner, input, view);
}
- static <T, W extends BoundedWindow, ViewT> PCollectionView<ViewT> applyForIterableLike(
+ static <T, W extends BoundedWindow, ViewT> PCollection<?> applyForIterableLike(
DataflowRunner runner,
PCollection<T> input,
PCollectionView<ViewT> view) {
@@ -1081,8 +1082,8 @@ class BatchViewOverrides {
runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted);
reifiedPerWindowAndSorted.apply(
- CreateDataflowView.<IsmRecord<WindowedValue<T>>, ViewT>of(view));
- return view;
+ CreateDataflowView.<IsmRecord<WindowedValue<T>>, ViewT>forBatch(view));
+ return reifiedPerWindowAndSorted;
}
PCollection<IsmRecord<WindowedValue<T>>> reifiedPerWindowAndSorted = input
@@ -1092,8 +1093,8 @@ class BatchViewOverrides {
runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted);
reifiedPerWindowAndSorted.apply(
- CreateDataflowView.<IsmRecord<WindowedValue<T>>, ViewT>of(view));
- return view;
+ CreateDataflowView.<IsmRecord<WindowedValue<T>>, ViewT>forBatch(view));
+ return reifiedPerWindowAndSorted;
}
@Override
@@ -1127,7 +1128,7 @@ class BatchViewOverrides {
* </ul>
*/
static class BatchViewAsIterable<T>
- extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
+ extends PTransform<PCollection<T>, PCollection<?>> {
private final DataflowRunner runner;
private final PCollectionView<Iterable<T>> view;
@@ -1140,7 +1141,7 @@ class BatchViewOverrides {
}
@Override
- public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
+ public PCollection<?> expand(PCollection<T> input) {
return BatchViewAsList.applyForIterableLike(runner, input, view);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c2f815c5/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
index 3b01d69..10888c2 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
@@ -25,20 +25,29 @@ import org.apache.beam.sdk.values.PCollectionView;
/** A {@link DataflowRunner} marker class for creating a {@link PCollectionView}. */
public class CreateDataflowView<ElemT, ViewT>
extends PTransform<PCollection<ElemT>, PCollection<ElemT>> {
- public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> of(PCollectionView<ViewT> view) {
- return new CreateDataflowView<>(view);
+ public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> forBatch(PCollectionView<ViewT> view) {
+ return new CreateDataflowView<>(view, false);
+ }
+
+ public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> forStreaming(PCollectionView<ViewT> view) {
+ return new CreateDataflowView<>(view, true);
}
private final PCollectionView<ViewT> view;
+ private final boolean streaming;
- private CreateDataflowView(PCollectionView<ViewT> view) {
+ private CreateDataflowView(PCollectionView<ViewT> view, boolean streaming) {
this.view = view;
+ this.streaming = streaming;
}
@Override
public PCollection<ElemT> expand(PCollection<ElemT> input) {
- return PCollection.createPrimitiveOutputInternal(
- input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), input.getCoder());
+ if (streaming) {
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), input.getCoder());
+ }
+ return (PCollection) view.getPCollection();
}
public PCollectionView<ViewT> getView() {
http://git-wip-us.apache.org/repos/asf/beam/blob/c2f815c5/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 0a20a0f..72e4f83 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -58,6 +58,7 @@ import java.util.Random;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.CoderTranslation;
import org.apache.beam.runners.core.construction.DeduplicatedFlattenFactory;
@@ -124,6 +125,7 @@ import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
@@ -139,7 +141,6 @@ import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.PValue;
@@ -395,28 +396,28 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
BatchStatefulParDoOverrides.singleOutputOverrideFactory(options)))
.add(
PTransformOverride.of(
- PTransformMatchers.createViewWithViewFn(PCollectionViews.MapViewFn.class),
- new ReflectiveOneToOneOverrideFactory(
+ PTransformMatchers.classEqualTo(View.AsMap.class),
+ new ReflectiveViewOverrideFactory(
BatchViewOverrides.BatchViewAsMap.class, this)))
.add(
PTransformOverride.of(
- PTransformMatchers.createViewWithViewFn(PCollectionViews.MultimapViewFn.class),
- new ReflectiveOneToOneOverrideFactory(
+ PTransformMatchers.classEqualTo(View.AsMultimap.class),
+ new ReflectiveViewOverrideFactory(
BatchViewOverrides.BatchViewAsMultimap.class, this)))
.add(
PTransformOverride.of(
- PTransformMatchers.createViewWithViewFn(PCollectionViews.SingletonViewFn.class),
- new ReflectiveOneToOneOverrideFactory(
+ PTransformMatchers.classEqualTo(View.AsSingleton.class),
+ new ReflectiveViewOverrideFactory(
BatchViewOverrides.BatchViewAsSingleton.class, this)))
.add(
PTransformOverride.of(
- PTransformMatchers.createViewWithViewFn(PCollectionViews.ListViewFn.class),
- new ReflectiveOneToOneOverrideFactory(
+ PTransformMatchers.classEqualTo(View.AsList.class),
+ new ReflectiveViewOverrideFactory(
BatchViewOverrides.BatchViewAsList.class, this)))
.add(
PTransformOverride.of(
- PTransformMatchers.createViewWithViewFn(PCollectionViews.IterableViewFn.class),
- new ReflectiveOneToOneOverrideFactory(
+ PTransformMatchers.classEqualTo(View.AsIterable.class),
+ new ReflectiveViewOverrideFactory(
BatchViewOverrides.BatchViewAsIterable.class, this)));
}
overridesBuilder
@@ -435,6 +436,81 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
return overridesBuilder.build();
}
+ /**
+ * Replace the View.AsYYY transform with specialized view overrides for Dataflow. It is required that the
+ * new replacement transform uses the supplied PCollectionView and does not create another instance.
+ */
+ private static class ReflectiveViewOverrideFactory<InputT, ViewT>
+ implements PTransformOverrideFactory<PCollection<InputT>,
+ PValue, PTransform<PCollection<InputT>, PValue>> {
+
+ private final Class<PTransform<PCollection<InputT>, PCollectionView<ViewT>>> replacement;
+ private final DataflowRunner runner;
+
+ private ReflectiveViewOverrideFactory(
+ Class<PTransform<PCollection<InputT>, PCollectionView<ViewT>>> replacement,
+ DataflowRunner runner) {
+ this.replacement = replacement;
+ this.runner = runner;
+ }
+
+ @Override
+ public PTransformReplacement<PCollection<InputT>, PValue> getReplacementTransform(
+ final AppliedPTransform<PCollection<InputT>,
+ PValue,
+ PTransform<PCollection<InputT>, PValue>> transform) {
+
+ final AtomicReference<CreatePCollectionView> viewTransformRef = new AtomicReference<>();
+ transform.getPipeline().traverseTopologically(new PipelineVisitor.Defaults() {
+ // Stores whether we have entered the expected composite view transform.
+ private boolean tracking = false;
+
+ @Override
+ public CompositeBehavior enterCompositeTransform(Node node) {
+ if (transform.getTransform() == node.getTransform()) {
+ tracking = true;
+ }
+ return super.enterCompositeTransform(node);
+ }
+
+ @Override
+ public void visitPrimitiveTransform(Node node) {
+ if (tracking && node.getTransform() instanceof CreatePCollectionView) {
+ checkState(
+ viewTransformRef.compareAndSet(null, (CreatePCollectionView) node.getTransform()),
+ "Found more then one instance of a CreatePCollectionView when "
+ + "attempting to replace %s, found [%s, %s]",
+ replacement, viewTransformRef.get(), node.getTransform());
+ }
+ }
+
+ @Override
+ public void leaveCompositeTransform(Node node) {
+ if (transform.getTransform() == node.getTransform()) {
+ tracking = false;
+ }
+ }
+ });
+
+ checkState(viewTransformRef.get() != null,
+ "Expected to find CreatePCollectionView contained within %s",
+ transform.getTransform());
+ PTransform<PCollection<InputT>, PCollectionView<ViewT>> rep =
+ InstanceBuilder.ofType(replacement)
+ .withArg(DataflowRunner.class, runner)
+ .withArg(CreatePCollectionView.class, viewTransformRef.get())
+ .build();
+ return PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(transform), (PTransform) rep);
+ }
+
+ @Override
+ public Map<PValue, ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> outputs, PValue newOutput) {
+ // We do not replace any of the outputs because we expect that the new PTransform will re-use the original
+ // PCollectionView that was returned.
+ return ImmutableMap.of();
+ }
+ }
+
private static class ReflectiveOneToOneOverrideFactory<
InputT, OutputT, TransformT extends PTransform<PCollection<InputT>, PCollection<OutputT>>>
extends SingleInputOutputOverrideFactory<
@@ -1258,19 +1334,21 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
*/
private static class Deduplicate<T>
extends PTransform<PCollection<ValueWithRecordId<T>>, PCollection<T>> {
+
// Use a finite set of keys to improve bundling. Without this, the key space
// will be the space of ids which is potentially very large, which results in much
// more per-key overhead.
private static final int NUM_RESHARD_KEYS = 10000;
+
@Override
public PCollection<T> expand(PCollection<ValueWithRecordId<T>> input) {
return input
.apply(WithKeys.of(new SerializableFunction<ValueWithRecordId<T>, Integer>() {
- @Override
- public Integer apply(ValueWithRecordId<T> value) {
- return Arrays.hashCode(value.getId()) % NUM_RESHARD_KEYS;
- }
- }))
+ @Override
+ public Integer apply(ValueWithRecordId<T> value) {
+ return Arrays.hashCode(value.getId()) % NUM_RESHARD_KEYS;
+ }
+ }))
// Reshuffle will dedup based on ids in ValueWithRecordId by passing the data through
// WindmillSink.
.apply(Reshuffle.<Integer, ValueWithRecordId<T>>of())
http://git-wip-us.apache.org/repos/asf/beam/blob/c2f815c5/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
index 1853248..69099c6 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
@@ -68,7 +68,7 @@ class StreamingViewOverrides {
return input
.apply(Combine.globally(new Concatenate<ElemT>()).withoutDefaults())
.apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder())))
- .apply(CreateDataflowView.<ElemT, ViewT>of(view));
+ .apply(CreateDataflowView.<ElemT, ViewT>forStreaming(view));
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c2f815c5/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index cc43e27..e03abb9 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -991,15 +991,15 @@ public class DataflowPipelineTranslatorTest implements Serializable {
assertAllStepOutputsHaveUniqueIds(job);
List<Step> steps = job.getSteps();
- assertEquals(10, steps.size());
+ assertEquals(5, steps.size());
@SuppressWarnings("unchecked")
List<Map<String, Object>> toIsmRecordOutputs =
- (List<Map<String, Object>>) steps.get(8).getProperties().get(PropertyNames.OUTPUT_INFO);
+ (List<Map<String, Object>>) steps.get(3).getProperties().get(PropertyNames.OUTPUT_INFO);
assertTrue(
Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format"));
- Step collectionToSingletonStep = steps.get(9);
+ Step collectionToSingletonStep = steps.get(4);
assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
}
@@ -1023,16 +1023,16 @@ public class DataflowPipelineTranslatorTest implements Serializable {
assertAllStepOutputsHaveUniqueIds(job);
List<Step> steps = job.getSteps();
- assertEquals(4, steps.size());
+ assertEquals(3, steps.size());
@SuppressWarnings("unchecked")
List<Map<String, Object>> toIsmRecordOutputs =
- (List<Map<String, Object>>) steps.get(2).getProperties().get(PropertyNames.OUTPUT_INFO);
+ (List<Map<String, Object>>) steps.get(1).getProperties().get(PropertyNames.OUTPUT_INFO);
assertTrue(
Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format"));
- Step collectionToSingletonStep = steps.get(3);
+ Step collectionToSingletonStep = steps.get(2);
assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
}
[5/7] beam git commit: Rebase and update code to honor findbugs
@Nullable/@Nonnull conversion
Posted by lc...@apache.org.
Rebase and update code to honor findbugs @Nullable/@Nonnull conversion
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/12246ad0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/12246ad0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/12246ad0
Branch: refs/heads/master
Commit: 12246ad0c6172fa235d96c69466b4876cd649523
Parents: 5e2593d
Author: Luke Cwik <lc...@google.com>
Authored: Fri Nov 10 11:18:13 2017 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Nov 15 08:59:51 2017 -0800
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/transforms/Materializations.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/12246ad0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java
index e606919..0e66c5c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.transforms;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.annotations.Internal;
@@ -43,7 +44,7 @@ public class Materializations {
* use the {@link Materializations#MULTIMAP_MATERIALIZATION_URN multimap materialization}.
*/
public interface MultimapView<K, V> {
- Iterable<V> get(K k);
+ Iterable<V> get(@Nullable K k);
}
/**
[7/7] beam git commit: [BEAM-2926] Migrate to using a trivial
multimap materialization within the Java SDK.
Posted by lc...@apache.org.
[BEAM-2926] Migrate to using a trivial multimap materialization within the Java SDK.
This closes #4011
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/30886ace
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/30886ace
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/30886ace
Branch: refs/heads/master
Commit: 30886acee9432d3bdfeb679c8dd717244bb86fb9
Parents: 7ce0a82 accb208
Author: Luke Cwik <lc...@google.com>
Authored: Wed Nov 15 09:00:23 2017 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Nov 15 09:00:23 2017 -0800
----------------------------------------------------------------------
.../apex/translation/ParDoTranslatorTest.java | 3 +-
.../core/construction/PTransformMatchers.java | 3 +-
.../core/construction/ParDoTranslation.java | 17 +-
.../CreatePCollectionViewTranslationTest.java | 10 +-
.../construction/PTransformMatchersTest.java | 33 +-
.../core/construction/ParDoTranslationTest.java | 7 +-
.../core/InMemoryMultimapSideInputView.java | 62 +++
.../beam/runners/core/SideInputHandler.java | 63 ++--
.../core/InMemoryMultimapSideInputViewTest.java | 53 +++
.../beam/runners/core/SideInputHandlerTest.java | 89 +++--
.../beam/runners/direct/SideInputContainer.java | 38 +-
.../runners/direct/EvaluationContextTest.java | 44 ++-
.../runners/direct/SideInputContainerTest.java | 226 +++++------
.../direct/ViewEvaluatorFactoryTest.java | 13 +-
.../runners/direct/ViewOverrideFactoryTest.java | 9 +-
.../direct/WriteWithShardingFactoryTest.java | 9 +-
.../FlinkStreamingTransformTranslators.java | 1 -
.../functions/FlinkSideInputReader.java | 27 +-
.../functions/SideInputInitializer.java | 50 ++-
.../flink/streaming/DoFnOperatorTest.java | 40 +-
runners/google-cloud-dataflow-java/pom.xml | 2 +-
.../runners/dataflow/BatchViewOverrides.java | 69 ++--
.../runners/dataflow/CreateDataflowView.java | 21 +-
.../beam/runners/dataflow/DataflowRunner.java | 152 +++++++-
.../dataflow/StreamingViewOverrides.java | 2 +-
.../spark/translation/TransformTranslator.java | 7 +-
.../spark/util/SparkSideInputReader.java | 50 ++-
.../org/apache/beam/sdk/transforms/Combine.java | 13 +-
.../apache/beam/sdk/transforms/DoFnTester.java | 20 +-
.../beam/sdk/transforms/Materializations.java | 30 +-
.../org/apache/beam/sdk/transforms/View.java | 67 +++-
.../org/apache/beam/sdk/transforms/ViewFn.java | 6 +-
.../apache/beam/sdk/values/PCollectionView.java | 7 +-
.../beam/sdk/values/PCollectionViews.java | 256 ++++++-------
.../sdk/testing/PCollectionViewTesting.java | 375 +++----------------
.../beam/sdk/transforms/DoFnTesterTest.java | 12 +-
.../beam/sdk/io/gcp/bigquery/BatchLoads.java | 14 +-
37 files changed, 997 insertions(+), 903 deletions(-)
----------------------------------------------------------------------
[3/7] beam git commit: [BEAM-2926] Migrate to using a trivial
multimap materialization within the Java SDK.
Posted by lc...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
index 6c91088..932ccd6 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
@@ -20,11 +20,17 @@ package org.apache.beam.runners.spark.util;
import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import java.util.Map;
import javax.annotation.Nullable;
+import org.apache.beam.runners.core.InMemoryMultimapSideInputView;
import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.Materializations.MultimapView;
+import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
@@ -34,7 +40,7 @@ import org.apache.beam.sdk.values.WindowingStrategy;
/**
- * A {@link SideInputReader} for thw SparkRunner.
+ * A {@link SideInputReader} for the SparkRunner.
*/
public class SparkSideInputReader implements SideInputReader {
private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs;
@@ -60,26 +66,30 @@ public class SparkSideInputReader implements SideInputReader {
//--- match the appropriate sideInput window.
// a tag will point to all matching sideInputs, that is all windows.
// now that we've obtained the appropriate sideInputWindow, all that's left is to filter by it.
- Iterable<WindowedValue<?>> availableSideInputs =
- (Iterable<WindowedValue<?>>) windowedBroadcastHelper.getValue().getValue();
- Iterable<WindowedValue<?>> sideInputForWindow =
- Iterables.filter(availableSideInputs, new Predicate<WindowedValue<?>>() {
- @Override
- public boolean apply(@Nullable WindowedValue<?> sideInputCandidate) {
- if (sideInputCandidate == null) {
- return false;
- }
- // first match of a sideInputWindow to the elementWindow is good enough.
- for (BoundedWindow sideInputCandidateWindow: sideInputCandidate.getWindows()) {
- if (sideInputCandidateWindow.equals(sideInputWindow)) {
- return true;
+ Iterable<WindowedValue<KV<?, ?>>> availableSideInputs =
+ (Iterable<WindowedValue<KV<?, ?>>>) windowedBroadcastHelper.getValue().getValue();
+ Iterable<KV<?, ?>> sideInputForWindow =
+ Iterables.transform(
+ Iterables.filter(availableSideInputs, new Predicate<WindowedValue<?>>() {
+ @Override
+ public boolean apply(@Nullable WindowedValue<?> sideInputCandidate) {
+ if (sideInputCandidate == null) {
+ return false;
+ }
+ return Iterables.contains(sideInputCandidate.getWindows(), sideInputWindow);
}
- }
- // no match found.
- return false;
- }
- });
- return view.getViewFn().apply(sideInputForWindow);
+ }),
+ new Function<WindowedValue<KV<?, ?>>, KV<?, ?>>() {
+ @Override
+ public KV<?, ?> apply(WindowedValue<KV<?, ?>> windowedValue) {
+ return windowedValue.getValue();
+ }
+ });
+
+ ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) view.getViewFn();
+ Coder keyCoder = ((KvCoder<?, ?>) view.getCoderInternal()).getKeyCoder();
+ return viewFn.apply(
+ InMemoryMultimapSideInputView.fromIterable(keyCoder, (Iterable) sideInputForWindow));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 3c5b55b..f86e9cc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -49,6 +49,7 @@ import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.CombineWithContext.Context;
import org.apache.beam.sdk.transforms.CombineWithContext.RequiresContextInternal;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
+import org.apache.beam.sdk.transforms.View.VoidKeyToMultimapMaterialization;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
@@ -1274,14 +1275,16 @@ public class Combine {
public PCollectionView<OutputT> expand(PCollection<InputT> input) {
PCollection<OutputT> combined =
input.apply(Combine.<InputT, OutputT>globally(fn).withoutDefaults().withFanout(fanout));
- PCollectionView<OutputT> view =
- PCollectionViews.singletonView(
- combined,
+ PCollection<KV<Void, OutputT>> materializationInput =
+ combined.apply(new VoidKeyToMultimapMaterialization<OutputT>());
+ PCollectionView<OutputT> view = PCollectionViews.singletonView(
+ materializationInput,
input.getWindowingStrategy(),
insertDefault,
insertDefault ? fn.defaultValue() : null,
- combined.getCoder());
- combined.apply(CreatePCollectionView.<OutputT, OutputT>of(view));
+ combined.getCoder());
+ materializationInput.apply(
+ CreatePCollectionView.<KV<Void, OutputT>, OutputT>of(view));
return view;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 6168710..d71f0fd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -47,7 +47,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
@@ -602,7 +601,24 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
return windowValue;
}
}
- return view.getViewFn().apply(Collections.<WindowedValue<?>>emptyList());
+ // Fallback to returning the default materialization if no data was supplied.
+ // This is really to support singleton views with default values.
+
+ // TODO: Update this to supply a materialization dependent on actual URN of materialization.
+ // Currently the SDK only supports the multimap materialization and it expects a
+ // mapping function.
+ checkState(Materializations.MULTIMAP_MATERIALIZATION_URN.equals(
+ view.getViewFn().getMaterialization().getUrn()),
+ "Only materializations of type %s supported, received %s",
+ Materializations.MULTIMAP_MATERIALIZATION_URN,
+ view.getViewFn().getMaterialization().getUrn());
+ return ((ViewFn<Materializations.MultimapView, T>) view.getViewFn()).apply(
+ new Materializations.MultimapView<Object, Object>() {
+ @Override
+ public Iterable<Object> get(Object o) {
+ return Collections.emptyList();
+ }
+ });
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java
index 6e4f83d..e606919 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java
@@ -21,7 +21,6 @@ package org.apache.beam.sdk.transforms;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.util.WindowedValue;
/**
* <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
@@ -32,29 +31,37 @@ import org.apache.beam.sdk.util.WindowedValue;
@Internal
public class Materializations {
/**
- * The URN for a {@link Materialization} where the primitive view type is an iterable of fully
+ * The URN for a {@link Materialization} where the primitive view type is an multimap of fully
* specified windowed values.
*/
@Experimental(Kind.CORE_RUNNERS_ONLY)
- public static final String ITERABLE_MATERIALIZATION_URN =
- "urn:beam:sideinput:materialization:iterable:0.1";
+ public static final String MULTIMAP_MATERIALIZATION_URN =
+ "urn:beam:sideinput:materialization:multimap:0.1";
+
+ /**
+ * Represents the {@code PrimitiveViewT} supplied to the {@link ViewFn} when it declares to
+ * use the {@link Materializations#MULTIMAP_MATERIALIZATION_URN multimap materialization}.
+ */
+ public interface MultimapView<K, V> {
+ Iterable<V> get(K k);
+ }
/**
* <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
*
- * <p>A {@link Materialization} where the primitive view type is an iterable of fully specified
- * windowed values.
+ * <p>A {@link Materialization} where the primitive view type is a multimap with fully
+ * specified windowed keys.
*/
@Internal
- public static <T> Materialization<Iterable<WindowedValue<T>>> iterable() {
- return new IterableMaterialization<>();
+ public static <K, V> Materialization<MultimapView<K, V>> multimap() {
+ return new MultimapMaterialization<>();
}
- private static class IterableMaterialization<T>
- implements Materialization<Iterable<WindowedValue<T>>> {
+ private static class MultimapMaterialization<K, V>
+ implements Materialization<MultimapView<K, V>> {
@Override
public String getUrn() {
- return ITERABLE_MATERIALIZATION_URN;
+ return MULTIMAP_MATERIALIZATION_URN;
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index eaa7925..ec8233e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -24,6 +24,8 @@ import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.KV;
@@ -258,9 +260,13 @@ public class View {
throw new IllegalStateException("Unable to create a side-input view from input", e);
}
- PCollectionView<List<T>> view =
- PCollectionViews.listView(input, input.getWindowingStrategy(), input.getCoder());
- input.apply(CreatePCollectionView.<T, List<T>>of(view));
+ PCollection<KV<Void, T>> materializationInput =
+ input.apply(new VoidKeyToMultimapMaterialization<T>());
+ PCollectionView<List<T>> view = PCollectionViews.listView(
+ materializationInput,
+ materializationInput.getWindowingStrategy());
+ materializationInput.apply(
+ CreatePCollectionView.<KV<Void, T>, List<T>>of(view));
return view;
}
}
@@ -285,9 +291,13 @@ public class View {
throw new IllegalStateException("Unable to create a side-input view from input", e);
}
- PCollectionView<Iterable<T>> view =
- PCollectionViews.iterableView(input, input.getWindowingStrategy(), input.getCoder());
- input.apply(CreatePCollectionView.<T, Iterable<T>>of(view));
+ PCollection<KV<Void, T>> materializationInput =
+ input.apply(new VoidKeyToMultimapMaterialization<T>());
+ PCollectionView<Iterable<T>> view = PCollectionViews.iterableView(
+ materializationInput,
+ materializationInput.getWindowingStrategy());
+ materializationInput.apply(
+ CreatePCollectionView.<KV<Void, T>, Iterable<T>>of(view));
return view;
}
}
@@ -428,9 +438,13 @@ public class View {
throw new IllegalStateException("Unable to create a side-input view from input", e);
}
- PCollectionView<Map<K, Iterable<V>>> view =
- PCollectionViews.multimapView(input, input.getWindowingStrategy(), input.getCoder());
- input.apply(CreatePCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
+ PCollection<KV<Void, KV<K, V>>> materializationInput =
+ input.apply(new VoidKeyToMultimapMaterialization<KV<K, V>>());
+ PCollectionView<Map<K, Iterable<V>>> view = PCollectionViews.multimapView(
+ materializationInput,
+ materializationInput.getWindowingStrategy());
+ materializationInput.apply(
+ CreatePCollectionView.<KV<Void, KV<K, V>>, Map<K, Iterable<V>>>of(view));
return view;
}
}
@@ -463,9 +477,13 @@ public class View {
throw new IllegalStateException("Unable to create a side-input view from input", e);
}
- PCollectionView<Map<K, V>> view =
- PCollectionViews.mapView(input, input.getWindowingStrategy(), input.getCoder());
- input.apply(CreatePCollectionView.<KV<K, V>, Map<K, V>>of(view));
+ PCollection<KV<Void, KV<K, V>>> materializationInput =
+ input.apply(new VoidKeyToMultimapMaterialization<KV<K, V>>());
+ PCollectionView<Map<K, V>> view = PCollectionViews.mapView(
+ materializationInput,
+ materializationInput.getWindowingStrategy());
+ materializationInput.apply(
+ CreatePCollectionView.<KV<Void, KV<K, V>>, Map<K, V>>of(view));
return view;
}
}
@@ -474,6 +492,31 @@ public class View {
// Internal details below
/**
+ * A {@link PTransform} which converts all values into {@link KV}s with {@link Void} keys.
+ *
+ * <p>TODO: Replace this materialization with specializations that optimize the various SDK
+ * requested views.
+ */
+ @Internal
+ static class VoidKeyToMultimapMaterialization<T>
+ extends PTransform<PCollection<T>, PCollection<KV<Void, T>>> {
+
+ private static class VoidKeyToMultimapMaterializationDoFn<T> extends DoFn<T, KV<Void, T>> {
+ @ProcessElement
+ public void processElement(ProcessContext ctxt) {
+ ctxt.output(KV.of((Void) null, ctxt.element()));
+ }
+ }
+
+ @Override
+ public PCollection<KV<Void, T>> expand(PCollection<T> input) {
+ PCollection output = input.apply(ParDo.of(new VoidKeyToMultimapMaterializationDoFn<>()));
+ output.setCoder(KvCoder.of(VoidCoder.of(), input.getCoder()));
+ return output;
+ }
+ }
+
+ /**
* <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
*
* <p>Creates a primitive {@link PCollectionView}.
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ViewFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ViewFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ViewFn.java
index d51a917..9291bc6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ViewFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ViewFn.java
@@ -36,8 +36,8 @@ import org.apache.beam.sdk.values.PCollectionView;
* {@link View#asIterable()}, and {@link View#asMap()} for more detail on specific views
* available in the SDK.
*
- * @param <PrimitiveViewT> the type of the underlying primitive view, provided by the runner
- * {@code <ViewT>} the type of the value(s) accessible via this {@link PCollectionView}
+ * @param <PrimitiveViewT> the type of the underlying primitive view required
+ * @param <ViewT> the type of the value(s) accessible via this {@link PCollectionView}
*/
@Internal
public abstract class ViewFn<PrimitiveViewT, ViewT> implements Serializable {
@@ -49,5 +49,5 @@ public abstract class ViewFn<PrimitiveViewT, ViewT> implements Serializable {
/**
* A function to adapt a primitive view type to a desired view type.
*/
- public abstract ViewT apply(PrimitiveViewT contents);
+ public abstract ViewT apply(PrimitiveViewT primitiveViewT);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java
index 7d87412..c212c34 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java
@@ -28,7 +28,6 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
-import org.apache.beam.sdk.util.WindowedValue;
/**
* A {@link PCollectionView PCollectionView<T>} is an immutable view of a {@link PCollection}
@@ -72,7 +71,7 @@ public interface PCollectionView<T> extends PValue, Serializable {
*/
@Deprecated
@Internal
- TupleTag<Iterable<WindowedValue<?>>> getTagInternal();
+ TupleTag<?> getTagInternal();
/**
* <b>For internal use only.</b>
@@ -83,7 +82,7 @@ public interface PCollectionView<T> extends PValue, Serializable {
*/
@Deprecated
@Internal
- ViewFn<Iterable<WindowedValue<?>>, T> getViewFn();
+ ViewFn<?, T> getViewFn();
/**
* <b>For internal use only.</b>
@@ -116,5 +115,5 @@ public interface PCollectionView<T> extends PValue, Serializable {
*/
@Deprecated
@Internal
- Coder<Iterable<WindowedValue<?>>> getCoderInternal();
+ Coder<?> getCoderInternal();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
index ed8fb76..30277f0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
@@ -17,14 +17,13 @@
*/
package org.apache.beam.sdk.values;
-import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -36,16 +35,15 @@ import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.transforms.Materialization;
import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.Materializations.MultimapView;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.WindowedValue;
/**
* <b>For internal use only; no backwards compatibility guarantees.</b>
@@ -56,88 +54,79 @@ import org.apache.beam.sdk.util.WindowedValue;
public class PCollectionViews {
/**
- * Returns a {@code PCollectionView<T>} capable of processing elements encoded using the provided
- * {@link Coder} and windowed using the provided * {@link WindowingStrategy}.
+ * Returns a {@code PCollectionView<T>} capable of processing elements windowed
+ * using the provided {@link WindowingStrategy}.
*
* <p>If {@code hasDefault} is {@code true}, then the view will take on the value
* {@code defaultValue} for any empty windows.
*/
public static <T, W extends BoundedWindow> PCollectionView<T> singletonView(
- PCollection<T> pCollection,
+ PCollection<KV<Void, T>> pCollection,
WindowingStrategy<?, W> windowingStrategy,
boolean hasDefault,
@Nullable T defaultValue,
- Coder<T> valueCoder) {
+ Coder<T> defaultValueCoder) {
return new SimplePCollectionView<>(
pCollection,
- new SingletonViewFn<>(hasDefault, defaultValue, valueCoder),
+ new SingletonViewFn<>(hasDefault, defaultValue, defaultValueCoder),
windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
- windowingStrategy,
- valueCoder);
+ windowingStrategy);
}
/**
- * Returns a {@code PCollectionView<Iterable<T>>} capable of processing elements encoded using the
- * provided {@link Coder} and windowed using the provided {@link WindowingStrategy}.
+ * Returns a {@code PCollectionView<Iterable<T>>} capable of processing elements windowed
+ * using the provided {@link WindowingStrategy}.
*/
public static <T, W extends BoundedWindow> PCollectionView<Iterable<T>> iterableView(
- PCollection<T> pCollection,
- WindowingStrategy<?, W> windowingStrategy,
- Coder<T> valueCoder) {
+ PCollection<KV<Void, T>> pCollection,
+ WindowingStrategy<?, W> windowingStrategy) {
return new SimplePCollectionView<>(
pCollection,
new IterableViewFn<T>(),
windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
- windowingStrategy,
- valueCoder);
+ windowingStrategy);
}
/**
- * Returns a {@code PCollectionView<List<T>>} capable of processing elements encoded using the
- * provided {@link Coder} and windowed using the provided {@link WindowingStrategy}.
+ * Returns a {@code PCollectionView<List<T>>} capable of processing elements windowed
+ * using the provided {@link WindowingStrategy}.
*/
public static <T, W extends BoundedWindow> PCollectionView<List<T>> listView(
- PCollection<T> pCollection,
- WindowingStrategy<?, W> windowingStrategy,
- Coder<T> valueCoder) {
+ PCollection<KV<Void, T>> pCollection,
+ WindowingStrategy<?, W> windowingStrategy) {
return new SimplePCollectionView<>(
pCollection,
new ListViewFn<T>(),
windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
- windowingStrategy,
- valueCoder);
+ windowingStrategy);
}
/**
- * Returns a {@code PCollectionView<Map<K, V>>} capable of processing elements encoded using the
- * provided {@link Coder} and windowed using the provided {@link WindowingStrategy}.
+ * Returns a {@code PCollectionView<Map<K, V>>} capable of processing elements windowed
+ * using the provided {@link WindowingStrategy}.
*/
public static <K, V, W extends BoundedWindow> PCollectionView<Map<K, V>> mapView(
- PCollection<KV<K, V>> pCollection,
- WindowingStrategy<?, W> windowingStrategy,
- Coder<KV<K, V>> valueCoder) {
+ PCollection<KV<Void, KV<K, V>>> pCollection,
+ WindowingStrategy<?, W> windowingStrategy) {
return new SimplePCollectionView<>(
pCollection,
new MapViewFn<K, V>(),
windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
- windowingStrategy,
- valueCoder);
+ windowingStrategy);
}
/**
- * Returns a {@code PCollectionView<Map<K, Iterable<V>>>} capable of processing elements encoded
- * using the provided {@link Coder} and windowed using the provided {@link WindowingStrategy}.
+ * Returns a {@code PCollectionView<Map<K, Iterable<V>>>} capable of processing elements windowed
+ * using the provided {@link WindowingStrategy}.
*/
public static <K, V, W extends BoundedWindow> PCollectionView<Map<K, Iterable<V>>> multimapView(
- PCollection<KV<K, V>> pCollection,
- WindowingStrategy<?, W> windowingStrategy,
- Coder<KV<K, V>> valueCoder) {
+ PCollection<KV<Void, KV<K, V>>> pCollection,
+ WindowingStrategy<?, W> windowingStrategy) {
return new SimplePCollectionView<>(
pCollection,
new MultimapViewFn<K, V>(),
windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
- windowingStrategy,
- valueCoder);
+ windowingStrategy);
}
/**
@@ -153,18 +142,15 @@ public class PCollectionViews {
}
/**
- * Implementation of conversion of singleton {@code Iterable<WindowedValue<T>>} to {@code T}.
+ * Implementation which is able to adapt a multimap materialization to a {@code T}.
*
* <p>For internal use only.
*
* <p>Instantiate via {@link PCollectionViews#singletonView}.
- *
- * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive
- * view type.
*/
- @Deprecated
@Experimental(Kind.CORE_RUNNERS_ONLY)
- public static class SingletonViewFn<T> extends ViewFn<Iterable<WindowedValue<T>>, T> {
+ public static class SingletonViewFn<T>
+ extends ViewFn<MultimapView<Void, T>, T> {
@Nullable private byte[] encodedDefaultValue;
@Nullable private transient T defaultValue;
@Nullable private Coder<T> valueCoder;
@@ -204,9 +190,12 @@ public class PCollectionViews {
}
// Lazily decode the default value once
synchronized (this) {
- if (encodedDefaultValue != null && defaultValue == null) {
+ if (encodedDefaultValue != null) {
try {
defaultValue = CoderUtils.decodeFromByteArray(valueCoder, encodedDefaultValue);
+ // Clear the encoded default value to free the reference once we have the object
+ // version. Also, this will guarantee that the value will only be decoded once.
+ encodedDefaultValue = null;
} catch (IOException e) {
throw new RuntimeException("Unexpected IOException: ", e);
}
@@ -216,84 +205,67 @@ public class PCollectionViews {
}
@Override
- public Materialization<Iterable<WindowedValue<T>>> getMaterialization() {
- return Materializations.iterable();
+ public Materialization<MultimapView<Void, T>> getMaterialization() {
+ return Materializations.multimap();
}
@Override
- public T apply(Iterable<WindowedValue<T>> contents) {
+ public T apply(MultimapView<Void, T> primitiveViewT) {
try {
- return Iterables.getOnlyElement(contents).getValue();
+ return Iterables.getOnlyElement(primitiveViewT.get(null));
} catch (NoSuchElementException exc) {
return getDefaultValue();
} catch (IllegalArgumentException exc) {
throw new IllegalArgumentException(
- "PCollection with more than one element "
- + "accessed as a singleton view.");
+ "PCollection with more than one element accessed as a singleton view.");
}
}
}
/**
- * Implementation of conversion {@code Iterable<WindowedValue<T>>} to {@code Iterable<T>}.
+ * Implementation which is able to adapt a multimap materialization to a {@code Iterable<T>}.
*
* <p>For internal use only.
*
* <p>Instantiate via {@link PCollectionViews#iterableView}.
- *
- * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive
- * view type.
*/
- @Deprecated
@Experimental(Kind.CORE_RUNNERS_ONLY)
public static class IterableViewFn<T>
- extends ViewFn<Iterable<WindowedValue<T>>, Iterable<T>> {
+ extends ViewFn<MultimapView<Void, T>, Iterable<T>> {
+
@Override
- public Materialization<Iterable<WindowedValue<T>>> getMaterialization() {
- return Materializations.iterable();
+ public Materialization<MultimapView<Void, T>> getMaterialization() {
+ return Materializations.multimap();
}
@Override
- public Iterable<T> apply(Iterable<WindowedValue<T>> contents) {
- return Iterables.unmodifiableIterable(
- Iterables.transform(contents, new Function<WindowedValue<T>, T>() {
- @SuppressWarnings("unchecked")
- @Override
- public T apply(WindowedValue<T> input) {
- return input.getValue();
- }
- }));
+ public Iterable<T> apply(MultimapView<Void, T> primitiveViewT) {
+ return Iterables.unmodifiableIterable(primitiveViewT.get(null));
}
}
/**
- * Implementation of conversion {@code Iterable<WindowedValue<T>>} to {@code List<T>}.
+ * Implementation which is able to adapt a multimap materialization to a {@code List<T>}.
*
* <p>For internal use only.
*
* <p>Instantiate via {@link PCollectionViews#listView}.
- *
- * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive
- * view type.
*/
- @Deprecated
@Experimental(Kind.CORE_RUNNERS_ONLY)
- public static class ListViewFn<T> extends ViewFn<Iterable<WindowedValue<T>>, List<T>> {
+ public static class ListViewFn<T>
+ extends ViewFn<MultimapView<Void, T>, List<T>> {
@Override
- public Materialization<Iterable<WindowedValue<T>>> getMaterialization() {
- return Materializations.iterable();
+ public Materialization<MultimapView<Void, T>> getMaterialization() {
+ return Materializations.multimap();
}
@Override
- public List<T> apply(Iterable<WindowedValue<T>> contents) {
- return ImmutableList.copyOf(
- Iterables.transform(contents, new Function<WindowedValue<T>, T>() {
- @SuppressWarnings("unchecked")
- @Override
- public T apply(WindowedValue<T> input) {
- return input.getValue();
+ public List<T> apply(MultimapView<Void, T> primitiveViewT) {
+ List<T> list = new ArrayList<>();
+ for (T t : primitiveViewT.get(null)) {
+ list.add(t);
}
- }));
+ return Collections.unmodifiableList(list);
}
@Override
@@ -308,27 +280,29 @@ public class PCollectionViews {
}
/**
- * Implementation of conversion {@code Iterable<WindowedValue<KV<K, V>>>}
- * to {@code Map<K, Iterable<V>>}.
+ * Implementation which is able to adapt a multimap materialization to a
+ * {@code Map<K, Iterable<V>>}.
+ *
+ * <p>For internal use only.
*
- * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive
- * view type.
+ * <p>Instantiate via {@link PCollectionViews#multimapView}.
*/
- @Deprecated
@Experimental(Kind.CORE_RUNNERS_ONLY)
public static class MultimapViewFn<K, V>
- extends ViewFn<Iterable<WindowedValue<KV<K, V>>>, Map<K, Iterable<V>>> {
+ extends ViewFn<MultimapView<Void, KV<K, V>>, Map<K, Iterable<V>>> {
@Override
- public Materialization<Iterable<WindowedValue<KV<K, V>>>> getMaterialization() {
- return Materializations.iterable();
+ public Materialization<MultimapView<Void, KV<K, V>>> getMaterialization() {
+ return Materializations.multimap();
}
@Override
- public Map<K, Iterable<V>> apply(Iterable<WindowedValue<KV<K, V>>> elements) {
+ public Map<K, Iterable<V>> apply(
+ MultimapView<Void, KV<K, V>> primitiveViewT) {
+ // TODO: BEAM-3071 - fix this so that we aren't relying on Java equality and are
+ // using structural value equality.
Multimap<K, V> multimap = HashMultimap.create();
- for (WindowedValue<KV<K, V>> elem : elements) {
- KV<K, V> kv = elem.getValue();
- multimap.put(kv.getKey(), kv.getValue());
+ for (KV<K, V> elem : primitiveViewT.get(null)) {
+ multimap.put(elem.getKey(), elem.getValue());
}
// Safe covariant cast that Java cannot express without rawtypes, even with unchecked casts
@SuppressWarnings({"unchecked", "rawtypes"})
@@ -338,32 +312,31 @@ public class PCollectionViews {
}
/**
- * Implementation of conversion {@code Iterable<WindowedValue<KV<K, V>>} with one value per key to
- * {@code Map<K, V>}.
+ * Implementation which is able to adapt a multimap materialization to a {@code Map<K, V>}.
+ *
+ * <p>For internal use only.
*
- * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive
- * view type.
+ * <p>Instantiate via {@link PCollectionViews#mapView}.
*/
- @Deprecated
@Experimental(Kind.CORE_RUNNERS_ONLY)
- public static class MapViewFn<K, V> extends ViewFn<Iterable<WindowedValue<KV<K, V>>>, Map<K, V>> {
+ public static class MapViewFn<K, V>
+ extends ViewFn<MultimapView<Void, KV<K, V>>, Map<K, V>> {
+
@Override
- public Materialization<Iterable<WindowedValue<KV<K, V>>>> getMaterialization() {
- return Materializations.iterable();
+ public Materialization<MultimapView<Void, KV<K, V>>> getMaterialization() {
+ return Materializations.multimap();
}
- /**
- * Input iterable must actually be {@code Iterable<WindowedValue<KV<K, V>>>}.
- */
@Override
- public Map<K, V> apply(Iterable<WindowedValue<KV<K, V>>> elements) {
+ public Map<K, V> apply(MultimapView<Void, KV<K, V>> primitiveViewT) {
+ // TODO: BEAM-3071 - fix this so that we aren't relying on Java equality and are
+ // using structural value equality.
Map<K, V> map = new HashMap<>();
- for (WindowedValue<KV<K, V>> elem : elements) {
- KV<K, V> kv = elem.getValue();
- if (map.containsKey(kv.getKey())) {
- throw new IllegalArgumentException("Duplicate values for " + kv.getKey());
+ for (KV<K, V> elem : primitiveViewT.get(null)) {
+ if (map.containsKey(elem.getKey())) {
+ throw new IllegalArgumentException("Duplicate values for " + elem.getKey());
}
- map.put(kv.getKey(), kv.getValue());
+ map.put(elem.getKey(), elem.getValue());
}
return Collections.unmodifiableMap(map);
}
@@ -375,14 +348,14 @@ public class PCollectionViews {
*
* <p>For internal use only.
*/
- public static class SimplePCollectionView<ElemT, ViewT, W extends BoundedWindow>
+ public static class SimplePCollectionView<ElemT, PrimitiveViewT, ViewT, W extends BoundedWindow>
extends PValueBase
implements PCollectionView<ViewT> {
/** The {@link PCollection} this view was originally created from. */
private transient PCollection<ElemT> pCollection;
/** A unique tag for the view, typed according to the elements underlying the view. */
- private TupleTag<Iterable<WindowedValue<ElemT>>> tag;
+ private TupleTag<PrimitiveViewT> tag;
private WindowMappingFn<W> windowMappingFn;
@@ -390,12 +363,12 @@ public class PCollectionViews {
private WindowingStrategy<?, W> windowingStrategy;
/** The coder for the elements underlying the view. */
- private @Nullable Coder<Iterable<WindowedValue<ElemT>>> coder;
+ private @Nullable Coder<ElemT> coder;
/**
* The typed {@link ViewFn} for this view.
*/
- private ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn;
+ private ViewFn<PrimitiveViewT, ViewT> viewFn;
/**
* Call this constructor to initialize the fields for which this base class provides
@@ -403,11 +376,10 @@ public class PCollectionViews {
*/
private SimplePCollectionView(
PCollection<ElemT> pCollection,
- TupleTag<Iterable<WindowedValue<ElemT>>> tag,
- ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
+ TupleTag<PrimitiveViewT> tag,
+ ViewFn<PrimitiveViewT, ViewT> viewFn,
WindowMappingFn<W> windowMappingFn,
- WindowingStrategy<?, W> windowingStrategy,
- Coder<ElemT> valueCoder) {
+ WindowingStrategy<?, W> windowingStrategy) {
super(pCollection.getPipeline());
this.pCollection = pCollection;
if (windowingStrategy.getWindowFn() instanceof InvalidWindows) {
@@ -417,9 +389,7 @@ public class PCollectionViews {
this.tag = tag;
this.windowingStrategy = windowingStrategy;
this.viewFn = viewFn;
- this.coder =
- IterableCoder.of(WindowedValue.getFullCoder(
- valueCoder, windowingStrategy.getWindowFn().windowCoder()));
+ this.coder = pCollection.getCoder();
}
/**
@@ -428,27 +398,20 @@ public class PCollectionViews {
*/
private SimplePCollectionView(
PCollection<ElemT> pCollection,
- ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
+ ViewFn<PrimitiveViewT, ViewT> viewFn,
WindowMappingFn<W> windowMappingFn,
- WindowingStrategy<?, W> windowingStrategy,
- Coder<ElemT> valueCoder) {
+ WindowingStrategy<?, W> windowingStrategy) {
this(
pCollection,
- new TupleTag<Iterable<WindowedValue<ElemT>>>(),
+ new TupleTag<PrimitiveViewT>(),
viewFn,
windowMappingFn,
- windowingStrategy,
- valueCoder);
+ windowingStrategy);
}
@Override
- public ViewFn<Iterable<WindowedValue<?>>, ViewT> getViewFn() {
- // Safe cast: it is required that the rest of the SDK maintain the invariant
- // that a PCollectionView is only provided an iterable for the elements of an
- // appropriately typed PCollection.
- @SuppressWarnings({"rawtypes", "unchecked"})
- ViewFn<Iterable<WindowedValue<?>>, ViewT> untypedViewFn = (ViewFn) viewFn;
- return untypedViewFn;
+ public ViewFn<PrimitiveViewT, ViewT> getViewFn() {
+ return viewFn;
}
@Override
@@ -467,13 +430,8 @@ public class PCollectionViews {
* <p>For internal use only by runner implementors.
*/
@Override
- public TupleTag<Iterable<WindowedValue<?>>> getTagInternal() {
- // Safe cast: It is required that the rest of the SDK maintain the invariant that
- // this tag is only used to access the contents of an appropriately typed underlying
- // PCollection
- @SuppressWarnings({"rawtypes", "unchecked"})
- TupleTag<Iterable<WindowedValue<?>>> untypedTag = (TupleTag) tag;
- return untypedTag;
+ public TupleTag<?> getTagInternal() {
+ return tag;
}
/**
@@ -488,12 +446,8 @@ public class PCollectionViews {
}
@Override
- public Coder<Iterable<WindowedValue<?>>> getCoderInternal() {
- // Safe cast: It is required that the rest of the SDK only use this untyped coder
- // for the elements of an appropriately typed underlying PCollection.
- @SuppressWarnings({"rawtypes", "unchecked"})
- Coder<Iterable<WindowedValue<?>>> untypedCoder = (Coder) coder;
- return untypedCoder;
+ public Coder<?> getCoderInternal() {
+ return coder;
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
index aaf8b91..e7fd9b8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
@@ -18,344 +18,57 @@
package org.apache.beam.sdk.testing;
-import com.google.common.base.Function;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
+import com.google.common.collect.ImmutableSet;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.transforms.Materialization;
-import org.apache.beam.sdk.transforms.Materializations;
-import org.apache.beam.sdk.transforms.ViewFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.PValueBase;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
/**
- * Methods for creating and using {@link PCollectionView} instances.
+ * Methods for testing {@link PCollectionView}s.
*/
public final class PCollectionViewTesting {
-
- // Do not instantiate; static methods only
- private PCollectionViewTesting() { }
-
- /**
- * The length of the default window, which is an {@link IntervalWindow}, but kept encapsulated
- * as it is not for the user to know what sort of window it is.
- */
- private static final long DEFAULT_WINDOW_MSECS = 1000 * 60 * 60;
-
- /**
- * A default windowing strategy. Tests that are not concerned with the windowing
- * strategy should not specify it, and all views will use this.
- */
- public static final WindowingStrategy<?, ?> DEFAULT_WINDOWING_STRATEGY =
- WindowingStrategy.of(FixedWindows.of(new Duration(DEFAULT_WINDOW_MSECS)));
-
- /**
- * A default window into which test elements will be placed, if the window is
- * not explicitly overridden.
- */
- public static final BoundedWindow DEFAULT_NONEMPTY_WINDOW =
- new IntervalWindow(new Instant(0), new Instant(DEFAULT_WINDOW_MSECS));
-
- /**
- * A timestamp in the {@link #DEFAULT_NONEMPTY_WINDOW}.
- */
- public static final Instant DEFAULT_TIMESTAMP = DEFAULT_NONEMPTY_WINDOW.maxTimestamp().minus(1);
-
- /**
- * A window into which no element will be placed by methods in this class, unless explicitly
- * requested.
- */
- public static final BoundedWindow DEFAULT_EMPTY_WINDOW = new IntervalWindow(
- DEFAULT_NONEMPTY_WINDOW.maxTimestamp(),
- DEFAULT_NONEMPTY_WINDOW.maxTimestamp().plus(DEFAULT_WINDOW_MSECS));
-
- /**
- * A {@link ViewFn} that returns the provided contents as a fully lazy iterable.
- */
- public static class IdentityViewFn<T> extends ViewFn<Iterable<WindowedValue<T>>, Iterable<T>> {
- @Override
- public Materialization<Iterable<WindowedValue<T>>> getMaterialization() {
- return Materializations.iterable();
- }
-
- @Override
- public Iterable<T> apply(Iterable<WindowedValue<T>> contents) {
- return Iterables.transform(contents, new Function<WindowedValue<T>, T>() {
- @Override
- public T apply(WindowedValue<T> windowedValue) {
- return windowedValue.getValue();
- }
- });
- }
- }
-
- /**
- * A {@link ViewFn} that traverses the whole iterable eagerly and returns the number of elements.
- *
- * <p>Only for use in testing scenarios with small collections. If there are more elements
- * provided than {@code Integer.MAX_VALUE} then behavior is unpredictable.
- */
- public static class LengthViewFn<T> extends ViewFn<Iterable<WindowedValue<T>>, Long> {
- @Override
- public Materialization<Iterable<WindowedValue<T>>> getMaterialization() {
- return Materializations.iterable();
- }
-
- @Override
- public Long apply(Iterable<WindowedValue<T>> contents) {
- return (long) Iterables.size(contents);
- }
- }
-
- /**
- * A {@link ViewFn} that always returns the value with which it is instantiated.
- */
- public static class ConstantViewFn<ElemT, ViewT>
- extends ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> {
- private ViewT value;
-
- public ConstantViewFn(ViewT value) {
- this.value = value;
- }
-
- @Override
- public Materialization<Iterable<WindowedValue<ElemT>>> getMaterialization() {
- return Materializations.iterable();
- }
-
- @Override
- public ViewT apply(Iterable<WindowedValue<ElemT>> contents) {
- return value;
- }
- }
-
- /**
- * A {@link PCollectionView} explicitly built from a {@link TupleTag}
- * and conversion {@link ViewFn}, and an element coder, using the
- * {@link #DEFAULT_WINDOWING_STRATEGY}.
- *
- * <p>This method is only recommended for use by runner implementors to test their
- * implementations. It is very easy to construct a {@link PCollectionView} that does
- * not respect the invariants required for proper functioning.
- *
- * <p>Note that if the provided {@code WindowingStrategy} does not match that of the windowed
- * values provided to the view during execution, results are unpredictable. It is recommended
- * that the values be prepared via {@link #contentsInDefaultWindow}.
- */
- public static <ElemT, ViewT> PCollectionView<ViewT> testingView(
- TupleTag<Iterable<WindowedValue<ElemT>>> tag,
- ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
- Coder<ElemT> elemCoder,
- WindowingStrategy<?, ?> windowingStrategy) {
- return testingView(null, tag, viewFn, elemCoder, windowingStrategy);
- }
-
- /**
- * The default {@link Coder} used for windowed values, given an element {@link Coder}.
- */
- public static <T> Coder<WindowedValue<T>> defaultWindowedValueCoder(Coder<T> elemCoder) {
- return WindowedValue.getFullCoder(
- elemCoder, DEFAULT_WINDOWING_STRATEGY.getWindowFn().windowCoder());
- }
-
- /**
- * A {@link PCollectionView} explicitly built from its {@link TupleTag}, {@link
- * WindowingStrategy}, {@link Coder}, and conversion function.
- *
- * <p>This method is only recommended for use by runner implementors to test their
- * implementations. It is very easy to construct a {@link PCollectionView} that does not respect
- * the invariants required for proper functioning.
- *
- * <p>Note that if the provided {@code WindowingStrategy} does not match that of the windowed
- * values provided to the view during execution, results are unpredictable.
- */
- public static <ElemT, ViewT> PCollectionView<ViewT> testingView(
- PCollection<ElemT> pCollection,
- TupleTag<Iterable<WindowedValue<ElemT>>> tag,
- ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
- Coder<ElemT> elemCoder,
- WindowingStrategy<?, ?> windowingStrategy) {
- return testingView(
- pCollection,
- tag,
- viewFn,
- windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
- elemCoder,
- windowingStrategy);
- }
-
- /**
- * A {@link PCollectionView} explicitly built from its {@link TupleTag}, {@link
- * WindowingStrategy}, {@link Coder}, {@link ViewFn} and {@link WindowMappingFn}.
- *
- * <p>This method is only recommended for use by runner implementors to test their
- * implementations. It is very easy to construct a {@link PCollectionView} that does not respect
- * the invariants required for proper functioning.
- *
- * <p>Note that if the provided {@code WindowingStrategy} does not match that of the windowed
- * values provided to the view during execution, results are unpredictable.
- */
- public static <ElemT, ViewT> PCollectionView<ViewT> testingView(
- PCollection<ElemT> pCollection,
- TupleTag<Iterable<WindowedValue<ElemT>>> tag,
- ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
- WindowMappingFn<?> windowMappingFn,
- Coder<ElemT> elemCoder,
- WindowingStrategy<?, ?> windowingStrategy) {
- return new PCollectionViewFromParts<>(
- pCollection,
- tag,
- viewFn,
- windowMappingFn,
- windowingStrategy,
- IterableCoder.of(
- WindowedValue.getFullCoder(elemCoder, windowingStrategy.getWindowFn().windowCoder())));
- }
-
- /**
- * Places the given {@code value} in the {@link #DEFAULT_NONEMPTY_WINDOW}.
- */
- public static <T> WindowedValue<T> valueInDefaultWindow(T value) {
- return WindowedValue.of(value, DEFAULT_TIMESTAMP, DEFAULT_NONEMPTY_WINDOW, PaneInfo.NO_FIRING);
- }
-
- /**
- * Prepares {@code values} for reading as the contents of a {@link PCollectionView} side input.
- */
- @SafeVarargs
- public static <T> Iterable<WindowedValue<T>> contentsInDefaultWindow(T... values)
- throws Exception {
- List<WindowedValue<T>> windowedValues = Lists.newArrayList();
- for (T value : values) {
- windowedValues.add(valueInDefaultWindow(value));
- }
- return windowedValues;
- }
-
- /**
- * Prepares {@code values} for reading as the contents of a {@link PCollectionView} side input.
- */
- public static <T> Iterable<WindowedValue<T>> contentsInDefaultWindow(Iterable<T> values)
- throws Exception {
- List<WindowedValue<T>> windowedValues = Lists.newArrayList();
- for (T value : values) {
- windowedValues.add(valueInDefaultWindow(value));
- }
- return windowedValues;
- }
-
- // Internal details below here
-
- /**
- * A {@link PCollectionView} explicitly built from its {@link TupleTag},
- * {@link WindowingStrategy}, and conversion function.
- *
- * <p>Instantiate via {@link #testingView}.
- */
- private static class PCollectionViewFromParts<ElemT, ViewT>
- extends PValueBase
- implements PCollectionView<ViewT> {
- private PCollection<ElemT> pCollection;
- private TupleTag<Iterable<WindowedValue<ElemT>>> tag;
- private ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn;
- private WindowMappingFn<?> windowMappingFn;
- private WindowingStrategy<?, ?> windowingStrategy;
- private Coder<Iterable<WindowedValue<ElemT>>> coder;
-
- public PCollectionViewFromParts(
- PCollection<ElemT> pCollection,
- TupleTag<Iterable<WindowedValue<ElemT>>> tag,
- ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
- WindowMappingFn<?> windowMappingFn,
- WindowingStrategy<?, ?> windowingStrategy,
- Coder<Iterable<WindowedValue<ElemT>>> coder) {
- this.pCollection = pCollection;
- this.tag = tag;
- this.viewFn = viewFn;
- this.windowMappingFn = windowMappingFn;
- this.windowingStrategy = windowingStrategy;
- this.coder = coder;
- }
-
- @Override
- public PCollection<?> getPCollection() {
- return pCollection;
- }
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- @Override
- public TupleTag<Iterable<WindowedValue<?>>> getTagInternal() {
- return (TupleTag) tag;
- }
-
- @Override
- public ViewFn<Iterable<WindowedValue<?>>, ViewT> getViewFn() {
- // Safe cast; runners must maintain type safety
- @SuppressWarnings({"unchecked", "rawtypes"})
- ViewFn<Iterable<WindowedValue<?>>, ViewT> untypedViewFn = (ViewFn) viewFn;
- return untypedViewFn;
- }
-
- @Override
- public WindowMappingFn<?> getWindowMappingFn() {
- return windowMappingFn;
- }
-
- @Override
- public WindowingStrategy<?, ?> getWindowingStrategyInternal() {
- return windowingStrategy;
- }
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- @Override
- public Coder<Iterable<WindowedValue<?>>> getCoderInternal() {
- return (Coder) coder;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(tag);
- }
-
- @Override
- public boolean equals(Object other) {
- if (!(other instanceof PCollectionView)) {
- return false;
+ public static List<Object> materializeValuesFor(
+ PTransform<?, ? extends PCollectionView<?>> viewTransformClass, Object ... values) {
+ List<Object> rval = new ArrayList<>();
+ // Currently all view materializations are the same where the data is shared underneath
+ // the void/null key. Once this changes, these materializations will differ but test code
+ // should not worry about what these look like if they are relying on the ViewFn to "undo"
+ // the conversion.
+ if (View.AsSingleton.class.equals(viewTransformClass.getClass())) {
+ for (Object value : values) {
+ rval.add(KV.of(null, value));
}
- @SuppressWarnings("unchecked")
- PCollectionView<?> otherView = (PCollectionView<?>) other;
- return tag.equals(otherView.getTagInternal());
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("tag", tag)
- .add("viewFn", viewFn)
- .toString();
- }
-
- @Override
- public Map<TupleTag<?>, PValue> expand() {
- return Collections.<TupleTag<?>, PValue>singletonMap(tag, pCollection);
- }
+ } else if (View.AsIterable.class.equals(viewTransformClass.getClass())) {
+ for (Object value : values) {
+ rval.add(KV.of(null, value));
+ }
+ } else if (View.AsList.class.equals(viewTransformClass.getClass())) {
+ for (Object value : values) {
+ rval.add(KV.of(null, value));
+ }
+ } else if (View.AsMap.class.equals(viewTransformClass.getClass())) {
+ for (Object value : values) {
+ rval.add(KV.of(null, value));
+ }
+ } else if (View.AsMultimap.class.equals(viewTransformClass.getClass())) {
+ for (Object value : values) {
+ rval.add(KV.of(null, value));
+ }
+ } else {
+ throw new IllegalArgumentException(String.format(
+ "Unknown type of view %s. Supported views are %s.",
+ viewTransformClass.getClass(),
+ ImmutableSet.of(
+ View.AsSingleton.class,
+ View.AsIterable.class,
+ View.AsList.class,
+ View.AsMap.class,
+ View.AsMultimap.class)));
+ }
+ return Collections.unmodifiableList(rval);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
index 5cb9e18..cff6b2d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
@@ -37,9 +37,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.TimestampedValue;
-import org.apache.beam.sdk.values.WindowingStrategy;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -297,9 +295,8 @@ public class DoFnTesterTest {
@Test
public void fnWithSideInputDefault() throws Exception {
PCollection<Integer> pCollection = p.apply(Create.empty(VarIntCoder.of()));
- final PCollectionView<Integer> value =
- PCollectionViews.singletonView(
- pCollection, WindowingStrategy.globalDefault(), true, 0, VarIntCoder.of());
+ final PCollectionView<Integer> value = pCollection.apply(
+ View.<Integer>asSingleton().withDefaultValue(0));
try (DoFnTester<Integer, Integer> tester = DoFnTester.of(new SideInputDoFn(value))) {
tester.processElement(1);
@@ -313,9 +310,8 @@ public class DoFnTesterTest {
@Test
public void fnWithSideInputExplicit() throws Exception {
PCollection<Integer> pCollection = p.apply(Create.of(-2));
- final PCollectionView<Integer> value =
- PCollectionViews.singletonView(
- pCollection, WindowingStrategy.globalDefault(), true, 0, VarIntCoder.of());
+ final PCollectionView<Integer> value = pCollection.apply(
+ View.<Integer>asSingleton().withDefaultValue(0));
try (DoFnTester<Integer, Integer> tester = DoFnTester.of(new SideInputDoFn(value))) {
tester.setSideInput(value, GlobalWindow.INSTANCE, -2);
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index 1ccd5d6..7d20532 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -209,7 +209,7 @@ class BatchLoads<DestinationT>
checkArgument(numFileShards > 0);
Pipeline p = input.getPipeline();
final PCollectionView<String> jobIdTokenView = createJobIdView(p);
- final PCollectionView<String> tempFilePrefixView = createTempFilePrefixView(jobIdTokenView);
+ final PCollectionView<String> tempFilePrefixView = createTempFilePrefixView(p, jobIdTokenView);
// The user-supplied triggeringDuration is often chosen to to control how many BigQuery load
// jobs are generated, to prevent going over BigQuery's daily quota for load jobs. If this
// is set to a large value, currently we have to buffer all the data unti the trigger fires.
@@ -295,7 +295,7 @@ class BatchLoads<DestinationT>
public WriteResult expandUntriggered(PCollection<KV<DestinationT, TableRow>> input) {
Pipeline p = input.getPipeline();
final PCollectionView<String> jobIdTokenView = createJobIdView(p);
- final PCollectionView<String> tempFilePrefixView = createTempFilePrefixView(jobIdTokenView);
+ final PCollectionView<String> tempFilePrefixView = createTempFilePrefixView(p, jobIdTokenView);
PCollection<KV<DestinationT, TableRow>> inputInGlobalWindow =
input.apply(
"rewindowIntoGlobal",
@@ -364,8 +364,10 @@ class BatchLoads<DestinationT>
}
// Generate the temporary-file prefix.
- private PCollectionView<String> createTempFilePrefixView(PCollectionView<String> jobIdView) {
- return ((PCollection<String>) jobIdView.getPCollection())
+ private PCollectionView<String> createTempFilePrefixView(
+ Pipeline p, final PCollectionView<String> jobIdView) {
+ return p
+ .apply(Create.of(""))
.apply(
"GetTempFilePrefix",
ParDo.of(
@@ -382,13 +384,13 @@ class BatchLoads<DestinationT>
resolveTempLocation(
tempLocationRoot,
"BigQueryWriteTemp",
- c.element());
+ c.sideInput(jobIdView));
LOG.info(
"Writing BigQuery temporary files to {} before loading them.",
tempLocation);
c.output(tempLocation);
}
- }))
+ }).withSideInputs(jobIdView))
.apply("TempFilePrefixView", View.<String>asSingleton());
}