You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/11/15 11:55:23 UTC
[1/2] incubator-beam git commit: [BEAM-762] Unify spark-runner
EvaluationContext and StreamingEvaluationContext
Repository: incubator-beam
Updated Branches:
refs/heads/master 9c300cde8 -> 2bc66f903
[BEAM-762] Unify spark-runner EvaluationContext and StreamingEvaluationContext
PR 1291 review changes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1bef01fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1bef01fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1bef01fe
Branch: refs/heads/master
Commit: 1bef01fef5ff5ff9a960c85b00c2cc4aa504ce4d
Parents: 9c300cd
Author: Aviem Zur <av...@gmail.com>
Authored: Sun Nov 13 13:57:07 2016 +0200
Committer: Sela <an...@paypal.com>
Committed: Tue Nov 15 13:35:49 2016 +0200
----------------------------------------------------------------------
.../apache/beam/runners/spark/SparkRunner.java | 4 +-
.../spark/translation/BoundedDataset.java | 114 ++++++++
.../beam/runners/spark/translation/Dataset.java | 34 +++
.../spark/translation/EvaluationContext.java | 230 +++++++---------
.../spark/translation/TransformTranslator.java | 99 +++----
.../SparkRunnerStreamingContextFactory.java | 7 +-
.../streaming/StreamingEvaluationContext.java | 272 -------------------
.../streaming/StreamingTransformTranslator.java | 135 +++++----
.../translation/streaming/UnboundedDataset.java | 103 +++++++
9 files changed, 464 insertions(+), 534 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1bef01fe/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 45c7f55..6bbef39 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -26,7 +26,6 @@ import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
import org.apache.beam.runners.spark.translation.TransformEvaluator;
import org.apache.beam.runners.spark.translation.TransformTranslator;
import org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory;
-import org.apache.beam.runners.spark.translation.streaming.StreamingEvaluationContext;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -49,6 +48,7 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* The SparkRunner translate operations defined on a pipeline to a representation
* executable by Spark, and then submitting the job to Spark to be executed. If we wanted to run
@@ -136,7 +136,7 @@ public final class SparkRunner extends PipelineRunner<EvaluationResult> {
jssc.start();
// if recovering from checkpoint, we have to reconstruct the EvaluationResult instance.
- return contextFactory.getCtxt() == null ? new StreamingEvaluationContext(jssc.sc(),
+ return contextFactory.getCtxt() == null ? new EvaluationContext(jssc.sc(),
pipeline, jssc, mOptions.getTimeout()) : contextFactory.getCtxt();
} else {
if (mOptions.getTimeout() > 0) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1bef01fe/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
new file mode 100644
index 0000000..774efb9
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.translation;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaRDDLike;
+import org.apache.spark.api.java.JavaSparkContext;
+
+/**
+ * Holds an RDD or values for deferred conversion to an RDD if needed. PCollections are sometimes
+ * created from a collection of objects (using RDD parallelize) and then only used to create View
+ * objects; in which case they do not need to be converted to bytes since they are not transferred
+ * across the network until they are broadcast.
+ */
+public class BoundedDataset<T> implements Dataset {
+ // only set if creating an RDD from a static collection
+ @Nullable private transient JavaSparkContext jsc;
+
+ private Iterable<WindowedValue<T>> windowedValues;
+ private Coder<T> coder;
+ private JavaRDD<WindowedValue<T>> rdd;
+
+ BoundedDataset(JavaRDD<WindowedValue<T>> rdd) {
+ this.rdd = rdd;
+ }
+
+ BoundedDataset(Iterable<T> values, JavaSparkContext jsc, Coder<T> coder) {
+ this.windowedValues =
+ Iterables.transform(values, WindowingHelpers.<T>windowValueFunction());
+ this.jsc = jsc;
+ this.coder = coder;
+ }
+
+ @SuppressWarnings("ConstantConditions")
+ public JavaRDD<WindowedValue<T>> getRDD() {
+ if (rdd == null) {
+ WindowedValue.ValueOnlyWindowedValueCoder<T> windowCoder =
+ WindowedValue.getValueOnlyCoder(coder);
+ rdd = jsc.parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder))
+ .map(CoderHelpers.fromByteFunction(windowCoder));
+ }
+ return rdd;
+ }
+
+ Iterable<WindowedValue<T>> getValues(PCollection<T> pcollection) {
+ if (windowedValues == null) {
+ WindowFn<?, ?> windowFn =
+ pcollection.getWindowingStrategy().getWindowFn();
+ Coder<? extends BoundedWindow> windowCoder = windowFn.windowCoder();
+ final WindowedValue.WindowedValueCoder<T> windowedValueCoder;
+ if (windowFn instanceof GlobalWindows) {
+ windowedValueCoder =
+ WindowedValue.ValueOnlyWindowedValueCoder.of(pcollection.getCoder());
+ } else {
+ windowedValueCoder =
+ WindowedValue.FullWindowedValueCoder.of(pcollection.getCoder(), windowCoder);
+ }
+ JavaRDDLike<byte[], ?> bytesRDD =
+ rdd.map(CoderHelpers.toByteFunction(windowedValueCoder));
+ List<byte[]> clientBytes = bytesRDD.collect();
+ windowedValues = Iterables.transform(clientBytes,
+ new Function<byte[], WindowedValue<T>>() {
+ @Override
+ public WindowedValue<T> apply(byte[] bytes) {
+ return CoderHelpers.fromByteArray(bytes, windowedValueCoder);
+ }
+ });
+ }
+ return windowedValues;
+ }
+
+ @Override
+ public void cache() {
+ rdd.cache();
+ }
+
+ @Override
+ public void action() {
+ rdd.count();
+ }
+
+ @Override
+ public void setName(String name) {
+ rdd.setName(name);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1bef01fe/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/Dataset.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/Dataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/Dataset.java
new file mode 100644
index 0000000..36b03fe
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/Dataset.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.translation;
+
+import java.io.Serializable;
+
+
+/**
+ * Holder for Spark RDD/DStream.
+ */
+public interface Dataset extends Serializable {
+
+ void cache();
+
+ void action();
+
+ void setName(String name);
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1bef01fe/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index 6ccec85..aaf7573 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -20,17 +20,15 @@ package org.apache.beam.runners.spark.translation;
import static com.google.common.base.Preconditions.checkArgument;
-import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.beam.runners.spark.EvaluationResult;
import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
-import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
import org.apache.beam.sdk.AggregatorRetrievalException;
import org.apache.beam.sdk.AggregatorValues;
import org.apache.beam.sdk.Pipeline;
@@ -39,17 +37,15 @@ import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
-import org.apache.spark.api.java.JavaRDDLike;
+import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.joda.time.Duration;
@@ -58,91 +54,48 @@ import org.joda.time.Duration;
*/
public class EvaluationContext implements EvaluationResult {
private final JavaSparkContext jsc;
- private final Pipeline pipeline;
+ private JavaStreamingContext jssc;
private final SparkRuntimeContext runtime;
- private final Map<PValue, RDDHolder<?>> pcollections = new LinkedHashMap<>();
- private final Set<RDDHolder<?>> leafRdds = new LinkedHashSet<>();
- private final Set<PValue> multireads = new LinkedHashSet<>();
+ private final Pipeline pipeline;
+ private long timeout;
+ private final Map<PValue, Dataset> datasets = new LinkedHashMap<>();
+ private final Map<PValue, Dataset> pcollections = new LinkedHashMap<>();
+ private final Set<Dataset> leaves = new LinkedHashSet<>();
+ private final Set<PValue> multiReads = new LinkedHashSet<>();
private final Map<PValue, Object> pobjects = new LinkedHashMap<>();
private final Map<PValue, Iterable<? extends WindowedValue<?>>> pview = new LinkedHashMap<>();
- protected AppliedPTransform<?, ?, ?> currentTransform;
+ private AppliedPTransform<?, ?, ?> currentTransform;
+ private State state;
public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline) {
this.jsc = jsc;
this.pipeline = pipeline;
this.runtime = new SparkRuntimeContext(pipeline, jsc);
+ // A batch pipeline is blocking by nature
+ this.state = State.DONE;
}
- /**
- * Holds an RDD or values for deferred conversion to an RDD if needed. PCollections are
- * sometimes created from a collection of objects (using RDD parallelize) and then
- * only used to create View objects; in which case they do not need to be
- * converted to bytes since they are not transferred across the network until they are
- * broadcast.
- */
- private class RDDHolder<T> {
-
- private Iterable<WindowedValue<T>> windowedValues;
- private Coder<T> coder;
- private JavaRDDLike<WindowedValue<T>, ?> rdd;
-
- RDDHolder(Iterable<T> values, Coder<T> coder) {
- this.windowedValues =
- Iterables.transform(values, WindowingHelpers.<T>windowValueFunction());
- this.coder = coder;
- }
-
- RDDHolder(JavaRDDLike<WindowedValue<T>, ?> rdd) {
- this.rdd = rdd;
- }
-
- JavaRDDLike<WindowedValue<T>, ?> getRDD() {
- if (rdd == null) {
- WindowedValue.ValueOnlyWindowedValueCoder<T> windowCoder =
- WindowedValue.getValueOnlyCoder(coder);
- rdd = jsc.parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder))
- .map(CoderHelpers.fromByteFunction(windowCoder));
- }
- return rdd;
- }
-
- Iterable<WindowedValue<T>> getValues(PCollection<T> pcollection) {
- if (windowedValues == null) {
- WindowFn<?, ?> windowFn =
- pcollection.getWindowingStrategy().getWindowFn();
- Coder<? extends BoundedWindow> windowCoder = windowFn.windowCoder();
- final WindowedValue.WindowedValueCoder<T> windowedValueCoder;
- if (windowFn instanceof GlobalWindows) {
- windowedValueCoder =
- WindowedValue.ValueOnlyWindowedValueCoder.of(pcollection.getCoder());
- } else {
- windowedValueCoder =
- WindowedValue.FullWindowedValueCoder.of(pcollection.getCoder(), windowCoder);
- }
- JavaRDDLike<byte[], ?> bytesRDD =
- rdd.map(CoderHelpers.toByteFunction(windowedValueCoder));
- List<byte[]> clientBytes = bytesRDD.collect();
- windowedValues = Iterables.transform(clientBytes,
- new Function<byte[], WindowedValue<T>>() {
- @Override
- public WindowedValue<T> apply(byte[] bytes) {
- return CoderHelpers.fromByteArray(bytes, windowedValueCoder);
- }
- });
- }
- return windowedValues;
- }
+ public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline,
+ JavaStreamingContext jssc, long timeout) {
+ this(jsc, pipeline);
+ this.jssc = jssc;
+ this.timeout = timeout;
+ this.state = State.RUNNING;
}
- protected JavaSparkContext getSparkContext() {
+ JavaSparkContext getSparkContext() {
return jsc;
}
+ public JavaStreamingContext getStreamingContext() {
+ return jssc;
+ }
+
public Pipeline getPipeline() {
return pipeline;
}
- protected SparkRuntimeContext getRuntimeContext() {
+ public SparkRuntimeContext getRuntimeContext() {
return runtime;
}
@@ -150,11 +103,7 @@ public class EvaluationContext implements EvaluationResult {
this.currentTransform = transform;
}
- protected AppliedPTransform<?, ?, ?> getCurrentTransform() {
- return currentTransform;
- }
-
- protected <T extends PInput> T getInput(PTransform<T, ?> transform) {
+ public <T extends PInput> T getInput(PTransform<T, ?> transform) {
checkArgument(currentTransform != null && currentTransform.getTransform() == transform,
"can only be called with current transform");
@SuppressWarnings("unchecked")
@@ -162,7 +111,7 @@ public class EvaluationContext implements EvaluationResult {
return input;
}
- protected <T extends POutput> T getOutput(PTransform<?, T> transform) {
+ public <T extends POutput> T getOutput(PTransform<?, T> transform) {
checkArgument(currentTransform != null && currentTransform.getTransform() == transform,
"can only be called with current transform");
@SuppressWarnings("unchecked")
@@ -170,81 +119,74 @@ public class EvaluationContext implements EvaluationResult {
return output;
}
- protected <T> void setOutputRDD(PTransform<?, ?> transform,
- JavaRDDLike<WindowedValue<T>, ?> rdd) {
- setRDD((PValue) getOutput(transform), rdd);
+ public void putDataset(PTransform<?, ?> transform, Dataset dataset) {
+ putDataset((PValue) getOutput(transform), dataset);
+ }
+
+ public void putDataset(PValue pvalue, Dataset dataset) {
+ try {
+ dataset.setName(pvalue.getName());
+ } catch (IllegalStateException e) {
+ // name not set, ignore
+ }
+ datasets.put(pvalue, dataset);
+ leaves.add(dataset);
+ }
+
+ <T> void putBoundedDatasetFromValues(PTransform<?, ?> transform, Iterable<T> values,
+ Coder<T> coder) {
+ datasets.put((PValue) getOutput(transform), new BoundedDataset<>(values, jsc, coder));
}
- protected <T> void setOutputRDDFromValues(PTransform<?, ?> transform, Iterable<T> values,
- Coder<T> coder) {
- pcollections.put((PValue) getOutput(transform), new RDDHolder<>(values, coder));
+ public <T> void putUnboundedDatasetFromQueue(
+ PTransform<?, ?> transform, Iterable<Iterable<T>> values, Coder<T> coder) {
+ datasets.put((PValue) getOutput(transform), new UnboundedDataset<>(values, jssc, coder));
}
- public void setPView(PValue view, Iterable<? extends WindowedValue<?>> value) {
+ void putPView(PValue view, Iterable<? extends WindowedValue<?>> value) {
pview.put(view, value);
}
- protected boolean hasOutputRDD(PTransform<? extends PInput, ?> transform) {
- PValue pvalue = (PValue) getOutput(transform);
- return pcollections.containsKey(pvalue);
+ public Dataset borrowDataset(PTransform<?, ?> transform) {
+ return borrowDataset((PValue) getInput(transform));
}
- public JavaRDDLike<?, ?> getRDD(PValue pvalue) {
- RDDHolder<?> rddHolder = pcollections.get(pvalue);
- JavaRDDLike<?, ?> rdd = rddHolder.getRDD();
- leafRdds.remove(rddHolder);
- if (multireads.contains(pvalue)) {
+ public Dataset borrowDataset(PValue pvalue) {
+ Dataset dataset = datasets.get(pvalue);
+ leaves.remove(dataset);
+ if (multiReads.contains(pvalue)) {
// Ensure the RDD is marked as cached
- rdd.rdd().cache();
+ dataset.cache();
} else {
- multireads.add(pvalue);
+ multiReads.add(pvalue);
}
- return rdd;
+ return dataset;
}
- protected <T> void setRDD(PValue pvalue, JavaRDDLike<WindowedValue<T>, ?> rdd) {
- try {
- rdd.rdd().setName(pvalue.getName());
- } catch (IllegalStateException e) {
- // name not set, ignore
- }
- RDDHolder<T> rddHolder = new RDDHolder<>(rdd);
- pcollections.put(pvalue, rddHolder);
- leafRdds.add(rddHolder);
- }
-
- protected JavaRDDLike<?, ?> getInputRDD(PTransform<? extends PInput, ?> transform) {
- return getRDD((PValue) getInput(transform));
- }
-
-
<T> Iterable<? extends WindowedValue<?>> getPCollectionView(PCollectionView<T> view) {
return pview.get(view);
}
/**
- * Computes the outputs for all RDDs that are leaves in the DAG and do not have any
- * actions (like saving to a file) registered on them (i.e. they are performed for side
- * effects).
+ * Computes the outputs for all RDDs that are leaves in the DAG and do not have any actions (like
+ * saving to a file) registered on them (i.e. they are performed for side effects).
*/
public void computeOutputs() {
- for (RDDHolder<?> rddHolder : leafRdds) {
- JavaRDDLike<?, ?> rdd = rddHolder.getRDD();
- rdd.rdd().cache(); // cache so that any subsequent get() is cheap
- rdd.count(); // force the RDD to be computed
+ for (Dataset dataset : leaves) {
+ dataset.cache(); // cache so that any subsequent get() is cheap.
+ dataset.action(); // force computation.
}
}
+ @SuppressWarnings("unchecked")
@Override
public <T> T get(PValue value) {
if (pobjects.containsKey(value)) {
- @SuppressWarnings("unchecked")
T result = (T) pobjects.get(value);
return result;
}
if (pcollections.containsKey(value)) {
- JavaRDDLike<?, ?> rdd = pcollections.get(value).getRDD();
- @SuppressWarnings("unchecked")
+ JavaRDD<?> rdd = ((BoundedDataset) pcollections.get(value)).getRDD();
T res = (T) Iterables.getOnlyElement(rdd.collect());
pobjects.put(value, res);
return res;
@@ -271,27 +213,37 @@ public class EvaluationContext implements EvaluationResult {
@Override
public <T> Iterable<T> get(PCollection<T> pcollection) {
@SuppressWarnings("unchecked")
- RDDHolder<T> rddHolder = (RDDHolder<T>) pcollections.get(pcollection);
- Iterable<WindowedValue<T>> windowedValues = rddHolder.getValues(pcollection);
+ BoundedDataset<T> boundedDataset = (BoundedDataset<T>) datasets.get(pcollection);
+ Iterable<WindowedValue<T>> windowedValues = boundedDataset.getValues(pcollection);
return Iterables.transform(windowedValues, WindowingHelpers.<T>unwindowValueFunction());
}
<T> Iterable<WindowedValue<T>> getWindowedValues(PCollection<T> pcollection) {
@SuppressWarnings("unchecked")
- RDDHolder<T> rddHolder = (RDDHolder<T>) pcollections.get(pcollection);
- return rddHolder.getValues(pcollection);
+ BoundedDataset<T> boundedDataset = (BoundedDataset<T>) datasets.get(pcollection);
+ return boundedDataset.getValues(pcollection);
}
@Override
public void close(boolean gracefully) {
- // graceful stop is used for streaming.
+ if (isStreamingPipeline()) {
+ // stop streaming context
+ if (timeout > 0) {
+ jssc.awaitTerminationOrTimeout(timeout);
+ } else {
+ jssc.awaitTermination();
+ }
+ // stop streaming context gracefully, so checkpointing (and other computations) get to
+ // finish before shutdown.
+ jssc.stop(false, gracefully);
+ }
+ state = State.DONE;
SparkContextFactory.stopSparkContext(jsc);
}
- /** The runner is blocking. */
@Override
public State getState() {
- return State.DONE;
+ return state;
}
@Override
@@ -307,9 +259,19 @@ public class EvaluationContext implements EvaluationResult {
@Override
public State waitUntilFinish(Duration duration) {
- // This is no-op, since Spark runner in batch is blocking.
- // It needs to be updated once SparkRunner supports non-blocking execution:
- // https://issues.apache.org/jira/browse/BEAM-595
- return State.DONE;
+ if (isStreamingPipeline()) {
+ throw new UnsupportedOperationException(
+ "Spark runner EvaluationContext does not support waitUntilFinish for streaming "
+ + "pipelines.");
+ } else {
+ // This is no-op, since Spark runner in batch is blocking.
+ // It needs to be updated once SparkRunner supports non-blocking execution:
+ // https://issues.apache.org/jira/browse/BEAM-595
+ return State.DONE;
+ }
+ }
+
+ private boolean isStreamingPipeline() {
+ return jssc != null;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1bef01fe/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 2e682c4..c902ee3 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.beam.runners.spark.translation;
import static com.google.common.base.Preconditions.checkState;
@@ -73,11 +72,9 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.spark.Accumulator;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaRDDLike;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
-
import scala.Tuple2;
@@ -101,11 +98,11 @@ public final class TransformTranslator {
} else {
JavaRDD<WindowedValue<T>>[] rdds = new JavaRDD[pcs.size()];
for (int i = 0; i < rdds.length; i++) {
- rdds[i] = (JavaRDD<WindowedValue<T>>) context.getRDD(pcs.get(i));
+ rdds[i] = ((BoundedDataset<T>) context.borrowDataset(pcs.get(i))).getRDD();
}
unionRDD = context.getSparkContext().union(rdds);
}
- context.setOutputRDD(transform, unionRDD);
+ context.putDataset(transform, new BoundedDataset<>(unionRDD));
}
};
}
@@ -116,7 +113,7 @@ public final class TransformTranslator {
public void evaluate(GroupByKey<K, V> transform, EvaluationContext context) {
@SuppressWarnings("unchecked")
JavaRDD<WindowedValue<KV<K, V>>> inRDD =
- (JavaRDD<WindowedValue<KV<K, V>>>) context.getInputRDD(transform);
+ ((BoundedDataset<KV<K, V>>) context.borrowDataset(transform)).getRDD();
@SuppressWarnings("unchecked")
final KvCoder<K, V> coder = (KvCoder<K, V>) context.getInput(transform).getCoder();
@@ -124,8 +121,9 @@ public final class TransformTranslator {
final Accumulator<NamedAggregators> accum =
AccumulatorSingleton.getInstance(context.getSparkContext());
- context.setOutputRDD(transform, GroupCombineFunctions.groupByKey(inRDD, accum, coder,
- context.getRuntimeContext(), context.getInput(transform).getWindowingStrategy()));
+ context.putDataset(transform,
+ new BoundedDataset<>(GroupCombineFunctions.groupByKey(inRDD, accum, coder,
+ context.getRuntimeContext(), context.getInput(transform).getWindowingStrategy())));
}
};
}
@@ -146,16 +144,17 @@ public final class TransformTranslator {
CombineFnUtil.toFnWithContext(transform.getFn());
@SuppressWarnings("unchecked")
- JavaRDDLike<WindowedValue<KV<K, Iterable<InputT>>>, ?> inRDD =
- (JavaRDDLike<WindowedValue<KV<K, Iterable<InputT>>>, ?>)
- context.getInputRDD(transform);
+ JavaRDD<WindowedValue<KV<K, Iterable<InputT>>>> inRDD =
+ ((BoundedDataset<KV<K, Iterable<InputT>>>)
+ context.borrowDataset(transform)).getRDD();
- SparkKeyedCombineFn<K, InputT, ?, OutputT> combineFnWithContext =
+ SparkKeyedCombineFn<K, InputT, ?, OutputT> combineFnWithContext =
new SparkKeyedCombineFn<>(fn, context.getRuntimeContext(),
TranslationUtils.getSideInputs(transform.getSideInputs(), context),
- windowingStrategy);
- context.setOutputRDD(transform, inRDD.map(new TranslationUtils.CombineGroupedValues<>(
- combineFnWithContext)));
+ windowingStrategy);
+ context.putDataset(transform, new BoundedDataset<>(inRDD.map(new TranslationUtils
+ .CombineGroupedValues<>(
+ combineFnWithContext))));
}
};
}
@@ -182,10 +181,11 @@ public final class TransformTranslator {
@SuppressWarnings("unchecked")
JavaRDD<WindowedValue<InputT>> inRdd =
- (JavaRDD<WindowedValue<InputT>>) context.getInputRDD(transform);
+ ((BoundedDataset<InputT>) context.borrowDataset(transform)).getRDD();
- context.setOutputRDD(transform, GroupCombineFunctions.combineGlobally(inRdd, combineFn,
- iCoder, oCoder, runtimeContext, windowingStrategy, sideInputs, hasDefault));
+ context.putDataset(transform, new BoundedDataset<>(GroupCombineFunctions
+ .combineGlobally(inRdd, combineFn,
+ iCoder, oCoder, runtimeContext, windowingStrategy, sideInputs, hasDefault)));
}
};
}
@@ -212,10 +212,11 @@ public final class TransformTranslator {
@SuppressWarnings("unchecked")
JavaRDD<WindowedValue<KV<K, InputT>>> inRdd =
- (JavaRDD<WindowedValue<KV<K, InputT>>>) context.getInputRDD(transform);
+ ((BoundedDataset<KV<K, InputT>>) context.borrowDataset(transform)).getRDD();
- context.setOutputRDD(transform, GroupCombineFunctions.combinePerKey(inRdd, combineFn,
- inputCoder, runtimeContext, windowingStrategy, sideInputs));
+ context.putDataset(transform, new BoundedDataset<>(GroupCombineFunctions
+ .combinePerKey(inRdd, combineFn,
+ inputCoder, runtimeContext, windowingStrategy, sideInputs)));
}
};
}
@@ -225,8 +226,8 @@ public final class TransformTranslator {
@Override
public void evaluate(ParDo.Bound<InputT, OutputT> transform, EvaluationContext context) {
@SuppressWarnings("unchecked")
- JavaRDDLike<WindowedValue<InputT>, ?> inRDD =
- (JavaRDDLike<WindowedValue<InputT>, ?>) context.getInputRDD(transform);
+ JavaRDD<WindowedValue<InputT>> inRDD =
+ ((BoundedDataset<InputT>) context.borrowDataset(transform)).getRDD();
@SuppressWarnings("unchecked")
final WindowFn<Object, ?> windowFn =
(WindowFn<Object, ?>) context.getInput(transform).getWindowingStrategy().getWindowFn();
@@ -234,9 +235,9 @@ public final class TransformTranslator {
AccumulatorSingleton.getInstance(context.getSparkContext());
Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
TranslationUtils.getSideInputs(transform.getSideInputs(), context);
- context.setOutputRDD(transform,
- inRDD.mapPartitions(new DoFnFunction<>(accum, transform.getFn(),
- context.getRuntimeContext(), sideInputs, windowFn)));
+ context.putDataset(transform,
+ new BoundedDataset<>(inRDD.mapPartitions(new DoFnFunction<>(accum, transform.getFn(),
+ context.getRuntimeContext(), sideInputs, windowFn))));
}
};
}
@@ -247,8 +248,8 @@ public final class TransformTranslator {
@Override
public void evaluate(ParDo.BoundMulti<InputT, OutputT> transform, EvaluationContext context) {
@SuppressWarnings("unchecked")
- JavaRDDLike<WindowedValue<InputT>, ?> inRDD =
- (JavaRDDLike<WindowedValue<InputT>, ?>) context.getInputRDD(transform);
+ JavaRDD<WindowedValue<InputT>> inRDD =
+ ((BoundedDataset<InputT>) context.borrowDataset(transform)).getRDD();
@SuppressWarnings("unchecked")
final WindowFn<Object, ?> windowFn =
(WindowFn<Object, ?>) context.getInput(transform).getWindowingStrategy().getWindowFn();
@@ -268,7 +269,7 @@ public final class TransformTranslator {
// Object is the best we can do since different outputs can have different tags
JavaRDD<WindowedValue<Object>> values =
(JavaRDD<WindowedValue<Object>>) (JavaRDD<?>) filtered.values();
- context.setRDD(e.getValue(), values);
+ context.putDataset(e.getValue(), new BoundedDataset<>(values));
}
}
};
@@ -281,8 +282,8 @@ public final class TransformTranslator {
public void evaluate(TextIO.Read.Bound<T> transform, EvaluationContext context) {
String pattern = transform.getFilepattern();
JavaRDD<WindowedValue<String>> rdd = context.getSparkContext().textFile(pattern)
- .map(WindowingHelpers.<String>windowFunction());
- context.setOutputRDD(transform, rdd);
+ .map(WindowingHelpers.<String>windowFunction());
+ context.putDataset(transform, new BoundedDataset<>(rdd));
}
};
}
@@ -293,7 +294,7 @@ public final class TransformTranslator {
public void evaluate(TextIO.Write.Bound<T> transform, EvaluationContext context) {
@SuppressWarnings("unchecked")
JavaPairRDD<T, Void> last =
- ((JavaRDDLike<WindowedValue<T>, ?>) context.getInputRDD(transform))
+ ((BoundedDataset<T>) context.borrowDataset(transform)).getRDD()
.map(WindowingHelpers.<T>unwindowFunction())
.mapToPair(new PairFunction<T, T,
Void>() {
@@ -331,7 +332,7 @@ public final class TransformTranslator {
return key.datum();
}
}).map(WindowingHelpers.<T>windowFunction());
- context.setOutputRDD(transform, rdd);
+ context.putDataset(transform, new BoundedDataset<>(rdd));
}
};
}
@@ -349,7 +350,7 @@ public final class TransformTranslator {
AvroJob.setOutputKeySchema(job, transform.getSchema());
@SuppressWarnings("unchecked")
JavaPairRDD<AvroKey<T>, NullWritable> last =
- ((JavaRDDLike<WindowedValue<T>, ?>) context.getInputRDD(transform))
+ ((BoundedDataset<T>) context.borrowDataset(transform)).getRDD()
.map(WindowingHelpers.<T>unwindowFunction())
.mapToPair(new PairFunction<T, AvroKey<T>, NullWritable>() {
@Override
@@ -377,7 +378,7 @@ public final class TransformTranslator {
JavaRDD<WindowedValue<T>> input = new SourceRDD.Bounded<>(
jsc.sc(), transform.getSource(), runtimeContext).toJavaRDD();
// cache to avoid re-evaluation of the source by Spark's lazy DAG evaluation.
- context.setOutputRDD(transform, input.cache());
+ context.putDataset(transform, new BoundedDataset<>(input.cache()));
}
};
}
@@ -388,7 +389,7 @@ public final class TransformTranslator {
public void evaluate(HadoopIO.Read.Bound<K, V> transform, EvaluationContext context) {
String pattern = transform.getFilepattern();
JavaSparkContext jsc = context.getSparkContext();
- @SuppressWarnings ("unchecked")
+ @SuppressWarnings("unchecked")
JavaPairRDD<K, V> file = jsc.newAPIHadoopFile(pattern,
transform.getFormatClass(),
transform.getKeyClass(), transform.getValueClass(),
@@ -400,7 +401,7 @@ public final class TransformTranslator {
return KV.of(t2._1(), t2._2());
}
}).map(WindowingHelpers.<KV<K, V>>windowFunction());
- context.setOutputRDD(transform, rdd);
+ context.putDataset(transform, new BoundedDataset<>(rdd));
}
};
}
@@ -410,8 +411,8 @@ public final class TransformTranslator {
@Override
public void evaluate(HadoopIO.Write.Bound<K, V> transform, EvaluationContext context) {
@SuppressWarnings("unchecked")
- JavaPairRDD<K, V> last = ((JavaRDDLike<WindowedValue<KV<K, V>>, ?>) context
- .getInputRDD(transform))
+ JavaPairRDD<K, V> last = ((BoundedDataset<KV<K, V>>) context.borrowDataset(transform))
+ .getRDD()
.map(WindowingHelpers.<KV<K, V>>unwindowFunction())
.mapToPair(new PairFunction<KV<K, V>, K, V>() {
@Override
@@ -492,20 +493,20 @@ public final class TransformTranslator {
@Override
public void evaluate(Window.Bound<T> transform, EvaluationContext context) {
@SuppressWarnings("unchecked")
- JavaRDDLike<WindowedValue<T>, ?> inRDD =
- (JavaRDDLike<WindowedValue<T>, ?>) context.getInputRDD(transform);
+ JavaRDD<WindowedValue<T>> inRDD =
+ ((BoundedDataset<T>) context.borrowDataset(transform)).getRDD();
if (TranslationUtils.skipAssignWindows(transform, context)) {
- context.setOutputRDD(transform, inRDD);
+ context.putDataset(transform, new BoundedDataset<>(inRDD));
} else {
@SuppressWarnings("unchecked")
WindowFn<? super T, W> windowFn = (WindowFn<? super T, W>) transform.getWindowFn();
OldDoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn);
Accumulator<NamedAggregators> accum =
AccumulatorSingleton.getInstance(context.getSparkContext());
- context.setOutputRDD(transform,
- inRDD.mapPartitions(new DoFnFunction<>(accum, addWindowsDoFn,
- context.getRuntimeContext(), null, null)));
+ context.putDataset(transform,
+ new BoundedDataset<>(inRDD.mapPartitions(new DoFnFunction<>(accum, addWindowsDoFn,
+ context.getRuntimeContext(), null, null))));
}
}
};
@@ -519,7 +520,7 @@ public final class TransformTranslator {
// Use a coder to convert the objects in the PCollection to byte arrays, so they
// can be transferred over the network.
Coder<T> coder = context.getOutput(transform).getCoder();
- context.setOutputRDDFromValues(transform, elems, coder);
+ context.putBoundedDatasetFromValues(transform, elems, coder);
}
};
}
@@ -530,7 +531,7 @@ public final class TransformTranslator {
public void evaluate(View.AsSingleton<T> transform, EvaluationContext context) {
Iterable<? extends WindowedValue<?>> iter =
context.getWindowedValues(context.getInput(transform));
- context.setPView(context.getOutput(transform), iter);
+ context.putPView(context.getOutput(transform), iter);
}
};
}
@@ -541,7 +542,7 @@ public final class TransformTranslator {
public void evaluate(View.AsIterable<T> transform, EvaluationContext context) {
Iterable<? extends WindowedValue<?>> iter =
context.getWindowedValues(context.getInput(transform));
- context.setPView(context.getOutput(transform), iter);
+ context.putPView(context.getOutput(transform), iter);
}
};
}
@@ -554,7 +555,7 @@ public final class TransformTranslator {
EvaluationContext context) {
Iterable<? extends WindowedValue<?>> iter =
context.getWindowedValues(context.getInput(transform));
- context.setPView(context.getOutput(transform), iter);
+ context.putPView(context.getOutput(transform), iter);
}
};
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1bef01fe/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
index f8ee8ad..01398e4 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
@@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import org.apache.beam.runners.spark.SparkContextOptions;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.SparkRunner;
+import org.apache.beam.runners.spark.translation.EvaluationContext;
import org.apache.beam.runners.spark.translation.SparkContextFactory;
import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
import org.apache.beam.runners.spark.translation.TransformTranslator;
@@ -54,7 +55,7 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextF
this.options = options;
}
- private StreamingEvaluationContext ctxt;
+ private EvaluationContext ctxt;
@Override
public JavaStreamingContext create() {
@@ -72,7 +73,7 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextF
JavaSparkContext jsc = SparkContextFactory.getSparkContext(options);
JavaStreamingContext jssc = new JavaStreamingContext(jsc, batchDuration);
- ctxt = new StreamingEvaluationContext(jsc, pipeline, jssc,
+ ctxt = new EvaluationContext(jsc, pipeline, jssc,
options.getTimeout());
pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, ctxt));
ctxt.computeOutputs();
@@ -95,7 +96,7 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextF
return jssc;
}
- public StreamingEvaluationContext getCtxt() {
+ public EvaluationContext getCtxt() {
return ctxt;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1bef01fe/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
deleted file mode 100644
index bfba316..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.translation.streaming;
-
-
-import com.google.common.collect.Iterables;
-
-import java.io.IOException;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.beam.runners.spark.coders.CoderHelpers;
-import org.apache.beam.runners.spark.translation.EvaluationContext;
-import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
-import org.apache.beam.runners.spark.translation.WindowingHelpers;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaRDDLike;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.VoidFunction;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaDStreamLike;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.joda.time.Duration;
-
-
-/**
- * Streaming evaluation context helps to handle streaming.
- */
-public class StreamingEvaluationContext extends EvaluationContext {
-
- private final JavaStreamingContext jssc;
- private final long timeout;
- private final Map<PValue, DStreamHolder<?>> pstreams = new LinkedHashMap<>();
- private final Set<DStreamHolder<?>> leafStreams = new LinkedHashSet<>();
-
- public StreamingEvaluationContext(JavaSparkContext jsc, Pipeline pipeline,
- JavaStreamingContext jssc, long timeout) {
- super(jsc, pipeline);
- this.jssc = jssc;
- this.timeout = timeout;
- }
-
- /**
- * DStream holder Can also crate a DStream from a supplied queue of values, but mainly for
- * testing.
- */
- private class DStreamHolder<T> {
-
- private Iterable<Iterable<T>> values;
- private Coder<T> coder;
- private JavaDStream<WindowedValue<T>> dStream;
-
- DStreamHolder(Iterable<Iterable<T>> values, Coder<T> coder) {
- this.values = values;
- this.coder = coder;
- }
-
- DStreamHolder(JavaDStream<WindowedValue<T>> dStream) {
- this.dStream = dStream;
- }
-
- @SuppressWarnings("unchecked")
- JavaDStream<WindowedValue<T>> getDStream() {
- if (dStream == null) {
- WindowedValue.ValueOnlyWindowedValueCoder<T> windowCoder =
- WindowedValue.getValueOnlyCoder(coder);
- // create the DStream from queue
- Queue<JavaRDD<WindowedValue<T>>> rddQueue = new LinkedBlockingQueue<>();
- JavaRDD<WindowedValue<T>> lastRDD = null;
- for (Iterable<T> v : values) {
- Iterable<WindowedValue<T>> windowedValues =
- Iterables.transform(v, WindowingHelpers.<T>windowValueFunction());
- JavaRDD<WindowedValue<T>> rdd = getSparkContext().parallelize(
- CoderHelpers.toByteArrays(windowedValues, windowCoder)).map(
- CoderHelpers.fromByteFunction(windowCoder));
- rddQueue.offer(rdd);
- lastRDD = rdd;
- }
- // create dstream from queue, one at a time,
- // with last as default in case batches repeat (graceful stops for example).
- // if the stream is empty, avoid creating a default empty RDD.
- // mainly for unit test so no reason to have this configurable.
- dStream = lastRDD != null ? jssc.queueStream(rddQueue, true, lastRDD)
- : jssc.queueStream(rddQueue, true);
- }
- return dStream;
- }
- }
-
- <T> void setDStreamFromQueue(
- PTransform<?, ?> transform, Iterable<Iterable<T>> values, Coder<T> coder) {
- pstreams.put((PValue) getOutput(transform), new DStreamHolder<>(values, coder));
- }
-
- <T> void setStream(PTransform<?, ?> transform, JavaDStream<WindowedValue<T>> dStream) {
- setStream((PValue) getOutput(transform), dStream);
- }
-
- <T> void setStream(PValue pvalue, JavaDStream<WindowedValue<T>> dStream) {
- DStreamHolder<T> dStreamHolder = new DStreamHolder<>(dStream);
- pstreams.put(pvalue, dStreamHolder);
- leafStreams.add(dStreamHolder);
- }
-
- boolean hasStream(PTransform<?, ?> transform) {
- PValue pvalue = (PValue) getInput(transform);
- return hasStream(pvalue);
- }
-
- boolean hasStream(PValue pvalue) {
- return pstreams.containsKey(pvalue);
- }
-
- JavaDStreamLike<?, ?, ?> getStream(PTransform<?, ?> transform) {
- return getStream((PValue) getInput(transform));
- }
-
- JavaDStreamLike<?, ?, ?> getStream(PValue pvalue) {
- DStreamHolder<?> dStreamHolder = pstreams.get(pvalue);
- JavaDStreamLike<?, ?, ?> dStream = dStreamHolder.getDStream();
- leafStreams.remove(dStreamHolder);
- return dStream;
- }
-
- // used to set the RDD from the DStream in the RDDHolder for transformation
- <T> void setInputRDD(
- PTransform<? extends PInput, ?> transform, JavaRDDLike<WindowedValue<T>, ?> rdd) {
- setRDD((PValue) getInput(transform), rdd);
- }
-
- // used to get the RDD transformation output and use it as the DStream transformation output
- JavaRDDLike<?, ?> getOutputRDD(PTransform<?, ?> transform) {
- return getRDD((PValue) getOutput(transform));
- }
-
- public JavaStreamingContext getStreamingContext() {
- return jssc;
- }
-
- @Override
- public void computeOutputs() {
- super.computeOutputs(); // in case the pipeline contains bounded branches as well.
- for (DStreamHolder<?> streamHolder : leafStreams) {
- computeOutput(streamHolder);
- } // force a DStream action
- }
-
- private static <T> void computeOutput(DStreamHolder<T> streamHolder) {
- JavaDStream<WindowedValue<T>> dStream = streamHolder.getDStream();
- // cache in DStream level not RDD
- // because there could be a difference in StorageLevel if the DStream is windowed.
- dStream.dstream().cache();
- dStream.foreachRDD(new VoidFunction<JavaRDD<WindowedValue<T>>>() {
- @Override
- public void call(JavaRDD<WindowedValue<T>> rdd) throws Exception {
- rdd.count();
- }
- });
- }
-
- @Override
- public void close(boolean gracefully) {
- if (timeout > 0) {
- jssc.awaitTerminationOrTimeout(timeout);
- } else {
- jssc.awaitTermination();
- }
- // stop streaming context gracefully, so checkpointing (and other computations) get to
- // finish before shutdown.
- jssc.stop(false, gracefully);
- state = State.DONE;
- super.close(false);
- }
-
- private State state = State.RUNNING;
-
- @Override
- public State getState() {
- return state;
- }
-
- @Override
- public State cancel() throws IOException {
- throw new UnsupportedOperationException(
- "Spark runner StreamingEvaluationContext does not support cancel.");
- }
-
- @Override
- public State waitUntilFinish() {
- throw new UnsupportedOperationException(
- "Spark runner StreamingEvaluationContext does not support waitUntilFinish.");
- }
-
- @Override
- public State waitUntilFinish(Duration duration) {
- throw new UnsupportedOperationException(
- "Spark runner StreamingEvaluationContext does not support waitUntilFinish.");
- }
-
- //---------------- override in order to expose in package
- @Override
- protected <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform) {
- return super.getInput(transform);
- }
- @Override
- protected <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform) {
- return super.getOutput(transform);
- }
-
- @Override
- protected JavaSparkContext getSparkContext() {
- return super.getSparkContext();
- }
-
- @Override
- protected SparkRuntimeContext getRuntimeContext() {
- return super.getRuntimeContext();
- }
-
- @Override
- public void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) {
- super.setCurrentTransform(transform);
- }
-
- @Override
- protected AppliedPTransform<?, ?, ?> getCurrentTransform() {
- return super.getCurrentTransform();
- }
-
- @Override
- protected <T> void setOutputRDD(PTransform<?, ?> transform,
- JavaRDDLike<WindowedValue<T>, ?> rdd) {
- super.setOutputRDD(transform, rdd);
- }
-
- @Override
- protected <T> void setOutputRDDFromValues(PTransform<?, ?> transform, Iterable<T> values,
- Coder<T> coder) {
- super.setOutputRDDFromValues(transform, values, coder);
- }
-
- @Override
- protected boolean hasOutputRDD(PTransform<? extends PInput, ?> transform) {
- return super.hasOutputRDD(transform);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1bef01fe/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index 71c27df..b30f079 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -29,6 +29,8 @@ import org.apache.beam.runners.spark.aggregators.NamedAggregators;
import org.apache.beam.runners.spark.io.ConsoleIO;
import org.apache.beam.runners.spark.io.CreateStream;
import org.apache.beam.runners.spark.io.SparkUnboundedSource;
+import org.apache.beam.runners.spark.translation.BoundedDataset;
+import org.apache.beam.runners.spark.translation.Dataset;
import org.apache.beam.runners.spark.translation.DoFnFunction;
import org.apache.beam.runners.spark.translation.EvaluationContext;
import org.apache.beam.runners.spark.translation.GroupCombineFunctions;
@@ -71,15 +73,13 @@ import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaDStreamLike;
import org.apache.spark.streaming.api.java.JavaPairDStream;
-
/**
* Supports translation between a Beam transform, and Spark's operations on DStreams.
*/
-public final class StreamingTransformTranslator {
+final class StreamingTransformTranslator {
private StreamingTransformTranslator() {
}
@@ -89,9 +89,8 @@ public final class StreamingTransformTranslator {
@Override
public void evaluate(ConsoleIO.Write.Unbound<T> transform, EvaluationContext context) {
@SuppressWarnings("unchecked")
- JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>> dstream =
- (JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>>)
- ((StreamingEvaluationContext) context).getStream(transform);
+ JavaDStream<WindowedValue<T>> dstream =
+ ((UnboundedDataset<T>) (context).borrowDataset(transform)).getDStream();
dstream.map(WindowingHelpers.<T>unwindowFunction()).print(transform.getNum());
}
};
@@ -101,9 +100,9 @@ public final class StreamingTransformTranslator {
return new TransformEvaluator<Read.Unbounded<T>>() {
@Override
public void evaluate(Read.Unbounded<T> transform, EvaluationContext context) {
- StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
- sec.setStream(transform, SparkUnboundedSource.read(sec.getStreamingContext(),
- sec.getRuntimeContext(), transform.getSource()));
+ context.putDataset(transform,
+ new UnboundedDataset<>(SparkUnboundedSource.read(context.getStreamingContext(),
+ context.getRuntimeContext(), transform.getSource())));
}
};
}
@@ -112,10 +111,9 @@ public final class StreamingTransformTranslator {
return new TransformEvaluator<CreateStream.QueuedValues<T>>() {
@Override
public void evaluate(CreateStream.QueuedValues<T> transform, EvaluationContext context) {
- StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
Iterable<Iterable<T>> values = transform.getQueuedValues();
- Coder<T> coder = sec.getOutput(transform).getCoder();
- sec.setDStreamFromQueue(transform, values, coder);
+ Coder<T> coder = context.getOutput(transform).getCoder();
+ context.putUnboundedDatasetFromQueue(transform, values, coder);
}
};
}
@@ -125,23 +123,23 @@ public final class StreamingTransformTranslator {
@SuppressWarnings("unchecked")
@Override
public void evaluate(Flatten.FlattenPCollectionList<T> transform, EvaluationContext context) {
- StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
- PCollectionList<T> pcs = sec.getInput(transform);
+ PCollectionList<T> pcs = context.getInput(transform);
// since this is a streaming pipeline, at least one of the PCollections to "flatten" are
// unbounded, meaning it represents a DStream.
// So we could end up with an unbounded unified DStream.
final List<JavaRDD<WindowedValue<T>>> rdds = new ArrayList<>();
final List<JavaDStream<WindowedValue<T>>> dStreams = new ArrayList<>();
- for (PCollection<T> pcol: pcs.getAll()) {
- if (sec.hasStream(pcol)) {
- dStreams.add((JavaDStream<WindowedValue<T>>) sec.getStream(pcol));
+ for (PCollection<T> pcol : pcs.getAll()) {
+ Dataset dataset = context.borrowDataset(pcol);
+ if (dataset instanceof UnboundedDataset) {
+ dStreams.add(((UnboundedDataset<T>) dataset).getDStream());
} else {
- rdds.add((JavaRDD<WindowedValue<T>>) context.getRDD(pcol));
+ rdds.add(((BoundedDataset<T>) dataset).getRDD());
}
}
// start by unifying streams into a single stream.
JavaDStream<WindowedValue<T>> unifiedStreams =
- sec.getStreamingContext().union(dStreams.remove(0), dStreams);
+ context.getStreamingContext().union(dStreams.remove(0), dStreams);
// now unify in RDDs.
if (rdds.size() > 0) {
JavaDStream<WindowedValue<T>> joined = unifiedStreams.transform(
@@ -152,9 +150,9 @@ public final class StreamingTransformTranslator {
return new JavaSparkContext(streamRdd.context()).union(streamRdd, rdds);
}
});
- sec.setStream(transform, joined);
+ context.putDataset(transform, new UnboundedDataset<>(joined));
} else {
- sec.setStream(transform, unifiedStreams);
+ context.putDataset(transform, new UnboundedDataset<>(unifiedStreams));
}
}
};
@@ -164,12 +162,11 @@ public final class StreamingTransformTranslator {
return new TransformEvaluator<Window.Bound<T>>() {
@Override
public void evaluate(Window.Bound<T> transform, EvaluationContext context) {
- StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
@SuppressWarnings("unchecked")
WindowFn<? super T, W> windowFn = (WindowFn<? super T, W>) transform.getWindowFn();
@SuppressWarnings("unchecked")
JavaDStream<WindowedValue<T>> dStream =
- (JavaDStream<WindowedValue<T>>) sec.getStream(transform);
+ ((UnboundedDataset<T>) context.borrowDataset(transform)).getDStream();
// get the right window durations.
Duration windowDuration;
Duration slideDuration;
@@ -188,10 +185,10 @@ public final class StreamingTransformTranslator {
dStream.window(windowDuration, slideDuration);
//--- then we apply windowing to the elements
if (TranslationUtils.skipAssignWindows(transform, context)) {
- sec.setStream(transform, windowedDStream);
+ context.putDataset(transform, new UnboundedDataset<>(windowedDStream));
} else {
final OldDoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn);
- final SparkRuntimeContext runtimeContext = sec.getRuntimeContext();
+ final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
JavaDStream<WindowedValue<T>> outStream = windowedDStream.transform(
new Function<JavaRDD<WindowedValue<T>>, JavaRDD<WindowedValue<T>>>() {
@Override
@@ -202,7 +199,7 @@ public final class StreamingTransformTranslator {
new DoFnFunction<>(accum, addWindowsDoFn, runtimeContext, null, null));
}
});
- sec.setStream(transform, outStream);
+ context.putDataset(transform, new UnboundedDataset<>(outStream));
}
}
};
@@ -212,18 +209,16 @@ public final class StreamingTransformTranslator {
return new TransformEvaluator<GroupByKey<K, V>>() {
@Override
public void evaluate(GroupByKey<K, V> transform, EvaluationContext context) {
- StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
-
@SuppressWarnings("unchecked")
JavaDStream<WindowedValue<KV<K, V>>> dStream =
- (JavaDStream<WindowedValue<KV<K, V>>>) sec.getStream(transform);
+ ((UnboundedDataset<KV<K, V>>) context.borrowDataset(transform)).getDStream();
@SuppressWarnings("unchecked")
- final KvCoder<K, V> coder = (KvCoder<K, V>) sec.getInput(transform).getCoder();
+ final KvCoder<K, V> coder = (KvCoder<K, V>) context.getInput(transform).getCoder();
- final SparkRuntimeContext runtimeContext = sec.getRuntimeContext();
+ final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
final WindowingStrategy<?, ?> windowingStrategy =
- sec.getInput(transform).getWindowingStrategy();
+ context.getInput(transform).getWindowingStrategy();
JavaDStream<WindowedValue<KV<K, Iterable<V>>>> outStream =
dStream.transform(new Function<JavaRDD<WindowedValue<KV<K, V>>>,
@@ -237,7 +232,7 @@ public final class StreamingTransformTranslator {
windowingStrategy);
}
});
- sec.setStream(transform, outStream);
+ context.putDataset(transform, new UnboundedDataset<>(outStream));
}
};
}
@@ -245,29 +240,29 @@ public final class StreamingTransformTranslator {
private static <K, InputT, OutputT> TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>>
combineGrouped() {
return new TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>>() {
+ @SuppressWarnings("unchecked")
@Override
public void evaluate(Combine.GroupedValues<K, InputT, OutputT> transform,
EvaluationContext context) {
- StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
// get the applied combine function.
PCollection<? extends KV<K, ? extends Iterable<InputT>>> input =
- sec.getInput(transform);
+ context.getInput(transform);
WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
- @SuppressWarnings("unchecked")
final CombineWithContext.KeyedCombineFnWithContext<K, InputT, ?, OutputT> fn =
(CombineWithContext.KeyedCombineFnWithContext<K, InputT, ?, OutputT>)
CombineFnUtil.toFnWithContext(transform.getFn());
- @SuppressWarnings("unchecked")
JavaDStream<WindowedValue<KV<K, Iterable<InputT>>>> dStream =
- (JavaDStream<WindowedValue<KV<K, Iterable<InputT>>>>) sec.getStream(transform);
+ ((UnboundedDataset<KV<K, Iterable<InputT>>>) context.borrowDataset(transform))
+ .getDStream();
- SparkKeyedCombineFn<K, InputT, ?, OutputT> combineFnWithContext =
- new SparkKeyedCombineFn<>(fn, sec.getRuntimeContext(),
+ SparkKeyedCombineFn<K, InputT, ?, OutputT> combineFnWithContext =
+ new SparkKeyedCombineFn<>(fn, context.getRuntimeContext(),
TranslationUtils.getSideInputs(transform.getSideInputs(), context),
- windowingStrategy);
- sec.setStream(transform, dStream.map(new TranslationUtils.CombineGroupedValues<>(
- combineFnWithContext)));
+ windowingStrategy);
+ context.putDataset(transform, new UnboundedDataset<>(dStream.map(new TranslationUtils
+ .CombineGroupedValues<>(
+ combineFnWithContext))));
}
};
}
@@ -276,26 +271,24 @@ public final class StreamingTransformTranslator {
combineGlobally() {
return new TransformEvaluator<Combine.Globally<InputT, OutputT>>() {
+ @SuppressWarnings("unchecked")
@Override
public void evaluate(Combine.Globally<InputT, OutputT> transform, EvaluationContext context) {
- StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
- final PCollection<InputT> input = sec.getInput(transform);
+ final PCollection<InputT> input = context.getInput(transform);
// serializable arguments to pass.
- final Coder<InputT> iCoder = sec.getInput(transform).getCoder();
- final Coder<OutputT> oCoder = sec.getOutput(transform).getCoder();
- @SuppressWarnings("unchecked")
+ final Coder<InputT> iCoder = context.getInput(transform).getCoder();
+ final Coder<OutputT> oCoder = context.getOutput(transform).getCoder();
final CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn =
(CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT>)
CombineFnUtil.toFnWithContext(transform.getFn());
final WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
- final SparkRuntimeContext runtimeContext = sec.getRuntimeContext();
+ final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
TranslationUtils.getSideInputs(transform.getSideInputs(), context);
final boolean hasDefault = transform.isInsertDefault();
- @SuppressWarnings("unchecked")
JavaDStream<WindowedValue<InputT>> dStream =
- (JavaDStream<WindowedValue<InputT>>) sec.getStream(transform);
+ ((UnboundedDataset<InputT>) context.borrowDataset(transform)).getDStream();
JavaDStream<WindowedValue<OutputT>> outStream = dStream.transform(
new Function<JavaRDD<WindowedValue<InputT>>, JavaRDD<WindowedValue<OutputT>>>() {
@@ -307,7 +300,7 @@ public final class StreamingTransformTranslator {
}
});
- sec.setStream(transform, outStream);
+ context.putDataset(transform, new UnboundedDataset<>(outStream));
}
};
}
@@ -315,27 +308,24 @@ public final class StreamingTransformTranslator {
private static <K, InputT, AccumT, OutputT>
TransformEvaluator<Combine.PerKey<K, InputT, OutputT>> combinePerKey() {
return new TransformEvaluator<Combine.PerKey<K, InputT, OutputT>>() {
+ @SuppressWarnings("unchecked")
@Override
public void evaluate(final Combine.PerKey<K, InputT, OutputT> transform,
final EvaluationContext context) {
- StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
- final PCollection<KV<K, InputT>> input = sec.getInput(transform);
+ final PCollection<KV<K, InputT>> input = context.getInput(transform);
// serializable arguments to pass.
- @SuppressWarnings("unchecked")
final KvCoder<K, InputT> inputCoder =
- (KvCoder<K, InputT>) sec.getInput(transform).getCoder();
- @SuppressWarnings("unchecked")
+ (KvCoder<K, InputT>) context.getInput(transform).getCoder();
final CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn =
(CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>)
CombineFnUtil.toFnWithContext(transform.getFn());
final WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
- final SparkRuntimeContext runtimeContext = sec.getRuntimeContext();
+ final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
TranslationUtils.getSideInputs(transform.getSideInputs(), context);
- @SuppressWarnings("unchecked")
JavaDStream<WindowedValue<KV<K, InputT>>> dStream =
- (JavaDStream<WindowedValue<KV<K, InputT>>>) sec.getStream(transform);
+ ((UnboundedDataset<KV<K, InputT>>) context.borrowDataset(transform)).getDStream();
JavaDStream<WindowedValue<KV<K, OutputT>>> outStream =
dStream.transform(new Function<JavaRDD<WindowedValue<KV<K, InputT>>>,
@@ -347,26 +337,24 @@ public final class StreamingTransformTranslator {
windowingStrategy, sideInputs);
}
});
- sec.setStream(transform, outStream);
+ context.putDataset(transform, new UnboundedDataset<>(outStream));
}
};
}
private static <InputT, OutputT> TransformEvaluator<ParDo.Bound<InputT, OutputT>> parDo() {
return new TransformEvaluator<ParDo.Bound<InputT, OutputT>>() {
+ @SuppressWarnings("unchecked")
@Override
public void evaluate(final ParDo.Bound<InputT, OutputT> transform,
final EvaluationContext context) {
- final StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
- final SparkRuntimeContext runtimeContext = sec.getRuntimeContext();
+ final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
TranslationUtils.getSideInputs(transform.getSideInputs(), context);
- @SuppressWarnings("unchecked")
final WindowFn<Object, ?> windowFn =
- (WindowFn<Object, ?>) sec.getInput(transform).getWindowingStrategy().getWindowFn();
- @SuppressWarnings("unchecked")
+ (WindowFn<Object, ?>) context.getInput(transform).getWindowingStrategy().getWindowFn();
JavaDStream<WindowedValue<InputT>> dStream =
- (JavaDStream<WindowedValue<InputT>>) sec.getStream(transform);
+ ((UnboundedDataset<InputT>) context.borrowDataset(transform)).getDStream();
JavaDStream<WindowedValue<OutputT>> outStream =
dStream.transform(new Function<JavaRDD<WindowedValue<InputT>>,
@@ -381,7 +369,7 @@ public final class StreamingTransformTranslator {
}
});
- sec.setStream(transform, outStream);
+ context.putDataset(transform, new UnboundedDataset<>(outStream));
}
};
}
@@ -392,16 +380,15 @@ public final class StreamingTransformTranslator {
@Override
public void evaluate(final ParDo.BoundMulti<InputT, OutputT> transform,
final EvaluationContext context) {
- final StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
- final SparkRuntimeContext runtimeContext = sec.getRuntimeContext();
+ final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
TranslationUtils.getSideInputs(transform.getSideInputs(), context);
@SuppressWarnings("unchecked")
final WindowFn<Object, ?> windowFn =
- (WindowFn<Object, ?>) sec.getInput(transform).getWindowingStrategy().getWindowFn();
+ (WindowFn<Object, ?>) context.getInput(transform).getWindowingStrategy().getWindowFn();
@SuppressWarnings("unchecked")
JavaDStream<WindowedValue<InputT>> dStream =
- (JavaDStream<WindowedValue<InputT>>) sec.getStream(transform);
+ ((UnboundedDataset<InputT>) context.borrowDataset(transform)).getDStream();
JavaPairDStream<TupleTag<?>, WindowedValue<?>> all = dStream.transformToPair(
new Function<JavaRDD<WindowedValue<InputT>>,
JavaPairRDD<TupleTag<?>, WindowedValue<?>>>() {
@@ -414,7 +401,7 @@ public final class StreamingTransformTranslator {
runtimeContext, transform.getMainOutputTag(), sideInputs, windowFn));
}
}).cache();
- PCollectionTuple pct = sec.getOutput(transform);
+ PCollectionTuple pct = context.getOutput(transform);
for (Map.Entry<TupleTag<?>, PCollection<?>> e : pct.getAll().entrySet()) {
@SuppressWarnings("unchecked")
JavaPairDStream<TupleTag<?>, WindowedValue<?>> filtered =
@@ -424,7 +411,7 @@ public final class StreamingTransformTranslator {
JavaDStream<WindowedValue<Object>> values =
(JavaDStream<WindowedValue<Object>>)
(JavaDStream<?>) TranslationUtils.dStreamValues(filtered);
- sec.setStream(e.getValue(), values);
+ context.putDataset(e.getValue(), new UnboundedDataset<>(values));
}
}
};
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1bef01fe/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
new file mode 100644
index 0000000..67adee2
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.translation.streaming;
+
+import com.google.common.collect.Iterables;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.translation.Dataset;
+import org.apache.beam.runners.spark.translation.WindowingHelpers;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.VoidFunction;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+
+
+/**
+ * DStream holder Can also crate a DStream from a supplied queue of values, but mainly for testing.
+ */
+public class UnboundedDataset<T> implements Dataset {
+ // only set if creating a DStream from a static collection
+ @Nullable private transient JavaStreamingContext jssc;
+
+ private Iterable<Iterable<T>> values;
+ private Coder<T> coder;
+ private JavaDStream<WindowedValue<T>> dStream;
+
+ UnboundedDataset(JavaDStream<WindowedValue<T>> dStream) {
+ this.dStream = dStream;
+ }
+
+ public UnboundedDataset(Iterable<Iterable<T>> values, JavaStreamingContext jssc, Coder<T> coder) {
+ this.values = values;
+ this.jssc = jssc;
+ this.coder = coder;
+ }
+
+ @SuppressWarnings("ConstantConditions")
+ JavaDStream<WindowedValue<T>> getDStream() {
+ if (dStream == null) {
+ WindowedValue.ValueOnlyWindowedValueCoder<T> windowCoder =
+ WindowedValue.getValueOnlyCoder(coder);
+ // create the DStream from queue
+ Queue<JavaRDD<WindowedValue<T>>> rddQueue = new LinkedBlockingQueue<>();
+ JavaRDD<WindowedValue<T>> lastRDD = null;
+ for (Iterable<T> v : values) {
+ Iterable<WindowedValue<T>> windowedValues =
+ Iterables.transform(v, WindowingHelpers.<T>windowValueFunction());
+ JavaRDD<WindowedValue<T>> rdd = jssc.sc().parallelize(
+ CoderHelpers.toByteArrays(windowedValues, windowCoder)).map(
+ CoderHelpers.fromByteFunction(windowCoder));
+ rddQueue.offer(rdd);
+ lastRDD = rdd;
+ }
+ // create DStream from queue, one at a time,
+ // with last as default in case batches repeat (graceful stops for example).
+ // if the stream is empty, avoid creating a default empty RDD.
+ // mainly for unit test so no reason to have this configurable.
+ dStream = lastRDD != null ? jssc.queueStream(rddQueue, true, lastRDD)
+ : jssc.queueStream(rddQueue, true);
+ }
+ return dStream;
+ }
+
+ @Override
+ public void cache() {
+ dStream.cache();
+ }
+
+ @Override
+ public void action() {
+ dStream.foreachRDD(new VoidFunction<JavaRDD<WindowedValue<T>>>() {
+ @Override
+ public void call(JavaRDD<WindowedValue<T>> rdd) throws Exception {
+ rdd.count();
+ }
+ });
+ }
+
+ @Override
+ public void setName(String name) {
+ // ignore
+ }
+}
[2/2] incubator-beam git commit: This closes #1291
Posted by am...@apache.org.
This closes #1291
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2bc66f90
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2bc66f90
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2bc66f90
Branch: refs/heads/master
Commit: 2bc66f903cdfa328c4bb3546befbaa0f58bdd6fa
Parents: 9c300cd 1bef01f
Author: Sela <an...@paypal.com>
Authored: Tue Nov 15 13:37:20 2016 +0200
Committer: Sela <an...@paypal.com>
Committed: Tue Nov 15 13:37:20 2016 +0200
----------------------------------------------------------------------
.../apache/beam/runners/spark/SparkRunner.java | 4 +-
.../spark/translation/BoundedDataset.java | 114 ++++++++
.../beam/runners/spark/translation/Dataset.java | 34 +++
.../spark/translation/EvaluationContext.java | 230 +++++++---------
.../spark/translation/TransformTranslator.java | 99 +++----
.../SparkRunnerStreamingContextFactory.java | 7 +-
.../streaming/StreamingEvaluationContext.java | 272 -------------------
.../streaming/StreamingTransformTranslator.java | 135 +++++----
.../translation/streaming/UnboundedDataset.java | 103 +++++++
9 files changed, 464 insertions(+), 534 deletions(-)
----------------------------------------------------------------------