You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/03/23 16:29:49 UTC
[1/2] beam git commit: [BEAM-649] Analyse DAG to determine if
RDD/DStream has to be cached or not
Repository: beam
Updated Branches:
refs/heads/master c045b0ec2 -> 82b7b8613
[BEAM-649] Analyse DAG to determine if RDD/DStream has to be cached or not
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/daa10ddb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/daa10ddb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/daa10ddb
Branch: refs/heads/master
Commit: daa10ddbf7c1cc07962ab1bced0f14485f3739cb
Parents: 9ac1ffc
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Thu Mar 2 17:28:50 2017 +0100
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Thu Mar 23 17:26:11 2017 +0100
----------------------------------------------------------------------
.../apache/beam/runners/spark/SparkRunner.java | 64 ++++++++++++++++++--
.../spark/translation/BoundedDataset.java | 3 +-
.../spark/translation/EvaluationContext.java | 61 ++++++++++++-------
.../SparkRunnerStreamingContextFactory.java | 4 ++
.../apache/beam/runners/spark/CacheTest.java | 61 +++++++++++++++++++
.../spark/translation/StorageLevelTest.java | 6 +-
6 files changed, 168 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/daa10ddb/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index de648fc..fc5d4af 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -38,6 +38,7 @@ import org.apache.beam.runners.spark.translation.TransformEvaluator;
import org.apache.beam.runners.spark.translation.TransformTranslator;
import org.apache.beam.runners.spark.translation.streaming.Checkpoint.CheckpointDir;
import org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory;
+import org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.WatermarksListener;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
@@ -90,6 +91,7 @@ import org.slf4j.LoggerFactory;
public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
private static final Logger LOG = LoggerFactory.getLogger(SparkRunner.class);
+
/**
* Options used in this pipeline runner.
*/
@@ -143,10 +145,14 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
final SparkPipelineResult result;
final Future<?> startPipeline;
+
+ final SparkPipelineTranslator translator;
+
final ExecutorService executorService = Executors.newSingleThreadExecutor();
MetricsEnvironment.setMetricsSupported(true);
+ // visit the pipeline to determine the translation mode
detectTranslationMode(pipeline);
if (mOptions.isStreaming()) {
@@ -157,6 +163,11 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
JavaStreamingContext.getOrCreate(checkpointDir.getSparkCheckpointDir().toString(),
contextFactory);
+ // update cache candidates
+ translator = new StreamingTransformTranslator.Translator(
+ new TransformTranslator.Translator());
+ updateCacheCandidates(pipeline, translator, contextFactory.getEvaluationContext());
+
// Checkpoint aggregator/metrics values
jssc.addStreamingListener(
new JavaStreamingListenerWrapper(
@@ -191,8 +202,13 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
result = new SparkPipelineResult.StreamingMode(startPipeline, jssc);
} else {
+ // create the evaluation context
final JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions);
final EvaluationContext evaluationContext = new EvaluationContext(jsc, pipeline);
+ translator = new TransformTranslator.Translator();
+
+ // update the cache candidates
+ updateCacheCandidates(pipeline, translator, evaluationContext);
initAccumulators(mOptions, jsc);
@@ -200,8 +216,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
@Override
public void run() {
- pipeline.traverseTopologically(new Evaluator(new TransformTranslator.Translator(),
- evaluationContext));
+ pipeline.traverseTopologically(new Evaluator(translator, evaluationContext));
evaluationContext.computeOutputs();
LOG.info("Batch pipeline execution complete.");
}
@@ -240,9 +255,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
}
/**
- * Detect the translation mode for the pipeline and change options in case streaming
- * translation is needed.
- * @param pipeline
+ * Visit the pipeline to determine the translation mode (batch/streaming).
*/
private void detectTranslationMode(Pipeline pipeline) {
TranslationModeDetector detector = new TranslationModeDetector();
@@ -254,6 +267,17 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
}
/**
+ * Evaluator that update/populate the cache candidates.
+ */
+ private void updateCacheCandidates(
+ Pipeline pipeline,
+ SparkPipelineTranslator translator,
+ EvaluationContext evaluationContext) {
+ CacheVisitor cacheVisitor = new CacheVisitor(translator, evaluationContext);
+ pipeline.traverseTopologically(cacheVisitor);
+ }
+
+ /**
* The translation mode of the Beam Pipeline.
*/
enum TranslationMode {
@@ -298,6 +322,36 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
}
/**
+ * Traverses the pipeline to populate the candidates for caching.
+ */
+ static class CacheVisitor extends Evaluator {
+
+ protected CacheVisitor(
+ SparkPipelineTranslator translator,
+ EvaluationContext evaluationContext) {
+ super(translator, evaluationContext);
+ }
+
+ @Override
+ public void doVisitTransform(TransformHierarchy.Node node) {
+ // we populate cache candidates by updating the map with inputs of each node.
+ // The goal is to detect the PCollections accessed more than one time, and so enable cache
+ // on the underlying RDDs or DStreams.
+
+ for (TaggedPValue input : node.getInputs()) {
+ PValue value = input.getValue();
+ if (value instanceof PCollection) {
+ long count = 1L;
+ if (ctxt.getCacheCandidates().get(value) != null) {
+ count = ctxt.getCacheCandidates().get(value) + 1;
+ }
+ ctxt.getCacheCandidates().put((PCollection) value, count);
+ }
+ }
+ }
+ }
+
+ /**
* Evaluator on the pipeline.
*/
@SuppressWarnings("WeakerAccess")
http://git-wip-us.apache.org/repos/asf/beam/blob/daa10ddb/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
index 6e4ffc7..652c753 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
@@ -99,7 +99,8 @@ public class BoundedDataset<T> implements Dataset {
@Override
public void cache(String storageLevel) {
- rdd.persist(StorageLevel.fromString(storageLevel));
+ // populate the rdd if needed
+ getRDD().persist(StorageLevel.fromString(storageLevel));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/daa10ddb/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index 329e047..643749d 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -21,12 +21,14 @@ package org.apache.beam.runners.spark.translation;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.collect.Iterables;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -52,10 +54,10 @@ public class EvaluationContext {
private final Map<PValue, Dataset> datasets = new LinkedHashMap<>();
private final Map<PValue, Dataset> pcollections = new LinkedHashMap<>();
private final Set<Dataset> leaves = new LinkedHashSet<>();
- private final Set<PValue> multiReads = new LinkedHashSet<>();
private final Map<PValue, Object> pobjects = new LinkedHashMap<>();
private AppliedPTransform<?, ?, ?> currentTransform;
private final SparkPCollectionView pviews = new SparkPCollectionView();
+ private final Map<PCollection, Long> cacheCandidates = new HashMap<>();
public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline) {
this.jsc = jsc;
@@ -116,6 +118,15 @@ public class EvaluationContext {
return currentTransform.getOutputs();
}
+ private boolean shouldCache(PValue pvalue) {
+ if ((pvalue instanceof PCollection)
+ && cacheCandidates.containsKey(pvalue)
+ && cacheCandidates.get(pvalue) > 1) {
+ return true;
+ }
+ return false;
+ }
+
public void putDataset(PTransform<?, ? extends PValue> transform, Dataset dataset) {
putDataset(getOutput(transform), dataset);
}
@@ -126,13 +137,30 @@ public class EvaluationContext {
} catch (IllegalStateException e) {
// name not set, ignore
}
+ if (shouldCache(pvalue)) {
+ dataset.cache(storageLevel());
+ }
datasets.put(pvalue, dataset);
leaves.add(dataset);
}
<T> void putBoundedDatasetFromValues(
PTransform<?, ? extends PValue> transform, Iterable<T> values, Coder<T> coder) {
- datasets.put(getOutput(transform), new BoundedDataset<>(values, jsc, coder));
+ PValue output = getOutput(transform);
+ if (shouldCache(output)) {
+ // eagerly create the RDD, as it will be reused.
+ Iterable<WindowedValue<T>> elems = Iterables.transform(values,
+ WindowingHelpers.<T>windowValueFunction());
+ WindowedValue.ValueOnlyWindowedValueCoder<T> windowCoder =
+ WindowedValue.getValueOnlyCoder(coder);
+ JavaRDD<WindowedValue<T>> rdd =
+ getSparkContext().parallelize(CoderHelpers.toByteArrays(elems, windowCoder))
+ .map(CoderHelpers.fromByteFunction(windowCoder));
+ putDataset(transform, new BoundedDataset<>(rdd));
+ } else {
+ // create a BoundedDataset that would create a RDD on demand
+ datasets.put(getOutput(transform), new BoundedDataset<>(values, jsc, coder));
+ }
}
public Dataset borrowDataset(PTransform<? extends PValue, ?> transform) {
@@ -142,12 +170,6 @@ public class EvaluationContext {
public Dataset borrowDataset(PValue pvalue) {
Dataset dataset = datasets.get(pvalue);
leaves.remove(dataset);
- if (multiReads.contains(pvalue)) {
- // Ensure the RDD is marked as cached
- dataset.cache(storageLevel());
- } else {
- multiReads.add(pvalue);
- }
return dataset;
}
@@ -157,8 +179,6 @@ public class EvaluationContext {
*/
public void computeOutputs() {
for (Dataset dataset : leaves) {
- // cache so that any subsequent get() is cheap.
- dataset.cache(storageLevel());
dataset.action(); // force computation.
}
}
@@ -186,18 +206,6 @@ public class EvaluationContext {
}
/**
- * Retrieves an iterable of results associated with the PCollection passed in.
- *
- * @param pcollection Collection we wish to translate.
- * @param <T> Type of elements contained in collection.
- * @return Natively types result associated with collection.
- */
- <T> Iterable<T> get(PCollection<T> pcollection) {
- Iterable<WindowedValue<T>> windowedValues = getWindowedValues(pcollection);
- return Iterables.transform(windowedValues, WindowingHelpers.<T>unwindowValueFunction());
- }
-
- /**
* Retrun the current views creates in the pipepline.
*
* @return SparkPCollectionView
@@ -220,6 +228,15 @@ public class EvaluationContext {
pviews.putPView(view, value, coder);
}
+ /**
+ * Get the map of cache candidates hold by the evaluation context.
+ *
+ * @return The current {@link Map} of cache candidates.
+ */
+ public Map<PCollection, Long> getCacheCandidates() {
+ return this.cacheCandidates;
+ }
+
<T> Iterable<WindowedValue<T>> getWindowedValues(PCollection<T> pcollection) {
@SuppressWarnings("unchecked")
BoundedDataset<T> boundedDataset = (BoundedDataset<T>) datasets.get(pcollection);
http://git-wip-us.apache.org/repos/asf/beam/blob/daa10ddb/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
index 7048be6..c298886 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
@@ -91,6 +91,10 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextF
return jssc;
}
+ public EvaluationContext getEvaluationContext() {
+ return this.ctxt;
+ }
+
private void checkpoint(JavaStreamingContext jssc) {
Path rootCheckpointPath = checkpointDir.getRootCheckpointDir();
Path sparkCheckpointPath = checkpointDir.getSparkCheckpointDir();
http://git-wip-us.apache.org/repos/asf/beam/blob/daa10ddb/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java
new file mode 100644
index 0000000..c3b48d8
--- /dev/null
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.runners.spark.translation.EvaluationContext;
+import org.apache.beam.runners.spark.translation.SparkContextFactory;
+import org.apache.beam.runners.spark.translation.TransformTranslator;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * This test checks how the cache candidates map is populated by the runner when evaluating the
+ * pipeline.
+ */
+public class CacheTest {
+
+ @Rule
+ public final transient PipelineRule pipelineRule = PipelineRule.batch();
+
+ @Test
+ public void cacheCandidatesUpdaterTest() throws Exception {
+ Pipeline pipeline = pipelineRule.createPipeline();
+ PCollection<String> pCollection = pipeline.apply(Create.of("foo", "bar"));
+ // first read
+ pCollection.apply(Count.<String>globally());
+ // second read
+ // as we access the same PCollection two times, the Spark runner does optimization and so
+ // will cache the RDD representing this PCollection
+ pCollection.apply(Count.<String>globally());
+
+ JavaSparkContext jsc = SparkContextFactory.getSparkContext(pipelineRule.getOptions());
+ EvaluationContext ctxt = new EvaluationContext(jsc, pipeline);
+ SparkRunner.CacheVisitor cacheVisitor =
+ new SparkRunner.CacheVisitor(new TransformTranslator.Translator(), ctxt);
+ pipeline.traverseTopologically(cacheVisitor);
+ assertEquals(2L, (long) ctxt.getCacheCandidates().get(pCollection));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/daa10ddb/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
index 4dc5dee..2b7b87b 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
@@ -37,9 +37,9 @@ public class StorageLevelTest {
@Test
public void test() throws Exception {
pipelineRule.getOptions().setStorageLevel("DISK_ONLY");
- Pipeline p = pipelineRule.createPipeline();
+ Pipeline pipeline = pipelineRule.createPipeline();
- PCollection<String> pCollection = p.apply(Create.of("foo"));
+ PCollection<String> pCollection = pipeline.apply(Create.of("foo"));
// by default, the Spark runner doesn't cache the RDD if it accessed only one time.
// So, to "force" the caching of the RDD, we have to call the RDD at least two time.
@@ -50,7 +50,7 @@ public class StorageLevelTest {
PAssert.thatSingleton(output).isEqualTo("Disk Serialized 1x Replicated");
- p.run();
+ pipeline.run();
}
}
[2/2] beam git commit: [BEAM-649] This closes #1739
Posted by jb...@apache.org.
[BEAM-649] This closes #1739
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/82b7b861
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/82b7b861
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/82b7b861
Branch: refs/heads/master
Commit: 82b7b86139167a5eae58a7eb6ace05f2776ada56
Parents: c045b0e daa10dd
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Thu Mar 23 17:29:44 2017 +0100
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Thu Mar 23 17:29:44 2017 +0100
----------------------------------------------------------------------
.../apache/beam/runners/spark/SparkRunner.java | 64 ++++++++++++++++++--
.../spark/translation/BoundedDataset.java | 3 +-
.../spark/translation/EvaluationContext.java | 61 ++++++++++++-------
.../SparkRunnerStreamingContextFactory.java | 4 ++
.../apache/beam/runners/spark/CacheTest.java | 61 +++++++++++++++++++
.../spark/translation/StorageLevelTest.java | 6 +-
6 files changed, 168 insertions(+), 31 deletions(-)
----------------------------------------------------------------------