You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ar...@apache.org on 2019/01/23 17:04:08 UTC
[beam] branch spark-runner_structured-streaming updated: Fail in
case of having SideInouts or State/Timers
This is an automated email from the ASF dual-hosted git repository.
aromanenko pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push:
new ff7a24f Fail in case of having SideInouts or State/Timers
ff7a24f is described below
commit ff7a24fadd7bbd5d53e935e138fe97a62328dc58
Author: Alexey Romanenko <ar...@gmail.com>
AuthorDate: Wed Jan 23 18:02:18 2019 +0100
Fail in case of having SideInouts or State/Timers
---
.../translation/TranslationContext.java | 5 -----
.../translation/batch/ParDoTranslatorBatch.java | 16 ++++++++++++++--
2 files changed, 14 insertions(+), 7 deletions(-)
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index 2837125..bf7053d 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -136,11 +136,6 @@ public class TranslationContext {
}
@SuppressWarnings("unchecked")
- public <T extends PValue> T getInput(PTransform<T, ?> transform) {
- return (T) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(currentTransform));
- }
-
- @SuppressWarnings("unchecked")
public Map<TupleTag<?>, PValue> getInputs() {
return currentTransform.getInputs();
}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
index f80db9a..0b39b8b 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
@@ -29,6 +29,7 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.*;
@@ -47,7 +48,8 @@ import java.util.Map;
import static com.google.common.base.Preconditions.checkState;
/**
- * TODO: Add support of state and timers TODO: Add support of side inputs
+ * TODO: Add support of state and timers
+ * TODO: Add support of side inputs
*
* @param <InputT>
* @param <OutputT>
@@ -59,12 +61,19 @@ class ParDoTranslatorBatch<InputT, OutputT>
public void translateTransform(
PTransform<PCollection<InputT>, PCollectionTuple> transform, TranslationContext context) {
+ // TODO: add support of Splittable DoFn
DoFn<InputT, OutputT> doFn = getDoFn(context);
checkState(
!DoFnSignatures.signatureForDoFn(doFn).processElement().isSplittable(),
"Not expected to directly translate splittable DoFn, should have been overridden: %s",
doFn);
+ // TODO: add support of states and timers
+ DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
+ boolean stateful =
+ signature.stateDeclarations().size() > 0 || signature.timerDeclarations().size() > 0;
+ checkState(!stateful, "States and timers are not supported for the moment.");
+
Dataset<WindowedValue<InputT>> inputDataSet = context.getDataset(context.getInput());
Map<TupleTag<?>, PValue> outputs = context.getOutputs();
TupleTag<?> mainOutputTag = getTupleTag(context);
@@ -109,6 +118,9 @@ class ParDoTranslatorBatch<InputT, OutputT>
UnionCoder unionCoder = UnionCoder.of(outputCoders);
List<PCollectionView<?>> sideInputs = getSideInputs(context);
+ final boolean hasSideInputs = sideInputs != null && sideInputs.size() > 0;
+ // TODO: add support of SideInputs
+ checkState(!hasSideInputs, "SideInputs are not supported for the moment.");
// construct a map from side input to WindowingStrategy so that
// the DoFn runner can map main-input windows to side input windows
@@ -128,7 +140,7 @@ class ParDoTranslatorBatch<InputT, OutputT>
context.getOptions(),
outputTags,
mainOutputTag,
- context.getInput(transform).getCoder(),
+ ((PCollection<InputT>)context.getInput()).getCoder(),
outputCoderMap);
Dataset<Tuple2<TupleTag<?>, WindowedValue<?>>> allOutputsDataset =