You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/11/17 20:31:26 UTC
[35/50] [abbrv] beam git commit: [BEAM-2926] Migrate to using a
trivial multimap materialization within the Java SDK.
[BEAM-2926] Migrate to using a trivial multimap materialization within the Java SDK.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5e2593da
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5e2593da
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5e2593da
Branch: refs/heads/tez-runner
Commit: 5e2593daacec83e876b747d56d8c335531a54d1d
Parents: 7ce0a82
Author: Luke Cwik <lc...@google.com>
Authored: Fri Nov 10 11:08:24 2017 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Nov 15 08:59:51 2017 -0800
----------------------------------------------------------------------
.../apex/translation/ParDoTranslatorTest.java | 3 +-
.../core/construction/PTransformMatchers.java | 3 +-
.../core/construction/ParDoTranslation.java | 17 +-
.../CreatePCollectionViewTranslationTest.java | 10 +-
.../construction/PTransformMatchersTest.java | 33 +-
.../core/construction/ParDoTranslationTest.java | 7 +-
.../core/InMemoryMultimapSideInputView.java | 62 +++
.../beam/runners/core/SideInputHandler.java | 63 ++--
.../core/InMemoryMultimapSideInputViewTest.java | 53 +++
.../beam/runners/core/SideInputHandlerTest.java | 89 +++--
.../beam/runners/direct/SideInputContainer.java | 38 +-
.../runners/direct/EvaluationContextTest.java | 44 ++-
.../runners/direct/SideInputContainerTest.java | 226 +++++------
.../direct/ViewEvaluatorFactoryTest.java | 13 +-
.../runners/direct/ViewOverrideFactoryTest.java | 9 +-
.../direct/WriteWithShardingFactoryTest.java | 9 +-
.../FlinkStreamingTransformTranslators.java | 1 -
.../functions/FlinkSideInputReader.java | 27 +-
.../functions/SideInputInitializer.java | 50 ++-
.../flink/streaming/DoFnOperatorTest.java | 40 +-
.../DataflowPipelineTranslatorTest.java | 12 +-
.../spark/translation/TransformTranslator.java | 7 +-
.../spark/util/SparkSideInputReader.java | 50 ++-
.../org/apache/beam/sdk/transforms/Combine.java | 13 +-
.../apache/beam/sdk/transforms/DoFnTester.java | 20 +-
.../beam/sdk/transforms/Materializations.java | 29 +-
.../org/apache/beam/sdk/transforms/View.java | 67 +++-
.../org/apache/beam/sdk/transforms/ViewFn.java | 6 +-
.../apache/beam/sdk/values/PCollectionView.java | 7 +-
.../beam/sdk/values/PCollectionViews.java | 256 ++++++-------
.../sdk/testing/PCollectionViewTesting.java | 375 +++----------------
.../beam/sdk/transforms/DoFnTesterTest.java | 12 +-
.../beam/sdk/io/gcp/bigquery/BatchLoads.java | 14 +-
33 files changed, 809 insertions(+), 856 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java
index 73382e3..4a4ca1d 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.apex.translation;
+import static org.apache.beam.sdk.testing.PCollectionViewTesting.materializeValuesFor;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -219,7 +220,7 @@ public class ParDoTranslatorTest {
operator.beginWindow(0);
WindowedValue<Integer> wv1 = WindowedValue.valueInGlobalWindow(1);
WindowedValue<Iterable<?>> sideInput = WindowedValue.<Iterable<?>>valueInGlobalWindow(
- Lists.<Integer>newArrayList(22));
+ materializeValuesFor(View.asSingleton(), 22));
operator.input.process(ApexStreamTuple.DataTuple.of(wv1)); // pushed back input
final List<Object> results = Lists.newArrayList();
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
index 0d27241..42ac73f 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
@@ -36,7 +36,6 @@ import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.ProcessElementMethod;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
@@ -304,7 +303,7 @@ public class PTransformMatchers {
}
CreatePCollectionView<?, ?> createView =
(CreatePCollectionView<?, ?>) application.getTransform();
- ViewFn<Iterable<WindowedValue<?>>, ?> viewFn = createView.getView().getViewFn();
+ ViewFn<?, ?> viewFn = createView.getView().getViewFn();
return viewFn.getClass().equals(viewFnType);
}
};
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index f88cbe5..e00b912 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -50,7 +50,6 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput.Builder;
import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
@@ -73,8 +72,6 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
@@ -561,25 +558,19 @@ public class ParDoTranslation {
ViewFn<?, ?> viewFn = viewFnFromProto(sideInput.getViewFn());
WindowingStrategy<?, ?> windowingStrategy = pCollection.getWindowingStrategy().fixDefaults();
- Coder<Iterable<WindowedValue<?>>> coder =
- (Coder)
- IterableCoder.of(
- FullWindowedValueCoder.of(
- pCollection.getCoder(),
- pCollection.getWindowingStrategy().getWindowFn().windowCoder()));
checkArgument(
- sideInput.getAccessPattern().getUrn().equals(Materializations.ITERABLE_MATERIALIZATION_URN),
+ sideInput.getAccessPattern().getUrn().equals(Materializations.MULTIMAP_MATERIALIZATION_URN),
"Unknown View Materialization URN %s",
sideInput.getAccessPattern().getUrn());
PCollectionView<?> view =
new RunnerPCollectionView<>(
pCollection,
- (TupleTag<Iterable<WindowedValue<?>>>) tag,
- (ViewFn<Iterable<WindowedValue<?>>, ?>) viewFn,
+ (TupleTag) tag,
+ (ViewFn) viewFn,
windowMappingFn,
windowingStrategy,
- coder);
+ (Coder) pCollection.getCoder());
return view;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java
index df659a8..690e3ca 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java
@@ -23,12 +23,14 @@ import static org.junit.Assert.assertThat;
import com.google.common.collect.ImmutableList;
import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PCollectionViews;
@@ -63,12 +65,11 @@ public class CreatePCollectionViewTranslationTest {
testPCollection.getWindowingStrategy(),
false,
null,
- testPCollection.getCoder())),
+ StringUtf8Coder.of())),
CreatePCollectionView.of(
PCollectionViews.listView(
testPCollection,
- testPCollection.getWindowingStrategy(),
- testPCollection.getCoder())));
+ testPCollection.getWindowingStrategy())));
}
@Parameter(0)
@@ -76,7 +77,8 @@ public class CreatePCollectionViewTranslationTest {
public static TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
- private static final PCollection<String> testPCollection = p.apply(Create.of("one"));
+ private static final PCollection<KV<Void, String>> testPCollection =
+ p.apply(Create.of(KV.of((Void) null, "one")));
@Test
public void testEncodedProto() throws Exception {
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index 324e38d..c2dab4c 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -54,8 +54,6 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.Materialization;
-import org.apache.beam.sdk.transforms.Materializations;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
@@ -67,7 +65,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
@@ -376,9 +373,8 @@ public class PTransformMatchersTest implements Serializable {
@Test
public void createViewWithViewFn() {
PCollection<Integer> input = p.apply(Create.of(1));
- PCollectionView<Iterable<Integer>> view =
- PCollectionViews.iterableView(input, input.getWindowingStrategy(), input.getCoder());
- ViewFn<Iterable<WindowedValue<?>>, Iterable<Integer>> viewFn = view.getViewFn();
+ PCollectionView<Iterable<Integer>> view = input.apply(View.<Integer>asIterable());
+ ViewFn<?, ?> viewFn = view.getViewFn();
CreatePCollectionView<?, ?> createView = CreatePCollectionView.of(view);
PTransformMatcher matcher = PTransformMatchers.createViewWithViewFn(viewFn.getClass());
@@ -388,23 +384,10 @@ public class PTransformMatchersTest implements Serializable {
@Test
public void createViewWithViewFnDifferentViewFn() {
PCollection<Integer> input = p.apply(Create.of(1));
- PCollectionView<Iterable<Integer>> view =
- PCollectionViews.iterableView(input, input.getWindowingStrategy(), input.getCoder());
- ViewFn<Iterable<WindowedValue<?>>, Iterable<Integer>> viewFn =
- new ViewFn<Iterable<WindowedValue<?>>, Iterable<Integer>>() {
- @Override
- public Materialization<Iterable<WindowedValue<?>>> getMaterialization() {
- @SuppressWarnings({"rawtypes", "unchecked"})
- Materialization<Iterable<WindowedValue<?>>> materialization =
- (Materialization) Materializations.iterable();
- return materialization;
- }
-
- @Override
- public Iterable<Integer> apply(Iterable<WindowedValue<?>> contents) {
- return Collections.emptyList();
- }
- };
+ PCollectionView<Iterable<Integer>> view = input.apply(View.<Integer>asIterable());
+
+ // Purposely create a subclass to get a different class then what was expected.
+ ViewFn<?, ?> viewFn = new PCollectionViews.IterableViewFn() {};
CreatePCollectionView<?, ?> createView = CreatePCollectionView.of(view);
PTransformMatcher matcher = PTransformMatchers.createViewWithViewFn(viewFn.getClass());
@@ -414,9 +397,7 @@ public class PTransformMatchersTest implements Serializable {
@Test
public void createViewWithViewFnNotCreatePCollectionView() {
PCollection<Integer> input = p.apply(Create.of(1));
- PCollectionView<Iterable<Integer>> view =
- PCollectionViews.iterableView(input, input.getWindowingStrategy(), input.getCoder());
-
+ PCollectionView<Iterable<Integer>> view = input.apply(View.<Integer>asIterable());
PTransformMatcher matcher =
PTransformMatchers.createViewWithViewFn(view.getViewFn().getClass());
assertThat(matcher.matches(getAppliedTransform(View.asIterable())), is(false));
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
index b79947e..83594f1 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
@@ -29,6 +29,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
@@ -166,7 +167,8 @@ public class ParDoTranslationTest {
view.getPCollection(),
protoTransform,
rehydratedComponents);
- assertThat(restoredView.getTagInternal(), equalTo(view.getTagInternal()));
+ assertThat(restoredView.getTagInternal(),
+ Matchers.<TupleTag<?>>equalTo(view.getTagInternal()));
assertThat(restoredView.getViewFn(), instanceOf(view.getViewFn().getClass()));
assertThat(
restoredView.getWindowMappingFn(), instanceOf(view.getWindowMappingFn().getClass()));
@@ -174,7 +176,8 @@ public class ParDoTranslationTest {
restoredView.getWindowingStrategyInternal(),
Matchers.<WindowingStrategy<?, ?>>equalTo(
view.getWindowingStrategyInternal().fixDefaults()));
- assertThat(restoredView.getCoderInternal(), equalTo(view.getCoderInternal()));
+ assertThat(restoredView.getCoderInternal(),
+ Matchers.<Coder<?>>equalTo(view.getCoderInternal()));
}
String mainInputId = sdkComponents.registerPCollection(mainInput);
assertThat(
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryMultimapSideInputView.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryMultimapSideInputView.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryMultimapSideInputView.java
new file mode 100644
index 0000000..b451547
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryMultimapSideInputView.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.Materializations.MultimapView;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * An in-memory representation of {@link MultimapView}.
+ */
+public class InMemoryMultimapSideInputView<K, V> implements Materializations.MultimapView<K, V> {
+
+ /**
+ * Creates a {@link MultimapView} from the provided values. The provided {@link Coder} is used
+ * to guarantee structural equality for keys instead of assuming Java object equality.
+ */
+ public static <K, V> MultimapView<K, V> fromIterable(
+ Coder<K> keyCoder, Iterable<KV<K, V>> values) {
+ // We specifically use an array list multimap to allow for:
+ // * null keys
+ // * null values
+ // * duplicate values
+ Multimap<Object, Object> multimap = ArrayListMultimap.create();
+ for (KV<K, V> value : values) {
+ multimap.put(keyCoder.structuralValue(value.getKey()), value.getValue());
+ }
+ return new InMemoryMultimapSideInputView(keyCoder, Multimaps.unmodifiableMultimap(multimap));
+ }
+
+ private final Coder<K> keyCoder;
+ private final Multimap<Object, V> structuralKeyToValuesMap;
+
+ private InMemoryMultimapSideInputView(Coder<K> keyCoder, Multimap<Object, V> data) {
+ this.keyCoder = keyCoder;
+ this.structuralKeyToValuesMap = data;
+ }
+
+ @Override
+ public Iterable<V> get(K k) {
+ return structuralKeyToValuesMap.get(keyCoder.structuralValue(k));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
index 3b37702..3ff4c94 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
@@ -17,22 +17,28 @@
*/
package org.apache.beam.runners.core;
-import java.util.ArrayList;
+import static com.google.common.base.Preconditions.checkArgument;
+
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.SetCoder;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.Materializations.MultimapView;
+import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
/**
@@ -58,7 +64,7 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
/**
* State internals that are scoped not to the key of a value but are global. The state can still
- * be keep locally but if side inputs are broadcast to all parallel operators then all will
+ * be kept locally but if side inputs are broadcast to all parallel operators then all will
* have the same view of the state.
*/
private final StateInternals stateInternals;
@@ -80,7 +86,7 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
*/
private final Map<
PCollectionView<?>,
- StateTag<ValueState<Iterable<WindowedValue<?>>>>> sideInputContentsTags;
+ StateTag<ValueState<Iterable<?>>>> sideInputContentsTags;
/**
* Creates a new {@code SideInputHandler} for the given side inputs that uses
@@ -94,7 +100,15 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
this.availableWindowsTags = new HashMap<>();
this.sideInputContentsTags = new HashMap<>();
- for (PCollectionView<?> sideInput: sideInputs) {
+ for (PCollectionView<?> sideInput : sideInputs) {
+ checkArgument(
+ Materializations.MULTIMAP_MATERIALIZATION_URN.equals(
+ sideInput.getViewFn().getMaterialization().getUrn()),
+ "This handler is only capable of dealing with %s materializations "
+ + "but was asked to handle %s for PCollectionView with tag %s.",
+ Materializations.MULTIMAP_MATERIALIZATION_URN,
+ sideInput.getViewFn().getMaterialization().getUrn(),
+ sideInput.getTagInternal().getId());
@SuppressWarnings("unchecked")
Coder<BoundedWindow> windowCoder =
@@ -114,9 +128,9 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
availableWindowsTags.put(sideInput, availableTag);
- Coder<Iterable<WindowedValue<?>>> coder = sideInput.getCoderInternal();
- StateTag<ValueState<Iterable<WindowedValue<?>>>> stateTag =
- StateTags.value("side-input-data-" + sideInput.getTagInternal().getId(), coder);
+ StateTag<ValueState<Iterable<?>>> stateTag =
+ StateTags.value("side-input-data-" + sideInput.getTagInternal().getId(),
+ (Coder) IterableCoder.of(sideInput.getCoderInternal()));
sideInputContentsTags.put(sideInput, stateTag);
}
}
@@ -129,7 +143,6 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
public void addSideInputValue(
PCollectionView<?> sideInput,
WindowedValue<Iterable<?>> value) {
-
@SuppressWarnings("unchecked")
Coder<BoundedWindow> windowCoder =
(Coder<BoundedWindow>) sideInput
@@ -137,19 +150,13 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
.getWindowFn()
.windowCoder();
- // reify the WindowedValue
- List<WindowedValue<?>> inputWithReifiedWindows = new ArrayList<>();
- for (Object e: value.getValue()) {
- inputWithReifiedWindows.add(value.withValue(e));
- }
-
- StateTag<ValueState<Iterable<WindowedValue<?>>>> stateTag =
+ StateTag<ValueState<Iterable<?>>> stateTag =
sideInputContentsTags.get(sideInput);
- for (BoundedWindow window: value.getWindows()) {
+ for (BoundedWindow window : value.getWindows()) {
stateInternals
.state(StateNamespaces.window(windowCoder, window), stateTag)
- .write(inputWithReifiedWindows);
+ .write(value.getValue());
stateInternals
.state(StateNamespaces.global(), availableWindowsTags.get(sideInput))
@@ -159,28 +166,32 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
@Nullable
@Override
- public <T> T get(PCollectionView<T> sideInput, BoundedWindow window) {
-
+ public <T> T get(PCollectionView<T> view, BoundedWindow window) {
@SuppressWarnings("unchecked")
Coder<BoundedWindow> windowCoder =
- (Coder<BoundedWindow>) sideInput
+ (Coder<BoundedWindow>) view
.getWindowingStrategyInternal()
.getWindowFn()
.windowCoder();
- StateTag<ValueState<Iterable<WindowedValue<?>>>> stateTag =
- sideInputContentsTags.get(sideInput);
+ StateTag<ValueState<Iterable<?>>> stateTag =
+ sideInputContentsTags.get(view);
- ValueState<Iterable<WindowedValue<?>>> state =
+ ValueState<Iterable<?>> state =
stateInternals.state(StateNamespaces.window(windowCoder, window), stateTag);
- @Nullable Iterable<WindowedValue<?>> elements = state.read();
+ // TODO: Add support for choosing which representation is contained based upon the
+ // side input materialization. We currently can assume that we always have a multimap
+ // materialization as that is the only supported type within the Java SDK.
+ @Nullable Iterable<KV<?, ?>> elements = (Iterable<KV<?, ?>>) state.read();
if (elements == null) {
elements = Collections.emptyList();
}
- return sideInput.getViewFn().apply(elements);
+ ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) view.getViewFn();
+ Coder<?> keyCoder = ((KvCoder<?, ?>) view.getCoderInternal()).getKeyCoder();
+ return viewFn.apply(InMemoryMultimapSideInputView.fromIterable(keyCoder, (Iterable) elements));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryMultimapSideInputViewTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryMultimapSideInputViewTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryMultimapSideInputViewTest.java
new file mode 100644
index 0000000..6840355
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryMultimapSideInputViewTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.Materializations.MultimapView;
+import org.apache.beam.sdk.values.KV;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link InMemoryMultimapSideInputView}. */
+@RunWith(JUnit4.class)
+public class InMemoryMultimapSideInputViewTest {
+ @Test
+ public void testStructuralKeyEquality() {
+ MultimapView<byte[], Integer> view = InMemoryMultimapSideInputView.fromIterable(
+ ByteArrayCoder.of(),
+ ImmutableList.of(KV.of(new byte[]{ 0x00 }, 0), KV.of(new byte[]{ 0x01 }, 1)));
+ assertEquals(view.get(new byte[]{ 0x00 }), ImmutableList.of(0));
+ assertEquals(view.get(new byte[]{ 0x01 }), ImmutableList.of(1));
+ assertEquals(view.get(new byte[]{ 0x02 }), ImmutableList.of());
+ }
+
+ @Test
+ public void testValueGrouping() {
+ MultimapView<String, String> view = InMemoryMultimapSideInputView.fromIterable(
+ StringUtf8Coder.of(),
+ ImmutableList.of(KV.of("A", "a1"), KV.of("A", "a2"), KV.of("B", "b1")));
+ assertEquals(view.get("A"), ImmutableList.of("a1", "a2"));
+ assertEquals(view.get("B"), ImmutableList.of("b1"));
+ assertEquals(view.get("C"), ImmutableList.of());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java
index f9e0aaf..7cbd1b0 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java
@@ -17,24 +17,28 @@
*/
package org.apache.beam.runners.core;
+import static org.apache.beam.sdk.testing.PCollectionViewTesting.materializeValuesFor;
import static org.hamcrest.Matchers.contains;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import com.google.common.collect.ImmutableList;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.testing.PCollectionViewTesting;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Duration;
import org.joda.time.Instant;
-import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -47,26 +51,19 @@ public class SideInputHandlerTest {
private static final long WINDOW_MSECS_1 = 100;
private static final long WINDOW_MSECS_2 = 500;
-
- private WindowingStrategy<Object, IntervalWindow> windowingStrategy1 =
- WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_1)));
-
- private PCollectionView<Iterable<String>> view1 =
- PCollectionViewTesting.testingView(
- new TupleTag<Iterable<WindowedValue<String>>>() {},
- new PCollectionViewTesting.IdentityViewFn<String>(),
- StringUtf8Coder.of(),
- windowingStrategy1);
-
- private WindowingStrategy<Object, IntervalWindow> windowingStrategy2 =
- WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_2)));
-
- private PCollectionView<Iterable<String>> view2 =
- PCollectionViewTesting.testingView(
- new TupleTag<Iterable<WindowedValue<String>>>() {},
- new PCollectionViewTesting.IdentityViewFn<String>(),
- StringUtf8Coder.of(),
- windowingStrategy2);
+ private PCollectionView<Iterable<String>> view1;
+ private PCollectionView<Iterable<String>> view2;
+
+ @Before
+ public void setUp() {
+ PCollection<String> pc = Pipeline.create().apply(Create.of("1"));
+ view1 = pc
+ .apply(Window.<String>into(FixedWindows.of(new Duration(WINDOW_MSECS_1))))
+ .apply(View.<String>asIterable());
+ view2 = pc
+ .apply(Window.<String>into(FixedWindows.of(new Duration(WINDOW_MSECS_2))))
+ .apply(View.<String>asIterable());
+ }
@Test
public void testIsEmpty() {
@@ -113,7 +110,9 @@ public class SideInputHandlerTest {
// add a value for view1
sideInputHandler.addSideInputValue(
view1,
- valuesInWindow(ImmutableList.of("Hello"), new Instant(0), firstWindow));
+ valuesInWindow(
+ materializeValuesFor(View.asIterable(), "Hello"),
+ new Instant(0), firstWindow));
// now side input should be ready
assertTrue(sideInputHandler.isReady(view1, firstWindow));
@@ -139,16 +138,20 @@ public class SideInputHandlerTest {
// add a first value for view1
sideInputHandler.addSideInputValue(
view1,
- valuesInWindow(ImmutableList.of("Hello"), new Instant(0), window));
+ valuesInWindow(
+ materializeValuesFor(View.asIterable(), "Hello"),
+ new Instant(0), window));
- Assert.assertThat(sideInputHandler.get(view1, window), contains("Hello"));
+ assertThat(sideInputHandler.get(view1, window), contains("Hello"));
// subsequent values should replace existing values
sideInputHandler.addSideInputValue(
view1,
- valuesInWindow(ImmutableList.of("Ciao", "Buongiorno"), new Instant(0), window));
+ valuesInWindow(
+ materializeValuesFor(View.asIterable(), "Ciao", "Buongiorno"),
+ new Instant(0), window));
- Assert.assertThat(sideInputHandler.get(view1, window), contains("Ciao", "Buongiorno"));
+ assertThat(sideInputHandler.get(view1, window), contains("Ciao", "Buongiorno"));
}
@Test
@@ -166,19 +169,21 @@ public class SideInputHandlerTest {
// add a first value for view1 in the first window
sideInputHandler.addSideInputValue(
view1,
- valuesInWindow(ImmutableList.of("Hello"), new Instant(0), firstWindow));
+ valuesInWindow(materializeValuesFor(View.asIterable(), "Hello"),
+ new Instant(0), firstWindow));
- Assert.assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
+ assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
// add something for second window of view1
sideInputHandler.addSideInputValue(
view1,
- valuesInWindow(ImmutableList.of("Arrivederci"), new Instant(0), secondWindow));
+ valuesInWindow(materializeValuesFor(View.asIterable(), "Arrivederci"),
+ new Instant(0), secondWindow));
- Assert.assertThat(sideInputHandler.get(view1, secondWindow), contains("Arrivederci"));
+ assertThat(sideInputHandler.get(view1, secondWindow), contains("Arrivederci"));
// contents for first window should be unaffected
- Assert.assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
+ assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
}
@Test
@@ -194,9 +199,10 @@ public class SideInputHandlerTest {
// add value for view1 in the first window
sideInputHandler.addSideInputValue(
view1,
- valuesInWindow(ImmutableList.of("Hello"), new Instant(0), firstWindow));
+ valuesInWindow(materializeValuesFor(View.asIterable(), "Hello"),
+ new Instant(0), firstWindow));
- Assert.assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
+ assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
// view2 should not have any data
assertFalse(sideInputHandler.isReady(view2, firstWindow));
@@ -204,18 +210,19 @@ public class SideInputHandlerTest {
// also add some data for view2
sideInputHandler.addSideInputValue(
view2,
- valuesInWindow(ImmutableList.of("Salut"), new Instant(0), firstWindow));
+ valuesInWindow(materializeValuesFor(View.asIterable(), "Salut"),
+ new Instant(0), firstWindow));
assertTrue(sideInputHandler.isReady(view2, firstWindow));
- Assert.assertThat(sideInputHandler.get(view2, firstWindow), contains("Salut"));
+ assertThat(sideInputHandler.get(view2, firstWindow), contains("Salut"));
// view1 should not be affected by that
- Assert.assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
+ assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello"));
}
@SuppressWarnings({"unchecked", "rawtypes"})
private WindowedValue<Iterable<?>> valuesInWindow(
- Iterable<?> values, Instant timestamp, BoundedWindow window) {
+ List<Object> values, Instant timestamp, BoundedWindow window) {
return (WindowedValue) WindowedValue.of(values, timestamp, window, PaneInfo.NO_FIRING);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
index 43da92f..ea8f168 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.direct;
import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
import com.google.common.base.Optional;
import com.google.common.cache.CacheBuilder;
@@ -35,11 +36,18 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
+import org.apache.beam.runners.core.InMemoryMultimapSideInputView;
import org.apache.beam.runners.core.ReadyCheckingSideInputReader;
import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.Materializations.MultimapView;
+import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.WindowingStrategy;
@@ -60,6 +68,16 @@ class SideInputContainer {
*/
public static SideInputContainer create(
final EvaluationContext context, Collection<PCollectionView<?>> containedViews) {
+ for (PCollectionView<?> pCollectionView : containedViews) {
+ checkArgument(
+ Materializations.MULTIMAP_MATERIALIZATION_URN.equals(
+ pCollectionView.getViewFn().getMaterialization().getUrn()),
+ "This handler is only capable of dealing with %s materializations "
+ + "but was asked to handle %s for PCollectionView with tag %s.",
+ Materializations.MULTIMAP_MATERIALIZATION_URN,
+ pCollectionView.getViewFn().getMaterialization().getUrn(),
+ pCollectionView.getTagInternal().getId());
+ }
LoadingCache<PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>>
viewByWindows = CacheBuilder.newBuilder().build(new CallbackSchedulingLoader(context));
return new SideInputContainer(containedViews, viewByWindows);
@@ -239,11 +257,21 @@ class SideInputContainer {
"calling get() on PCollectionView %s that is not ready in window %s",
view,
window);
- // Safe covariant cast
- @SuppressWarnings("unchecked") Iterable<WindowedValue<?>> values =
- (Iterable<WindowedValue<?>>) viewContents.getUnchecked(PCollectionViewWindow.of(view,
- window)).get();
- return view.getViewFn().apply(values);
+ // Safe covariant cast since we know that the view only contains KVs.
+ @SuppressWarnings("unchecked") Iterable<KV<?, ?>> elements = Iterables.transform(
+ (Iterable<WindowedValue<KV<?, ?>>>) viewContents.getUnchecked(
+ PCollectionViewWindow.of(view, window)).get(),
+ new Function<WindowedValue<KV<?, ?>>, KV<?, ?>>() {
+ @Override
+ public KV<?, ?> apply(WindowedValue<KV<?, ?>> windowedValue) {
+ return windowedValue.getValue();
+ }
+ });
+
+ ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) view.getViewFn();
+ Coder<?> keyCoder = ((KvCoder<?, ?>) view.getCoderInternal()).getKeyCoder();
+ return viewFn.apply(
+ InMemoryMultimapSideInputView.fromIterable(keyCoder, (Iterable) elements));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index cc9ce60..0a1ffe7 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.direct;
+import static org.apache.beam.sdk.testing.PCollectionViewTesting.materializeValuesFor;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.emptyIterable;
@@ -28,7 +29,6 @@ import static org.junit.Assert.assertThat;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import java.util.Collection;
-import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.core.SideInputReader;
@@ -41,8 +41,10 @@ import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -126,33 +128,47 @@ public class EvaluationContextTest {
@Test
public void writeToViewWriterThenReadReads() {
- PCollectionViewWriter<Integer, Iterable<Integer>> viewWriter =
+ PCollectionViewWriter<?, Iterable<Integer>> viewWriter =
context.createPCollectionViewWriter(
PCollection.createPrimitiveOutputInternal(
p,
WindowingStrategy.globalDefault(),
IsBounded.BOUNDED,
- IterableCoder.of(VarIntCoder.of())),
+ IterableCoder.of(KvCoder.of(VoidCoder.of(), VarIntCoder.of()))),
view);
BoundedWindow window = new TestBoundedWindow(new Instant(1024L));
BoundedWindow second = new TestBoundedWindow(new Instant(899999L));
- WindowedValue<Integer> firstValue =
- WindowedValue.of(1, new Instant(1222), window, PaneInfo.ON_TIME_AND_ONLY_FIRING);
- WindowedValue<Integer> secondValue =
- WindowedValue.of(
- 2, new Instant(8766L), second, PaneInfo.createPane(true, false, Timing.ON_TIME, 0, 0));
- Iterable<WindowedValue<Integer>> values = ImmutableList.of(firstValue, secondValue);
- viewWriter.add(values);
+ ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder();
+ for (Object materializedValue : materializeValuesFor(View.asIterable(), 1)) {
+ valuesBuilder.add(WindowedValue.of(
+ materializedValue,
+ new Instant(1222),
+ window,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING));
+ }
+ for (Object materializedValue : materializeValuesFor(View.asIterable(), 2)) {
+ valuesBuilder.add(WindowedValue.of(
+ materializedValue,
+ new Instant(8766L),
+ second,
+ PaneInfo.createPane(true, false, Timing.ON_TIME, 0, 0)));
+ }
+ viewWriter.add((Iterable) valuesBuilder.build());
SideInputReader reader =
context.createSideInputReader(ImmutableList.<PCollectionView<?>>of(view));
assertThat(reader.get(view, window), containsInAnyOrder(1));
assertThat(reader.get(view, second), containsInAnyOrder(2));
- WindowedValue<Integer> overrittenSecondValue =
- WindowedValue.of(
- 4444, new Instant(8677L), second, PaneInfo.createPane(false, true, Timing.LATE, 1, 1));
- viewWriter.add(Collections.singleton(overrittenSecondValue));
+ ImmutableList.Builder<WindowedValue<?>> overwrittenValuesBuilder = ImmutableList.builder();
+ for (Object materializedValue : materializeValuesFor(View.asIterable(), 4444)) {
+ overwrittenValuesBuilder.add(WindowedValue.of(
+ materializedValue,
+ new Instant(8677L),
+ second,
+ PaneInfo.createPane(false, true, Timing.LATE, 1, 1)));
+ }
+ viewWriter.add((Iterable) overwrittenValuesBuilder.build());
assertThat(reader.get(view, second), containsInAnyOrder(2));
// The cached value is served in the earlier reader
reader = context.createSideInputReader(ImmutableList.<PCollectionView<?>>of(view));
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
index 5e7c799..91255e0 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.direct;
+import static org.apache.beam.sdk.testing.PCollectionViewTesting.materializeValuesFor;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
@@ -34,8 +35,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.core.ReadyCheckingSideInputReader;
import org.apache.beam.runners.core.SideInputReader;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Mean;
@@ -49,9 +48,7 @@ import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.TypeDescriptor;
-import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Rule;
@@ -134,13 +131,22 @@ public class SideInputContainerTest {
@Test
public void getAfterWriteReturnsPaneInWindow() throws Exception {
- WindowedValue<KV<String, Integer>> one =
- WindowedValue.of(
- KV.of("one", 1), new Instant(1L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING);
- WindowedValue<KV<String, Integer>> two =
- WindowedValue.of(
- KV.of("two", 2), new Instant(20L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING);
- container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two));
+ ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder();
+ for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("one", 1))) {
+ valuesBuilder.add(WindowedValue.of(
+ materializedValue,
+ new Instant(1L),
+ FIRST_WINDOW,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING));
+ }
+ for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("two", 2))) {
+ valuesBuilder.add(WindowedValue.of(
+ materializedValue,
+ new Instant(20L),
+ FIRST_WINDOW,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING));
+ }
+ container.write(mapView, valuesBuilder.build());
Map<String, Integer> viewContents =
container
@@ -153,19 +159,22 @@ public class SideInputContainerTest {
@Test
public void getReturnsLatestPaneInWindow() throws Exception {
- WindowedValue<KV<String, Integer>> one =
- WindowedValue.of(
- KV.of("one", 1),
- new Instant(1L),
- SECOND_WINDOW,
- PaneInfo.createPane(true, false, Timing.EARLY));
- WindowedValue<KV<String, Integer>> two =
- WindowedValue.of(
- KV.of("two", 2),
- new Instant(20L),
- SECOND_WINDOW,
- PaneInfo.createPane(true, false, Timing.EARLY));
- container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two));
+ ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder();
+ for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("one", 1))) {
+ valuesBuilder.add(WindowedValue.of(
+ materializedValue,
+ new Instant(1L),
+ SECOND_WINDOW,
+ PaneInfo.createPane(true, false, Timing.EARLY)));
+ }
+ for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("two", 2))) {
+ valuesBuilder.add(WindowedValue.of(
+ materializedValue,
+ new Instant(20L),
+ SECOND_WINDOW,
+ PaneInfo.createPane(true, false, Timing.EARLY)));
+ }
+ container.write(mapView, valuesBuilder.build());
Map<String, Integer> viewContents =
container
@@ -175,13 +184,15 @@ public class SideInputContainerTest {
assertThat(viewContents, hasEntry("two", 2));
assertThat(viewContents.size(), is(2));
- WindowedValue<KV<String, Integer>> three =
- WindowedValue.of(
- KV.of("three", 3),
- new Instant(300L),
- SECOND_WINDOW,
- PaneInfo.createPane(false, false, Timing.EARLY, 1, -1));
- container.write(mapView, ImmutableList.<WindowedValue<?>>of(three));
+ ImmutableList.Builder<WindowedValue<?>> overwriteValuesBuilder = ImmutableList.builder();
+ for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("three", 3))) {
+ overwriteValuesBuilder.add(WindowedValue.of(
+ materializedValue,
+ new Instant(300L),
+ SECOND_WINDOW,
+ PaneInfo.createPane(false, false, Timing.EARLY, 1, -1)));
+ }
+ container.write(mapView, overwriteValuesBuilder.build());
Map<String, Integer> overwrittenViewContents =
container
@@ -209,10 +220,7 @@ public class SideInputContainerTest {
PCollection<KV<String, String>> input =
pipeline.apply(Create.empty(new TypeDescriptor<KV<String, String>>() {}));
PCollectionView<Map<String, Iterable<String>>> newView =
- PCollectionViews.multimapView(
- input,
- WindowingStrategy.globalDefault(),
- KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
+ input.apply(View.<String, String>asMultimap());
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("unknown views");
@@ -232,19 +240,22 @@ public class SideInputContainerTest {
@Test
public void writeForMultipleElementsInDifferentWindowsSucceeds() throws Exception {
- WindowedValue<Double> firstWindowedValue =
- WindowedValue.of(
- 2.875,
- FIRST_WINDOW.maxTimestamp().minus(200L),
- FIRST_WINDOW,
- PaneInfo.ON_TIME_AND_ONLY_FIRING);
- WindowedValue<Double> secondWindowedValue =
- WindowedValue.of(
- 4.125,
- SECOND_WINDOW.maxTimestamp().minus(2_000_000L),
- SECOND_WINDOW,
- PaneInfo.ON_TIME_AND_ONLY_FIRING);
- container.write(singletonView, ImmutableList.of(firstWindowedValue, secondWindowedValue));
+ ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder();
+ for (Object materializedValue : materializeValuesFor(View.asSingleton(), 2.875)) {
+ valuesBuilder.add(WindowedValue.of(
+ materializedValue,
+ FIRST_WINDOW.maxTimestamp().minus(200L),
+ FIRST_WINDOW,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING));
+ }
+ for (Object materializedValue : materializeValuesFor(View.asSingleton(), 4.125)) {
+ valuesBuilder.add(WindowedValue.of(
+ materializedValue,
+ SECOND_WINDOW.maxTimestamp().minus(2_000_000L),
+ SECOND_WINDOW,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING));
+ }
+ container.write(singletonView, valuesBuilder.build());
assertThat(
container
.createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
@@ -259,20 +270,15 @@ public class SideInputContainerTest {
@Test
public void writeForMultipleIdenticalElementsInSameWindowSucceeds() throws Exception {
- WindowedValue<Integer> firstValue =
- WindowedValue.of(
- 44,
- FIRST_WINDOW.maxTimestamp().minus(200L),
- FIRST_WINDOW,
- PaneInfo.ON_TIME_AND_ONLY_FIRING);
- WindowedValue<Integer> secondValue =
- WindowedValue.of(
- 44,
- FIRST_WINDOW.maxTimestamp().minus(200L),
- FIRST_WINDOW,
- PaneInfo.ON_TIME_AND_ONLY_FIRING);
-
- container.write(iterableView, ImmutableList.of(firstValue, secondValue));
+ ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder();
+ for (Object materializedValue : materializeValuesFor(View.asIterable(), 44, 44)) {
+ valuesBuilder.add(WindowedValue.of(
+ materializedValue,
+ FIRST_WINDOW.maxTimestamp().minus(200L),
+ FIRST_WINDOW,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING));
+ }
+ container.write(iterableView, valuesBuilder.build());
assertThat(
container
@@ -283,13 +289,15 @@ public class SideInputContainerTest {
@Test
public void writeForElementInMultipleWindowsSucceeds() throws Exception {
- WindowedValue<Double> multiWindowedValue =
- WindowedValue.of(
- 2.875,
- FIRST_WINDOW.maxTimestamp().minus(200L),
- ImmutableList.of(FIRST_WINDOW, SECOND_WINDOW),
- PaneInfo.ON_TIME_AND_ONLY_FIRING);
- container.write(singletonView, ImmutableList.of(multiWindowedValue));
+ ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder();
+ for (Object materializedValue : materializeValuesFor(View.asSingleton(), 2.875)) {
+ valuesBuilder.add(WindowedValue.of(
+ materializedValue,
+ FIRST_WINDOW.maxTimestamp().minus(200L),
+ ImmutableList.of(FIRST_WINDOW, SECOND_WINDOW),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING));
+ }
+ container.write(singletonView, valuesBuilder.build());
assertThat(
container
.createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
@@ -304,19 +312,22 @@ public class SideInputContainerTest {
@Test
public void finishDoesNotOverwriteWrittenElements() throws Exception {
- WindowedValue<KV<String, Integer>> one =
- WindowedValue.of(
- KV.of("one", 1),
- new Instant(1L),
- SECOND_WINDOW,
- PaneInfo.createPane(true, false, Timing.EARLY));
- WindowedValue<KV<String, Integer>> two =
- WindowedValue.of(
- KV.of("two", 2),
- new Instant(20L),
- SECOND_WINDOW,
- PaneInfo.createPane(true, false, Timing.EARLY));
- container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two));
+ ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder();
+ for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("one", 1))) {
+ valuesBuilder.add(WindowedValue.of(
+ materializedValue,
+ new Instant(1L),
+ SECOND_WINDOW,
+ PaneInfo.createPane(true, false, Timing.EARLY)));
+ }
+ for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("two", 2))) {
+ valuesBuilder.add(WindowedValue.of(
+ materializedValue,
+ new Instant(20L),
+ SECOND_WINDOW,
+ PaneInfo.createPane(true, false, Timing.EARLY)));
+ }
+ container.write(mapView, valuesBuilder.build());
immediatelyInvokeCallback(mapView, SECOND_WINDOW);
@@ -362,14 +373,15 @@ public class SideInputContainerTest {
*/
@Test
public void isReadyForSomeNotReadyViewsFalseUntilElements() {
- container.write(
- mapView,
- ImmutableList.of(
- WindowedValue.of(
- KV.of("one", 1),
- SECOND_WINDOW.maxTimestamp().minus(100L),
- SECOND_WINDOW,
- PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+ ImmutableList.Builder<WindowedValue<?>> mapValuesBuilder = ImmutableList.builder();
+ for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("one", 1))) {
+ mapValuesBuilder.add(WindowedValue.of(
+ materializedValue,
+ SECOND_WINDOW.maxTimestamp().minus(100L),
+ SECOND_WINDOW,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING));
+ }
+ container.write(mapView, mapValuesBuilder.build());
ReadyCheckingSideInputReader reader =
container.createReaderForViews(ImmutableList.of(mapView, singletonView));
@@ -378,25 +390,27 @@ public class SideInputContainerTest {
assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(false));
- container.write(
- mapView,
- ImmutableList.of(
- WindowedValue.of(
- KV.of("too", 2),
- FIRST_WINDOW.maxTimestamp().minus(100L),
- FIRST_WINDOW,
- PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+ ImmutableList.Builder<WindowedValue<?>> newMapValuesBuilder = ImmutableList.builder();
+ for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("too", 2))) {
+ newMapValuesBuilder.add(WindowedValue.of(
+ materializedValue,
+ FIRST_WINDOW.maxTimestamp().minus(100L),
+ FIRST_WINDOW,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING));
+ }
+ container.write(mapView, newMapValuesBuilder.build());
// Cached value is false
assertThat(reader.isReady(mapView, FIRST_WINDOW), is(false));
- container.write(
- singletonView,
- ImmutableList.of(
- WindowedValue.of(
- 1.25,
- SECOND_WINDOW.maxTimestamp().minus(100L),
- SECOND_WINDOW,
- PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+ ImmutableList.Builder<WindowedValue<?>> singletonValuesBuilder = ImmutableList.builder();
+ for (Object materializedValue : materializeValuesFor(View.asSingleton(), 1.25)) {
+ singletonValuesBuilder.add(WindowedValue.of(
+ materializedValue,
+ SECOND_WINDOW.maxTimestamp().minus(100L),
+ SECOND_WINDOW,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING));
+ }
+ container.write(singletonView, singletonValuesBuilder.build());
assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true));
assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(false));
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
index 5bc48b7..3716ec8 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
@@ -32,11 +32,11 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.Values;
-import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
+import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionViews;
+import org.apache.beam.sdk.values.PCollectionView;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
@@ -56,10 +56,7 @@ public class ViewEvaluatorFactoryTest {
@Test
public void testInMemoryEvaluator() throws Exception {
PCollection<String> input = p.apply(Create.of("foo", "bar"));
- CreatePCollectionView<String, Iterable<String>> createView =
- CreatePCollectionView.of(
- PCollectionViews.iterableView(
- input, input.getWindowingStrategy(), StringUtf8Coder.of()));
+ PCollectionView<Iterable<String>> pCollectionView = input.apply(View.<String>asIterable());
PCollection<Iterable<String>> concat =
input.apply(WithKeys.<Void, String>of((Void) null))
.setCoder(KvCoder.of(VoidCoder.of(), StringUtf8Coder.of()))
@@ -67,11 +64,11 @@ public class ViewEvaluatorFactoryTest {
.apply(Values.<Iterable<String>>create());
PCollection<Iterable<String>> view =
concat.apply(
- new ViewOverrideFactory.WriteView<String, Iterable<String>>(createView.getView()));
+ new ViewOverrideFactory.WriteView<String, Iterable<String>>(pCollectionView));
EvaluationContext context = mock(EvaluationContext.class);
TestViewWriter<String, Iterable<String>> viewWriter = new TestViewWriter<>();
- when(context.createPCollectionViewWriter(concat, createView.getView())).thenReturn(viewWriter);
+ when(context.createPCollectionViewWriter(concat, pCollectionView)).thenReturn(viewWriter);
CommittedBundle<String> inputBundle = bundleFactory.createBundle(input).commit(Instant.now());
AppliedPTransform<?, ?, ?> producer = DirectGraphs.getProducer(view);
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
index 94d8d70..556cac5 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
@@ -34,13 +34,13 @@ import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews;
-import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.sdk.values.TupleTag;
import org.hamcrest.Matchers;
import org.junit.Rule;
import org.junit.Test;
@@ -59,8 +59,7 @@ public class ViewOverrideFactoryTest implements Serializable {
@Test
public void replacementGetViewReturnsOriginal() {
final PCollection<Integer> ints = p.apply("CreateContents", Create.of(1, 2, 3));
- final PCollectionView<List<Integer>> view =
- PCollectionViews.listView(ints, WindowingStrategy.globalDefault(), ints.getCoder());
+ final PCollectionView<List<Integer>> view = ints.apply(View.<Integer>asList());
PTransformReplacement<PCollection<Integer>, PCollection<Integer>> replacement =
factory.getReplacementTransform(
AppliedPTransform
@@ -89,7 +88,7 @@ public class ViewOverrideFactoryTest implements Serializable {
// so not asserted one way or the other
assertThat(
replacementView.getTagInternal(),
- equalTo(view.getTagInternal()));
+ equalTo((TupleTag) view.getTagInternal()));
assertThat(
replacementView.getViewFn(),
Matchers.<ViewFn<?, ?>>equalTo(view.getViewFn()));
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index 79a23cc..cffcc5a 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -38,7 +38,6 @@ import java.util.List;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.beam.runners.direct.WriteWithShardingFactory.CalculateShardsFn;
-import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.DynamicFileDestinations;
import org.apache.beam.sdk.io.FileBasedSink;
@@ -56,15 +55,14 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -217,9 +215,8 @@ public class WriteWithShardingFactoryTest implements Serializable {
public void keyBasedOnCountFnFewElementsExtraShards() throws Exception {
long countValue = (long) WriteWithShardingFactory.MIN_SHARDS_FOR_LOG + 3;
PCollection<Long> inputCount = p.apply(Create.of(countValue));
- PCollectionView<Long> elementCountView =
- PCollectionViews.singletonView(
- inputCount, WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of());
+ PCollectionView<Long> elementCountView = inputCount.apply(
+ View.<Long>asSingleton().withDefaultValue(countValue));
CalculateShardsFn fn = new CalculateShardsFn(3);
DoFnTester<Long, Integer> fnTester = DoFnTester.of(fn);
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index cec01f8..aa5cc39 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -307,7 +307,6 @@ class FlinkStreamingTransformTranslators {
intToViewMapping.put(count, sideInput);
tagToIntMapping.put(tag, count);
count++;
- Coder<Iterable<WindowedValue<?>>> coder = sideInput.getCoderInternal();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
index f275290..fb3f375 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.flink.translation.functions;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Collections;
@@ -24,8 +25,10 @@ import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.Materializations.MultimapView;
+import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
@@ -35,6 +38,13 @@ import org.apache.flink.api.common.functions.RuntimeContext;
* A {@link SideInputReader} for the Flink Batch Runner.
*/
public class FlinkSideInputReader implements SideInputReader {
+ /** A {@link MultimapView} which always returns an empty iterable. */
+ private static final MultimapView EMPTY_MULTMAP_VIEW = new MultimapView() {
+ @Override
+ public Iterable get(Object o) {
+ return Collections.EMPTY_LIST;
+ }
+ };
private final Map<TupleTag<?>, WindowingStrategy<?, ?>> sideInputs;
@@ -42,6 +52,16 @@ public class FlinkSideInputReader implements SideInputReader {
public FlinkSideInputReader(Map<PCollectionView<?>, WindowingStrategy<?, ?>> indexByView,
RuntimeContext runtimeContext) {
+ for (PCollectionView<?> view : indexByView.keySet()) {
+ checkArgument(
+ Materializations.MULTIMAP_MATERIALIZATION_URN.equals(
+ view.getViewFn().getMaterialization().getUrn()),
+ "This handler is only capable of dealing with %s materializations "
+ + "but was asked to handle %s for PCollectionView with tag %s.",
+ Materializations.MULTIMAP_MATERIALIZATION_URN,
+ view.getViewFn().getMaterialization().getUrn(),
+ view.getTagInternal().getId());
+ }
sideInputs = new HashMap<>();
for (Map.Entry<PCollectionView<?>, WindowingStrategy<?, ?>> entry : indexByView.entrySet()) {
sideInputs.put(entry.getKey().getTagInternal(), entry.getValue());
@@ -53,7 +73,7 @@ public class FlinkSideInputReader implements SideInputReader {
@Override
public <T> T get(PCollectionView<T> view, BoundedWindow window) {
checkNotNull(view, "View passed to sideInput cannot be null");
- TupleTag<Iterable<WindowedValue<?>>> tag = view.getTagInternal();
+ TupleTag<?> tag = view.getTagInternal();
checkNotNull(
sideInputs.get(tag),
"Side input for " + view + " not available.");
@@ -63,7 +83,8 @@ public class FlinkSideInputReader implements SideInputReader {
tag.getId(), new SideInputInitializer<>(view));
T result = sideInputs.get(window);
if (result == null) {
- result = view.getViewFn().apply(Collections.<WindowedValue<?>>emptyList());
+ ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) view.getViewFn();
+ result = viewFn.apply(EMPTY_MULTMAP_VIEW);
}
return result;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java
index 12222b4..782f72a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java
@@ -17,12 +17,23 @@
*/
package org.apache.beam.runners.flink.translation.functions;
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.beam.runners.core.InMemoryMultimapSideInputView;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.Materializations.MultimapView;
+import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
@@ -30,24 +41,33 @@ import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
* {@link BroadcastVariableInitializer} that initializes the broadcast input as a {@code Map}
* from window to side input.
*/
-public class SideInputInitializer<ElemT, ViewT, W extends BoundedWindow>
- implements BroadcastVariableInitializer<WindowedValue<ElemT>, Map<BoundedWindow, ViewT>> {
+public class SideInputInitializer<ViewT>
+ implements BroadcastVariableInitializer<WindowedValue<?>, Map<BoundedWindow, ViewT>> {
PCollectionView<ViewT> view;
public SideInputInitializer(PCollectionView<ViewT> view) {
+ checkArgument(
+ Materializations.MULTIMAP_MATERIALIZATION_URN.equals(
+ view.getViewFn().getMaterialization().getUrn()),
+ "This handler is only capable of dealing with %s materializations "
+ + "but was asked to handle %s for PCollectionView with tag %s.",
+ Materializations.MULTIMAP_MATERIALIZATION_URN,
+ view.getViewFn().getMaterialization().getUrn(),
+ view.getTagInternal().getId());
this.view = view;
}
@Override
public Map<BoundedWindow, ViewT> initializeBroadcastVariable(
- Iterable<WindowedValue<ElemT>> inputValues) {
+ Iterable<WindowedValue<?>> inputValues) {
// first partition into windows
- Map<BoundedWindow, List<WindowedValue<ElemT>>> partitionedElements = new HashMap<>();
- for (WindowedValue<ElemT> value: inputValues) {
+ Map<BoundedWindow, List<WindowedValue<KV<?, ?>>>> partitionedElements = new HashMap<>();
+ for (WindowedValue<KV<?, ?>> value
+ : (Iterable<WindowedValue<KV<?, ?>>>) (Iterable) inputValues) {
for (BoundedWindow window: value.getWindows()) {
- List<WindowedValue<ElemT>> windowedValues = partitionedElements.get(window);
+ List<WindowedValue<KV<?, ?>>> windowedValues = partitionedElements.get(window);
if (windowedValues == null) {
windowedValues = new ArrayList<>();
partitionedElements.put(window, windowedValues);
@@ -58,14 +78,20 @@ public class SideInputInitializer<ElemT, ViewT, W extends BoundedWindow>
Map<BoundedWindow, ViewT> resultMap = new HashMap<>();
- for (Map.Entry<BoundedWindow, List<WindowedValue<ElemT>>> elements:
+ for (Map.Entry<BoundedWindow, List<WindowedValue<KV<?, ?>>>> elements:
partitionedElements.entrySet()) {
- @SuppressWarnings("unchecked")
- Iterable<WindowedValue<?>> elementsIterable =
- (List<WindowedValue<?>>) (List<?>) elements.getValue();
-
- resultMap.put(elements.getKey(), view.getViewFn().apply(elementsIterable));
+ ViewFn<MultimapView, ViewT> viewFn = (ViewFn<MultimapView, ViewT>) view.getViewFn();
+ Coder keyCoder = ((KvCoder<?, ?>) view.getCoderInternal()).getKeyCoder();
+ resultMap.put(elements.getKey(), viewFn.apply(InMemoryMultimapSideInputView.fromIterable(
+ keyCoder,
+ (Iterable) Iterables.transform(elements.getValue(),
+ new Function<WindowedValue<KV<?, ?>>, KV<?, ?>>() {
+ @Override
+ public KV<?, ?> apply(WindowedValue<KV<?, ?>> windowedValue) {
+ return windowedValue.getValue();
+ }
+ }))));
}
return resultMap;
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
index ad17de8..33ac024f 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -35,6 +35,7 @@ import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
+import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -47,16 +48,19 @@ import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
-import org.apache.beam.sdk.testing.PCollectionViewTesting;
+import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
@@ -71,6 +75,7 @@ import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
import org.apache.flink.util.OutputTag;
import org.joda.time.Duration;
import org.joda.time.Instant;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -84,26 +89,19 @@ public class DoFnOperatorTest {
// views and windows for testing side inputs
private static final long WINDOW_MSECS_1 = 100;
private static final long WINDOW_MSECS_2 = 500;
-
- private WindowingStrategy<Object, IntervalWindow> windowingStrategy1 =
- WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_1)));
-
- private PCollectionView<Iterable<String>> view1 =
- PCollectionViewTesting.testingView(
- new TupleTag<Iterable<WindowedValue<String>>>() {},
- new PCollectionViewTesting.IdentityViewFn<String>(),
- StringUtf8Coder.of(),
- windowingStrategy1);
-
- private WindowingStrategy<Object, IntervalWindow> windowingStrategy2 =
- WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_2)));
-
- private PCollectionView<Iterable<String>> view2 =
- PCollectionViewTesting.testingView(
- new TupleTag<Iterable<WindowedValue<String>>>() {},
- new PCollectionViewTesting.IdentityViewFn<String>(),
- StringUtf8Coder.of(),
- windowingStrategy2);
+ private PCollectionView<Iterable<String>> view1;
+ private PCollectionView<Iterable<String>> view2;
+
+ @Before
+ public void setUp() {
+ PCollection<String> pc = Pipeline.create().apply(Create.of("1"));
+ view1 = pc
+ .apply(Window.<String>into(FixedWindows.of(new Duration(WINDOW_MSECS_1))))
+ .apply(View.<String>asIterable());
+ view2 = pc
+ .apply(Window.<String>into(FixedWindows.of(new Duration(WINDOW_MSECS_2))))
+ .apply(View.<String>asIterable());
+ }
@Test
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 81e7a97..cc43e27 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -991,15 +991,15 @@ public class DataflowPipelineTranslatorTest implements Serializable {
assertAllStepOutputsHaveUniqueIds(job);
List<Step> steps = job.getSteps();
- assertEquals(9, steps.size());
+ assertEquals(10, steps.size());
@SuppressWarnings("unchecked")
List<Map<String, Object>> toIsmRecordOutputs =
- (List<Map<String, Object>>) steps.get(7).getProperties().get(PropertyNames.OUTPUT_INFO);
+ (List<Map<String, Object>>) steps.get(8).getProperties().get(PropertyNames.OUTPUT_INFO);
assertTrue(
Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format"));
- Step collectionToSingletonStep = steps.get(8);
+ Step collectionToSingletonStep = steps.get(9);
assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
}
@@ -1023,16 +1023,16 @@ public class DataflowPipelineTranslatorTest implements Serializable {
assertAllStepOutputsHaveUniqueIds(job);
List<Step> steps = job.getSteps();
- assertEquals(3, steps.size());
+ assertEquals(4, steps.size());
@SuppressWarnings("unchecked")
List<Map<String, Object>> toIsmRecordOutputs =
- (List<Map<String, Object>>) steps.get(1).getProperties().get(PropertyNames.OUTPUT_INFO);
+ (List<Map<String, Object>>) steps.get(2).getProperties().get(PropertyNames.OUTPUT_INFO);
assertTrue(
Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format"));
- Step collectionToSingletonStep = steps.get(2);
+ Step collectionToSingletonStep = steps.get(3);
assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 7cb8628..68e3e3c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -40,6 +40,7 @@ import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
import org.apache.beam.runners.spark.util.SideInputBroadcast;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.transforms.Combine;
@@ -527,7 +528,11 @@ public final class TransformTranslator {
Iterable<? extends WindowedValue<?>> iter =
context.getWindowedValues(context.getInput(transform));
PCollectionView<WriteT> output = transform.getView();
- Coder<Iterable<WindowedValue<?>>> coderInternal = output.getCoderInternal();
+ Coder<Iterable<WindowedValue<?>>> coderInternal =
+ (Coder) IterableCoder.of(
+ WindowedValue.getFullCoder(
+ output.getCoderInternal(),
+ output.getWindowingStrategyInternal().getWindowFn().windowCoder()));
@SuppressWarnings("unchecked")
Iterable<WindowedValue<?>> iterCast = (Iterable<WindowedValue<?>>) iter;