You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/14 21:41:10 UTC

[2/4] beam git commit: Rename DoFn.Context#sideOutput to output

http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
index 211dfd9..f752b1c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
@@ -55,12 +55,12 @@ public class TypedPValueTest {
   }
 
   private PCollectionTuple buildPCollectionTupleWithTags(
-      TupleTag<Integer> mainOutputTag, TupleTag<Integer> sideOutputTag) {
+      TupleTag<Integer> mainOutputTag, TupleTag<Integer> additionalOutputTag) {
     PCollection<Integer> input = p.apply(Create.of(1, 2, 3));
     PCollectionTuple tuple = input.apply(
         ParDo
         .of(new IdentityDoFn())
-        .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)));
+        .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag)));
     return tuple;
   }
 
@@ -69,11 +69,11 @@ public class TypedPValueTest {
   }
 
   @Test
-  public void testUntypedSideOutputTupleTagGivesActionableMessage() {
+  public void testUntypedOutputTupleTagGivesActionableMessage() {
     TupleTag<Integer> mainOutputTag = new TupleTag<Integer>() {};
-    // untypedSideOutputTag did not use anonymous subclass.
-    TupleTag<Integer> untypedSideOutputTag = new TupleTag<Integer>();
-    PCollectionTuple tuple = buildPCollectionTupleWithTags(mainOutputTag, untypedSideOutputTag);
+    // untypedOutputTag did not use anonymous subclass.
+    TupleTag<Integer> untypedOutputTag = new TupleTag<Integer>();
+    PCollectionTuple tuple = buildPCollectionTupleWithTags(mainOutputTag, untypedOutputTag);
 
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage("No Coder has been manually specified");
@@ -84,15 +84,15 @@ public class TypedPValueTest {
     thrown.expectMessage(
         containsString("Building a Coder from the fallback CoderProvider failed"));
 
-    tuple.get(untypedSideOutputTag).getCoder();
+    tuple.get(untypedOutputTag).getCoder();
   }
 
   @Test
-  public void testStaticFactorySideOutputTupleTagGivesActionableMessage() {
+  public void testStaticFactoryOutputTupleTagGivesActionableMessage() {
     TupleTag<Integer> mainOutputTag = new TupleTag<Integer>() {};
-    // untypedSideOutputTag constructed from a static factory method.
-    TupleTag<Integer> untypedSideOutputTag = makeTagStatically();
-    PCollectionTuple tuple = buildPCollectionTupleWithTags(mainOutputTag, untypedSideOutputTag);
+    // untypedOutputTag constructed from a static factory method.
+    TupleTag<Integer> untypedOutputTag = makeTagStatically();
+    PCollectionTuple tuple = buildPCollectionTupleWithTags(mainOutputTag, untypedOutputTag);
 
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage("No Coder has been manually specified");
@@ -103,27 +103,27 @@ public class TypedPValueTest {
     thrown.expectMessage(
         containsString("Building a Coder from the fallback CoderProvider failed"));
 
-    tuple.get(untypedSideOutputTag).getCoder();
+    tuple.get(untypedOutputTag).getCoder();
   }
 
   @Test
-  public void testTypedSideOutputTupleTag() {
+  public void testTypedOutputTupleTag() {
     TupleTag<Integer> mainOutputTag = new TupleTag<Integer>() {};
-    // typedSideOutputTag was constructed with compile-time type information.
-    TupleTag<Integer> typedSideOutputTag = new TupleTag<Integer>() {};
-    PCollectionTuple tuple = buildPCollectionTupleWithTags(mainOutputTag, typedSideOutputTag);
+    // typedOutputTag was constructed with compile-time type information.
+    TupleTag<Integer> typedOutputTag = new TupleTag<Integer>() {};
+    PCollectionTuple tuple = buildPCollectionTupleWithTags(mainOutputTag, typedOutputTag);
 
-    assertThat(tuple.get(typedSideOutputTag).getCoder(), instanceOf(VarIntCoder.class));
+    assertThat(tuple.get(typedOutputTag).getCoder(), instanceOf(VarIntCoder.class));
   }
 
   @Test
-  public void testUntypedMainOutputTagTypedSideOutputTupleTag() {
+  public void testUntypedMainOutputTagTypedOutputTupleTag() {
     // mainOutputTag is allowed to be untyped because Coder can be inferred other ways.
     TupleTag<Integer> mainOutputTag = new TupleTag<>();
-    TupleTag<Integer> typedSideOutputTag = new TupleTag<Integer>() {};
-    PCollectionTuple tuple = buildPCollectionTupleWithTags(mainOutputTag, typedSideOutputTag);
+    TupleTag<Integer> typedOutputTag = new TupleTag<Integer>() {};
+    PCollectionTuple tuple = buildPCollectionTupleWithTags(mainOutputTag, typedOutputTag);
 
-    assertThat(tuple.get(typedSideOutputTag).getCoder(), instanceOf(VarIntCoder.class));
+    assertThat(tuple.get(typedOutputTag).getCoder(), instanceOf(VarIntCoder.class));
   }
 
   // A simple class for which there should be no obvious Coder.
@@ -139,13 +139,13 @@ public class TypedPValueTest {
   }
 
   @Test
-  public void testParDoWithNoSideOutputsErrorDoesNotMentionTupleTag() {
+  public void testParDoWithNoOutputsErrorDoesNotMentionTupleTag() {
     PCollection<EmptyClass> input =
         p.apply(Create.of(1, 2, 3)).apply(ParDo.of(new EmptyClassDoFn()));
 
     thrown.expect(IllegalStateException.class);
 
-    // Output specific to ParDo TupleTag side outputs should not be present.
+    // Output specific to ParDo additional TupleTag outputs should not be present.
     thrown.expectMessage(not(containsString("erasure")));
     thrown.expectMessage(not(containsString("see TupleTag Javadoc")));
     // Instead, expect output suggesting other possible fixes.

http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
index 6403e96..9714d72 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
@@ -46,7 +46,7 @@ public class FakeStepContext implements StepContext {
   }
 
   @Override
-  public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) {
+  public void noteOutput(TupleTag<?> tag, WindowedValue<?> output) {
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
index de105d7..bd2fba9 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
@@ -300,7 +300,7 @@ public class ProcessBundleHandlerTest {
 
   private static class TestDoFn extends DoFn<String, String> {
     private static final TupleTag<String> mainOutput = new TupleTag<>("mainOutput");
-    private static final TupleTag<String> sideOutput = new TupleTag<>("sideOutput");
+    private static final TupleTag<String> additionalOutput = new TupleTag<>("output");
 
     @StartBundle
     public void startBundle(Context context) {
@@ -310,7 +310,7 @@ public class ProcessBundleHandlerTest {
     @ProcessElement
     public void processElement(ProcessContext context) {
       context.output("MainOutput" + context.element());
-      context.sideOutput(sideOutput, "SideOutput" + context.element());
+      context.output(additionalOutput, "AdditionalOutput" + context.element());
     }
 
     @FinishBundle
@@ -321,7 +321,7 @@ public class ProcessBundleHandlerTest {
 
   /**
    * Create a DoFn that has 3 inputs (inputATarget1, inputATarget2, inputBTarget) and 2 outputs
-   * (mainOutput, sideOutput). Validate that inputs are fed to the {@link DoFn} and that outputs
+   * (mainOutput, output). Validate that inputs are fed to the {@link DoFn} and that outputs
    * are directed to the correct consumers.
    */
   @Test
@@ -329,7 +329,7 @@ public class ProcessBundleHandlerTest {
     Map<String, Message> fnApiRegistry = ImmutableMap.of(STRING_CODER_SPEC_ID, STRING_CODER_SPEC);
     String primitiveTransformId = "100L";
     long mainOutputId = 101L;
-    long sideOutputId = 102L;
+    long additionalOutputId = 102L;
 
     DoFnInfo<?, ?> doFnInfo = DoFnInfo.forFn(
         new TestDoFn(),
@@ -339,7 +339,7 @@ public class ProcessBundleHandlerTest {
         mainOutputId,
         ImmutableMap.of(
             mainOutputId, TestDoFn.mainOutput,
-            sideOutputId, TestDoFn.sideOutput));
+            additionalOutputId, TestDoFn.additionalOutput));
     BeamFnApi.FunctionSpec functionSpec = BeamFnApi.FunctionSpec.newBuilder()
         .setId("1L")
         .setUrn(JAVA_DO_FN_URN)
@@ -372,25 +372,25 @@ public class ProcessBundleHandlerTest {
         .putOutputs(Long.toString(mainOutputId), BeamFnApi.PCollection.newBuilder()
             .setCoderReference(STRING_CODER_SPEC_ID)
             .build())
-        .putOutputs(Long.toString(sideOutputId), BeamFnApi.PCollection.newBuilder()
+        .putOutputs(Long.toString(additionalOutputId), BeamFnApi.PCollection.newBuilder()
             .setCoderReference(STRING_CODER_SPEC_ID)
             .build())
         .build();
 
     List<WindowedValue<String>> mainOutputValues = new ArrayList<>();
-    List<WindowedValue<String>> sideOutputValues = new ArrayList<>();
+    List<WindowedValue<String>> additionalOutputValues = new ArrayList<>();
     BeamFnApi.Target mainOutputTarget = BeamFnApi.Target.newBuilder()
         .setPrimitiveTransformReference(primitiveTransformId)
         .setName(Long.toString(mainOutputId))
         .build();
-    BeamFnApi.Target sideOutputTarget = BeamFnApi.Target.newBuilder()
+    BeamFnApi.Target additionalOutputTarget = BeamFnApi.Target.newBuilder()
         .setPrimitiveTransformReference(primitiveTransformId)
-        .setName(Long.toString(sideOutputId))
+        .setName(Long.toString(additionalOutputId))
         .build();
     Multimap<BeamFnApi.Target, ThrowingConsumer<WindowedValue<String>>> existingConsumers =
         ImmutableMultimap.of(
             mainOutputTarget, mainOutputValues::add,
-            sideOutputTarget, sideOutputValues::add);
+            additionalOutputTarget, additionalOutputValues::add);
     Multimap<BeamFnApi.Target, ThrowingConsumer<WindowedValue<String>>> newConsumers =
         HashMultimap.create();
     List<ThrowingRunnable> startFunctions = new ArrayList<>();
@@ -422,12 +422,12 @@ public class ProcessBundleHandlerTest {
         valueInGlobalWindow("MainOutputA1"),
         valueInGlobalWindow("MainOutputA2"),
         valueInGlobalWindow("MainOutputB")));
-    assertThat(sideOutputValues, contains(
-        valueInGlobalWindow("SideOutputA1"),
-        valueInGlobalWindow("SideOutputA2"),
-        valueInGlobalWindow("SideOutputB")));
+    assertThat(additionalOutputValues, contains(
+        valueInGlobalWindow("AdditionalOutputA1"),
+        valueInGlobalWindow("AdditionalOutputA2"),
+        valueInGlobalWindow("AdditionalOutputB")));
     mainOutputValues.clear();
-    sideOutputValues.clear();
+    additionalOutputValues.clear();
 
     Iterables.getOnlyElement(finishFunctions).run();
     assertThat(mainOutputValues, contains(valueInGlobalWindow("FinishBundle")));

http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
index f4bf198..1b6492e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
@@ -61,7 +61,7 @@ class WritePartition extends DoFn<String, KV<Long, List<String>>> {
       KV<String, Long> fileResult = results.get(i);
       if (currNumFiles + 1 > Write.MAX_NUM_FILES
           || currSizeBytes + fileResult.getValue() > Write.MAX_SIZE_BYTES) {
-        c.sideOutput(multiPartitionsTag, KV.of(++partitionId, currResults));
+        c.output(multiPartitionsTag, KV.of(++partitionId, currResults));
         currResults = Lists.newArrayList();
         currNumFiles = 0;
         currSizeBytes = 0;
@@ -71,9 +71,9 @@ class WritePartition extends DoFn<String, KV<Long, List<String>>> {
       currResults.add(fileResult.getKey());
     }
     if (partitionId == 0) {
-      c.sideOutput(singlePartitionTag, KV.of(++partitionId, currResults));
+      c.output(singlePartitionTag, KV.of(++partitionId, currResults));
     } else {
-      c.sideOutput(multiPartitionsTag, KV.of(++partitionId, currResults));
+      c.output(multiPartitionsTag, KV.of(++partitionId, currResults));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 92ab204..2a2bf91 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -2107,9 +2107,9 @@ public class BigQueryIOTest implements Serializable {
 
     List<KV<Long, List<String>>> partitions;
     if (expectedNumPartitions > 1) {
-      partitions = tester.takeSideOutputElements(multiPartitionsTag);
+      partitions = tester.takeOutputElements(multiPartitionsTag);
     } else {
-      partitions = tester.takeSideOutputElements(singlePartitionTag);
+      partitions = tester.takeOutputElements(singlePartitionTag);
     }
     List<Long> partitionIds = Lists.newArrayList();
     List<String> partitionFileNames = Lists.newArrayList();