You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/11/17 20:31:25 UTC
[34/50] [abbrv] beam git commit: [BEAM-2926] Migrate to using a
trivial multimap materialization within the Java SDK.
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
index 6c91088..932ccd6 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java
@@ -20,11 +20,17 @@ package org.apache.beam.runners.spark.util;
import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import java.util.Map;
import javax.annotation.Nullable;
+import org.apache.beam.runners.core.InMemoryMultimapSideInputView;
import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.Materializations.MultimapView;
+import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
@@ -34,7 +40,7 @@ import org.apache.beam.sdk.values.WindowingStrategy;
/**
- * A {@link SideInputReader} for thw SparkRunner.
+ * A {@link SideInputReader} for the SparkRunner.
*/
public class SparkSideInputReader implements SideInputReader {
private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs;
@@ -60,26 +66,30 @@ public class SparkSideInputReader implements SideInputReader {
//--- match the appropriate sideInput window.
// a tag will point to all matching sideInputs, that is all windows.
// now that we've obtained the appropriate sideInputWindow, all that's left is to filter by it.
- Iterable<WindowedValue<?>> availableSideInputs =
- (Iterable<WindowedValue<?>>) windowedBroadcastHelper.getValue().getValue();
- Iterable<WindowedValue<?>> sideInputForWindow =
- Iterables.filter(availableSideInputs, new Predicate<WindowedValue<?>>() {
- @Override
- public boolean apply(@Nullable WindowedValue<?> sideInputCandidate) {
- if (sideInputCandidate == null) {
- return false;
- }
- // first match of a sideInputWindow to the elementWindow is good enough.
- for (BoundedWindow sideInputCandidateWindow: sideInputCandidate.getWindows()) {
- if (sideInputCandidateWindow.equals(sideInputWindow)) {
- return true;
+ Iterable<WindowedValue<KV<?, ?>>> availableSideInputs =
+ (Iterable<WindowedValue<KV<?, ?>>>) windowedBroadcastHelper.getValue().getValue();
+ Iterable<KV<?, ?>> sideInputForWindow =
+ Iterables.transform(
+ Iterables.filter(availableSideInputs, new Predicate<WindowedValue<?>>() {
+ @Override
+ public boolean apply(@Nullable WindowedValue<?> sideInputCandidate) {
+ if (sideInputCandidate == null) {
+ return false;
+ }
+ return Iterables.contains(sideInputCandidate.getWindows(), sideInputWindow);
}
- }
- // no match found.
- return false;
- }
- });
- return view.getViewFn().apply(sideInputForWindow);
+ }),
+ new Function<WindowedValue<KV<?, ?>>, KV<?, ?>>() {
+ @Override
+ public KV<?, ?> apply(WindowedValue<KV<?, ?>> windowedValue) {
+ return windowedValue.getValue();
+ }
+ });
+
+ ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) view.getViewFn();
+ Coder keyCoder = ((KvCoder<?, ?>) view.getCoderInternal()).getKeyCoder();
+ return viewFn.apply(
+ InMemoryMultimapSideInputView.fromIterable(keyCoder, (Iterable) sideInputForWindow));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 3c5b55b..f86e9cc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -49,6 +49,7 @@ import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.CombineWithContext.Context;
import org.apache.beam.sdk.transforms.CombineWithContext.RequiresContextInternal;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
+import org.apache.beam.sdk.transforms.View.VoidKeyToMultimapMaterialization;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
@@ -1274,14 +1275,16 @@ public class Combine {
public PCollectionView<OutputT> expand(PCollection<InputT> input) {
PCollection<OutputT> combined =
input.apply(Combine.<InputT, OutputT>globally(fn).withoutDefaults().withFanout(fanout));
- PCollectionView<OutputT> view =
- PCollectionViews.singletonView(
- combined,
+ PCollection<KV<Void, OutputT>> materializationInput =
+ combined.apply(new VoidKeyToMultimapMaterialization<OutputT>());
+ PCollectionView<OutputT> view = PCollectionViews.singletonView(
+ materializationInput,
input.getWindowingStrategy(),
insertDefault,
insertDefault ? fn.defaultValue() : null,
- combined.getCoder());
- combined.apply(CreatePCollectionView.<OutputT, OutputT>of(view));
+ combined.getCoder());
+ materializationInput.apply(
+ CreatePCollectionView.<KV<Void, OutputT>, OutputT>of(view));
return view;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 6168710..d71f0fd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -47,7 +47,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
@@ -602,7 +601,24 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
return windowValue;
}
}
- return view.getViewFn().apply(Collections.<WindowedValue<?>>emptyList());
+ // Fallback to returning the default materialization if no data was supplied.
+ // This is really to support singleton views with default values.
+
+ // TODO: Update this to supply a materialization dependent on actual URN of materialization.
+ // Currently the SDK only supports the multimap materialization and it expects a
+ // mapping function.
+ checkState(Materializations.MULTIMAP_MATERIALIZATION_URN.equals(
+ view.getViewFn().getMaterialization().getUrn()),
+ "Only materializations of type %s supported, received %s",
+ Materializations.MULTIMAP_MATERIALIZATION_URN,
+ view.getViewFn().getMaterialization().getUrn());
+ return ((ViewFn<Materializations.MultimapView, T>) view.getViewFn()).apply(
+ new Materializations.MultimapView<Object, Object>() {
+ @Override
+ public Iterable<Object> get(Object o) {
+ return Collections.emptyList();
+ }
+ });
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java
index 6e4f83d..e606919 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java
@@ -21,7 +21,6 @@ package org.apache.beam.sdk.transforms;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.util.WindowedValue;
/**
* <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
@@ -32,29 +31,37 @@ import org.apache.beam.sdk.util.WindowedValue;
@Internal
public class Materializations {
/**
- * The URN for a {@link Materialization} where the primitive view type is an iterable of fully
+ * The URN for a {@link Materialization} where the primitive view type is an multimap of fully
* specified windowed values.
*/
@Experimental(Kind.CORE_RUNNERS_ONLY)
- public static final String ITERABLE_MATERIALIZATION_URN =
- "urn:beam:sideinput:materialization:iterable:0.1";
+ public static final String MULTIMAP_MATERIALIZATION_URN =
+ "urn:beam:sideinput:materialization:multimap:0.1";
+
+ /**
+ * Represents the {@code PrimitiveViewT} supplied to the {@link ViewFn} when it declares to
+ * use the {@link Materializations#MULTIMAP_MATERIALIZATION_URN multimap materialization}.
+ */
+ public interface MultimapView<K, V> {
+ Iterable<V> get(K k);
+ }
/**
* <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
*
- * <p>A {@link Materialization} where the primitive view type is an iterable of fully specified
- * windowed values.
+ * <p>A {@link Materialization} where the primitive view type is a multimap with fully
+ * specified windowed keys.
*/
@Internal
- public static <T> Materialization<Iterable<WindowedValue<T>>> iterable() {
- return new IterableMaterialization<>();
+ public static <K, V> Materialization<MultimapView<K, V>> multimap() {
+ return new MultimapMaterialization<>();
}
- private static class IterableMaterialization<T>
- implements Materialization<Iterable<WindowedValue<T>>> {
+ private static class MultimapMaterialization<K, V>
+ implements Materialization<MultimapView<K, V>> {
@Override
public String getUrn() {
- return ITERABLE_MATERIALIZATION_URN;
+ return MULTIMAP_MATERIALIZATION_URN;
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index eaa7925..ec8233e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -24,6 +24,8 @@ import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.KV;
@@ -258,9 +260,13 @@ public class View {
throw new IllegalStateException("Unable to create a side-input view from input", e);
}
- PCollectionView<List<T>> view =
- PCollectionViews.listView(input, input.getWindowingStrategy(), input.getCoder());
- input.apply(CreatePCollectionView.<T, List<T>>of(view));
+ PCollection<KV<Void, T>> materializationInput =
+ input.apply(new VoidKeyToMultimapMaterialization<T>());
+ PCollectionView<List<T>> view = PCollectionViews.listView(
+ materializationInput,
+ materializationInput.getWindowingStrategy());
+ materializationInput.apply(
+ CreatePCollectionView.<KV<Void, T>, List<T>>of(view));
return view;
}
}
@@ -285,9 +291,13 @@ public class View {
throw new IllegalStateException("Unable to create a side-input view from input", e);
}
- PCollectionView<Iterable<T>> view =
- PCollectionViews.iterableView(input, input.getWindowingStrategy(), input.getCoder());
- input.apply(CreatePCollectionView.<T, Iterable<T>>of(view));
+ PCollection<KV<Void, T>> materializationInput =
+ input.apply(new VoidKeyToMultimapMaterialization<T>());
+ PCollectionView<Iterable<T>> view = PCollectionViews.iterableView(
+ materializationInput,
+ materializationInput.getWindowingStrategy());
+ materializationInput.apply(
+ CreatePCollectionView.<KV<Void, T>, Iterable<T>>of(view));
return view;
}
}
@@ -428,9 +438,13 @@ public class View {
throw new IllegalStateException("Unable to create a side-input view from input", e);
}
- PCollectionView<Map<K, Iterable<V>>> view =
- PCollectionViews.multimapView(input, input.getWindowingStrategy(), input.getCoder());
- input.apply(CreatePCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
+ PCollection<KV<Void, KV<K, V>>> materializationInput =
+ input.apply(new VoidKeyToMultimapMaterialization<KV<K, V>>());
+ PCollectionView<Map<K, Iterable<V>>> view = PCollectionViews.multimapView(
+ materializationInput,
+ materializationInput.getWindowingStrategy());
+ materializationInput.apply(
+ CreatePCollectionView.<KV<Void, KV<K, V>>, Map<K, Iterable<V>>>of(view));
return view;
}
}
@@ -463,9 +477,13 @@ public class View {
throw new IllegalStateException("Unable to create a side-input view from input", e);
}
- PCollectionView<Map<K, V>> view =
- PCollectionViews.mapView(input, input.getWindowingStrategy(), input.getCoder());
- input.apply(CreatePCollectionView.<KV<K, V>, Map<K, V>>of(view));
+ PCollection<KV<Void, KV<K, V>>> materializationInput =
+ input.apply(new VoidKeyToMultimapMaterialization<KV<K, V>>());
+ PCollectionView<Map<K, V>> view = PCollectionViews.mapView(
+ materializationInput,
+ materializationInput.getWindowingStrategy());
+ materializationInput.apply(
+ CreatePCollectionView.<KV<Void, KV<K, V>>, Map<K, V>>of(view));
return view;
}
}
@@ -474,6 +492,31 @@ public class View {
// Internal details below
/**
+ * A {@link PTransform} which converts all values into {@link KV}s with {@link Void} keys.
+ *
+ * <p>TODO: Replace this materialization with specializations that optimize the various SDK
+ * requested views.
+ */
+ @Internal
+ static class VoidKeyToMultimapMaterialization<T>
+ extends PTransform<PCollection<T>, PCollection<KV<Void, T>>> {
+
+ private static class VoidKeyToMultimapMaterializationDoFn<T> extends DoFn<T, KV<Void, T>> {
+ @ProcessElement
+ public void processElement(ProcessContext ctxt) {
+ ctxt.output(KV.of((Void) null, ctxt.element()));
+ }
+ }
+
+ @Override
+ public PCollection<KV<Void, T>> expand(PCollection<T> input) {
+ PCollection output = input.apply(ParDo.of(new VoidKeyToMultimapMaterializationDoFn<>()));
+ output.setCoder(KvCoder.of(VoidCoder.of(), input.getCoder()));
+ return output;
+ }
+ }
+
+ /**
* <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
*
* <p>Creates a primitive {@link PCollectionView}.
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ViewFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ViewFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ViewFn.java
index d51a917..9291bc6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ViewFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ViewFn.java
@@ -36,8 +36,8 @@ import org.apache.beam.sdk.values.PCollectionView;
* {@link View#asIterable()}, and {@link View#asMap()} for more detail on specific views
* available in the SDK.
*
- * @param <PrimitiveViewT> the type of the underlying primitive view, provided by the runner
- * {@code <ViewT>} the type of the value(s) accessible via this {@link PCollectionView}
+ * @param <PrimitiveViewT> the type of the underlying primitive view required
+ * @param <ViewT> the type of the value(s) accessible via this {@link PCollectionView}
*/
@Internal
public abstract class ViewFn<PrimitiveViewT, ViewT> implements Serializable {
@@ -49,5 +49,5 @@ public abstract class ViewFn<PrimitiveViewT, ViewT> implements Serializable {
/**
* A function to adapt a primitive view type to a desired view type.
*/
- public abstract ViewT apply(PrimitiveViewT contents);
+ public abstract ViewT apply(PrimitiveViewT primitiveViewT);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java
index 7d87412..c212c34 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java
@@ -28,7 +28,6 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
-import org.apache.beam.sdk.util.WindowedValue;
/**
* A {@link PCollectionView PCollectionView<T>} is an immutable view of a {@link PCollection}
@@ -72,7 +71,7 @@ public interface PCollectionView<T> extends PValue, Serializable {
*/
@Deprecated
@Internal
- TupleTag<Iterable<WindowedValue<?>>> getTagInternal();
+ TupleTag<?> getTagInternal();
/**
* <b>For internal use only.</b>
@@ -83,7 +82,7 @@ public interface PCollectionView<T> extends PValue, Serializable {
*/
@Deprecated
@Internal
- ViewFn<Iterable<WindowedValue<?>>, T> getViewFn();
+ ViewFn<?, T> getViewFn();
/**
* <b>For internal use only.</b>
@@ -116,5 +115,5 @@ public interface PCollectionView<T> extends PValue, Serializable {
*/
@Deprecated
@Internal
- Coder<Iterable<WindowedValue<?>>> getCoderInternal();
+ Coder<?> getCoderInternal();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
index ed8fb76..30277f0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
@@ -17,14 +17,13 @@
*/
package org.apache.beam.sdk.values;
-import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -36,16 +35,15 @@ import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.transforms.Materialization;
import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.Materializations.MultimapView;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.WindowedValue;
/**
* <b>For internal use only; no backwards compatibility guarantees.</b>
@@ -56,88 +54,79 @@ import org.apache.beam.sdk.util.WindowedValue;
public class PCollectionViews {
/**
- * Returns a {@code PCollectionView<T>} capable of processing elements encoded using the provided
- * {@link Coder} and windowed using the provided * {@link WindowingStrategy}.
+ * Returns a {@code PCollectionView<T>} capable of processing elements windowed
+ * using the provided {@link WindowingStrategy}.
*
* <p>If {@code hasDefault} is {@code true}, then the view will take on the value
* {@code defaultValue} for any empty windows.
*/
public static <T, W extends BoundedWindow> PCollectionView<T> singletonView(
- PCollection<T> pCollection,
+ PCollection<KV<Void, T>> pCollection,
WindowingStrategy<?, W> windowingStrategy,
boolean hasDefault,
@Nullable T defaultValue,
- Coder<T> valueCoder) {
+ Coder<T> defaultValueCoder) {
return new SimplePCollectionView<>(
pCollection,
- new SingletonViewFn<>(hasDefault, defaultValue, valueCoder),
+ new SingletonViewFn<>(hasDefault, defaultValue, defaultValueCoder),
windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
- windowingStrategy,
- valueCoder);
+ windowingStrategy);
}
/**
- * Returns a {@code PCollectionView<Iterable<T>>} capable of processing elements encoded using the
- * provided {@link Coder} and windowed using the provided {@link WindowingStrategy}.
+ * Returns a {@code PCollectionView<Iterable<T>>} capable of processing elements windowed
+ * using the provided {@link WindowingStrategy}.
*/
public static <T, W extends BoundedWindow> PCollectionView<Iterable<T>> iterableView(
- PCollection<T> pCollection,
- WindowingStrategy<?, W> windowingStrategy,
- Coder<T> valueCoder) {
+ PCollection<KV<Void, T>> pCollection,
+ WindowingStrategy<?, W> windowingStrategy) {
return new SimplePCollectionView<>(
pCollection,
new IterableViewFn<T>(),
windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
- windowingStrategy,
- valueCoder);
+ windowingStrategy);
}
/**
- * Returns a {@code PCollectionView<List<T>>} capable of processing elements encoded using the
- * provided {@link Coder} and windowed using the provided {@link WindowingStrategy}.
+ * Returns a {@code PCollectionView<List<T>>} capable of processing elements windowed
+ * using the provided {@link WindowingStrategy}.
*/
public static <T, W extends BoundedWindow> PCollectionView<List<T>> listView(
- PCollection<T> pCollection,
- WindowingStrategy<?, W> windowingStrategy,
- Coder<T> valueCoder) {
+ PCollection<KV<Void, T>> pCollection,
+ WindowingStrategy<?, W> windowingStrategy) {
return new SimplePCollectionView<>(
pCollection,
new ListViewFn<T>(),
windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
- windowingStrategy,
- valueCoder);
+ windowingStrategy);
}
/**
- * Returns a {@code PCollectionView<Map<K, V>>} capable of processing elements encoded using the
- * provided {@link Coder} and windowed using the provided {@link WindowingStrategy}.
+ * Returns a {@code PCollectionView<Map<K, V>>} capable of processing elements windowed
+ * using the provided {@link WindowingStrategy}.
*/
public static <K, V, W extends BoundedWindow> PCollectionView<Map<K, V>> mapView(
- PCollection<KV<K, V>> pCollection,
- WindowingStrategy<?, W> windowingStrategy,
- Coder<KV<K, V>> valueCoder) {
+ PCollection<KV<Void, KV<K, V>>> pCollection,
+ WindowingStrategy<?, W> windowingStrategy) {
return new SimplePCollectionView<>(
pCollection,
new MapViewFn<K, V>(),
windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
- windowingStrategy,
- valueCoder);
+ windowingStrategy);
}
/**
- * Returns a {@code PCollectionView<Map<K, Iterable<V>>>} capable of processing elements encoded
- * using the provided {@link Coder} and windowed using the provided {@link WindowingStrategy}.
+ * Returns a {@code PCollectionView<Map<K, Iterable<V>>>} capable of processing elements windowed
+ * using the provided {@link WindowingStrategy}.
*/
public static <K, V, W extends BoundedWindow> PCollectionView<Map<K, Iterable<V>>> multimapView(
- PCollection<KV<K, V>> pCollection,
- WindowingStrategy<?, W> windowingStrategy,
- Coder<KV<K, V>> valueCoder) {
+ PCollection<KV<Void, KV<K, V>>> pCollection,
+ WindowingStrategy<?, W> windowingStrategy) {
return new SimplePCollectionView<>(
pCollection,
new MultimapViewFn<K, V>(),
windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
- windowingStrategy,
- valueCoder);
+ windowingStrategy);
}
/**
@@ -153,18 +142,15 @@ public class PCollectionViews {
}
/**
- * Implementation of conversion of singleton {@code Iterable<WindowedValue<T>>} to {@code T}.
+ * Implementation which is able to adapt a multimap materialization to a {@code T}.
*
* <p>For internal use only.
*
* <p>Instantiate via {@link PCollectionViews#singletonView}.
- *
- * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive
- * view type.
*/
- @Deprecated
@Experimental(Kind.CORE_RUNNERS_ONLY)
- public static class SingletonViewFn<T> extends ViewFn<Iterable<WindowedValue<T>>, T> {
+ public static class SingletonViewFn<T>
+ extends ViewFn<MultimapView<Void, T>, T> {
@Nullable private byte[] encodedDefaultValue;
@Nullable private transient T defaultValue;
@Nullable private Coder<T> valueCoder;
@@ -204,9 +190,12 @@ public class PCollectionViews {
}
// Lazily decode the default value once
synchronized (this) {
- if (encodedDefaultValue != null && defaultValue == null) {
+ if (encodedDefaultValue != null) {
try {
defaultValue = CoderUtils.decodeFromByteArray(valueCoder, encodedDefaultValue);
+ // Clear the encoded default value to free the reference once we have the object
+ // version. Also, this will guarantee that the value will only be decoded once.
+ encodedDefaultValue = null;
} catch (IOException e) {
throw new RuntimeException("Unexpected IOException: ", e);
}
@@ -216,84 +205,67 @@ public class PCollectionViews {
}
@Override
- public Materialization<Iterable<WindowedValue<T>>> getMaterialization() {
- return Materializations.iterable();
+ public Materialization<MultimapView<Void, T>> getMaterialization() {
+ return Materializations.multimap();
}
@Override
- public T apply(Iterable<WindowedValue<T>> contents) {
+ public T apply(MultimapView<Void, T> primitiveViewT) {
try {
- return Iterables.getOnlyElement(contents).getValue();
+ return Iterables.getOnlyElement(primitiveViewT.get(null));
} catch (NoSuchElementException exc) {
return getDefaultValue();
} catch (IllegalArgumentException exc) {
throw new IllegalArgumentException(
- "PCollection with more than one element "
- + "accessed as a singleton view.");
+ "PCollection with more than one element accessed as a singleton view.");
}
}
}
/**
- * Implementation of conversion {@code Iterable<WindowedValue<T>>} to {@code Iterable<T>}.
+ * Implementation which is able to adapt a multimap materialization to a {@code Iterable<T>}.
*
* <p>For internal use only.
*
* <p>Instantiate via {@link PCollectionViews#iterableView}.
- *
- * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive
- * view type.
*/
- @Deprecated
@Experimental(Kind.CORE_RUNNERS_ONLY)
public static class IterableViewFn<T>
- extends ViewFn<Iterable<WindowedValue<T>>, Iterable<T>> {
+ extends ViewFn<MultimapView<Void, T>, Iterable<T>> {
+
@Override
- public Materialization<Iterable<WindowedValue<T>>> getMaterialization() {
- return Materializations.iterable();
+ public Materialization<MultimapView<Void, T>> getMaterialization() {
+ return Materializations.multimap();
}
@Override
- public Iterable<T> apply(Iterable<WindowedValue<T>> contents) {
- return Iterables.unmodifiableIterable(
- Iterables.transform(contents, new Function<WindowedValue<T>, T>() {
- @SuppressWarnings("unchecked")
- @Override
- public T apply(WindowedValue<T> input) {
- return input.getValue();
- }
- }));
+ public Iterable<T> apply(MultimapView<Void, T> primitiveViewT) {
+ return Iterables.unmodifiableIterable(primitiveViewT.get(null));
}
}
/**
- * Implementation of conversion {@code Iterable<WindowedValue<T>>} to {@code List<T>}.
+ * Implementation which is able to adapt a multimap materialization to a {@code List<T>}.
*
* <p>For internal use only.
*
* <p>Instantiate via {@link PCollectionViews#listView}.
- *
- * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive
- * view type.
*/
- @Deprecated
@Experimental(Kind.CORE_RUNNERS_ONLY)
- public static class ListViewFn<T> extends ViewFn<Iterable<WindowedValue<T>>, List<T>> {
+ public static class ListViewFn<T>
+ extends ViewFn<MultimapView<Void, T>, List<T>> {
@Override
- public Materialization<Iterable<WindowedValue<T>>> getMaterialization() {
- return Materializations.iterable();
+ public Materialization<MultimapView<Void, T>> getMaterialization() {
+ return Materializations.multimap();
}
@Override
- public List<T> apply(Iterable<WindowedValue<T>> contents) {
- return ImmutableList.copyOf(
- Iterables.transform(contents, new Function<WindowedValue<T>, T>() {
- @SuppressWarnings("unchecked")
- @Override
- public T apply(WindowedValue<T> input) {
- return input.getValue();
+ public List<T> apply(MultimapView<Void, T> primitiveViewT) {
+ List<T> list = new ArrayList<>();
+ for (T t : primitiveViewT.get(null)) {
+ list.add(t);
}
- }));
+ return Collections.unmodifiableList(list);
}
@Override
@@ -308,27 +280,29 @@ public class PCollectionViews {
}
/**
- * Implementation of conversion {@code Iterable<WindowedValue<KV<K, V>>>}
- * to {@code Map<K, Iterable<V>>}.
+ * Implementation which is able to adapt a multimap materialization to a
+ * {@code Map<K, Iterable<V>>}.
+ *
+ * <p>For internal use only.
*
- * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive
- * view type.
+ * <p>Instantiate via {@link PCollectionViews#multimapView}.
*/
- @Deprecated
@Experimental(Kind.CORE_RUNNERS_ONLY)
public static class MultimapViewFn<K, V>
- extends ViewFn<Iterable<WindowedValue<KV<K, V>>>, Map<K, Iterable<V>>> {
+ extends ViewFn<MultimapView<Void, KV<K, V>>, Map<K, Iterable<V>>> {
@Override
- public Materialization<Iterable<WindowedValue<KV<K, V>>>> getMaterialization() {
- return Materializations.iterable();
+ public Materialization<MultimapView<Void, KV<K, V>>> getMaterialization() {
+ return Materializations.multimap();
}
@Override
- public Map<K, Iterable<V>> apply(Iterable<WindowedValue<KV<K, V>>> elements) {
+ public Map<K, Iterable<V>> apply(
+ MultimapView<Void, KV<K, V>> primitiveViewT) {
+ // TODO: BEAM-3071 - fix this so that we aren't relying on Java equality and are
+ // using structural value equality.
Multimap<K, V> multimap = HashMultimap.create();
- for (WindowedValue<KV<K, V>> elem : elements) {
- KV<K, V> kv = elem.getValue();
- multimap.put(kv.getKey(), kv.getValue());
+ for (KV<K, V> elem : primitiveViewT.get(null)) {
+ multimap.put(elem.getKey(), elem.getValue());
}
// Safe covariant cast that Java cannot express without rawtypes, even with unchecked casts
@SuppressWarnings({"unchecked", "rawtypes"})
@@ -338,32 +312,31 @@ public class PCollectionViews {
}
/**
- * Implementation of conversion {@code Iterable<WindowedValue<KV<K, V>>} with one value per key to
- * {@code Map<K, V>}.
+ * Implementation which is able to adapt a multimap materialization to a {@code Map<K, V>}.
+ *
+ * <p>For internal use only.
*
- * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive
- * view type.
+ * <p>Instantiate via {@link PCollectionViews#mapView}.
*/
- @Deprecated
@Experimental(Kind.CORE_RUNNERS_ONLY)
- public static class MapViewFn<K, V> extends ViewFn<Iterable<WindowedValue<KV<K, V>>>, Map<K, V>> {
+ public static class MapViewFn<K, V>
+ extends ViewFn<MultimapView<Void, KV<K, V>>, Map<K, V>> {
+
@Override
- public Materialization<Iterable<WindowedValue<KV<K, V>>>> getMaterialization() {
- return Materializations.iterable();
+ public Materialization<MultimapView<Void, KV<K, V>>> getMaterialization() {
+ return Materializations.multimap();
}
- /**
- * Input iterable must actually be {@code Iterable<WindowedValue<KV<K, V>>>}.
- */
@Override
- public Map<K, V> apply(Iterable<WindowedValue<KV<K, V>>> elements) {
+ public Map<K, V> apply(MultimapView<Void, KV<K, V>> primitiveViewT) {
+ // TODO: BEAM-3071 - fix this so that we aren't relying on Java equality and are
+ // using structural value equality.
Map<K, V> map = new HashMap<>();
- for (WindowedValue<KV<K, V>> elem : elements) {
- KV<K, V> kv = elem.getValue();
- if (map.containsKey(kv.getKey())) {
- throw new IllegalArgumentException("Duplicate values for " + kv.getKey());
+ for (KV<K, V> elem : primitiveViewT.get(null)) {
+ if (map.containsKey(elem.getKey())) {
+ throw new IllegalArgumentException("Duplicate values for " + elem.getKey());
}
- map.put(kv.getKey(), kv.getValue());
+ map.put(elem.getKey(), elem.getValue());
}
return Collections.unmodifiableMap(map);
}
@@ -375,14 +348,14 @@ public class PCollectionViews {
*
* <p>For internal use only.
*/
- public static class SimplePCollectionView<ElemT, ViewT, W extends BoundedWindow>
+ public static class SimplePCollectionView<ElemT, PrimitiveViewT, ViewT, W extends BoundedWindow>
extends PValueBase
implements PCollectionView<ViewT> {
/** The {@link PCollection} this view was originally created from. */
private transient PCollection<ElemT> pCollection;
/** A unique tag for the view, typed according to the elements underlying the view. */
- private TupleTag<Iterable<WindowedValue<ElemT>>> tag;
+ private TupleTag<PrimitiveViewT> tag;
private WindowMappingFn<W> windowMappingFn;
@@ -390,12 +363,12 @@ public class PCollectionViews {
private WindowingStrategy<?, W> windowingStrategy;
/** The coder for the elements underlying the view. */
- private @Nullable Coder<Iterable<WindowedValue<ElemT>>> coder;
+ private @Nullable Coder<ElemT> coder;
/**
* The typed {@link ViewFn} for this view.
*/
- private ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn;
+ private ViewFn<PrimitiveViewT, ViewT> viewFn;
/**
* Call this constructor to initialize the fields for which this base class provides
@@ -403,11 +376,10 @@ public class PCollectionViews {
*/
private SimplePCollectionView(
PCollection<ElemT> pCollection,
- TupleTag<Iterable<WindowedValue<ElemT>>> tag,
- ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
+ TupleTag<PrimitiveViewT> tag,
+ ViewFn<PrimitiveViewT, ViewT> viewFn,
WindowMappingFn<W> windowMappingFn,
- WindowingStrategy<?, W> windowingStrategy,
- Coder<ElemT> valueCoder) {
+ WindowingStrategy<?, W> windowingStrategy) {
super(pCollection.getPipeline());
this.pCollection = pCollection;
if (windowingStrategy.getWindowFn() instanceof InvalidWindows) {
@@ -417,9 +389,7 @@ public class PCollectionViews {
this.tag = tag;
this.windowingStrategy = windowingStrategy;
this.viewFn = viewFn;
- this.coder =
- IterableCoder.of(WindowedValue.getFullCoder(
- valueCoder, windowingStrategy.getWindowFn().windowCoder()));
+ this.coder = pCollection.getCoder();
}
/**
@@ -428,27 +398,20 @@ public class PCollectionViews {
*/
private SimplePCollectionView(
PCollection<ElemT> pCollection,
- ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
+ ViewFn<PrimitiveViewT, ViewT> viewFn,
WindowMappingFn<W> windowMappingFn,
- WindowingStrategy<?, W> windowingStrategy,
- Coder<ElemT> valueCoder) {
+ WindowingStrategy<?, W> windowingStrategy) {
this(
pCollection,
- new TupleTag<Iterable<WindowedValue<ElemT>>>(),
+ new TupleTag<PrimitiveViewT>(),
viewFn,
windowMappingFn,
- windowingStrategy,
- valueCoder);
+ windowingStrategy);
}
@Override
- public ViewFn<Iterable<WindowedValue<?>>, ViewT> getViewFn() {
- // Safe cast: it is required that the rest of the SDK maintain the invariant
- // that a PCollectionView is only provided an iterable for the elements of an
- // appropriately typed PCollection.
- @SuppressWarnings({"rawtypes", "unchecked"})
- ViewFn<Iterable<WindowedValue<?>>, ViewT> untypedViewFn = (ViewFn) viewFn;
- return untypedViewFn;
+ public ViewFn<PrimitiveViewT, ViewT> getViewFn() {
+ return viewFn;
}
@Override
@@ -467,13 +430,8 @@ public class PCollectionViews {
* <p>For internal use only by runner implementors.
*/
@Override
- public TupleTag<Iterable<WindowedValue<?>>> getTagInternal() {
- // Safe cast: It is required that the rest of the SDK maintain the invariant that
- // this tag is only used to access the contents of an appropriately typed underlying
- // PCollection
- @SuppressWarnings({"rawtypes", "unchecked"})
- TupleTag<Iterable<WindowedValue<?>>> untypedTag = (TupleTag) tag;
- return untypedTag;
+ public TupleTag<?> getTagInternal() {
+ return tag;
}
/**
@@ -488,12 +446,8 @@ public class PCollectionViews {
}
@Override
- public Coder<Iterable<WindowedValue<?>>> getCoderInternal() {
- // Safe cast: It is required that the rest of the SDK only use this untyped coder
- // for the elements of an appropriately typed underlying PCollection.
- @SuppressWarnings({"rawtypes", "unchecked"})
- Coder<Iterable<WindowedValue<?>>> untypedCoder = (Coder) coder;
- return untypedCoder;
+ public Coder<?> getCoderInternal() {
+ return coder;
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
index aaf8b91..e7fd9b8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
@@ -18,344 +18,57 @@
package org.apache.beam.sdk.testing;
-import com.google.common.base.Function;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
+import com.google.common.collect.ImmutableSet;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.transforms.Materialization;
-import org.apache.beam.sdk.transforms.Materializations;
-import org.apache.beam.sdk.transforms.ViewFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.PValueBase;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
/**
- * Methods for creating and using {@link PCollectionView} instances.
+ * Methods for testing {@link PCollectionView}s.
*/
public final class PCollectionViewTesting {
-
- // Do not instantiate; static methods only
- private PCollectionViewTesting() { }
-
- /**
- * The length of the default window, which is an {@link IntervalWindow}, but kept encapsulated
- * as it is not for the user to know what sort of window it is.
- */
- private static final long DEFAULT_WINDOW_MSECS = 1000 * 60 * 60;
-
- /**
- * A default windowing strategy. Tests that are not concerned with the windowing
- * strategy should not specify it, and all views will use this.
- */
- public static final WindowingStrategy<?, ?> DEFAULT_WINDOWING_STRATEGY =
- WindowingStrategy.of(FixedWindows.of(new Duration(DEFAULT_WINDOW_MSECS)));
-
- /**
- * A default window into which test elements will be placed, if the window is
- * not explicitly overridden.
- */
- public static final BoundedWindow DEFAULT_NONEMPTY_WINDOW =
- new IntervalWindow(new Instant(0), new Instant(DEFAULT_WINDOW_MSECS));
-
- /**
- * A timestamp in the {@link #DEFAULT_NONEMPTY_WINDOW}.
- */
- public static final Instant DEFAULT_TIMESTAMP = DEFAULT_NONEMPTY_WINDOW.maxTimestamp().minus(1);
-
- /**
- * A window into which no element will be placed by methods in this class, unless explicitly
- * requested.
- */
- public static final BoundedWindow DEFAULT_EMPTY_WINDOW = new IntervalWindow(
- DEFAULT_NONEMPTY_WINDOW.maxTimestamp(),
- DEFAULT_NONEMPTY_WINDOW.maxTimestamp().plus(DEFAULT_WINDOW_MSECS));
-
- /**
- * A {@link ViewFn} that returns the provided contents as a fully lazy iterable.
- */
- public static class IdentityViewFn<T> extends ViewFn<Iterable<WindowedValue<T>>, Iterable<T>> {
- @Override
- public Materialization<Iterable<WindowedValue<T>>> getMaterialization() {
- return Materializations.iterable();
- }
-
- @Override
- public Iterable<T> apply(Iterable<WindowedValue<T>> contents) {
- return Iterables.transform(contents, new Function<WindowedValue<T>, T>() {
- @Override
- public T apply(WindowedValue<T> windowedValue) {
- return windowedValue.getValue();
- }
- });
- }
- }
-
- /**
- * A {@link ViewFn} that traverses the whole iterable eagerly and returns the number of elements.
- *
- * <p>Only for use in testing scenarios with small collections. If there are more elements
- * provided than {@code Integer.MAX_VALUE} then behavior is unpredictable.
- */
- public static class LengthViewFn<T> extends ViewFn<Iterable<WindowedValue<T>>, Long> {
- @Override
- public Materialization<Iterable<WindowedValue<T>>> getMaterialization() {
- return Materializations.iterable();
- }
-
- @Override
- public Long apply(Iterable<WindowedValue<T>> contents) {
- return (long) Iterables.size(contents);
- }
- }
-
- /**
- * A {@link ViewFn} that always returns the value with which it is instantiated.
- */
- public static class ConstantViewFn<ElemT, ViewT>
- extends ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> {
- private ViewT value;
-
- public ConstantViewFn(ViewT value) {
- this.value = value;
- }
-
- @Override
- public Materialization<Iterable<WindowedValue<ElemT>>> getMaterialization() {
- return Materializations.iterable();
- }
-
- @Override
- public ViewT apply(Iterable<WindowedValue<ElemT>> contents) {
- return value;
- }
- }
-
- /**
- * A {@link PCollectionView} explicitly built from a {@link TupleTag}
- * and conversion {@link ViewFn}, and an element coder, using the
- * {@link #DEFAULT_WINDOWING_STRATEGY}.
- *
- * <p>This method is only recommended for use by runner implementors to test their
- * implementations. It is very easy to construct a {@link PCollectionView} that does
- * not respect the invariants required for proper functioning.
- *
- * <p>Note that if the provided {@code WindowingStrategy} does not match that of the windowed
- * values provided to the view during execution, results are unpredictable. It is recommended
- * that the values be prepared via {@link #contentsInDefaultWindow}.
- */
- public static <ElemT, ViewT> PCollectionView<ViewT> testingView(
- TupleTag<Iterable<WindowedValue<ElemT>>> tag,
- ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
- Coder<ElemT> elemCoder,
- WindowingStrategy<?, ?> windowingStrategy) {
- return testingView(null, tag, viewFn, elemCoder, windowingStrategy);
- }
-
- /**
- * The default {@link Coder} used for windowed values, given an element {@link Coder}.
- */
- public static <T> Coder<WindowedValue<T>> defaultWindowedValueCoder(Coder<T> elemCoder) {
- return WindowedValue.getFullCoder(
- elemCoder, DEFAULT_WINDOWING_STRATEGY.getWindowFn().windowCoder());
- }
-
- /**
- * A {@link PCollectionView} explicitly built from its {@link TupleTag}, {@link
- * WindowingStrategy}, {@link Coder}, and conversion function.
- *
- * <p>This method is only recommended for use by runner implementors to test their
- * implementations. It is very easy to construct a {@link PCollectionView} that does not respect
- * the invariants required for proper functioning.
- *
- * <p>Note that if the provided {@code WindowingStrategy} does not match that of the windowed
- * values provided to the view during execution, results are unpredictable.
- */
- public static <ElemT, ViewT> PCollectionView<ViewT> testingView(
- PCollection<ElemT> pCollection,
- TupleTag<Iterable<WindowedValue<ElemT>>> tag,
- ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
- Coder<ElemT> elemCoder,
- WindowingStrategy<?, ?> windowingStrategy) {
- return testingView(
- pCollection,
- tag,
- viewFn,
- windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
- elemCoder,
- windowingStrategy);
- }
-
- /**
- * A {@link PCollectionView} explicitly built from its {@link TupleTag}, {@link
- * WindowingStrategy}, {@link Coder}, {@link ViewFn} and {@link WindowMappingFn}.
- *
- * <p>This method is only recommended for use by runner implementors to test their
- * implementations. It is very easy to construct a {@link PCollectionView} that does not respect
- * the invariants required for proper functioning.
- *
- * <p>Note that if the provided {@code WindowingStrategy} does not match that of the windowed
- * values provided to the view during execution, results are unpredictable.
- */
- public static <ElemT, ViewT> PCollectionView<ViewT> testingView(
- PCollection<ElemT> pCollection,
- TupleTag<Iterable<WindowedValue<ElemT>>> tag,
- ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
- WindowMappingFn<?> windowMappingFn,
- Coder<ElemT> elemCoder,
- WindowingStrategy<?, ?> windowingStrategy) {
- return new PCollectionViewFromParts<>(
- pCollection,
- tag,
- viewFn,
- windowMappingFn,
- windowingStrategy,
- IterableCoder.of(
- WindowedValue.getFullCoder(elemCoder, windowingStrategy.getWindowFn().windowCoder())));
- }
-
- /**
- * Places the given {@code value} in the {@link #DEFAULT_NONEMPTY_WINDOW}.
- */
- public static <T> WindowedValue<T> valueInDefaultWindow(T value) {
- return WindowedValue.of(value, DEFAULT_TIMESTAMP, DEFAULT_NONEMPTY_WINDOW, PaneInfo.NO_FIRING);
- }
-
- /**
- * Prepares {@code values} for reading as the contents of a {@link PCollectionView} side input.
- */
- @SafeVarargs
- public static <T> Iterable<WindowedValue<T>> contentsInDefaultWindow(T... values)
- throws Exception {
- List<WindowedValue<T>> windowedValues = Lists.newArrayList();
- for (T value : values) {
- windowedValues.add(valueInDefaultWindow(value));
- }
- return windowedValues;
- }
-
- /**
- * Prepares {@code values} for reading as the contents of a {@link PCollectionView} side input.
- */
- public static <T> Iterable<WindowedValue<T>> contentsInDefaultWindow(Iterable<T> values)
- throws Exception {
- List<WindowedValue<T>> windowedValues = Lists.newArrayList();
- for (T value : values) {
- windowedValues.add(valueInDefaultWindow(value));
- }
- return windowedValues;
- }
-
- // Internal details below here
-
- /**
- * A {@link PCollectionView} explicitly built from its {@link TupleTag},
- * {@link WindowingStrategy}, and conversion function.
- *
- * <p>Instantiate via {@link #testingView}.
- */
- private static class PCollectionViewFromParts<ElemT, ViewT>
- extends PValueBase
- implements PCollectionView<ViewT> {
- private PCollection<ElemT> pCollection;
- private TupleTag<Iterable<WindowedValue<ElemT>>> tag;
- private ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn;
- private WindowMappingFn<?> windowMappingFn;
- private WindowingStrategy<?, ?> windowingStrategy;
- private Coder<Iterable<WindowedValue<ElemT>>> coder;
-
- public PCollectionViewFromParts(
- PCollection<ElemT> pCollection,
- TupleTag<Iterable<WindowedValue<ElemT>>> tag,
- ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn,
- WindowMappingFn<?> windowMappingFn,
- WindowingStrategy<?, ?> windowingStrategy,
- Coder<Iterable<WindowedValue<ElemT>>> coder) {
- this.pCollection = pCollection;
- this.tag = tag;
- this.viewFn = viewFn;
- this.windowMappingFn = windowMappingFn;
- this.windowingStrategy = windowingStrategy;
- this.coder = coder;
- }
-
- @Override
- public PCollection<?> getPCollection() {
- return pCollection;
- }
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- @Override
- public TupleTag<Iterable<WindowedValue<?>>> getTagInternal() {
- return (TupleTag) tag;
- }
-
- @Override
- public ViewFn<Iterable<WindowedValue<?>>, ViewT> getViewFn() {
- // Safe cast; runners must maintain type safety
- @SuppressWarnings({"unchecked", "rawtypes"})
- ViewFn<Iterable<WindowedValue<?>>, ViewT> untypedViewFn = (ViewFn) viewFn;
- return untypedViewFn;
- }
-
- @Override
- public WindowMappingFn<?> getWindowMappingFn() {
- return windowMappingFn;
- }
-
- @Override
- public WindowingStrategy<?, ?> getWindowingStrategyInternal() {
- return windowingStrategy;
- }
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- @Override
- public Coder<Iterable<WindowedValue<?>>> getCoderInternal() {
- return (Coder) coder;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(tag);
- }
-
- @Override
- public boolean equals(Object other) {
- if (!(other instanceof PCollectionView)) {
- return false;
+ public static List<Object> materializeValuesFor(
+ PTransform<?, ? extends PCollectionView<?>> viewTransformClass, Object ... values) {
+ List<Object> rval = new ArrayList<>();
+ // Currently all view materializations are the same where the data is shared underneath
+ // the void/null key. Once this changes, these materializations will differ but test code
+ // should not worry about what these look like if they are relying on the ViewFn to "undo"
+ // the conversion.
+ if (View.AsSingleton.class.equals(viewTransformClass.getClass())) {
+ for (Object value : values) {
+ rval.add(KV.of(null, value));
}
- @SuppressWarnings("unchecked")
- PCollectionView<?> otherView = (PCollectionView<?>) other;
- return tag.equals(otherView.getTagInternal());
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("tag", tag)
- .add("viewFn", viewFn)
- .toString();
- }
-
- @Override
- public Map<TupleTag<?>, PValue> expand() {
- return Collections.<TupleTag<?>, PValue>singletonMap(tag, pCollection);
- }
+ } else if (View.AsIterable.class.equals(viewTransformClass.getClass())) {
+ for (Object value : values) {
+ rval.add(KV.of(null, value));
+ }
+ } else if (View.AsList.class.equals(viewTransformClass.getClass())) {
+ for (Object value : values) {
+ rval.add(KV.of(null, value));
+ }
+ } else if (View.AsMap.class.equals(viewTransformClass.getClass())) {
+ for (Object value : values) {
+ rval.add(KV.of(null, value));
+ }
+ } else if (View.AsMultimap.class.equals(viewTransformClass.getClass())) {
+ for (Object value : values) {
+ rval.add(KV.of(null, value));
+ }
+ } else {
+ throw new IllegalArgumentException(String.format(
+ "Unknown type of view %s. Supported views are %s.",
+ viewTransformClass.getClass(),
+ ImmutableSet.of(
+ View.AsSingleton.class,
+ View.AsIterable.class,
+ View.AsList.class,
+ View.AsMap.class,
+ View.AsMultimap.class)));
+ }
+ return Collections.unmodifiableList(rval);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
index 5cb9e18..cff6b2d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
@@ -37,9 +37,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.TimestampedValue;
-import org.apache.beam.sdk.values.WindowingStrategy;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -297,9 +295,8 @@ public class DoFnTesterTest {
@Test
public void fnWithSideInputDefault() throws Exception {
PCollection<Integer> pCollection = p.apply(Create.empty(VarIntCoder.of()));
- final PCollectionView<Integer> value =
- PCollectionViews.singletonView(
- pCollection, WindowingStrategy.globalDefault(), true, 0, VarIntCoder.of());
+ final PCollectionView<Integer> value = pCollection.apply(
+ View.<Integer>asSingleton().withDefaultValue(0));
try (DoFnTester<Integer, Integer> tester = DoFnTester.of(new SideInputDoFn(value))) {
tester.processElement(1);
@@ -313,9 +310,8 @@ public class DoFnTesterTest {
@Test
public void fnWithSideInputExplicit() throws Exception {
PCollection<Integer> pCollection = p.apply(Create.of(-2));
- final PCollectionView<Integer> value =
- PCollectionViews.singletonView(
- pCollection, WindowingStrategy.globalDefault(), true, 0, VarIntCoder.of());
+ final PCollectionView<Integer> value = pCollection.apply(
+ View.<Integer>asSingleton().withDefaultValue(0));
try (DoFnTester<Integer, Integer> tester = DoFnTester.of(new SideInputDoFn(value))) {
tester.setSideInput(value, GlobalWindow.INSTANCE, -2);
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index 1ccd5d6..7d20532 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -209,7 +209,7 @@ class BatchLoads<DestinationT>
checkArgument(numFileShards > 0);
Pipeline p = input.getPipeline();
final PCollectionView<String> jobIdTokenView = createJobIdView(p);
- final PCollectionView<String> tempFilePrefixView = createTempFilePrefixView(jobIdTokenView);
+ final PCollectionView<String> tempFilePrefixView = createTempFilePrefixView(p, jobIdTokenView);
// The user-supplied triggeringDuration is often chosen to to control how many BigQuery load
// jobs are generated, to prevent going over BigQuery's daily quota for load jobs. If this
// is set to a large value, currently we have to buffer all the data unti the trigger fires.
@@ -295,7 +295,7 @@ class BatchLoads<DestinationT>
public WriteResult expandUntriggered(PCollection<KV<DestinationT, TableRow>> input) {
Pipeline p = input.getPipeline();
final PCollectionView<String> jobIdTokenView = createJobIdView(p);
- final PCollectionView<String> tempFilePrefixView = createTempFilePrefixView(jobIdTokenView);
+ final PCollectionView<String> tempFilePrefixView = createTempFilePrefixView(p, jobIdTokenView);
PCollection<KV<DestinationT, TableRow>> inputInGlobalWindow =
input.apply(
"rewindowIntoGlobal",
@@ -364,8 +364,10 @@ class BatchLoads<DestinationT>
}
// Generate the temporary-file prefix.
- private PCollectionView<String> createTempFilePrefixView(PCollectionView<String> jobIdView) {
- return ((PCollection<String>) jobIdView.getPCollection())
+ private PCollectionView<String> createTempFilePrefixView(
+ Pipeline p, final PCollectionView<String> jobIdView) {
+ return p
+ .apply(Create.of(""))
.apply(
"GetTempFilePrefix",
ParDo.of(
@@ -382,13 +384,13 @@ class BatchLoads<DestinationT>
resolveTempLocation(
tempLocationRoot,
"BigQueryWriteTemp",
- c.element());
+ c.sideInput(jobIdView));
LOG.info(
"Writing BigQuery temporary files to {} before loading them.",
tempLocation);
c.output(tempLocation);
}
- }))
+ }).withSideInputs(jobIdView))
.apply("TempFilePrefixView", View.<String>asSingleton());
}