You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/14 21:41:10 UTC
[2/4] beam git commit: Rename DoFn.Context#sideOutput to output
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
index 211dfd9..f752b1c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
@@ -55,12 +55,12 @@ public class TypedPValueTest {
}
private PCollectionTuple buildPCollectionTupleWithTags(
- TupleTag<Integer> mainOutputTag, TupleTag<Integer> sideOutputTag) {
+ TupleTag<Integer> mainOutputTag, TupleTag<Integer> additionalOutputTag) {
PCollection<Integer> input = p.apply(Create.of(1, 2, 3));
PCollectionTuple tuple = input.apply(
ParDo
.of(new IdentityDoFn())
- .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)));
+ .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag)));
return tuple;
}
@@ -69,11 +69,11 @@ public class TypedPValueTest {
}
@Test
- public void testUntypedSideOutputTupleTagGivesActionableMessage() {
+ public void testUntypedOutputTupleTagGivesActionableMessage() {
TupleTag<Integer> mainOutputTag = new TupleTag<Integer>() {};
- // untypedSideOutputTag did not use anonymous subclass.
- TupleTag<Integer> untypedSideOutputTag = new TupleTag<Integer>();
- PCollectionTuple tuple = buildPCollectionTupleWithTags(mainOutputTag, untypedSideOutputTag);
+ // untypedOutputTag did not use anonymous subclass.
+ TupleTag<Integer> untypedOutputTag = new TupleTag<Integer>();
+ PCollectionTuple tuple = buildPCollectionTupleWithTags(mainOutputTag, untypedOutputTag);
thrown.expect(IllegalStateException.class);
thrown.expectMessage("No Coder has been manually specified");
@@ -84,15 +84,15 @@ public class TypedPValueTest {
thrown.expectMessage(
containsString("Building a Coder from the fallback CoderProvider failed"));
- tuple.get(untypedSideOutputTag).getCoder();
+ tuple.get(untypedOutputTag).getCoder();
}
@Test
- public void testStaticFactorySideOutputTupleTagGivesActionableMessage() {
+ public void testStaticFactoryOutputTupleTagGivesActionableMessage() {
TupleTag<Integer> mainOutputTag = new TupleTag<Integer>() {};
- // untypedSideOutputTag constructed from a static factory method.
- TupleTag<Integer> untypedSideOutputTag = makeTagStatically();
- PCollectionTuple tuple = buildPCollectionTupleWithTags(mainOutputTag, untypedSideOutputTag);
+ // untypedOutputTag constructed from a static factory method.
+ TupleTag<Integer> untypedOutputTag = makeTagStatically();
+ PCollectionTuple tuple = buildPCollectionTupleWithTags(mainOutputTag, untypedOutputTag);
thrown.expect(IllegalStateException.class);
thrown.expectMessage("No Coder has been manually specified");
@@ -103,27 +103,27 @@ public class TypedPValueTest {
thrown.expectMessage(
containsString("Building a Coder from the fallback CoderProvider failed"));
- tuple.get(untypedSideOutputTag).getCoder();
+ tuple.get(untypedOutputTag).getCoder();
}
@Test
- public void testTypedSideOutputTupleTag() {
+ public void testTypedOutputTupleTag() {
TupleTag<Integer> mainOutputTag = new TupleTag<Integer>() {};
- // typedSideOutputTag was constructed with compile-time type information.
- TupleTag<Integer> typedSideOutputTag = new TupleTag<Integer>() {};
- PCollectionTuple tuple = buildPCollectionTupleWithTags(mainOutputTag, typedSideOutputTag);
+ // typedOutputTag was constructed with compile-time type information.
+ TupleTag<Integer> typedOutputTag = new TupleTag<Integer>() {};
+ PCollectionTuple tuple = buildPCollectionTupleWithTags(mainOutputTag, typedOutputTag);
- assertThat(tuple.get(typedSideOutputTag).getCoder(), instanceOf(VarIntCoder.class));
+ assertThat(tuple.get(typedOutputTag).getCoder(), instanceOf(VarIntCoder.class));
}
@Test
- public void testUntypedMainOutputTagTypedSideOutputTupleTag() {
+ public void testUntypedMainOutputTagTypedOutputTupleTag() {
// mainOutputTag is allowed to be untyped because Coder can be inferred other ways.
TupleTag<Integer> mainOutputTag = new TupleTag<>();
- TupleTag<Integer> typedSideOutputTag = new TupleTag<Integer>() {};
- PCollectionTuple tuple = buildPCollectionTupleWithTags(mainOutputTag, typedSideOutputTag);
+ TupleTag<Integer> typedOutputTag = new TupleTag<Integer>() {};
+ PCollectionTuple tuple = buildPCollectionTupleWithTags(mainOutputTag, typedOutputTag);
- assertThat(tuple.get(typedSideOutputTag).getCoder(), instanceOf(VarIntCoder.class));
+ assertThat(tuple.get(typedOutputTag).getCoder(), instanceOf(VarIntCoder.class));
}
// A simple class for which there should be no obvious Coder.
@@ -139,13 +139,13 @@ public class TypedPValueTest {
}
@Test
- public void testParDoWithNoSideOutputsErrorDoesNotMentionTupleTag() {
+ public void testParDoWithNoOutputsErrorDoesNotMentionTupleTag() {
PCollection<EmptyClass> input =
p.apply(Create.of(1, 2, 3)).apply(ParDo.of(new EmptyClassDoFn()));
thrown.expect(IllegalStateException.class);
- // Output specific to ParDo TupleTag side outputs should not be present.
+ // Output specific to ParDo additional TupleTag outputs should not be present.
thrown.expectMessage(not(containsString("erasure")));
thrown.expectMessage(not(containsString("see TupleTag Javadoc")));
// Instead, expect output suggesting other possible fixes.
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
index 6403e96..9714d72 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
@@ -46,7 +46,7 @@ public class FakeStepContext implements StepContext {
}
@Override
- public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) {
+ public void noteOutput(TupleTag<?> tag, WindowedValue<?> output) {
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
index de105d7..bd2fba9 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
@@ -300,7 +300,7 @@ public class ProcessBundleHandlerTest {
private static class TestDoFn extends DoFn<String, String> {
private static final TupleTag<String> mainOutput = new TupleTag<>("mainOutput");
- private static final TupleTag<String> sideOutput = new TupleTag<>("sideOutput");
+ private static final TupleTag<String> additionalOutput = new TupleTag<>("output");
@StartBundle
public void startBundle(Context context) {
@@ -310,7 +310,7 @@ public class ProcessBundleHandlerTest {
@ProcessElement
public void processElement(ProcessContext context) {
context.output("MainOutput" + context.element());
- context.sideOutput(sideOutput, "SideOutput" + context.element());
+ context.output(additionalOutput, "AdditionalOutput" + context.element());
}
@FinishBundle
@@ -321,7 +321,7 @@ public class ProcessBundleHandlerTest {
/**
* Create a DoFn that has 3 inputs (inputATarget1, inputATarget2, inputBTarget) and 2 outputs
- * (mainOutput, sideOutput). Validate that inputs are fed to the {@link DoFn} and that outputs
+ * (mainOutput, output). Validate that inputs are fed to the {@link DoFn} and that outputs
* are directed to the correct consumers.
*/
@Test
@@ -329,7 +329,7 @@ public class ProcessBundleHandlerTest {
Map<String, Message> fnApiRegistry = ImmutableMap.of(STRING_CODER_SPEC_ID, STRING_CODER_SPEC);
String primitiveTransformId = "100L";
long mainOutputId = 101L;
- long sideOutputId = 102L;
+ long additionalOutputId = 102L;
DoFnInfo<?, ?> doFnInfo = DoFnInfo.forFn(
new TestDoFn(),
@@ -339,7 +339,7 @@ public class ProcessBundleHandlerTest {
mainOutputId,
ImmutableMap.of(
mainOutputId, TestDoFn.mainOutput,
- sideOutputId, TestDoFn.sideOutput));
+ additionalOutputId, TestDoFn.additionalOutput));
BeamFnApi.FunctionSpec functionSpec = BeamFnApi.FunctionSpec.newBuilder()
.setId("1L")
.setUrn(JAVA_DO_FN_URN)
@@ -372,25 +372,25 @@ public class ProcessBundleHandlerTest {
.putOutputs(Long.toString(mainOutputId), BeamFnApi.PCollection.newBuilder()
.setCoderReference(STRING_CODER_SPEC_ID)
.build())
- .putOutputs(Long.toString(sideOutputId), BeamFnApi.PCollection.newBuilder()
+ .putOutputs(Long.toString(additionalOutputId), BeamFnApi.PCollection.newBuilder()
.setCoderReference(STRING_CODER_SPEC_ID)
.build())
.build();
List<WindowedValue<String>> mainOutputValues = new ArrayList<>();
- List<WindowedValue<String>> sideOutputValues = new ArrayList<>();
+ List<WindowedValue<String>> additionalOutputValues = new ArrayList<>();
BeamFnApi.Target mainOutputTarget = BeamFnApi.Target.newBuilder()
.setPrimitiveTransformReference(primitiveTransformId)
.setName(Long.toString(mainOutputId))
.build();
- BeamFnApi.Target sideOutputTarget = BeamFnApi.Target.newBuilder()
+ BeamFnApi.Target additionalOutputTarget = BeamFnApi.Target.newBuilder()
.setPrimitiveTransformReference(primitiveTransformId)
- .setName(Long.toString(sideOutputId))
+ .setName(Long.toString(additionalOutputId))
.build();
Multimap<BeamFnApi.Target, ThrowingConsumer<WindowedValue<String>>> existingConsumers =
ImmutableMultimap.of(
mainOutputTarget, mainOutputValues::add,
- sideOutputTarget, sideOutputValues::add);
+ additionalOutputTarget, additionalOutputValues::add);
Multimap<BeamFnApi.Target, ThrowingConsumer<WindowedValue<String>>> newConsumers =
HashMultimap.create();
List<ThrowingRunnable> startFunctions = new ArrayList<>();
@@ -422,12 +422,12 @@ public class ProcessBundleHandlerTest {
valueInGlobalWindow("MainOutputA1"),
valueInGlobalWindow("MainOutputA2"),
valueInGlobalWindow("MainOutputB")));
- assertThat(sideOutputValues, contains(
- valueInGlobalWindow("SideOutputA1"),
- valueInGlobalWindow("SideOutputA2"),
- valueInGlobalWindow("SideOutputB")));
+ assertThat(additionalOutputValues, contains(
+ valueInGlobalWindow("AdditionalOutputA1"),
+ valueInGlobalWindow("AdditionalOutputA2"),
+ valueInGlobalWindow("AdditionalOutputB")));
mainOutputValues.clear();
- sideOutputValues.clear();
+ additionalOutputValues.clear();
Iterables.getOnlyElement(finishFunctions).run();
assertThat(mainOutputValues, contains(valueInGlobalWindow("FinishBundle")));
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
index f4bf198..1b6492e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
@@ -61,7 +61,7 @@ class WritePartition extends DoFn<String, KV<Long, List<String>>> {
KV<String, Long> fileResult = results.get(i);
if (currNumFiles + 1 > Write.MAX_NUM_FILES
|| currSizeBytes + fileResult.getValue() > Write.MAX_SIZE_BYTES) {
- c.sideOutput(multiPartitionsTag, KV.of(++partitionId, currResults));
+ c.output(multiPartitionsTag, KV.of(++partitionId, currResults));
currResults = Lists.newArrayList();
currNumFiles = 0;
currSizeBytes = 0;
@@ -71,9 +71,9 @@ class WritePartition extends DoFn<String, KV<Long, List<String>>> {
currResults.add(fileResult.getKey());
}
if (partitionId == 0) {
- c.sideOutput(singlePartitionTag, KV.of(++partitionId, currResults));
+ c.output(singlePartitionTag, KV.of(++partitionId, currResults));
} else {
- c.sideOutput(multiPartitionsTag, KV.of(++partitionId, currResults));
+ c.output(multiPartitionsTag, KV.of(++partitionId, currResults));
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 92ab204..2a2bf91 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -2107,9 +2107,9 @@ public class BigQueryIOTest implements Serializable {
List<KV<Long, List<String>>> partitions;
if (expectedNumPartitions > 1) {
- partitions = tester.takeSideOutputElements(multiPartitionsTag);
+ partitions = tester.takeOutputElements(multiPartitionsTag);
} else {
- partitions = tester.takeSideOutputElements(singlePartitionTag);
+ partitions = tester.takeOutputElements(singlePartitionTag);
}
List<Long> partitionIds = Lists.newArrayList();
List<String> partitionFileNames = Lists.newArrayList();