You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2019/05/10 09:47:12 UTC

[beam] 01/03: Pass transform based doFnSchemaInformation in ParDo translation

This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit ad22b6828cf7b78945e10cd60362c27a0447e66a
Author: Ismaël Mejía <ie...@gmail.com>
AuthorDate: Fri May 10 11:44:53 2019 +0200

    Pass transform based doFnSchemaInformation in ParDo translation
---
 .../structuredstreaming/translation/batch/ParDoTranslatorBatch.java  | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
index b16d7e9..400b025 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
@@ -77,6 +77,9 @@ class ParDoTranslatorBatch<InputT, OutputT>
         signature.stateDeclarations().size() > 0 || signature.timerDeclarations().size() > 0;
     checkState(!stateful, "States and timers are not supported for the moment.");
 
+    DoFnSchemaInformation doFnSchemaInformation =
+        ParDoTranslation.getSchemaInformation(context.getCurrentTransform());
+
     // Init main variables
     Dataset<WindowedValue<InputT>> inputDataSet = context.getDataset(context.getInput());
     Map<TupleTag<?>, PValue> outputs = context.getOutputs();
@@ -110,7 +113,7 @@ class ParDoTranslatorBatch<InputT, OutputT>
             inputCoder,
             outputCoderMap,
             broadcastStateData,
-            DoFnSchemaInformation.create());
+            doFnSchemaInformation);
 
     Dataset<Tuple2<TupleTag<?>, WindowedValue<?>>> allOutputs =
         inputDataSet.mapPartitions(doFnWrapper, EncoderHelpers.tuple2Encoder());