You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/12 16:55:30 UTC
[15/50] [abbrv] beam git commit: Expand all PValues to component
PCollections always
http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
index 6c385d7..1853248 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
@@ -42,12 +42,12 @@ import org.apache.beam.sdk.values.PCollectionView;
class StreamingViewOverrides {
static class StreamingCreatePCollectionViewFactory<ElemT, ViewT>
extends SingleInputOutputOverrideFactory<
- PCollection<ElemT>, PCollectionView<ViewT>, CreatePCollectionView<ElemT, ViewT>> {
+ PCollection<ElemT>, PCollection<ElemT>, CreatePCollectionView<ElemT, ViewT>> {
@Override
- public PTransformReplacement<PCollection<ElemT>, PCollectionView<ViewT>>
+ public PTransformReplacement<PCollection<ElemT>, PCollection<ElemT>>
getReplacementTransform(
AppliedPTransform<
- PCollection<ElemT>, PCollectionView<ViewT>, CreatePCollectionView<ElemT, ViewT>>
+ PCollection<ElemT>, PCollection<ElemT>, CreatePCollectionView<ElemT, ViewT>>
transform) {
StreamingCreatePCollectionView<ElemT, ViewT> streamingView =
new StreamingCreatePCollectionView<>(transform.getTransform().getView());
@@ -56,7 +56,7 @@ class StreamingViewOverrides {
}
private static class StreamingCreatePCollectionView<ElemT, ViewT>
- extends PTransform<PCollection<ElemT>, PCollectionView<ViewT>> {
+ extends PTransform<PCollection<ElemT>, PCollection<ElemT>> {
private final PCollectionView<ViewT> view;
private StreamingCreatePCollectionView(PCollectionView<ViewT> view) {
@@ -64,7 +64,7 @@ class StreamingViewOverrides {
}
@Override
- public PCollectionView<ViewT> expand(PCollection<ElemT> input) {
+ public PCollection<ElemT> expand(PCollection<ElemT> input) {
return input
.apply(Combine.globally(new Concatenate<ElemT>()).withoutDefaults())
.apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder())))
http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 89dc2d5..53215f6 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -920,15 +920,15 @@ public class DataflowPipelineTranslatorTest implements Serializable {
assertAllStepOutputsHaveUniqueIds(job);
List<Step> steps = job.getSteps();
- assertEquals(5, steps.size());
+ assertEquals(9, steps.size());
@SuppressWarnings("unchecked")
List<Map<String, Object>> toIsmRecordOutputs =
- (List<Map<String, Object>>) steps.get(3).getProperties().get(PropertyNames.OUTPUT_INFO);
+ (List<Map<String, Object>>) steps.get(7).getProperties().get(PropertyNames.OUTPUT_INFO);
assertTrue(
Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format"));
- Step collectionToSingletonStep = steps.get(4);
+ Step collectionToSingletonStep = steps.get(8);
assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 64aa35a..ac5e0cd 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -508,50 +508,6 @@ public final class TransformTranslator {
};
}
- private static <T> TransformEvaluator<View.AsSingleton<T>> viewAsSingleton() {
- return new TransformEvaluator<View.AsSingleton<T>>() {
- @Override
- public void evaluate(View.AsSingleton<T> transform, EvaluationContext context) {
- Iterable<? extends WindowedValue<?>> iter =
- context.getWindowedValues(context.getInput(transform));
- PCollectionView<T> output = context.getOutput(transform);
- Coder<Iterable<WindowedValue<?>>> coderInternal = output.getCoderInternal();
-
- @SuppressWarnings("unchecked")
- Iterable<WindowedValue<?>> iterCast = (Iterable<WindowedValue<?>>) iter;
-
- context.putPView(output, iterCast, coderInternal);
- }
-
- @Override
- public String toNativeString() {
- return "collect()";
- }
- };
- }
-
- private static <T> TransformEvaluator<View.AsIterable<T>> viewAsIter() {
- return new TransformEvaluator<View.AsIterable<T>>() {
- @Override
- public void evaluate(View.AsIterable<T> transform, EvaluationContext context) {
- Iterable<? extends WindowedValue<?>> iter =
- context.getWindowedValues(context.getInput(transform));
- PCollectionView<Iterable<T>> output = context.getOutput(transform);
- Coder<Iterable<WindowedValue<?>>> coderInternal = output.getCoderInternal();
-
- @SuppressWarnings("unchecked")
- Iterable<WindowedValue<?>> iterCast = (Iterable<WindowedValue<?>>) iter;
-
- context.putPView(output, iterCast, coderInternal);
- }
-
- @Override
- public String toNativeString() {
- return "collect()";
- }
- };
- }
-
private static <ReadT, WriteT> TransformEvaluator<View.CreatePCollectionView<ReadT, WriteT>>
createPCollView() {
return new TransformEvaluator<View.CreatePCollectionView<ReadT, WriteT>>() {
@@ -560,7 +516,7 @@ public final class TransformTranslator {
EvaluationContext context) {
Iterable<? extends WindowedValue<?>> iter =
context.getWindowedValues(context.getInput(transform));
- PCollectionView<WriteT> output = context.getOutput(transform);
+ PCollectionView<WriteT> output = transform.getView();
Coder<Iterable<WindowedValue<?>>> coderInternal = output.getCoderInternal();
@SuppressWarnings("unchecked")
@@ -645,8 +601,8 @@ public final class TransformTranslator {
EVALUATORS.put(Combine.PerKey.class, combinePerKey());
EVALUATORS.put(Flatten.PCollections.class, flattenPColl());
EVALUATORS.put(Create.Values.class, create());
- EVALUATORS.put(View.AsSingleton.class, viewAsSingleton());
- EVALUATORS.put(View.AsIterable.class, viewAsIter());
+// EVALUATORS.put(View.AsSingleton.class, viewAsSingleton());
+// EVALUATORS.put(View.AsIterable.class, viewAsIter());
EVALUATORS.put(View.CreatePCollectionView.class, createPCollView());
EVALUATORS.put(Window.Assign.class, window());
EVALUATORS.put(Reshuffle.class, reshuffle());
http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index 2f0e8ef..ee1ce7b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -24,10 +24,12 @@ import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -39,6 +41,7 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
@@ -165,7 +168,7 @@ public class TransformHierarchy {
* nodes.
*/
public void setOutput(POutput output) {
- for (PValue value : output.expand().values()) {
+ for (PCollection<?> value : fullyExpand(output).values()) {
if (!producers.containsKey(value)) {
producers.put(value, current);
value.finishSpecifyingOutput(
@@ -226,6 +229,47 @@ public class TransformHierarchy {
return current;
}
+ private Map<TupleTag<?>, PCollection<?>> fullyExpand(POutput output) {
+ Map<TupleTag<?>, PCollection<?>> result = new LinkedHashMap<>();
+ for (Map.Entry<TupleTag<?>, PValue> value : output.expand().entrySet()) {
+ if (value.getValue() instanceof PCollection) {
+ PCollection<?> previous = result.put(value.getKey(), (PCollection<?>) value.getValue());
+ checkArgument(
+ previous == null,
+ "Found conflicting %ss in flattened expansion of %s: %s maps to %s and %s",
+ output,
+ TupleTag.class.getSimpleName(),
+ value.getKey(),
+ previous,
+ value.getValue());
+ } else {
+ if (value.getValue().expand().size() == 1
+ && Iterables.getOnlyElement(value.getValue().expand().values())
+ .equals(value.getValue())) {
+ throw new IllegalStateException(
+ String.format(
+ "Non %s %s that expands into itself %s",
+ PCollection.class.getSimpleName(),
+ PValue.class.getSimpleName(),
+ value.getValue()));
+ }
+ for (Map.Entry<TupleTag<?>, PCollection<?>> valueComponent :
+ fullyExpand(value.getValue()).entrySet()) {
+ PCollection<?> previous = result.put(valueComponent.getKey(), valueComponent.getValue());
+ checkArgument(
+ previous == null,
+ "Found conflicting %ss in flattened expansion of %s: %s maps to %s and %s",
+ output,
+ TupleTag.class.getSimpleName(),
+ valueComponent.getKey(),
+ previous,
+ valueComponent.getValue());
+ }
+ }
+ }
+ return result;
+ }
+
/**
* Provides internal tracking of transform relationships with helper methods
* for initialization and ordered visitation.
http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 9e1cc71..6a90bcf 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -1277,14 +1277,15 @@ public class Combine {
public PCollectionView<OutputT> expand(PCollection<InputT> input) {
PCollection<OutputT> combined =
input.apply(Combine.<InputT, OutputT>globally(fn).withoutDefaults().withFanout(fanout));
- return combined.apply(
- CreatePCollectionView.<OutputT, OutputT>of(
- PCollectionViews.singletonView(
- combined,
- input.getWindowingStrategy(),
- insertDefault,
- insertDefault ? fn.defaultValue() : null,
- combined.getCoder())));
+ PCollectionView<OutputT> view =
+ PCollectionViews.singletonView(
+ combined,
+ input.getWindowingStrategy(),
+ insertDefault,
+ insertDefault ? fn.defaultValue() : null,
+ combined.getCoder());
+ combined.apply(CreatePCollectionView.<OutputT, OutputT>of(view));
+ return view;
}
public int getFanout() {
http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index 073c750..331b143 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -257,8 +257,10 @@ public class View {
throw new IllegalStateException("Unable to create a side-input view from input", e);
}
- return input.apply(CreatePCollectionView.<T, List<T>>of(PCollectionViews.listView(
- input, input.getWindowingStrategy(), input.getCoder())));
+ PCollectionView<List<T>> view =
+ PCollectionViews.listView(input, input.getWindowingStrategy(), input.getCoder());
+ input.apply(CreatePCollectionView.<T, List<T>>of(view));
+ return view;
}
}
@@ -282,8 +284,10 @@ public class View {
throw new IllegalStateException("Unable to create a side-input view from input", e);
}
- return input.apply(CreatePCollectionView.<T, Iterable<T>>of(PCollectionViews.iterableView(
- input, input.getWindowingStrategy(), input.getCoder())));
+ PCollectionView<Iterable<T>> view =
+ PCollectionViews.iterableView(input, input.getWindowingStrategy(), input.getCoder());
+ input.apply(CreatePCollectionView.<T, Iterable<T>>of(view));
+ return view;
}
}
@@ -423,11 +427,10 @@ public class View {
throw new IllegalStateException("Unable to create a side-input view from input", e);
}
- return input.apply(CreatePCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(
- PCollectionViews.multimapView(
- input,
- input.getWindowingStrategy(),
- input.getCoder())));
+ PCollectionView<Map<K, Iterable<V>>> view =
+ PCollectionViews.multimapView(input, input.getWindowingStrategy(), input.getCoder());
+ input.apply(CreatePCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
+ return view;
}
}
@@ -459,11 +462,10 @@ public class View {
throw new IllegalStateException("Unable to create a side-input view from input", e);
}
- return input.apply(CreatePCollectionView.<KV<K, V>, Map<K, V>>of(
- PCollectionViews.mapView(
- input,
- input.getWindowingStrategy(),
- input.getCoder())));
+ PCollectionView<Map<K, V>> view =
+ PCollectionViews.mapView(input, input.getWindowingStrategy(), input.getCoder());
+ input.apply(CreatePCollectionView.<KV<K, V>, Map<K, V>>of(view));
+ return view;
}
}
@@ -480,7 +482,7 @@ public class View {
*/
@Internal
public static class CreatePCollectionView<ElemT, ViewT>
- extends PTransform<PCollection<ElemT>, PCollectionView<ViewT>> {
+ extends PTransform<PCollection<ElemT>, PCollection<ElemT>> {
private PCollectionView<ViewT> view;
private CreatePCollectionView(PCollectionView<ViewT> view) {
@@ -506,8 +508,10 @@ public class View {
}
@Override
- public PCollectionView<ViewT> expand(PCollection<ElemT> input) {
- return view;
+ public PCollection<ElemT> expand(PCollection<ElemT> input) {
+ return PCollection.<ElemT>createPrimitiveOutputInternal(
+ input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
+ .setCoder(input.getCoder());
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
index f210fd8..4063d11 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
@@ -20,6 +20,8 @@ package org.apache.beam.sdk.values;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
+import java.util.Collections;
+import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Internal;
@@ -226,6 +228,11 @@ public class PCollection<T> extends PValueBase implements PValue {
return super.getName();
}
+ @Override
+ public final Map<TupleTag<?>, PValue> expand() {
+ return Collections.<TupleTag<?>, PValue>singletonMap(tag, this);
+ }
+
/**
* Sets the name of this {@link PCollection}. Returns {@code this}.
*
@@ -314,6 +321,11 @@ public class PCollection<T> extends PValueBase implements PValue {
private IsBounded isBounded;
+ /**
+ * A local {@link TupleTag} used in the expansion of this {@link PValueBase}.
+ */
+ private final TupleTag<?> tag = new TupleTag<>();
+
private PCollection(Pipeline p) {
super(p);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
index 74887c7..5e2e2c3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
@@ -170,6 +170,15 @@ public class PCollectionViews {
}
/**
+ * Returns if a default value was specified.
+ */
+ @Deprecated
+ @Internal
+ public boolean hasDefault() {
+ return hasDefault;
+ }
+
+ /**
* Returns the default value that was specified.
*
* <p>For internal use only.
@@ -491,5 +500,10 @@ public class PCollectionViews {
public String toString() {
return MoreObjects.toStringHelper(this).add("tag", tag).toString();
}
+
+ @Override
+ public Map<TupleTag<?>, PValue> expand() {
+ return Collections.<TupleTag<?>, PValue>singletonMap(tag, pCollection);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
index 6f638d7..f312eac 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
@@ -19,8 +19,6 @@ package org.apache.beam.sdk.values;
import static com.google.common.base.Preconditions.checkState;
-import java.util.Collections;
-import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.transforms.PTransform;
@@ -87,11 +85,6 @@ public abstract class PValueBase implements PValue {
private String name;
/**
- * A local {@link TupleTag} used in the expansion of this {@link PValueBase}.
- */
- private TupleTag<?> tag = new TupleTag<>();
-
- /**
* Whether this {@link PValueBase} has been finalized, and its core
* properties, e.g., name, can no longer be changed.
*/
@@ -108,11 +101,6 @@ public abstract class PValueBase implements PValue {
}
@Override
- public final Map<TupleTag<?>, PValue> expand() {
- return Collections.<TupleTag<?>, PValue>singletonMap(tag, this);
- }
-
- @Override
public void finishSpecifying(PInput input, PTransform<?, ?> transform) {
finishedSpecifying = true;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
index adf27f8..aaf8b91 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
@@ -22,7 +22,9 @@ import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
@@ -37,6 +39,7 @@ import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.PValueBase;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
@@ -349,5 +352,10 @@ public final class PCollectionViewTesting {
.add("viewFn", viewFn)
.toString();
}
+
+ @Override
+ public Map<TupleTag<?>, PValue> expand() {
+ return Collections.<TupleTag<?>, PValue>singletonMap(tag, pCollection);
+ }
}
}