You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ar...@apache.org on 2019/01/23 17:04:08 UTC

[beam] branch spark-runner_structured-streaming updated: Fail in case of having SideInouts or State/Timers

This is an automated email from the ASF dual-hosted git repository.

aromanenko pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push:
     new ff7a24f  Fail in case of having SideInouts or State/Timers
ff7a24f is described below

commit ff7a24fadd7bbd5d53e935e138fe97a62328dc58
Author: Alexey Romanenko <ar...@gmail.com>
AuthorDate: Wed Jan 23 18:02:18 2019 +0100

    Fail in case of having SideInouts or State/Timers
---
 .../translation/TranslationContext.java                  |  5 -----
 .../translation/batch/ParDoTranslatorBatch.java          | 16 ++++++++++++++--
 2 files changed, 14 insertions(+), 7 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index 2837125..bf7053d 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -136,11 +136,6 @@ public class TranslationContext {
   }
 
   @SuppressWarnings("unchecked")
-  public <T extends PValue> T getInput(PTransform<T, ?> transform) {
-    return (T) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(currentTransform));
-  }
-
-  @SuppressWarnings("unchecked")
   public Map<TupleTag<?>, PValue> getInputs() {
     return currentTransform.getInputs();
   }
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
index f80db9a..0b39b8b 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
@@ -29,6 +29,7 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.*;
@@ -47,7 +48,8 @@ import java.util.Map;
 import static com.google.common.base.Preconditions.checkState;
 
 /**
- * TODO: Add support of state and timers TODO: Add support of side inputs
+ * TODO: Add support of state and timers
+ * TODO: Add support of side inputs
  *
  * @param <InputT>
  * @param <OutputT>
@@ -59,12 +61,19 @@ class ParDoTranslatorBatch<InputT, OutputT>
   public void translateTransform(
       PTransform<PCollection<InputT>, PCollectionTuple> transform, TranslationContext context) {
 
+    // TODO: add support of Splittable DoFn
     DoFn<InputT, OutputT> doFn = getDoFn(context);
     checkState(
         !DoFnSignatures.signatureForDoFn(doFn).processElement().isSplittable(),
         "Not expected to directly translate splittable DoFn, should have been overridden: %s",
         doFn);
 
+    // TODO: add support of states and timers
+    DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
+    boolean stateful =
+        signature.stateDeclarations().size() > 0 || signature.timerDeclarations().size() > 0;
+    checkState(!stateful, "States and timers are not supported for the moment.");
+
     Dataset<WindowedValue<InputT>> inputDataSet = context.getDataset(context.getInput());
     Map<TupleTag<?>, PValue> outputs = context.getOutputs();
     TupleTag<?> mainOutputTag = getTupleTag(context);
@@ -109,6 +118,9 @@ class ParDoTranslatorBatch<InputT, OutputT>
     UnionCoder unionCoder = UnionCoder.of(outputCoders);
 
     List<PCollectionView<?>> sideInputs = getSideInputs(context);
+    final boolean hasSideInputs = sideInputs != null && sideInputs.size() > 0;
+    // TODO: add support of SideInputs
+    checkState(!hasSideInputs, "SideInputs are not supported for the moment.");
 
     // construct a map from side input to WindowingStrategy so that
     // the DoFn runner can map main-input windows to side input windows
@@ -128,7 +140,7 @@ class ParDoTranslatorBatch<InputT, OutputT>
             context.getOptions(),
             outputTags,
             mainOutputTag,
-            context.getInput(transform).getCoder(),
+            ((PCollection<InputT>)context.getInput()).getCoder(),
             outputCoderMap);
 
     Dataset<Tuple2<TupleTag<?>, WindowedValue<?>>> allOutputsDataset =