You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2019/12/02 23:36:40 UTC
[beam] 01/03: [BEAM-2929] Ensure that the Beam Java SDK sends the
property "use_indexed_format" to Dataflow for side inputs which use a
multimap materialization.
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 8e71e59eae495ab74c5c20f50456a5c9c5416905
Author: Luke Cwik <lc...@google.com>
AuthorDate: Wed Nov 27 10:39:12 2019 -0800
[BEAM-2929] Ensure that the Beam Java SDK sends the property "use_indexed_format" to Dataflow for side inputs which use a multimap materialization.
---
.../dataflow/DataflowPipelineTranslator.java | 8 ++-
.../dataflow/DataflowPipelineTranslatorTest.java | 62 +++++++++++++++++++++-
2 files changed, 67 insertions(+), 3 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 41e5cbb..45e0c33 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -83,6 +83,7 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.Materializations;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
@@ -710,8 +711,11 @@ public class DataflowPipelineTranslator {
String generatedName = String.format("%s.out%d", stepName, outputInfoList.size());
addString(outputInfo, PropertyNames.USER_NAME, generatedName);
- if (value instanceof PCollection
- && translator.runner.doesPCollectionRequireIndexedFormat((PCollection<?>) value)) {
+ if ((value instanceof PCollection
+ && translator.runner.doesPCollectionRequireIndexedFormat((PCollection<?>) value))
+ || ((value instanceof PCollectionView)
+ && (Materializations.MULTIMAP_MATERIALIZATION_URN.equals(
+ ((PCollectionView) value).getViewFn().getMaterialization().getUrn())))) {
addBoolean(outputInfo, PropertyNames.USE_INDEXED_FORMAT, true);
}
if (valueCoder != null) {
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 85b1e22..c5c3ddc 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -736,8 +736,8 @@ public class DataflowPipelineTranslatorTest implements Serializable {
@Test
public void testSplittableParDoTranslationFnApi() throws Exception {
DataflowPipelineOptions options = buildPipelineOptions();
- DataflowRunner runner = DataflowRunner.fromOptions(options);
options.setExperiments(Arrays.asList("beam_fn_api"));
+ DataflowRunner runner = DataflowRunner.fromOptions(options);
DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
Pipeline pipeline = Pipeline.create(options);
@@ -852,6 +852,66 @@ public class DataflowPipelineTranslatorTest implements Serializable {
}
@Test
+ public void testToSingletonTranslationWithFnApiSideInput() throws Exception {
+ // A "change detector" test that makes sure the translation
+ // of getting a PCollectionView<T> does not change
+ // in bad ways during refactor
+
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setExperiments(Arrays.asList("beam_fn_api"));
+ DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
+
+ Pipeline pipeline = Pipeline.create(options);
+ pipeline.apply(Create.of(1)).apply(View.asSingleton());
+ DataflowRunner runner = DataflowRunner.fromOptions(options);
+ runner.replaceTransforms(pipeline);
+ Job job = translator.translate(pipeline, runner, Collections.emptyList()).getJob();
+ assertAllStepOutputsHaveUniqueIds(job);
+
+ List<Step> steps = job.getSteps();
+ assertEquals(14, steps.size());
+
+ Step collectionToSingletonStep = steps.get(steps.size() - 1);
+ assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
+
+ @SuppressWarnings("unchecked")
+ List<Map<String, Object>> ctsOutputs =
+ (List<Map<String, Object>>)
+ steps.get(steps.size() - 1).getProperties().get(PropertyNames.OUTPUT_INFO);
+ assertTrue(Structs.getBoolean(Iterables.getOnlyElement(ctsOutputs), "use_indexed_format"));
+ }
+
+ @Test
+ public void testToIterableTranslationWithFnApiSideInput() throws Exception {
+ // A "change detector" test that makes sure the translation
+ // of getting a PCollectionView<Iterable<T>> does not change
+ // in bad ways during refactor
+
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setExperiments(Arrays.asList("beam_fn_api"));
+ DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
+
+ Pipeline pipeline = Pipeline.create(options);
+ pipeline.apply(Create.of(1, 2, 3)).apply(View.asIterable());
+
+ DataflowRunner runner = DataflowRunner.fromOptions(options);
+ runner.replaceTransforms(pipeline);
+ Job job = translator.translate(pipeline, runner, Collections.emptyList()).getJob();
+ assertAllStepOutputsHaveUniqueIds(job);
+
+ List<Step> steps = job.getSteps();
+ assertEquals(10, steps.size());
+
+ @SuppressWarnings("unchecked")
+ List<Map<String, Object>> ctsOutputs =
+ (List<Map<String, Object>>)
+ steps.get(steps.size() - 1).getProperties().get(PropertyNames.OUTPUT_INFO);
+ assertTrue(Structs.getBoolean(Iterables.getOnlyElement(ctsOutputs), "use_indexed_format"));
+ Step collectionToSingletonStep = steps.get(steps.size() - 1);
+ assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
+ }
+
+ @Test
public void testStepDisplayData() throws Exception {
DataflowPipelineOptions options = buildPipelineOptions();
DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);