You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/04/12 19:56:42 UTC

[11/50] [abbrv] beam git commit: Remove Orderdness of Input, Output expansions

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
index f62b320..3638fc8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
@@ -17,21 +17,18 @@
  */
 package org.apache.beam.sdk.runners;
 
-import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
 
-import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults;
@@ -224,11 +221,13 @@ public class TransformHierarchyTest implements Serializable {
     assertThat(hierarchy.getCurrent(), equalTo(replacement));
     hierarchy.setOutput(replacementOutput);
 
-    TaggedPValue taggedOriginal = Iterables.getOnlyElement(originalOutput.expand());
-    TaggedPValue taggedReplacement = Iterables.getOnlyElement(replacementOutput.expand());
+    TaggedPValue taggedReplacement = TaggedPValue.ofExpandedValue(replacementOutput);
     Map<PValue, ReplacementOutput> replacementOutputs =
         Collections.<PValue, ReplacementOutput>singletonMap(
-            replacementOutput, ReplacementOutput.of(taggedOriginal, taggedReplacement));
+            replacementOutput,
+            ReplacementOutput.of(
+                TaggedPValue.ofExpandedValue(originalOutput),
+                taggedReplacement));
     hierarchy.replaceOutputs(replacementOutputs);
 
     assertThat(replacement.getInputs(), equalTo(original.getInputs()));
@@ -238,8 +237,9 @@ public class TransformHierarchyTest implements Serializable {
         replacement.getTransform(), Matchers.<PTransform<?, ?>>equalTo(replacementTransform));
     // THe tags of the replacement transform are matched to the appropriate PValues of the original
     assertThat(
-        replacement.getOutputs(),
-        contains(TaggedPValue.of(taggedReplacement.getTag(), taggedOriginal.getValue())));
+        replacement.getOutputs().keySet(),
+        Matchers.<TupleTag<?>>contains(taggedReplacement.getTag()));
+    assertThat(replacement.getOutputs().values(), Matchers.<PValue>contains(originalOutput));
     hierarchy.popNode();
   }
 
@@ -294,21 +294,23 @@ public class TransformHierarchyTest implements Serializable {
     hierarchy.popNode();
     hierarchy.setOutput(replacementOutput.get(longs));
 
-    TaggedPValue originalLongs = Iterables.getOnlyElement(output.expand());
-    TaggedPValue replacementLongs = Iterables.getOnlyElement(replacementOutput.expand());
+    Entry<TupleTag<?>, PValue>
+        replacementLongs = Iterables.getOnlyElement(replacementOutput.expand().entrySet());
     hierarchy.replaceOutputs(
         Collections.<PValue, ReplacementOutput>singletonMap(
-            replacementOutput.get(longs), ReplacementOutput.of(originalLongs, replacementLongs)));
+            replacementOutput.get(longs),
+            ReplacementOutput.of(
+                TaggedPValue.ofExpandedValue(output),
+                TaggedPValue.of(replacementLongs.getKey(), replacementLongs.getValue()))));
 
     assertThat(
-        replacementParNode.getOutputs(),
-        contains(TaggedPValue.of(replacementLongs.getTag(), originalLongs.getValue())));
+        replacementParNode.getOutputs().keySet(),
+        Matchers.<TupleTag<?>>contains(replacementLongs.getKey()));
+    assertThat(replacementParNode.getOutputs().values(), Matchers.<PValue>contains(output));
     assertThat(
-        compositeNode.getOutputs(),
-        contains(
-            TaggedPValue.of(
-                Iterables.getOnlyElement(replacementOutput.get(longs).expand()).getTag(),
-                originalLongs.getValue())));
+        compositeNode.getOutputs().keySet(),
+        equalTo(replacementOutput.get(longs).expand().keySet()));
+    assertThat(compositeNode.getOutputs().values(), Matchers.<PValue>contains(output));
     hierarchy.popNode();
   }
 
