You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ar...@apache.org on 2019/01/18 18:14:50 UTC
[beam] branch spark-runner_structured-streaming updated: Address
minor review notes
This is an automated email from the ASF dual-hosted git repository.
aromanenko pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push:
new 16ac4a0 Address minor review notes
16ac4a0 is described below
commit 16ac4a026dc17dcdf6bb5fe2250af6bf06e1cfbc
Author: Alexey Romanenko <ar...@gmail.com>
AuthorDate: Fri Jan 18 19:13:34 2019 +0100
Address minor review notes
---
.../translation/TranslationContext.java | 4 ++++
.../translation/batch/DoFnFunction.java | 16 ++++++++++++----
.../translation/batch/ParDoTranslatorBatch.java | 12 ++++++------
.../{SparkProcessContext.java => ProcessContext.java} | 12 ++----------
.../{SparkNoOpStepContext.java => NoOpStepContext.java} | 2 +-
5 files changed, 25 insertions(+), 21 deletions(-)
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index 75b470e..ca69261 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -109,6 +109,10 @@ public class TranslationContext {
return (Dataset<WindowedValue<T>>) dataset;
}
+ /**
+ * TODO: All these 3 methods (putDataset*) are temporary and they are used only for generics type
+ * checking. We should unify them in the future.
+ */
public void putDatasetWildcard(PValue value, Dataset<WindowedValue<?>> dataset) {
if (!datasets.containsKey(value)) {
datasets.put(value, dataset);
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java
index 35204bc..62629ee 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java
@@ -26,7 +26,7 @@ import com.google.common.collect.Multimap;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
-import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.SparkNoOpStepContext;
+import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.NoOpStepContext;
import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.SparkSideInputReader;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -45,6 +45,14 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
+/**
+ * Encapsulates a {@link DoFn} inside a Spark {@link
+ * org.apache.spark.api.java.function.MapPartitionsFunction}.
+ *
+ * <p>We get a mapping from {@link org.apache.beam.sdk.values.TupleTag} to output index and must tag
+ * all outputs with the output number. Afterwards a filter will filter out those elements that are
+ * not to be in a specific output.
+ */
public class DoFnFunction<InputT, OutputT>
implements MapPartitionsFunction<WindowedValue<InputT>, Tuple2<TupleTag<?>, WindowedValue<?>>> {
@@ -98,18 +106,18 @@ public class DoFnFunction<InputT, OutputT>
outputManager,
mainOutputTag,
additionalOutputTags,
- new SparkNoOpStepContext(),
+ new NoOpStepContext(),
inputCoder,
outputCoderMap,
windowingStrategy);
- return new SparkProcessContext<>(doFn, doFnRunner, outputManager, Collections.emptyIterator())
+ return new ProcessContext<>(doFn, doFnRunner, outputManager, Collections.emptyIterator())
.processPartition(iter)
.iterator();
}
private class DoFnOutputManager
- implements SparkProcessContext.SparkOutputManager<Tuple2<TupleTag<?>, WindowedValue<?>>> {
+ implements ProcessContext.SparkOutputManager<Tuple2<TupleTag<?>, WindowedValue<?>>> {
private final Multimap<TupleTag<?>, WindowedValue<?>> outputs = LinkedListMultimap.create();
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
index 9cbde5a..f80db9a 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
@@ -69,19 +69,19 @@ class ParDoTranslatorBatch<InputT, OutputT>
Map<TupleTag<?>, PValue> outputs = context.getOutputs();
TupleTag<?> mainOutputTag = getTupleTag(context);
- Map<TupleTag<?>, Integer> outputMap = Maps.newHashMap();
+ Map<TupleTag<?>, Integer> outputTags = Maps.newHashMap();
- outputMap.put(mainOutputTag, 0);
+ outputTags.put(mainOutputTag, 0);
int count = 1;
for (TupleTag<?> tag : outputs.keySet()) {
- if (!outputMap.containsKey(tag)) {
- outputMap.put(tag, count++);
+ if (!outputTags.containsKey(tag)) {
+ outputTags.put(tag, count++);
}
}
// Union coder elements must match the order of the output tags.
Map<Integer, TupleTag<?>> indexMap = Maps.newTreeMap();
- for (Map.Entry<TupleTag<?>, Integer> entry : outputMap.entrySet()) {
+ for (Map.Entry<TupleTag<?>, Integer> entry : outputTags.entrySet()) {
indexMap.put(entry.getValue(), entry.getKey());
}
@@ -126,7 +126,7 @@ class ParDoTranslatorBatch<InputT, OutputT>
windowingStrategy,
sideInputStrategies,
context.getOptions(),
- outputMap,
+ outputTags,
mainOutputTag,
context.getInput(transform).getCoder(),
outputCoderMap);
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SparkProcessContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ProcessContext.java
similarity index 91%
rename from runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SparkProcessContext.java
rename to runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ProcessContext.java
index 720b7ab..f51bd80 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SparkProcessContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ProcessContext.java
@@ -31,14 +31,14 @@ import java.util.Iterator;
import static com.google.common.base.Preconditions.checkArgument;
/** Spark runner process context processes Spark partitions using Beam's {@link DoFnRunner}. */
-class SparkProcessContext<FnInputT, FnOutputT, OutputT> {
+class ProcessContext<FnInputT, FnOutputT, OutputT> {
private final DoFn<FnInputT, FnOutputT> doFn;
private final DoFnRunner<FnInputT, FnOutputT> doFnRunner;
private final SparkOutputManager<OutputT> outputManager;
private Iterator<TimerInternals.TimerData> timerDataIterator;
- SparkProcessContext(
+ ProcessContext(
DoFn<FnInputT, FnOutputT> doFn,
DoFnRunner<FnInputT, FnOutputT> doFnRunner,
SparkOutputManager<OutputT> outputManager,
@@ -119,7 +119,6 @@ class SparkProcessContext<FnInputT, FnOutputT, OutputT> {
doFnRunner.processElement(inputIterator.next());
outputIterator = getOutputIterator();
} else if (timerDataIterator.hasNext()) {
- fireTimer(timerDataIterator.next());
outputIterator = getOutputIterator();
} else {
// no more input to consume, but finishBundle can produce more output
@@ -138,12 +137,5 @@ class SparkProcessContext<FnInputT, FnOutputT, OutputT> {
throw re;
}
}
-
- private void fireTimer(TimerInternals.TimerData timer) {
- StateNamespace namespace = timer.getNamespace();
- checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
- BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
- doFnRunner.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain());
- }
}
}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SparkNoOpStepContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/NoOpStepContext.java
similarity index 95%
rename from runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SparkNoOpStepContext.java
rename to runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/NoOpStepContext.java
index 889cdf5..25e6f11 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SparkNoOpStepContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/NoOpStepContext.java
@@ -22,7 +22,7 @@ import org.apache.beam.runners.core.StepContext;
import org.apache.beam.runners.core.TimerInternals;
/** A {@link StepContext} for Spark Batch Runner execution. */
-public class SparkNoOpStepContext implements StepContext {
+public class NoOpStepContext implements StepContext {
@Override
public StateInternals stateInternals() {