You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2016/12/01 13:34:21 UTC
[1/2] incubator-beam git commit: [BEAM-918] Allow users to define the
storage level via pipeline options
Repository: incubator-beam
Updated Branches:
refs/heads/master 711c68092 -> 0c875ba70
[BEAM-918] Allow users to define the storage level via pipeline options
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d99829dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d99829dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d99829dd
Branch: refs/heads/master
Commit: d99829dd99db4090ceb7e5eefce50ee513c5458e
Parents: 711c680
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Thu Nov 17 12:38:00 2016 +0100
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Thu Dec 1 11:38:25 2016 +0100
----------------------------------------------------------------------
.../runners/spark/SparkPipelineOptions.java | 5 ++
.../spark/translation/BoundedDataset.java | 5 +-
.../beam/runners/spark/translation/Dataset.java | 2 +-
.../spark/translation/EvaluationContext.java | 10 +++-
.../translation/StorageLevelPTransform.java | 43 +++++++++++++++
.../spark/translation/TransformTranslator.java | 27 ++++++++++
.../translation/streaming/UnboundedDataset.java | 13 ++++-
.../spark/translation/StorageLevelTest.java | 56 ++++++++++++++++++++
8 files changed, 155 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d99829dd/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index 0fd790e..3f8b379 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -44,6 +44,11 @@ public interface SparkPipelineOptions
Long getBatchIntervalMillis();
void setBatchIntervalMillis(Long batchInterval);
+ @Description("Batch default storage level")
+ @Default.String("MEMORY_ONLY")
+ String getStorageLevel();
+ void setStorageLevel(String storageLevel);
+
@Description("Minimum time to spend on read, for each micro-batch.")
@Default.Long(200)
Long getMinReadTimeMillis();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d99829dd/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
index 774efb9..1cfb0e0 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaRDDLike;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.storage.StorageLevel;
/**
* Holds an RDD or values for deferred conversion to an RDD if needed. PCollections are sometimes
@@ -97,8 +98,8 @@ public class BoundedDataset<T> implements Dataset {
}
@Override
- public void cache() {
- rdd.cache();
+ public void cache(String storageLevel) {
+ rdd.persist(StorageLevel.fromString(storageLevel));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d99829dd/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/Dataset.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/Dataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/Dataset.java
index 36b03fe..b5d550e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/Dataset.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/Dataset.java
@@ -26,7 +26,7 @@ import java.io.Serializable;
*/
public interface Dataset extends Serializable {
- void cache();
+ void cache(String storageLevel);
void action();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d99829dd/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index 1183fbb..ae45609 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -27,6 +27,7 @@ import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import org.apache.beam.runners.spark.EvaluationResult;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
import org.apache.beam.sdk.AggregatorRetrievalException;
@@ -155,7 +156,7 @@ public class EvaluationContext implements EvaluationResult {
leaves.remove(dataset);
if (multiReads.contains(pvalue)) {
// Ensure the RDD is marked as cached
- dataset.cache();
+ dataset.cache(storageLevel());
} else {
multiReads.add(pvalue);
}
@@ -172,7 +173,8 @@ public class EvaluationContext implements EvaluationResult {
*/
public void computeOutputs() {
for (Dataset dataset : leaves) {
- dataset.cache(); // cache so that any subsequent get() is cheap.
+ // cache so that any subsequent get() is cheap.
+ dataset.cache(storageLevel());
dataset.action(); // force computation.
}
}
@@ -295,4 +297,8 @@ public class EvaluationContext implements EvaluationResult {
private boolean isStreamingPipeline() {
return jssc != null;
}
+
+ private String storageLevel() {
+ return runtime.getPipelineOptions().as(SparkPipelineOptions.class).getStorageLevel();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d99829dd/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java
new file mode 100644
index 0000000..6944dbf
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Get RDD storage level for the input PCollection (mostly used for testing purpose).
+ */
+public final class StorageLevelPTransform extends PTransform<PCollection<?>, PCollection<String>> {
+
+ @Override
+ public PCollection<String> apply(PCollection<?> input) {
+ return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
+ WindowingStrategy.globalDefault(),
+ PCollection.IsBounded.BOUNDED);
+ }
+
+ @Override
+ public Coder getDefaultOutputCoder() {
+ return StringUtf8Coder.of();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d99829dd/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 60d668e..66da181 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -26,6 +26,7 @@ import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceSh
import com.google.common.collect.Maps;
import java.io.IOException;
+import java.util.Collections;
import java.util.Map;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroJob;
@@ -34,6 +35,7 @@ import org.apache.beam.runners.core.AssignWindowsDoFn;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
import org.apache.beam.runners.spark.aggregators.NamedAggregators;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.io.SourceRDD;
import org.apache.beam.runners.spark.io.hadoop.HadoopIO;
import org.apache.beam.runners.spark.io.hadoop.ShardNameTemplateHelper;
@@ -42,6 +44,7 @@ import org.apache.beam.runners.spark.io.hadoop.TemplatedTextOutputFormat;
import org.apache.beam.runners.spark.util.BroadcastHelper;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.TextIO;
@@ -78,6 +81,7 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
+
import scala.Tuple2;
@@ -583,6 +587,27 @@ public final class TransformTranslator {
};
}
+ private static TransformEvaluator<StorageLevelPTransform> storageLevel() {
+ return new TransformEvaluator<StorageLevelPTransform>() {
+ @Override
+ public void evaluate(StorageLevelPTransform transform, EvaluationContext context) {
+ JavaRDD rdd = ((BoundedDataset) (context).borrowDataset(transform)).getRDD();
+ JavaSparkContext javaSparkContext = context.getSparkContext();
+
+ WindowedValue.ValueOnlyWindowedValueCoder<String> windowCoder =
+ WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
+ JavaRDD output =
+ javaSparkContext.parallelize(
+ CoderHelpers.toByteArrays(
+ Collections.singletonList(rdd.getStorageLevel().description()),
+ StringUtf8Coder.of()))
+ .map(CoderHelpers.fromByteFunction(windowCoder));
+
+ context.putDataset(transform, new BoundedDataset<String>(output));
+ }
+ };
+ }
+
private static final Map<Class<? extends PTransform>, TransformEvaluator<?>> EVALUATORS = Maps
.newHashMap();
@@ -602,6 +627,8 @@ public final class TransformTranslator {
EVALUATORS.put(View.AsIterable.class, viewAsIter());
EVALUATORS.put(View.CreatePCollectionView.class, createPCollView());
EVALUATORS.put(Window.Bound.class, window());
+ // mostly test evaluators
+ EVALUATORS.put(StorageLevelPTransform.class, storageLevel());
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d99829dd/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
index 67adee2..d059c7e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
@@ -31,12 +31,17 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* DStream holder Can also crate a DStream from a supplied queue of values, but mainly for testing.
*/
public class UnboundedDataset<T> implements Dataset {
+
+ private static final Logger LOG = LoggerFactory.getLogger(UnboundedDataset.class);
+
// only set if creating a DStream from a static collection
@Nullable private transient JavaStreamingContext jssc;
@@ -81,12 +86,18 @@ public class UnboundedDataset<T> implements Dataset {
return dStream;
}
- @Override
public void cache() {
dStream.cache();
}
@Override
+ public void cache(String storageLevel) {
+ // we "force" MEMORY storage level in streaming
+ LOG.warn("Provided StorageLevel ignored for stream, using default level");
+ cache();
+ }
+
+ @Override
public void action() {
dStream.foreachRDD(new VoidFunction<JavaRDD<WindowedValue<T>>>() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d99829dd/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
new file mode 100644
index 0000000..48105e1
--- /dev/null
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptions;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Test the RDD storage level defined by user.
+ */
+public class StorageLevelTest {
+
+ @Rule
+ public final transient SparkTestPipelineOptions pipelineOptions = new SparkTestPipelineOptions();
+
+ @Test
+ public void test() throws Exception {
+ pipelineOptions.getOptions().setStorageLevel("DISK_ONLY");
+ Pipeline p = Pipeline.create(pipelineOptions.getOptions());
+
+ PCollection<String> pCollection = p.apply(Create.of("foo"));
+
+ // by default, the Spark runner doesn't cache the RDD if it accessed only one time.
+ // So, to "force" the caching of the RDD, we have to call the RDD at least two time.
+ // That's why we are using Count fn on the PCollection.
+ pCollection.apply(Count.<String>globally());
+
+ PCollection<String> output = pCollection.apply(new StorageLevelPTransform());
+
+ PAssert.thatSingleton(output).isEqualTo("Disk Serialized 1x Replicated");
+
+ p.run();
+ }
+
+}
[2/2] incubator-beam git commit: [BEAM-918] This closes #1370
Posted by jb...@apache.org.
[BEAM-918] This closes #1370
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0c875ba7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0c875ba7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0c875ba7
Branch: refs/heads/master
Commit: 0c875ba704c2501c3215ffd588d02d2e4117ded2
Parents: 711c680 d99829d
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Thu Dec 1 11:43:36 2016 +0100
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Thu Dec 1 11:43:36 2016 +0100
----------------------------------------------------------------------
.../runners/spark/SparkPipelineOptions.java | 5 ++
.../spark/translation/BoundedDataset.java | 5 +-
.../beam/runners/spark/translation/Dataset.java | 2 +-
.../spark/translation/EvaluationContext.java | 10 +++-
.../translation/StorageLevelPTransform.java | 43 +++++++++++++++
.../spark/translation/TransformTranslator.java | 27 ++++++++++
.../translation/streaming/UnboundedDataset.java | 13 ++++-
.../spark/translation/StorageLevelTest.java | 56 ++++++++++++++++++++
8 files changed, 155 insertions(+), 6 deletions(-)
----------------------------------------------------------------------