@@ -340,10 +342,10 @@ public class TransformHierarchyTest implements Serializable {
     TransformHierarchy.Node compositeNode = hierarchy.pushNode("Create", begin, create);
     hierarchy.finishSpecifyingInput();
     assertThat(hierarchy.getCurrent(), equalTo(compositeNode));
-    assertThat(compositeNode.getInputs(), Matchers.emptyIterable());
+    assertThat(compositeNode.getInputs().entrySet(), Matchers.empty());
     assertThat(compositeNode.getTransform(), Matchers.<PTransform<?, ?>>equalTo(create));
     // Not yet set
-    assertThat(compositeNode.getOutputs(), Matchers.emptyIterable());
+    assertThat(compositeNode.getOutputs().entrySet(), Matchers.emptyIterable());
     assertThat(compositeNode.getEnclosingNode().isRootNode(), is(true));
 
     TransformHierarchy.Node primitiveNode = hierarchy.pushNode("Create/Read", begin, read);
@@ -351,16 +353,14 @@ public class TransformHierarchyTest implements Serializable {
     hierarchy.finishSpecifyingInput();
     hierarchy.setOutput(created);
     hierarchy.popNode();
-    assertThat(
-        fromTaggedValues(primitiveNode.getOutputs()), Matchers.<PValue>containsInAnyOrder(created));
-    assertThat(primitiveNode.getInputs(), Matchers.<TaggedPValue>emptyIterable());
+    assertThat(primitiveNode.getOutputs().values(), Matchers.<PValue>containsInAnyOrder(created));
+    assertThat(primitiveNode.getInputs().entrySet(), Matchers.emptyIterable());
     assertThat(primitiveNode.getTransform(), Matchers.<PTransform<?, ?>>equalTo(read));
     assertThat(primitiveNode.getEnclosingNode(), equalTo(compositeNode));
 
     hierarchy.setOutput(created);
     // The composite is listed as outputting a PValue created by the contained primitive
-    assertThat(
-        fromTaggedValues(compositeNode.getOutputs()), Matchers.<PValue>containsInAnyOrder(created));
+    assertThat(compositeNode.getOutputs().values(), Matchers.<PValue>containsInAnyOrder(created));
     // The producer of that PValue is still the primitive in which it is first output
     assertThat(hierarchy.getProducer(created), equalTo(primitiveNode));
     hierarchy.popNode();
@@ -457,11 +457,14 @@ public class TransformHierarchyTest implements Serializable {
     hierarchy.popNode();
     hierarchy.setOutput(replacementOutput.get(longs));
 
-    TaggedPValue originalLongs = Iterables.getOnlyElement(output.expand());
-    TaggedPValue replacementLongs = Iterables.getOnlyElement(replacementOutput.expand());
+    Entry<TupleTag<?>, PValue> replacementLongs =
+        Iterables.getOnlyElement(replacementOutput.expand().entrySet());
     hierarchy.replaceOutputs(
         Collections.<PValue, ReplacementOutput>singletonMap(
-            replacementOutput.get(longs), ReplacementOutput.of(originalLongs, replacementLongs)));
+            replacementOutput.get(longs),
+            ReplacementOutput.of(
+                TaggedPValue.ofExpandedValue(output),
+                TaggedPValue.of(replacementLongs.getKey(), replacementLongs.getValue()))));
     hierarchy.popNode();
 
     final Set<Node> visitedCompositeNodes = new HashSet<>();
@@ -489,15 +492,4 @@ public class TransformHierarchyTest implements Serializable {
     assertThat(visitedPrimitiveNodes, containsInAnyOrder(upstreamNode, replacementParNode));
     assertThat(visitedValues, Matchers.<PValue>containsInAnyOrder(upstream, output));
   }
-
-  private static List<PValue> fromTaggedValues(List<TaggedPValue> taggedValues) {
-    return Lists.transform(
-        taggedValues,
-        new Function<TaggedPValue, PValue>() {
-          @Override
-          public PValue apply(TaggedPValue input) {
-            return input.getValue();
-          }
-        });
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionListTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionListTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionListTest.java
index 2482f32..76cba01 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionListTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionListTest.java
@@ -19,8 +19,8 @@ package org.apache.beam.sdk.values;
 
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasSize;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
@@ -28,9 +28,9 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.testing.EqualsTester;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.CountingInput;
-import org.apache.beam.sdk.io.CountingInput.BoundedCountingInput;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.hamcrest.Matchers;
@@ -90,27 +90,25 @@ public class PCollectionListTest {
     assertThat(
         fromEmpty.getAll(), contains(unboundedCount, createOne, boundedCount, maxRecordsCount));
 
-    List<TaggedPValue> expansion = fromEmpty.expand();
-    // TaggedPValues are stable between expansions
+    Map<TupleTag<?>, PValue> expansion = fromEmpty.expand();
+    // Tag->PValue mappings are stable between expansions. They don't need to be stable across
+    // different list instances, though
     assertThat(expansion, equalTo(fromEmpty.expand()));
-    // TaggedPValues are equivalent between equivalent lists
-    assertThat(
-        expansion,
-        equalTo(
-            PCollectionList.of(unboundedCount)
-                .and(createOne)
-                .and(boundedCount)
-                .and(maxRecordsCount)
-                .expand()));
 
     List<PCollection<Long>> expectedList =
         ImmutableList.of(unboundedCount, createOne, boundedCount, maxRecordsCount);
-    for (int i = 0; i < expansion.size(); i++) {
-      assertThat(
-          "Index " + i + " should have equal PValue",
-          expansion.get(i).getValue(),
-          Matchers.<PValue>equalTo(expectedList.get(i)));
-    }
+    assertThat(expansion.values(), containsInAnyOrder(expectedList.toArray()));
+  }
+
+  @Test
+  public void testExpandWithDuplicates() {
+    Pipeline p = TestPipeline.create();
+    PCollection<Long> createOne = p.apply("CreateOne", Create.of(1L, 2L, 3L));
+
+    PCollectionList<Long> list = PCollectionList.of(createOne).and(createOne).and(createOne);
+    assertThat(
+        list.expand().values(),
+        Matchers.<PValue>containsInAnyOrder(createOne, createOne, createOne));
   }
 
   @Test
@@ -121,15 +119,15 @@ public class PCollectionListTest {
     PCollection<String> third = p.apply("Syntactic", Create.of("eggs", "baz"));
 
     EqualsTester tester = new EqualsTester();
-    tester.addEqualityGroup(PCollectionList.empty(p), PCollectionList.empty(p));
-    tester.addEqualityGroup(PCollectionList.of(first).and(second));
+//    tester.addEqualityGroup(PCollectionList.empty(p), PCollectionList.empty(p));
+//    tester.addEqualityGroup(PCollectionList.of(first).and(second));
     // Constructors should all produce equivalent
     tester.addEqualityGroup(
         PCollectionList.of(first).and(second).and(third),
         PCollectionList.of(first).and(second).and(third),
-        PCollectionList.<String>empty(p).and(first).and(second).and(third),
-        PCollectionList.of(ImmutableList.of(first, second, third)),
-        PCollectionList.of(first).and(ImmutableList.of(second, third)),
+//        PCollectionList.<String>empty(p).and(first).and(second).and(third),
+//        PCollectionList.of(ImmutableList.of(first, second, third)),
+//        PCollectionList.of(first).and(ImmutableList.of(second, third)),
         PCollectionList.of(ImmutableList.of(first, second)).and(third));
     // Order is considered
     tester.addEqualityGroup(PCollectionList.of(first).and(third).and(second));
@@ -137,28 +135,4 @@ public class PCollectionListTest {
 
     tester.testEquals();
   }
-
-  @Test
-  public void testExpansionOrderWithDuplicates() {
-    TestPipeline p = TestPipeline.create();
-    BoundedCountingInput count = CountingInput.upTo(10L);
-    PCollection<Long> firstCount = p.apply("CountFirst", count);
-    PCollection<Long> secondCount = p.apply("CountSecond", count);
-
-    PCollectionList<Long> counts =
-        PCollectionList.of(firstCount).and(secondCount).and(firstCount).and(firstCount);
-
-    ImmutableList<PCollection<Long>> expectedOrder =
-        ImmutableList.of(firstCount, secondCount, firstCount, firstCount);
-    PCollectionList<Long> reconstructed = PCollectionList.empty(p);
-    assertThat(counts.expand(), hasSize(4));
-    for (int i = 0; i < 4; i++) {
-      PValue value = counts.expand().get(i).getValue();
-      assertThat(
-          "Index " + i + " should be equal", value,
-          Matchers.<PValue>equalTo(expectedOrder.get(i)));
-      reconstructed = reconstructed.and((PCollection<Long>) value);
-    }
-    assertThat(reconstructed, equalTo(counts));
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
index 010d726..0a0abd6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
@@ -29,6 +29,7 @@ import java.io.Serializable;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.CountingInput;
 import org.apache.beam.sdk.testing.PAssert;
@@ -153,8 +154,8 @@ public final class PCollectionTupleTest implements Serializable {
         PCollectionTuple.of(intTag, ints).and(longTag, longs).and(strTag, strs);
     assertThat(tuple.getAll(), equalTo(pcsByTag));
     PCollectionTuple reconstructed = PCollectionTuple.empty(p);
-    for (TaggedPValue taggedValue : tuple.expand()) {
-      TupleTag<?> tag = taggedValue.getTag();
+    for (Entry<TupleTag<?>, PValue> taggedValue : tuple.expand().entrySet()) {
+      TupleTag<?> tag = taggedValue.getKey();
       PValue value = taggedValue.getValue();
       assertThat("The tag should map back to the value", tuple.get(tag), equalTo(value));
       assertThat(value, Matchers.<PValue>equalTo(pcsByTag.get(tag)));

http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
index 07fbc68..3e0f51c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
@@ -18,12 +18,11 @@
 package org.apache.beam.sdk.io.gcp.bigquery;
 
 import java.util.Collections;
-import java.util.List;
-
+import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.values.POutputValueBase;
-import org.apache.beam.sdk.values.TaggedPValue;
-
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
 
 /**
  * The result of a {@link BigQueryIO.Write} transform.
@@ -37,8 +36,8 @@ final class WriteResult extends POutputValueBase {
   }
 
   @Override
-  public List<TaggedPValue> expand() {
-    return Collections.emptyList();
+  public Map<TupleTag<?>, PValue> expand() {
+    return Collections.emptyMap();
   }
 
   private WriteResult(Pipeline pipeline) {