You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2019/05/10 09:47:12 UTC
[beam] 01/03: Pass transform based doFnSchemaInformation in ParDo
translation
This is an automated email from the ASF dual-hosted git repository.
iemejia pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit ad22b6828cf7b78945e10cd60362c27a0447e66a
Author: Ismaël Mejía <ie...@gmail.com>
AuthorDate: Fri May 10 11:44:53 2019 +0200
Pass transform based doFnSchemaInformation in ParDo translation
---
.../structuredstreaming/translation/batch/ParDoTranslatorBatch.java | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
index b16d7e9..400b025 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
@@ -77,6 +77,9 @@ class ParDoTranslatorBatch<InputT, OutputT>
signature.stateDeclarations().size() > 0 || signature.timerDeclarations().size() > 0;
checkState(!stateful, "States and timers are not supported for the moment.");
+ DoFnSchemaInformation doFnSchemaInformation =
+ ParDoTranslation.getSchemaInformation(context.getCurrentTransform());
+
// Init main variables
Dataset<WindowedValue<InputT>> inputDataSet = context.getDataset(context.getInput());
Map<TupleTag<?>, PValue> outputs = context.getOutputs();
@@ -110,7 +113,7 @@ class ParDoTranslatorBatch<InputT, OutputT>
inputCoder,
outputCoderMap,
broadcastStateData,
- DoFnSchemaInformation.create());
+ doFnSchemaInformation);
Dataset<Tuple2<TupleTag<?>, WindowedValue<?>>> allOutputs =
inputDataSet.mapPartitions(doFnWrapper, EncoderHelpers.tuple2Encoder());