You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2016/12/01 13:34:21 UTC

[1/2] incubator-beam git commit: [BEAM-918] Allow users to define the storage level via pipeline options

Repository: incubator-beam
Updated Branches:
  refs/heads/master 711c68092 -> 0c875ba70


[BEAM-918] Allow users to define the storage level via pipeline options


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d99829dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d99829dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d99829dd

Branch: refs/heads/master
Commit: d99829dd99db4090ceb7e5eefce50ee513c5458e
Parents: 711c680
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Thu Nov 17 12:38:00 2016 +0100
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Thu Dec 1 11:38:25 2016 +0100

----------------------------------------------------------------------
 .../runners/spark/SparkPipelineOptions.java     |  5 ++
 .../spark/translation/BoundedDataset.java       |  5 +-
 .../beam/runners/spark/translation/Dataset.java |  2 +-
 .../spark/translation/EvaluationContext.java    | 10 +++-
 .../translation/StorageLevelPTransform.java     | 43 +++++++++++++++
 .../spark/translation/TransformTranslator.java  | 27 ++++++++++
 .../translation/streaming/UnboundedDataset.java | 13 ++++-
 .../spark/translation/StorageLevelTest.java     | 56 ++++++++++++++++++++
 8 files changed, 155 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d99829dd/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index 0fd790e..3f8b379 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -44,6 +44,11 @@ public interface SparkPipelineOptions
   Long getBatchIntervalMillis();
   void setBatchIntervalMillis(Long batchInterval);
 
+  @Description("Batch default storage level")
+  @Default.String("MEMORY_ONLY")
+  String getStorageLevel();
+  void setStorageLevel(String storageLevel);
+
   @Description("Minimum time to spend on read, for each micro-batch.")
   @Default.Long(200)
   Long getMinReadTimeMillis();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d99829dd/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
