You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/04/12 19:56:44 UTC
[13/50] [abbrv] beam git commit: Remove Orderdness of Input,
Output expansions
Remove Orderdness of Input, Output expansions
This brings the PInput and POutput expansion signatures back in line
with the Runner API.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0e5737fd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0e5737fd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0e5737fd
Branch: refs/heads/DSL_SQL
Commit: 0e5737fdbee5478ee7f39c4b1a1ac95353ec7b08
Parents: a5a5bf9
Author: Thomas Groh <tg...@google.com>
Authored: Tue Apr 4 16:51:55 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Apr 11 12:40:46 2017 -0700
----------------------------------------------------------------------
.../FlattenPCollectionTranslator.java | 13 +-
.../apex/translation/ParDoTranslator.java | 13 +-
.../apex/translation/TranslationContext.java | 10 +-
.../DeduplicatedFlattenFactory.java | 18 ++-
.../EmptyFlattenAsCreateFactory.java | 7 +-
.../core/construction/PTransformMatchers.java | 5 +-
.../core/construction/PrimitiveCreate.java | 7 +-
.../core/construction/ReplacementOutputs.java | 63 ++++-----
.../SingleInputOutputOverrideFactory.java | 11 +-
.../UnsupportedOverrideFactory.java | 8 +-
.../DeduplicatedFlattenFactoryTest.java | 6 +-
.../EmptyFlattenAsCreateFactoryTest.java | 8 +-
.../construction/PTransformMatchersTest.java | 132 +++++++++----------
.../construction/ReplacementOutputsTest.java | 109 ++-------------
.../SingleInputOutputOverrideFactoryTest.java | 6 +-
.../UnsupportedOverrideFactoryTest.java | 7 +-
.../direct/BoundedReadEvaluatorFactory.java | 2 +-
.../beam/runners/direct/DirectGraphVisitor.java | 5 +-
.../direct/ExecutorServiceParallelExecutor.java | 4 +-
.../runners/direct/FlattenEvaluatorFactory.java | 2 +-
.../GroupAlsoByWindowEvaluatorFactory.java | 2 +-
.../direct/GroupByKeyOnlyEvaluatorFactory.java | 4 +-
.../direct/KeyedPValueTrackingVisitor.java | 14 +-
.../runners/direct/ParDoEvaluatorFactory.java | 10 +-
.../direct/ParDoMultiOverrideFactory.java | 9 +-
.../direct/StatefulParDoEvaluatorFactory.java | 8 +-
.../direct/TestStreamEvaluatorFactory.java | 8 +-
.../direct/UnboundedReadEvaluatorFactory.java | 4 +-
.../runners/direct/ViewEvaluatorFactory.java | 4 +-
.../runners/direct/ViewOverrideFactory.java | 9 +-
.../beam/runners/direct/WatermarkManager.java | 19 +--
.../runners/direct/WindowEvaluatorFactory.java | 2 +-
.../direct/WriteWithShardingFactory.java | 10 +-
.../runners/direct/DirectGraphVisitorTest.java | 7 +-
.../beam/runners/direct/ParDoEvaluatorTest.java | 2 +-
.../StatefulParDoEvaluatorFactoryTest.java | 2 +-
.../direct/TestStreamEvaluatorFactoryTest.java | 5 +-
.../runners/direct/ViewOverrideFactoryTest.java | 2 +-
.../flink/FlinkBatchTransformTranslators.java | 36 ++---
.../flink/FlinkBatchTranslationContext.java | 11 +-
.../flink/FlinkStreamingPipelineTranslator.java | 9 +-
.../FlinkStreamingTransformTranslators.java | 32 ++---
.../flink/FlinkStreamingTranslationContext.java | 12 +-
.../dataflow/BatchStatefulParDoOverrides.java | 15 +--
.../dataflow/DataflowPipelineTranslator.java | 20 +--
.../beam/runners/dataflow/DataflowRunner.java | 27 ++--
.../runners/dataflow/TransformTranslator.java | 6 +-
.../dataflow/DataflowPipelineJobTest.java | 7 +-
.../apache/beam/runners/spark/SparkRunner.java | 20 +--
.../beam/runners/spark/TestSparkRunner.java | 7 +-
.../spark/translation/EvaluationContext.java | 11 +-
.../spark/translation/TransformTranslator.java | 25 ++--
.../streaming/StreamingTransformTranslator.java | 20 +--
.../sdk/runners/PTransformOverrideFactory.java | 10 +-
.../beam/sdk/runners/TransformHierarchy.java | 80 +++++------
.../beam/sdk/transforms/AppliedPTransform.java | 17 ++-
.../transforms/join/KeyedPCollectionTuple.java | 12 +-
.../java/org/apache/beam/sdk/values/PBegin.java | 6 +-
.../apache/beam/sdk/values/PCollectionList.java | 27 ++--
.../beam/sdk/values/PCollectionTuple.java | 10 +-
.../java/org/apache/beam/sdk/values/PDone.java | 6 +-
.../java/org/apache/beam/sdk/values/PInput.java | 4 +-
.../org/apache/beam/sdk/values/POutput.java | 4 +-
.../java/org/apache/beam/sdk/values/PValue.java | 4 +-
.../org/apache/beam/sdk/values/PValueBase.java | 6 +-
.../apache/beam/sdk/values/TaggedPValue.java | 5 +
.../java/org/apache/beam/sdk/PipelineTest.java | 20 ++-
.../sdk/runners/TransformHierarchyTest.java | 72 +++++-----
.../beam/sdk/values/PCollectionListTest.java | 70 ++++------
.../beam/sdk/values/PCollectionTupleTest.java | 5 +-
.../beam/sdk/io/gcp/bigquery/WriteResult.java | 11 +-
71 files changed, 526 insertions(+), 658 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
index 080c5e9..440b801 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
@@ -32,7 +32,8 @@ import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
/**
* {@link Flatten.PCollections} translation to Apex operator.
@@ -63,15 +64,15 @@ class FlattenPCollectionTranslator<T> implements
}
}
- private List<PCollection<T>> extractPCollections(List<TaggedPValue> inputs) {
+ private List<PCollection<T>> extractPCollections(Map<TupleTag<?>, PValue> inputs) {
List<PCollection<T>> collections = Lists.newArrayList();
- for (TaggedPValue pv : inputs) {
+ for (PValue pv : inputs.values()) {
checkArgument(
- pv.getValue() instanceof PCollection,
+ pv instanceof PCollection,
"Non-PCollection provided as input to flatten: %s of type %s",
- pv.getValue(),
+ pv,
pv.getClass().getSimpleName());
- collections.add((PCollection<T>) pv.getValue());
+ collections.add((PCollection<T>) pv);
}
return collections;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
index fa9d21d..9213c1f 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import org.apache.beam.runners.apex.ApexRunner;
import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
import org.apache.beam.sdk.coders.Coder;
@@ -38,7 +39,7 @@ import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,8 +82,8 @@ class ParDoTranslator<InputT, OutputT>
ApexRunner.class.getSimpleName()));
}
- List<TaggedPValue> outputs = context.getOutputs();
- PCollection<InputT> input = context.getInput();
+ Map<TupleTag<?>, PValue> outputs = context.getOutputs();
+ PCollection<InputT> input = (PCollection<InputT>) context.getInput();
List<PCollectionView<?>> sideInputs = transform.getSideInputs();
Coder<InputT> inputCoder = input.getCoder();
WindowedValueCoder<InputT> wvInputCoder =
@@ -100,7 +101,7 @@ class ParDoTranslator<InputT, OutputT>
context.getStateBackend());
Map<PCollection<?>, OutputPort<?>> ports = Maps.newHashMapWithExpectedSize(outputs.size());
- for (TaggedPValue output : outputs) {
+ for (Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
checkArgument(
output.getValue() instanceof PCollection,
"%s %s outputs non-PCollection %s of type %s",
@@ -109,12 +110,12 @@ class ParDoTranslator<InputT, OutputT>
output.getValue(),
output.getValue().getClass().getSimpleName());
PCollection<?> pc = (PCollection<?>) output.getValue();
- if (output.getTag().equals(transform.getMainOutputTag())) {
+ if (output.getKey().equals(transform.getMainOutputTag())) {
ports.put(pc, operator.output);
} else {
int portIndex = 0;
for (TupleTag<?> tag : transform.getSideOutputTags().getAll()) {
- if (tag.equals(output.getTag())) {
+ if (tag.equals(output.getKey())) {
ports.put(pc, operator.sideOutputPorts[portIndex]);
break;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
index 81507ef..c78028e 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
@@ -42,7 +42,7 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
@@ -85,20 +85,20 @@ class TranslationContext {
return getCurrentTransform().getFullName();
}
- public List<TaggedPValue> getInputs() {
+ public Map<TupleTag<?>, PValue> getInputs() {
return getCurrentTransform().getInputs();
}
public <InputT extends PValue> InputT getInput() {
- return (InputT) Iterables.getOnlyElement(getCurrentTransform().getInputs()).getValue();
+ return (InputT) Iterables.getOnlyElement(getCurrentTransform().getInputs().values());
}
- public List<TaggedPValue> getOutputs() {
+ public Map<TupleTag<?>, PValue> getOutputs() {
return getCurrentTransform().getOutputs();
}
public <OutputT extends PValue> OutputT getOutput() {
- return (OutputT) Iterables.getOnlyElement(getCurrentTransform().getOutputs()).getValue();
+ return (OutputT) Iterables.getOnlyElement(getCurrentTransform().getOutputs().values());
}
private AppliedPTransform<?, ?, ?> getCurrentTransform() {
http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DeduplicatedFlattenFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DeduplicatedFlattenFactory.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DeduplicatedFlattenFactory.java
index 093385e..c12c548 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DeduplicatedFlattenFactory.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DeduplicatedFlattenFactory.java
@@ -19,7 +19,6 @@
package org.apache.beam.runners.core.construction;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
@@ -31,7 +30,7 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
/**
* A {@link PTransformOverrideFactory} that will apply a flatten where no element appears in the
@@ -46,6 +45,7 @@ public class DeduplicatedFlattenFactory<T>
}
private DeduplicatedFlattenFactory() {}
+
@Override
public PTransform<PCollectionList<T>, PCollection<T>> getReplacementTransform(
PCollections<T> transform) {
@@ -75,12 +75,16 @@ public class DeduplicatedFlattenFactory<T>
};
}
+ /**
+ * {@inheritDoc}.
+ *
+ * <p>The input {@link PCollectionList} that is constructed will have the same values in the same
+ */
@Override
- public PCollectionList<T> getInput(
- List<TaggedPValue> inputs, Pipeline p) {
+ public PCollectionList<T> getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p) {
PCollectionList<T> pCollections = PCollectionList.empty(p);
- for (TaggedPValue input : inputs) {
- PCollection<T> pcollection = (PCollection<T>) input.getValue();
+ for (PValue input : inputs.values()) {
+ PCollection<T> pcollection = (PCollection<T>) input;
pCollections = pCollections.and(pcollection);
}
return pCollections;
@@ -88,7 +92,7 @@ public class DeduplicatedFlattenFactory<T>
@Override
public Map<PValue, ReplacementOutput> mapOutputs(
- List<TaggedPValue> outputs, PCollection<T> newOutput) {
+ Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) {
return ReplacementOutputs.singleton(outputs, newOutput);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java
index 4328cf3..936bc08 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.core.construction;
import static com.google.common.base.Preconditions.checkArgument;
-import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.VoidCoder;
@@ -31,7 +30,7 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
/**
* A {@link PTransformOverrideFactory} that provides an empty {@link Create} to replace a {@link
@@ -57,7 +56,7 @@ public class EmptyFlattenAsCreateFactory<T>
@Override
public PCollectionList<T> getInput(
- List<TaggedPValue> inputs, Pipeline p) {
+ Map<TupleTag<?>, PValue> inputs, Pipeline p) {
checkArgument(
inputs.isEmpty(),
"Unexpected nonempty input %s for %s",
@@ -68,7 +67,7 @@ public class EmptyFlattenAsCreateFactory<T>
@Override
public Map<PValue, ReplacementOutput> mapOutputs(
- List<TaggedPValue> outputs, PCollection<T> newOutput) {
+ Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) {
return ReplacementOutputs.singleton(outputs, newOutput);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
index 6437f7e..94ec38c 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
@@ -37,7 +37,6 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
/**
* A {@link PTransformMatcher} that matches {@link PTransform PTransforms} based on the class of the
@@ -250,8 +249,8 @@ public class PTransformMatchers {
public boolean matches(AppliedPTransform<?, ?, ?> application) {
if (application.getTransform() instanceof Flatten.PCollections) {
Set<PValue> observed = new HashSet<>();
- for (TaggedPValue pvalue : application.getInputs()) {
- boolean firstInstance = observed.add(pvalue.getValue());
+ for (PValue pvalue : application.getInputs().values()) {
+ boolean firstInstance = observed.add(pvalue);
if (!firstInstance) {
return true;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PrimitiveCreate.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PrimitiveCreate.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PrimitiveCreate.java
index 7bd38b0..9335f3a 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PrimitiveCreate.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PrimitiveCreate.java
@@ -18,7 +18,6 @@
package org.apache.beam.runners.core.construction;
-import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
@@ -30,7 +29,7 @@ import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
/**
* An implementation of {@link Create} that returns a primitive {@link PCollection}.
@@ -63,13 +62,13 @@ public class PrimitiveCreate<T> extends PTransform<PBegin, PCollection<T>> {
}
@Override
- public PBegin getInput(List<TaggedPValue> inputs, Pipeline p) {
+ public PBegin getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p) {
return p.begin();
}
@Override
public Map<PValue, ReplacementOutput> mapOutputs(
- List<TaggedPValue> outputs, PCollection<T> newOutput) {
+ Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) {
return ReplacementOutputs.singleton(outputs, newOutput);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReplacementOutputs.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReplacementOutputs.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReplacementOutputs.java
index 11b4449..3d485ae 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReplacementOutputs.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReplacementOutputs.java
@@ -21,10 +21,11 @@ import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
import org.apache.beam.sdk.values.POutput;
@@ -39,60 +40,40 @@ public class ReplacementOutputs {
private ReplacementOutputs() {}
public static Map<PValue, ReplacementOutput> singleton(
- List<TaggedPValue> original, PValue replacement) {
- TaggedPValue taggedReplacement = Iterables.getOnlyElement(replacement.expand());
- return ImmutableMap.<PValue, ReplacementOutput>builder()
- .put(
- taggedReplacement.getValue(),
- ReplacementOutput.of(Iterables.getOnlyElement(original), taggedReplacement))
- .build();
- }
-
- public static Map<PValue, ReplacementOutput> ordered(
- List<TaggedPValue> original, POutput replacement) {
- ImmutableMap.Builder<PValue, ReplacementOutput> result = ImmutableMap.builder();
- List<TaggedPValue> replacements = replacement.expand();
- checkArgument(
- original.size() == replacements.size(),
- "Original and Replacements must be the same size. Original: %s Replacement: %s",
- original.size(),
- replacements.size());
- int i = 0;
- for (TaggedPValue replacementPvalue : replacements) {
- result.put(
- replacementPvalue.getValue(), ReplacementOutput.of(original.get(i), replacementPvalue));
- i++;
- }
- return result.build();
+ Map<TupleTag<?>, PValue> original, PValue replacement) {
+ Entry<TupleTag<?>, PValue> originalElement = Iterables.getOnlyElement(original.entrySet());
+ TupleTag<?> replacementTag = Iterables.getOnlyElement(replacement.expand().entrySet()).getKey();
+ return Collections.singletonMap(
+ replacement,
+ ReplacementOutput.of(
+ TaggedPValue.of(originalElement.getKey(), originalElement.getValue()),
+ TaggedPValue.of(replacementTag, replacement)));
}
public static Map<PValue, ReplacementOutput> tagged(
- List<TaggedPValue> original, POutput replacement) {
+ Map<TupleTag<?>, PValue> original, POutput replacement) {
Map<TupleTag<?>, TaggedPValue> originalTags = new HashMap<>();
- for (TaggedPValue value : original) {
- TaggedPValue former = originalTags.put(value.getTag(), value);
- checkArgument(
- former == null || former.equals(value),
- "Found two tags in an expanded output which map to different values: output: %s "
- + "Values: %s and %s",
- original,
- former,
- value);
+ for (Map.Entry<TupleTag<?>, PValue> originalValue : original.entrySet()) {
+ originalTags.put(
+ originalValue.getKey(),
+ TaggedPValue.of(originalValue.getKey(), originalValue.getValue()));
}
ImmutableMap.Builder<PValue, ReplacementOutput> resultBuilder = ImmutableMap.builder();
Set<TupleTag<?>> missingTags = new HashSet<>(originalTags.keySet());
- for (TaggedPValue replacementValue : replacement.expand()) {
- TaggedPValue mapped = originalTags.get(replacementValue.getTag());
+ for (Map.Entry<TupleTag<?>, PValue> replacementValue : replacement.expand().entrySet()) {
+ TaggedPValue mapped = originalTags.get(replacementValue.getKey());
checkArgument(
mapped != null,
"Missing original output for Tag %s and Value %s Between original %s and replacement %s",
- replacementValue.getTag(),
+ replacementValue.getKey(),
replacementValue.getValue(),
original,
replacement.expand());
resultBuilder.put(
- replacementValue.getValue(), ReplacementOutput.of(mapped, replacementValue));
- missingTags.remove(replacementValue.getTag());
+ replacementValue.getValue(),
+ ReplacementOutput.of(
+ mapped, TaggedPValue.of(replacementValue.getKey(), replacementValue.getValue())));
+ missingTags.remove(replacementValue.getKey());
}
ImmutableMap<PValue, ReplacementOutput> result = resultBuilder.build();
checkArgument(
http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactory.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactory.java
index 43bf556..6d0d571 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactory.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactory.java
@@ -19,17 +19,16 @@
package org.apache.beam.runners.core.construction;
import com.google.common.collect.Iterables;
-import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
/**
* A {@link PTransformOverrideFactory} which consumes from a {@link PValue} and produces a
- * {@link PValue}. {@link #getInput(List, Pipeline)} and {@link #mapOutputs(List, PValue)} are
+ * {@link PValue}. {@link #getInput(Map, Pipeline)} and {@link #mapOutputs(Map, PValue)} are
* implemented.
*/
public abstract class SingleInputOutputOverrideFactory<
@@ -38,13 +37,13 @@ public abstract class SingleInputOutputOverrideFactory<
TransformT extends PTransform<InputT, OutputT>>
implements PTransformOverrideFactory<InputT, OutputT, TransformT> {
@Override
- public final InputT getInput(List<TaggedPValue> inputs, Pipeline p) {
- return (InputT) Iterables.getOnlyElement(inputs).getValue();
+ public final InputT getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p) {
+ return (InputT) Iterables.getOnlyElement(inputs.values());
}
@Override
public final Map<PValue, ReplacementOutput> mapOutputs(
- List<TaggedPValue> outputs, OutputT newOutput) {
+ Map<TupleTag<?>, PValue> outputs, OutputT newOutput) {
return ReplacementOutputs.singleton(outputs, newOutput);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnsupportedOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnsupportedOverrideFactory.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnsupportedOverrideFactory.java
index 38cbd2a..7b9d704 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnsupportedOverrideFactory.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnsupportedOverrideFactory.java
@@ -18,7 +18,6 @@
package org.apache.beam.runners.core.construction;
-import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
@@ -26,7 +25,7 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
/**
* A {@link PTransformOverrideFactory} that throws an exception when a call to
@@ -60,12 +59,13 @@ public final class UnsupportedOverrideFactory<
}
@Override
- public InputT getInput(List<TaggedPValue> inputs, Pipeline p) {
+ public InputT getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p) {
throw new UnsupportedOperationException(message);
}
@Override
- public Map<PValue, ReplacementOutput> mapOutputs(List<TaggedPValue> outputs, OutputT newOutput) {
+ public Map<PValue, ReplacementOutput> mapOutputs(
+ Map<TupleTag<?>, PValue> outputs, OutputT newOutput) {
throw new UnsupportedOperationException(message);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/DeduplicatedFlattenFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/DeduplicatedFlattenFactoryTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/DeduplicatedFlattenFactoryTest.java
index a251f5a..14aa1e6 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/DeduplicatedFlattenFactoryTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/DeduplicatedFlattenFactoryTest.java
@@ -22,7 +22,6 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
-import com.google.common.collect.Iterables;
import org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults;
import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
import org.apache.beam.sdk.runners.TransformHierarchy;
@@ -35,6 +34,7 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
import org.hamcrest.Matchers;
import org.junit.Rule;
import org.junit.Test;
@@ -106,7 +106,7 @@ public class DeduplicatedFlattenFactoryTest {
Matchers.<PValue, ReplacementOutput>hasEntry(
replacement,
ReplacementOutput.of(
- Iterables.getOnlyElement(original.expand()),
- Iterables.getOnlyElement(replacement.expand()))));
+ TaggedPValue.ofExpandedValue(original),
+ TaggedPValue.ofExpandedValue(replacement))));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactoryTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactoryTest.java
index ad9d908..90bbee7 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactoryTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactoryTest.java
@@ -21,7 +21,6 @@ package org.apache.beam.runners.core.construction;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
-import com.google.common.collect.Iterables;
import java.util.Collections;
import java.util.Map;
import org.apache.beam.sdk.io.CountingInput;
@@ -34,6 +33,7 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
import org.hamcrest.Matchers;
import org.junit.Rule;
import org.junit.Test;
@@ -55,7 +55,7 @@ public class EmptyFlattenAsCreateFactoryTest {
@Test
public void getInputEmptySucceeds() {
assertThat(
- factory.getInput(Collections.<TaggedPValue>emptyList(), pipeline).size(), equalTo(0));
+ factory.getInput(Collections.<TupleTag<?>, PValue>emptyMap(), pipeline).size(), equalTo(0));
}
@Test
@@ -80,8 +80,8 @@ public class EmptyFlattenAsCreateFactoryTest {
Matchers.<PValue, ReplacementOutput>hasEntry(
replacement,
ReplacementOutput.of(
- Iterables.getOnlyElement(original.expand()),
- Iterables.getOnlyElement(replacement.expand()))));
+ TaggedPValue.ofExpandedValue(original),
+ TaggedPValue.ofExpandedValue(replacement))));
}
@Test
http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index d053f62..4084cdc 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -24,7 +24,7 @@ import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import java.io.Serializable;
import java.util.Collections;
import org.apache.beam.sdk.coders.VarIntCoder;
@@ -65,7 +65,7 @@ import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.hamcrest.Matchers;
@@ -385,17 +385,15 @@ public class PTransformMatchersTest implements Serializable {
public void emptyFlattenWithEmptyFlatten() {
AppliedPTransform application =
AppliedPTransform
- .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>
- of(
- "EmptyFlatten",
- Collections.<TaggedPValue>emptyList(),
- Collections.singletonList(
- TaggedPValue.of(
- new TupleTag<Object>(),
- PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED))),
- Flatten.pCollections(),
- p);
+ .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>of(
+ "EmptyFlatten",
+ Collections.<TupleTag<?>, PValue>emptyMap(),
+ Collections.<TupleTag<?>, PValue>singletonMap(
+ new TupleTag<Object>(),
+ PCollection.createPrimitiveOutputInternal(
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
+ Flatten.pCollections(),
+ p);
assertThat(PTransformMatchers.emptyFlatten().matches(application), is(true));
}
@@ -404,21 +402,18 @@ public class PTransformMatchersTest implements Serializable {
public void emptyFlattenWithNonEmptyFlatten() {
AppliedPTransform application =
AppliedPTransform
- .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>
- of(
- "Flatten",
- Collections.singletonList(
- TaggedPValue.of(
- new TupleTag<Object>(),
- PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED))),
- Collections.singletonList(
- TaggedPValue.of(
- new TupleTag<Object>(),
- PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED))),
- Flatten.pCollections(),
- p);
+ .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>of(
+ "Flatten",
+ Collections.<TupleTag<?>, PValue>singletonMap(
+ new TupleTag<Object>(),
+ PCollection.createPrimitiveOutputInternal(
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
+ Collections.<TupleTag<?>, PValue>singletonMap(
+ new TupleTag<Object>(),
+ PCollection.createPrimitiveOutputInternal(
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
+ Flatten.pCollections(),
+ p);
assertThat(PTransformMatchers.emptyFlatten().matches(application), is(false));
}
@@ -427,18 +422,16 @@ public class PTransformMatchersTest implements Serializable {
public void emptyFlattenWithNonFlatten() {
AppliedPTransform application =
AppliedPTransform
- .<PCollection<Iterable<Object>>, PCollection<Object>, Flatten.Iterables<Object>>
- of(
- "EmptyFlatten",
- Collections.<TaggedPValue>emptyList(),
- Collections.singletonList(
- TaggedPValue.of(
- new TupleTag<Object>(),
- PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED))),
- Flatten.iterables() /* This isn't actually possible to construct,
+ .<PCollection<Iterable<Object>>, PCollection<Object>, Flatten.Iterables<Object>>of(
+ "EmptyFlatten",
+ Collections.<TupleTag<?>, PValue>emptyMap(),
+ Collections.<TupleTag<?>, PValue>singletonMap(
+ new TupleTag<Object>(),
+ PCollection.createPrimitiveOutputInternal(
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
+ Flatten.iterables() /* This isn't actually possible to construct,
* but for the sake of example */,
- p);
+ p);
assertThat(PTransformMatchers.emptyFlatten().matches(application), is(false));
}
@@ -447,19 +440,16 @@ public class PTransformMatchersTest implements Serializable {
public void flattenWithDuplicateInputsWithoutDuplicates() {
AppliedPTransform application =
AppliedPTransform
- .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>
- of(
+ .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>of(
"Flatten",
- Collections.singletonList(
- TaggedPValue.of(
- new TupleTag<Object>(),
- PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED))),
- Collections.singletonList(
- TaggedPValue.of(
- new TupleTag<Object>(),
- PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED))),
+ Collections.<TupleTag<?>, PValue>singletonMap(
+ new TupleTag<Object>(),
+ PCollection.createPrimitiveOutputInternal(
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
+ Collections.<TupleTag<?>, PValue>singletonMap(
+ new TupleTag<Object>(),
+ PCollection.createPrimitiveOutputInternal(
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
Flatten.pCollections(),
p);
@@ -468,21 +458,21 @@ public class PTransformMatchersTest implements Serializable {
@Test
public void flattenWithDuplicateInputsWithDuplicates() {
- PCollection<Object> duplicate = PCollection.createPrimitiveOutputInternal(p,
- WindowingStrategy.globalDefault(),
- IsBounded.BOUNDED);
+ PCollection<Object> duplicate =
+ PCollection.createPrimitiveOutputInternal(
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
AppliedPTransform application =
AppliedPTransform
.<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>of(
"Flatten",
- ImmutableList.of(
- TaggedPValue.of(new TupleTag<Object>(), duplicate),
- TaggedPValue.of(new TupleTag<Object>(), duplicate)),
- Collections.singletonList(
- TaggedPValue.of(
- new TupleTag<Object>(),
- PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED))),
+ ImmutableMap.<TupleTag<?>, PValue>builder()
+ .put(new TupleTag<Object>(), duplicate)
+ .put(new TupleTag<Object>(), duplicate)
+ .build(),
+ Collections.<TupleTag<?>, PValue>singletonMap(
+ new TupleTag<Object>(),
+ PCollection.createPrimitiveOutputInternal(
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
Flatten.pCollections(),
p);
@@ -493,15 +483,13 @@ public class PTransformMatchersTest implements Serializable {
public void flattenWithDuplicateInputsNonFlatten() {
AppliedPTransform application =
AppliedPTransform
- .<PCollection<Iterable<Object>>, PCollection<Object>, Flatten.Iterables<Object>>
- of(
+ .<PCollection<Iterable<Object>>, PCollection<Object>, Flatten.Iterables<Object>>of(
"EmptyFlatten",
- Collections.<TaggedPValue>emptyList(),
- Collections.singletonList(
- TaggedPValue.of(
- new TupleTag<Object>(),
- PCollection.createPrimitiveOutputInternal(
- p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED))),
+ Collections.<TupleTag<?>, PValue>emptyMap(),
+ Collections.<TupleTag<?>, PValue>singletonMap(
+ new TupleTag<Object>(),
+ PCollection.createPrimitiveOutputInternal(
+ p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)),
Flatten.iterables() /* This isn't actually possible to construct,
* but for the sake of example */,
p);
@@ -541,8 +529,8 @@ public class PTransformMatchersTest implements Serializable {
private AppliedPTransform<?, ?, ?> appliedWrite(Write<Integer> write) {
return AppliedPTransform.<PCollection<Integer>, PDone, Write<Integer>>of(
"Write",
- Collections.<TaggedPValue>emptyList(),
- Collections.<TaggedPValue>emptyList(),
+ Collections.<TupleTag<?>, PValue>emptyMap(),
+ Collections.<TupleTag<?>, PValue>emptyMap(),
write,
p);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReplacementOutputsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReplacementOutputsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReplacementOutputsTest.java
index abfdeef..00c436d 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReplacementOutputsTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReplacementOutputsTest.java
@@ -21,18 +21,15 @@ package org.apache.beam.runners.core.construction;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
-import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TaggedPValue;
import org.apache.beam.sdk.values.TupleTag;
@@ -79,8 +76,10 @@ public class ReplacementOutputsTest {
assertThat(replacements, Matchers.<PValue>hasKey(replacementInts));
ReplacementOutput replacement = replacements.get(replacementInts);
- TaggedPValue taggedInts = Iterables.getOnlyElement(ints.expand());
- assertThat(replacement.getOriginal(), equalTo(taggedInts));
+ Map.Entry<TupleTag<?>, PValue> taggedInts = Iterables.getOnlyElement(ints.expand().entrySet());
+ assertThat(
+ replacement.getOriginal().getTag(), Matchers.<TupleTag<?>>equalTo(taggedInts.getKey()));
+ assertThat(replacement.getOriginal().getValue(), equalTo(taggedInts.getValue()));
assertThat(replacement.getReplacement().getValue(), Matchers.<PValue>equalTo(replacementInts));
}
@@ -88,44 +87,11 @@ public class ReplacementOutputsTest {
public void singletonMultipleOriginalsThrows() {
thrown.expect(IllegalArgumentException.class);
ReplacementOutputs.singleton(
- ImmutableList.copyOf(Iterables.concat(ints.expand(), moreInts.expand())), replacementInts);
- }
-
- @Test
- public void orderedSucceeds() {
- List<TaggedPValue> originals = PCollectionList.of(ints).and(moreInts).expand();
- Map<PValue, ReplacementOutput> replacements =
- ReplacementOutputs.ordered(
- originals, PCollectionList.of(replacementInts).and(moreReplacementInts));
- assertThat(
- replacements.keySet(),
- Matchers.<PValue>containsInAnyOrder(replacementInts, moreReplacementInts));
-
- ReplacementOutput intsMapping = replacements.get(replacementInts);
- assertThat(intsMapping.getOriginal().getValue(), Matchers.<PValue>equalTo(ints));
- assertThat(intsMapping.getReplacement().getValue(), Matchers.<PValue>equalTo(replacementInts));
-
- ReplacementOutput moreIntsMapping = replacements.get(moreReplacementInts);
- assertThat(moreIntsMapping.getOriginal().getValue(), Matchers.<PValue>equalTo(moreInts));
- assertThat(
- moreIntsMapping.getReplacement().getValue(), Matchers.<PValue>equalTo(moreReplacementInts));
- }
-
- @Test
- public void orderedTooManyReplacements() {
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("same size");
- ReplacementOutputs.ordered(
- PCollectionList.of(ints).expand(),
- PCollectionList.of(replacementInts).and(moreReplacementInts));
- }
-
- @Test
- public void orderedTooFewReplacements() {
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("same size");
- ReplacementOutputs.ordered(
- PCollectionList.of(ints).and(moreInts).expand(), PCollectionList.of(moreReplacementInts));
+ ImmutableMap.<TupleTag<?>, PValue>builder()
+ .putAll(ints.expand())
+ .putAll(moreInts.expand())
+ .build(),
+ replacementInts);
}
private TupleTag<Integer> intsTag = new TupleTag<>();
@@ -168,61 +134,6 @@ public class ReplacementOutputsTest {
TaggedPValue.of(moreIntsTag, moreReplacementInts))));
}
- /**
- * When a call to {@link ReplacementOutputs#tagged(List, POutput)} is made where the first
- * argument contains multiple copies of the same {@link TaggedPValue}, the call succeeds using
- * that mapping.
- */
- @Test
- public void taggedMultipleInstances() {
- List<TaggedPValue> original =
- ImmutableList.of(
- TaggedPValue.of(intsTag, ints),
- TaggedPValue.of(strsTag, strs),
- TaggedPValue.of(intsTag, ints));
-
- Map<PValue, ReplacementOutput> replacements =
- ReplacementOutputs.tagged(
- original, PCollectionTuple.of(strsTag, replacementStrs).and(intsTag, replacementInts));
- assertThat(
- replacements.keySet(),
- Matchers.<PValue>containsInAnyOrder(replacementStrs, replacementInts));
- ReplacementOutput intsReplacement = replacements.get(replacementInts);
- ReplacementOutput strsReplacement = replacements.get(replacementStrs);
-
- assertThat(
- intsReplacement,
- equalTo(
- ReplacementOutput.of(
- TaggedPValue.of(intsTag, ints), TaggedPValue.of(intsTag, replacementInts))));
- assertThat(
- strsReplacement,
- equalTo(
- ReplacementOutput.of(
- TaggedPValue.of(strsTag, strs), TaggedPValue.of(strsTag, replacementStrs))));
- }
-
- /**
- * When a call to {@link ReplacementOutputs#tagged(List, POutput)} is made where a single tag
- * has multiple {@link PValue PValues} mapped to it, the call fails.
- */
- @Test
- public void taggedMultipleConflictingInstancesThrows() {
- List<TaggedPValue> original =
- ImmutableList.of(
- TaggedPValue.of(intsTag, ints), TaggedPValue.of(intsTag, moreReplacementInts));
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("different values");
- thrown.expectMessage(intsTag.toString());
- thrown.expectMessage(ints.toString());
- thrown.expectMessage(moreReplacementInts.toString());
- ReplacementOutputs.tagged(
- original,
- PCollectionTuple.of(strsTag, replacementStrs)
- .and(moreIntsTag, moreReplacementInts)
- .and(intsTag, replacementInts));
- }
-
@Test
public void taggedMissingReplacementThrows() {
PCollectionTuple original =
http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java
index b4cdd1f..07352f5 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.core.construction;
import static org.junit.Assert.assertThat;
-import com.google.common.collect.Iterables;
import java.io.Serializable;
import java.util.Map;
import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
@@ -32,6 +31,7 @@ import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
import org.hamcrest.Matchers;
import org.junit.Rule;
import org.junit.Test;
@@ -97,8 +97,8 @@ public class SingleInputOutputOverrideFactoryTest implements Serializable {
Matchers.<PValue, ReplacementOutput>hasEntry(
reappliedOutput,
ReplacementOutput.of(
- Iterables.getOnlyElement(output.expand()),
- Iterables.getOnlyElement(reappliedOutput.expand()))));
+ TaggedPValue.ofExpandedValue(output),
+ TaggedPValue.ofExpandedValue(reappliedOutput))));
}
@Test
http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnsupportedOverrideFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnsupportedOverrideFactoryTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnsupportedOverrideFactoryTest.java
index f33d0f9..81ce00d 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnsupportedOverrideFactoryTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnsupportedOverrideFactoryTest.java
@@ -23,7 +23,8 @@ import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -53,13 +54,13 @@ public class UnsupportedOverrideFactoryTest {
public void getInputThrows() {
thrown.expect(UnsupportedOperationException.class);
thrown.expectMessage(message);
- factory.getInput(Collections.<TaggedPValue>emptyList(), pipeline);
+ factory.getInput(Collections.<TupleTag<?>, PValue>emptyMap(), pipeline);
}
@Test
public void mapOutputThrows() {
thrown.expect(UnsupportedOperationException.class);
thrown.expectMessage(message);
- factory.mapOutputs(Collections.<TaggedPValue>emptyList(), PDone.in(pipeline));
+ factory.mapOutputs(Collections.<TupleTag<?>, PValue>emptyMap(), PDone.in(pipeline));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
index 57735e7..5bd6f7e 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
@@ -117,7 +117,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
ExecutorService executor) {
this.evaluationContext = evaluationContext;
this.outputPCollection =
- (PCollection<OutputT>) Iterables.getOnlyElement(transform.getOutputs()).getValue();
+ (PCollection<OutputT>) Iterables.getOnlyElement(transform.getOutputs().values());
this.resultBuilder = StepTransformResult.withoutHold(transform);
this.minimumDynamicSplitSize = minimumDynamicSplitSize;
this.produceSplitExecutor = executor;
http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
index 7e6845d..c342136 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
@@ -34,7 +34,6 @@ import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
/**
* Tracks the {@link AppliedPTransform AppliedPTransforms} that consume each {@link PValue} in the
@@ -83,8 +82,8 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults {
if (node.getInputs().isEmpty()) {
rootTransforms.add(appliedTransform);
} else {
- for (TaggedPValue value : node.getInputs()) {
- primitiveConsumers.put(value.getValue(), appliedTransform);
+ for (PValue value : node.getInputs().values()) {
+ primitiveConsumers.put(value, appliedTransform);
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 8b9f995..c802c58 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -534,8 +534,8 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
.createKeyedBundle(
transformTimers.getKey(),
(PCollection)
- Iterables.getOnlyElement(transformTimers.getTransform().getInputs())
- .getValue())
+ Iterables.getOnlyElement(
+ transformTimers.getTransform().getInputs().values()))
.add(WindowedValue.valueInGlobalWindow(work))
.commit(evaluationContext.now());
scheduleConsumption(
http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
index 8528905..7c6d2a1 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
@@ -57,7 +57,7 @@ class FlattenEvaluatorFactory implements TransformEvaluatorFactory {
application) {
final UncommittedBundle<InputT> outputBundle =
evaluationContext.createBundle(
- (PCollection<InputT>) Iterables.getOnlyElement(application.getOutputs()).getValue());
+ (PCollection<InputT>) Iterables.getOnlyElement(application.getOutputs().values()));
final TransformResult<InputT> result =
StepTransformResult.<InputT>withoutHold(application).addOutput(outputBundle).build();
return new FlattenEvaluator<>(outputBundle, result);
http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index 04e5aaa..f7fd4cf 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -162,7 +162,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
evaluationContext.createKeyedBundle(
structuralKey,
(PCollection<KV<K, Iterable<V>>>)
- Iterables.getOnlyElement(application.getOutputs()).getValue());
+ Iterables.getOnlyElement(application.getOutputs().values()));
outputBundles.add(bundle);
CopyOnAccessInMemoryStateInternals<K> stateInternals =
(CopyOnAccessInMemoryStateInternals<K>) stepContext.stateInternals();
http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
index ef96a92..ac0b14f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
@@ -105,7 +105,7 @@ class GroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFactory {
this.application = application;
this.keyCoder =
getKeyCoder(
- ((PCollection<KV<K, V>>) Iterables.getOnlyElement(application.getInputs()).getValue())
+ ((PCollection<KV<K, V>>) Iterables.getOnlyElement(application.getInputs().values()))
.getCoder());
this.groupingMap = new HashMap<>();
}
@@ -158,7 +158,7 @@ class GroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFactory {
evaluationContext.createKeyedBundle(
StructuralKey.of(key, keyCoder),
(PCollection<KeyedWorkItem<K, V>>)
- Iterables.getOnlyElement(application.getOutputs()).getValue());
+ Iterables.getOnlyElement(application.getOutputs().values()));
bundle.add(WindowedValue.valueInGlobalWindow(groupedKv));
resultBuilder.addOutput(bundle);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
index 02b1bed..f9b6eba 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
@@ -21,7 +21,7 @@ import static com.google.common.base.Preconditions.checkState;
import com.google.common.collect.ImmutableSet;
import java.util.HashSet;
-import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.beam.runners.core.SplittableParDo;
import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
@@ -32,7 +32,7 @@ import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
/**
* A pipeline visitor that tracks all keyed {@link PValue PValues}. A {@link PValue} is keyed if it
@@ -83,9 +83,9 @@ class KeyedPValueTrackingVisitor implements PipelineVisitor {
if (node.isRootNode()) {
finalized = true;
} else if (PRODUCES_KEYED_OUTPUTS.contains(node.getTransform().getClass())) {
- List<TaggedPValue> outputs = node.getOutputs();
- for (TaggedPValue output : outputs) {
- keyedValues.add(output.getValue());
+ Map<TupleTag<?>, PValue> outputs = node.getOutputs();
+ for (PValue output : outputs.values()) {
+ keyedValues.add(output);
}
}
}
@@ -96,8 +96,8 @@ class KeyedPValueTrackingVisitor implements PipelineVisitor {
@Override
public void visitValue(PValue value, TransformHierarchy.Node producer) {
boolean inputsAreKeyed = true;
- for (TaggedPValue input : producer.getInputs()) {
- inputsAreKeyed = inputsAreKeyed && keyedValues.contains(input.getValue());
+ for (PValue input : producer.getInputs().values()) {
+ inputsAreKeyed = inputsAreKeyed && keyedValues.contains(input);
}
if (PRODUCES_KEYED_OUTPUTS.contains(producer.getTransform().getClass())
|| (isKeyPreserving(producer.getTransform()) && inputsAreKeyed)) {
http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
index b0e97fb..b8a13e2 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
@@ -32,7 +32,7 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -141,7 +141,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
evaluationContext,
stepContext,
application,
- ((PCollection<InputT>) Iterables.getOnlyElement(application.getInputs()).getValue())
+ ((PCollection<InputT>) Iterables.getOnlyElement(application.getInputs().values()))
.getWindowingStrategy(),
fn,
key,
@@ -162,10 +162,10 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
}
}
- private Map<TupleTag<?>, PCollection<?>> pcollections(List<TaggedPValue> outputs) {
+ private Map<TupleTag<?>, PCollection<?>> pcollections(Map<TupleTag<?>, PValue> outputs) {
Map<TupleTag<?>, PCollection<?>> pcs = new HashMap<>();
- for (TaggedPValue output : outputs) {
- pcs.put(output.getTag(), (PCollection<?>) output.getValue());
+ for (Map.Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
+ pcs.put(output.getKey(), (PCollection<?>) output.getValue());
}
return pcs;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 056a0c3..00c0d6a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.direct;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.collect.Iterables;
-import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItemCoder;
@@ -50,7 +49,7 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypedPValue;
@@ -86,13 +85,13 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
@Override
public PCollection<? extends InputT> getInput(
- List<TaggedPValue> inputs, Pipeline p) {
- return (PCollection<? extends InputT>) Iterables.getOnlyElement(inputs).getValue();
+ Map<TupleTag<?>, PValue> inputs, Pipeline p) {
+ return (PCollection<? extends InputT>) Iterables.getOnlyElement(inputs.values());
}
@Override
public Map<PValue, ReplacementOutput> mapOutputs(
- List<TaggedPValue> outputs, PCollectionTuple newOutput) {
+ Map<TupleTag<?>, PValue> outputs, PCollectionTuple newOutput) {
return ReplacementOutputs.tagged(outputs, newOutput);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index 77bebb2..f8fe3d6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Lists;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Map.Entry;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItems;
import org.apache.beam.runners.core.StateNamespace;
@@ -52,7 +53,7 @@ import org.apache.beam.sdk.util.state.StateSpec;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
/** A {@link TransformEvaluatorFactory} for stateful {@link ParDo}. */
@@ -139,8 +140,9 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo
String stepName = evaluationContext.getStepName(transformOutputWindow.getTransform());
Map<TupleTag<?>, PCollection<?>> taggedValues = new HashMap<>();
- for (TaggedPValue pv : transformOutputWindow.getTransform().getOutputs()) {
- taggedValues.put(pv.getTag(), (PCollection<?>) pv.getValue());
+ for (Entry<TupleTag<?>, PValue> pv :
+ transformOutputWindow.getTransform().getOutputs().entrySet()) {
+ taggedValues.put(pv.getKey(), (PCollection<?>) pv.getValue());
}
PCollection<?> pc =
taggedValues
http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index 0dd8919..6e0a4fc 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -48,8 +48,8 @@ import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -108,7 +108,7 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
if (event.getType().equals(EventType.ELEMENT)) {
UncommittedBundle<T> bundle =
context.createBundle(
- (PCollection<T>) Iterables.getOnlyElement(application.getOutputs()).getValue());
+ (PCollection<T>) Iterables.getOnlyElement(application.getOutputs().values()));
for (TimestampedValue<T> elem : ((ElementEvent<T>) event).getElements()) {
bundle.add(
WindowedValue.timestampedValueInGlobalWindow(elem.getValue(), elem.getTimestamp()));
@@ -176,13 +176,13 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
}
@Override
- public PBegin getInput(List<TaggedPValue> inputs, Pipeline p) {
+ public PBegin getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p) {
return p.begin();
}
@Override
public Map<PValue, ReplacementOutput> mapOutputs(
- List<TaggedPValue> outputs, PCollection<T> newOutput) {
+ Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) {
return ReplacementOutputs.singleton(outputs, newOutput);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index f0eef58..91e7248 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -121,7 +121,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
WindowedValue<UnboundedSourceShard<OutputT, CheckpointMarkT>> element) throws IOException {
UncommittedBundle<OutputT> output =
evaluationContext.createBundle(
- (PCollection<OutputT>) getOnlyElement(transform.getOutputs()).getValue());
+ (PCollection<OutputT>) getOnlyElement(transform.getOutputs().values()));
UnboundedSourceShard<OutputT, CheckpointMarkT> shard = element.getValue();
UnboundedReader<OutputT> reader = null;
try {
@@ -227,7 +227,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
// committing the output.
if (!reader.getWatermark().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
PCollection<OutputT> outputPc =
- (PCollection<OutputT>) Iterables.getOnlyElement(transform.getOutputs()).getValue();
+ (PCollection<OutputT>) Iterables.getOnlyElement(transform.getOutputs().values());
evaluationContext.scheduleAfterOutputWouldBeProduced(
outputPc,
GlobalWindow.INSTANCE,
http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
index dc74d3e..8cbe8fc 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
@@ -64,9 +64,9 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory {
final AppliedPTransform<PCollection<Iterable<InT>>, PCollectionView<OuT>, WriteView<InT, OuT>>
application) {
PCollection<Iterable<InT>> input =
- (PCollection<Iterable<InT>>) Iterables.getOnlyElement(application.getInputs()).getValue();
+ (PCollection<Iterable<InT>>) Iterables.getOnlyElement(application.getInputs().values());
final PCollectionViewWriter<InT, OuT> writer = context.createPCollectionViewWriter(input,
- (PCollectionView<OuT>) Iterables.getOnlyElement(application.getOutputs()).getValue());
+ (PCollectionView<OuT>) Iterables.getOnlyElement(application.getOutputs().values()));
return new TransformEvaluator<Iterable<InT>>() {
private final List<WindowedValue<InT>> elements = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
index 64e1218..52dc329 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.direct;
import com.google.common.collect.Iterables;
import java.util.Collections;
-import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.construction.ForwardingPTransform;
import org.apache.beam.sdk.Pipeline;
@@ -35,7 +34,7 @@ import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
/**
* A {@link PTransformOverrideFactory} that provides overrides for the {@link CreatePCollectionView}
@@ -51,13 +50,13 @@ class ViewOverrideFactory<ElemT, ViewT>
}
@Override
- public PCollection<ElemT> getInput(List<TaggedPValue> inputs, Pipeline p) {
- return (PCollection<ElemT>) Iterables.getOnlyElement(inputs).getValue();
+ public PCollection<ElemT> getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p) {
+ return (PCollection<ElemT>) Iterables.getOnlyElement(inputs.values());
}
@Override
public Map<PValue, ReplacementOutput> mapOutputs(
- List<TaggedPValue> outputs, PCollectionView<ViewT> newOutput) {
+ Map<TupleTag<?>, PValue> outputs, PCollectionView<ViewT> newOutput) {
return Collections.emptyMap();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index 942e67c..8c04362 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -61,7 +61,8 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Instant;
/**
@@ -818,13 +819,13 @@ public class WatermarkManager {
private Collection<Watermark> getInputProcessingWatermarks(AppliedPTransform<?, ?, ?> transform) {
ImmutableList.Builder<Watermark> inputWmsBuilder = ImmutableList.builder();
- List<TaggedPValue> inputs = transform.getInputs();
+ Map<TupleTag<?>, PValue> inputs = transform.getInputs();
if (inputs.isEmpty()) {
inputWmsBuilder.add(THE_END_OF_TIME);
}
- for (TaggedPValue pvalue : inputs) {
+ for (PValue pvalue : inputs.values()) {
Watermark producerOutputWatermark =
- getTransformWatermark(graph.getProducer(pvalue.getValue()))
+ getTransformWatermark(graph.getProducer(pvalue))
.synchronizedProcessingOutputWatermark;
inputWmsBuilder.add(producerOutputWatermark);
}
@@ -833,13 +834,13 @@ public class WatermarkManager {
private List<Watermark> getInputWatermarks(AppliedPTransform<?, ?, ?> transform) {
ImmutableList.Builder<Watermark> inputWatermarksBuilder = ImmutableList.builder();
- List<TaggedPValue> inputs = transform.getInputs();
+ Map<TupleTag<?>, PValue> inputs = transform.getInputs();
if (inputs.isEmpty()) {
inputWatermarksBuilder.add(THE_END_OF_TIME);
}
- for (TaggedPValue pvalue : inputs) {
+ for (PValue pvalue : inputs.values()) {
Watermark producerOutputWatermark =
- getTransformWatermark(graph.getProducer(pvalue.getValue())).outputWatermark;
+ getTransformWatermark(graph.getProducer(pvalue)).outputWatermark;
inputWatermarksBuilder.add(producerOutputWatermark);
}
List<Watermark> inputCollectionWatermarks = inputWatermarksBuilder.build();
@@ -1023,8 +1024,8 @@ public class WatermarkManager {
WatermarkUpdate updateResult = myWatermarks.refresh();
if (updateResult.isAdvanced()) {
Set<AppliedPTransform<?, ?, ?>> additionalRefreshes = new HashSet<>();
- for (TaggedPValue outputPValue : toRefresh.getOutputs()) {
- additionalRefreshes.addAll(graph.getPrimitiveConsumers(outputPValue.getValue()));
+ for (PValue outputPValue : toRefresh.getOutputs().values()) {
+ additionalRefreshes.addAll(graph.getPrimitiveConsumers(outputPValue));
}
return additionalRefreshes;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
index 8974c67..2550924 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
@@ -57,7 +57,7 @@ class WindowEvaluatorFactory implements TransformEvaluatorFactory {
WindowFn<? super InputT, ?> fn = transform.getTransform().getWindowFn();
UncommittedBundle<InputT> outputBundle =
evaluationContext.createBundle(
- (PCollection<InputT>) Iterables.getOnlyElement(transform.getOutputs()).getValue());
+ (PCollection<InputT>) Iterables.getOnlyElement(transform.getOutputs().values()));
if (fn == null) {
return PassthroughTransformEvaluator.create(transform, outputBundle);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index 1bf5839..b3f92ab 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -24,7 +24,6 @@ import com.google.common.base.Suppliers;
import com.google.common.collect.Iterables;
import java.io.Serializable;
import java.util.Collections;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.sdk.Pipeline;
@@ -41,7 +40,7 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
/**
* A {@link PTransformOverrideFactory} that overrides {@link Write} {@link PTransform PTransforms}
@@ -60,12 +59,13 @@ class WriteWithShardingFactory<InputT>
}
@Override
- public PCollection<InputT> getInput(List<TaggedPValue> inputs, Pipeline p) {
- return (PCollection<InputT>) Iterables.getOnlyElement(inputs).getValue();
+ public PCollection<InputT> getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p) {
+ return (PCollection<InputT>) Iterables.getOnlyElement(inputs.values());
}
@Override
- public Map<PValue, ReplacementOutput> mapOutputs(List<TaggedPValue> outputs, PDone newOutput) {
+ public Map<PValue, ReplacementOutput> mapOutputs(
+ Map<TupleTag<?>, PValue> outputs, PDone newOutput) {
return Collections.emptyMap();
}