index 774efb9..1cfb0e0 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaRDDLike;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.storage.StorageLevel;
 
 /**
  * Holds an RDD or values for deferred conversion to an RDD if needed. PCollections are sometimes
@@ -97,8 +98,8 @@ public class BoundedDataset<T> implements Dataset {
   }
 
   @Override
-  public void cache() {
-    rdd.cache();
+  public void cache(String storageLevel) {
+    rdd.persist(StorageLevel.fromString(storageLevel));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d99829dd/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/Dataset.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/Dataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/Dataset.java
index 36b03fe..b5d550e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/Dataset.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/Dataset.java
@@ -26,7 +26,7 @@ import java.io.Serializable;
  */
 public interface Dataset extends Serializable {
 
-  void cache();
+  void cache(String storageLevel);
 
   void action();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d99829dd/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index 1183fbb..ae45609 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -27,6 +27,7 @@ import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Set;
 import org.apache.beam.runners.spark.EvaluationResult;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
 import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
 import org.apache.beam.sdk.AggregatorRetrievalException;
@@ -155,7 +156,7 @@ public class EvaluationContext implements EvaluationResult {
     leaves.remove(dataset);
     if (multiReads.contains(pvalue)) {
       // Ensure the RDD is marked as cached
-      dataset.cache();
+      dataset.cache(storageLevel());
     } else {
       multiReads.add(pvalue);
     }
@@ -172,7 +173,8 @@ public class EvaluationContext implements EvaluationResult {
    */
   public void computeOutputs() {
     for (Dataset dataset : leaves) {
-      dataset.cache(); // cache so that any subsequent get() is cheap.
+      // cache so that any subsequent get() is cheap.
+      dataset.cache(storageLevel());
       dataset.action(); // force computation.
     }
   }
@@ -295,4 +297,8 @@ public class EvaluationContext implements EvaluationResult {
   private boolean isStreamingPipeline() {
     return jssc != null;
   }
+
+  private String storageLevel() {
+    return runtime.getPipelineOptions().as(SparkPipelineOptions.class).getStorageLevel();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d99829dd/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java
new file mode 100644
index 0000000..6944dbf
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/StorageLevelPTransform.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Get RDD storage level for the input PCollection (mostly used for testing purpose).
+ */
+public final class StorageLevelPTransform extends PTransform<PCollection<?>, PCollection<String>> {
+
+  @Override
+  public PCollection<String> apply(PCollection<?> input) {
+    return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
+        WindowingStrategy.globalDefault(),
+        PCollection.IsBounded.BOUNDED);
+  }
+
+  @Override
+  public Coder getDefaultOutputCoder() {
+    return StringUtf8Coder.of();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d99829dd/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 60d668e..66da181 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -26,6 +26,7 @@ import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceSh
 
 import com.google.common.collect.Maps;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Map;
 import org.apache.avro.mapred.AvroKey;
 import org.apache.avro.mapreduce.AvroJob;
@@ -34,6 +35,7 @@ import org.apache.beam.runners.core.AssignWindowsDoFn;
 import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.runners.spark.io.SourceRDD;
 import org.apache.beam.runners.spark.io.hadoop.HadoopIO;
 import org.apache.beam.runners.spark.io.hadoop.ShardNameTemplateHelper;
@@ -42,6 +44,7 @@ import org.apache.beam.runners.spark.io.hadoop.TemplatedTextOutputFormat;
 import org.apache.beam.runners.spark.util.BroadcastHelper;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.AvroIO;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.TextIO;
@@ -78,6 +81,7 @@ import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.PairFunction;
+
 import scala.Tuple2;
 
 
@@ -583,6 +587,27 @@ public final class TransformTranslator {
     };
   }
 
+  private static TransformEvaluator<StorageLevelPTransform> storageLevel() {
+    return new TransformEvaluator<StorageLevelPTransform>() {
+      @Override
+      public void evaluate(StorageLevelPTransform transform, EvaluationContext context) {
+        JavaRDD rdd = ((BoundedDataset) (context).borrowDataset(transform)).getRDD();
+        JavaSparkContext javaSparkContext = context.getSparkContext();
+
+        WindowedValue.ValueOnlyWindowedValueCoder<String> windowCoder =
+            WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
+        JavaRDD output =
+            javaSparkContext.parallelize(
+                CoderHelpers.toByteArrays(
+                    Collections.singletonList(rdd.getStorageLevel().description()),
+                    StringUtf8Coder.of()))
+            .map(CoderHelpers.fromByteFunction(windowCoder));
+
+        context.putDataset(transform, new BoundedDataset<String>(output));
+      }
+    };
+  }
+
   private static final Map<Class<? extends PTransform>, TransformEvaluator<?>> EVALUATORS = Maps
       .newHashMap();
 
@@ -602,6 +627,8 @@ public final class TransformTranslator {
     EVALUATORS.put(View.AsIterable.class, viewAsIter());
     EVALUATORS.put(View.CreatePCollectionView.class, createPCollView());
     EVALUATORS.put(Window.Bound.class, window());
+    // mostly test evaluators
+    EVALUATORS.put(StorageLevelPTransform.class, storageLevel());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d99829dd/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
index 67adee2..d059c7e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
@@ -31,12 +31,17 @@ import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.function.VoidFunction;
 import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * DStream holder Can also crate a DStream from a supplied queue of values, but mainly for testing.
  */
 public class UnboundedDataset<T> implements Dataset {
+
+  private static final Logger LOG = LoggerFactory.getLogger(UnboundedDataset.class);
+
   // only set if creating a DStream from a static collection
   @Nullable private transient JavaStreamingContext jssc;
 
@@ -81,12 +86,18 @@ public class UnboundedDataset<T> implements Dataset {
     return dStream;
   }
 
-  @Override
   public void cache() {
     dStream.cache();
   }
 
   @Override
+  public void cache(String storageLevel) {
+    // we "force" MEMORY storage level in streaming
+    LOG.warn("Provided StorageLevel ignored for stream, using default level");
+    cache();
+  }
+
+  @Override
   public void action() {
     dStream.foreachRDD(new VoidFunction<JavaRDD<WindowedValue<T>>>() {
       @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d99829dd/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
new file mode 100644
index 0000000..48105e1
--- /dev/null
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptions;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Test the RDD storage level defined by user.
+ */
+public class StorageLevelTest {
+
+  @Rule
+  public final transient SparkTestPipelineOptions pipelineOptions = new SparkTestPipelineOptions();
+
+  @Test
+  public void test() throws Exception {
+    pipelineOptions.getOptions().setStorageLevel("DISK_ONLY");
+    Pipeline p = Pipeline.create(pipelineOptions.getOptions());
+
+    PCollection<String> pCollection = p.apply(Create.of("foo"));
+
+    // by default, the Spark runner doesn't cache the RDD if it accessed only one time.
+    // So, to "force" the caching of the RDD, we have to call the RDD at least two time.
+    // That's why we are using Count fn on the PCollection.
+    pCollection.apply(Count.<String>globally());
+
+    PCollection<String> output = pCollection.apply(new StorageLevelPTransform());
+
+    PAssert.thatSingleton(output).isEqualTo("Disk Serialized 1x Replicated");
+
+    p.run();
+  }
+
+}


[2/2] incubator-beam git commit: [BEAM-918] This closes #1370

Posted by jb...@apache.org.
[BEAM-918] This closes #1370


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0c875ba7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0c875ba7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0c875ba7

Branch: refs/heads/master
Commit: 0c875ba704c2501c3215ffd588d02d2e4117ded2
Parents: 711c680 d99829d
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Thu Dec 1 11:43:36 2016 +0100
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Thu Dec 1 11:43:36 2016 +0100

----------------------------------------------------------------------
 .../runners/spark/SparkPipelineOptions.java     |  5 ++
 .../spark/translation/BoundedDataset.java       |  5 +-
 .../beam/runners/spark/translation/Dataset.java |  2 +-
 .../spark/translation/EvaluationContext.java    | 10 +++-
 .../translation/StorageLevelPTransform.java     | 43 +++++++++++++++
 .../spark/translation/TransformTranslator.java  | 27 ++++++++++
 .../translation/streaming/UnboundedDataset.java | 13 ++++-
 .../spark/translation/StorageLevelTest.java     | 56 ++++++++++++++++++++
 8 files changed, 155 insertions(+), 6 deletions(-)
----------------------------------------------------------------------