You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/09/22 22:41:13 UTC

[1/2] incubator-beam git commit: [BEAM-657] Support Read.Bounded primitive.

Repository: incubator-beam
Updated Branches:
  refs/heads/master 4872bde8f -> a00d2f810


[BEAM-657] Support Read.Bounded primitive.

Support Read.Bounded primitive.

Avro requires this for snappy.

Create is supported by Read.Bounded now.

Read.Bounded support should solve gs issues now.

remove unused direct translations. Addressed by BEAM-668.

Assert deault parallelism, close reader on exception, and other improvements.

Adressed more comments.

Extra line.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cccc5848
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cccc5848
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cccc5848

Branch: refs/heads/master
Commit: cccc58489bc8cbe4d702ff9ae07f932fb96141a1
Parents: 4872bde
Author: Sela <an...@paypal.com>
Authored: Wed Sep 21 15:13:02 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Fri Sep 23 01:31:16 2016 +0300

----------------------------------------------------------------------
 examples/java/pom.xml                           |   2 +-
 .../org/apache/beam/examples/WordCountIT.java   |   6 -
 runners/spark/pom.xml                           |   6 +
 .../apache/beam/runners/spark/SparkRunner.java  |   5 -
 .../apache/beam/runners/spark/io/SourceRDD.java | 198 +++++++++++++++++++
 .../spark/translation/TransformTranslator.java  |  22 ++-
 6 files changed, 223 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cccc5848/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index 6a39f64..9a48ec6 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -184,7 +184,7 @@
                     <beamTestPipelineOptions>
                       [
                       "--project=apache-beam-testing",
-                      "--tempRoot=/tmp",
+                      "--tempRoot=gs://temp-storage-for-end-to-end-tests",
                       "--runner=org.apache.beam.runners.spark.TestSparkRunner"
                       ]
                     </beamTestPipelineOptions>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cccc5848/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
index b0e0fe0..2f2ea46 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
@@ -18,7 +18,6 @@
 
 package org.apache.beam.examples;
 
-import com.google.common.io.Resources;
 import java.util.Date;
 import org.apache.beam.examples.WordCount.WordCountOptions;
 import org.apache.beam.sdk.options.Default;
@@ -63,11 +62,6 @@ public class WordCountIT {
         new FileChecksumMatcher(options.getOutputChecksum(), options.getOutput() + "*"));
 
     String e2eTestInputPath = "gs://apache-beam-samples/apache/LICENSE";
-    // Spark runner currently doesn't support GCS I/O, change default input to:
-    // .../src/test/resources/LICENSE
-    if (options.getRunner().getName().contains("SparkRunner")) {
-      e2eTestInputPath = Resources.getResource("LICENSE").getPath();
-    }
     options.setInputFile(e2eTestInputPath);
 
     WordCount.main(TestPipeline.convertToArgs(options));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cccc5848/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 228a90b..60b2de9 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -180,6 +180,12 @@
       <artifactId>joda-time</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-compress</artifactId>
+      <version>1.9</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
       <groupId>commons-io</groupId>
       <artifactId>commons-io</artifactId>
       <version>2.4</version>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cccc5848/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 63dfe0d..3888ec2 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -28,7 +28,6 @@ import org.apache.beam.runners.spark.translation.TransformEvaluator;
 import org.apache.beam.runners.spark.translation.TransformTranslator;
 import org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory;
 import org.apache.beam.runners.spark.translation.streaming.StreamingEvaluationContext;
-import org.apache.beam.runners.spark.util.SinglePrimitiveOutputPTransform;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -36,7 +35,6 @@ import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.runners.TransformTreeNode;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PBegin;
@@ -124,9 +122,6 @@ public final class SparkRunner extends PipelineRunner<EvaluationResult> {
     if (transform instanceof GroupByKey) {
       return (OutputT) ((PCollection) input).apply(
           new GroupByKeyViaGroupByKeyOnly((GroupByKey) transform));
-    } else if (transform instanceof Create.Values) {
-      return (OutputT) super.apply(
-        new SinglePrimitiveOutputPTransform((Create.Values) transform), input);
     } else {
       return super.apply(transform, input);
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cccc5848/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
new file mode 100644
index 0000000..ec2d2cf
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.io;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.spark.Dependency;
+import org.apache.spark.InterruptibleIterator;
+import org.apache.spark.Partition;
+import org.apache.spark.SparkContext;
+import org.apache.spark.TaskContext;
+import org.apache.spark.api.java.JavaSparkContext$;
+import org.apache.spark.rdd.RDD;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+/**
+ * Classes implementing Beam {@link Source} {@link RDD}s.
+ */
+public class SourceRDD {
+
+  /**
+   * A SourceRDD.Bounded reads input from a {@link BoundedSource} and creates a Spark {@link RDD}.
+   * This is the default way for the SparkRunner to read data from Beam's BoundedSources.
+   */
+  public static class Bounded<T> extends RDD<WindowedValue<T>> {
+    private static final Logger LOG = LoggerFactory.getLogger(SourceRDD.Bounded.class);
+
+    private final BoundedSource<T> source;
+    private final SparkRuntimeContext runtimeContext;
+    private final int numPartitions;
+
+    // to satisfy Scala API.
+    private static final scala.collection.immutable.List<Dependency<?>> NIL =
+        scala.collection.immutable.List.empty();
+
+    public Bounded(SparkContext sc,
+                   BoundedSource<T> source,
+                   SparkRuntimeContext runtimeContext) {
+      super(sc, NIL, JavaSparkContext$.MODULE$.<WindowedValue<T>>fakeClassTag());
+      this.source = source;
+      this.runtimeContext = runtimeContext;
+      // the input parallelism is determined by Spark's scheduler backend.
+      // when running on YARN/SparkDeploy it's the result of max(totalCores, 2).
+      // when running on Mesos it's 8.
+      // when running local it's the total number of cores (local = 1, local[N] = N,
+      // local[*] = estimation of the machine's cores).
+      // ** the configuration "spark.default.parallelism" takes precedence over all of the above **
+      this.numPartitions = sc.defaultParallelism();
+      checkArgument(this.numPartitions > 0, "Number of partitions must be greater than zero.");
+    }
+
+    private static final long DEFAULT_BUNDLE_SIZE = 64 * 1024 * 1024;
+
+    @Override
+    public Partition[] getPartitions() {
+      long desiredSizeBytes = DEFAULT_BUNDLE_SIZE;
+      try {
+        desiredSizeBytes = source.getEstimatedSizeBytes(
+            runtimeContext.getPipelineOptions()) / numPartitions;
+      } catch (Exception e) {
+        LOG.warn("Failed to get estimated bundle size for source {}, using default bundle "
+            + "size of {} bytes.", source, DEFAULT_BUNDLE_SIZE);
+      }
+      try {
+        List<? extends Source<T>> partitionedSources = source.splitIntoBundles(desiredSizeBytes,
+            runtimeContext.getPipelineOptions());
+        Partition[] partitions = new SourcePartition[partitionedSources.size()];
+        for (int i = 0; i < partitionedSources.size(); i++) {
+          partitions[i] = new SourcePartition<>(id(), i, partitionedSources.get(i));
+        }
+        return partitions;
+      } catch (Exception e) {
+        throw new RuntimeException("Failed to create partitions for source "
+            + source.getClass().getSimpleName(), e);
+      }
+    }
+
+    @Override
+    public scala.collection.Iterator<WindowedValue<T>> compute(final Partition split,
+                                                               TaskContext context) {
+      final Iterator<WindowedValue<T>> iter = new Iterator<WindowedValue<T>>() {
+        @SuppressWarnings("unchecked")
+        SourcePartition<T> partition = (SourcePartition<T>) split;
+        BoundedSource.BoundedReader<T> reader = createReader(partition);
+
+        private boolean finished = false;
+        private boolean started = false;
+        private boolean closed = false;
+
+        @Override
+        public boolean hasNext() {
+          try {
+            if (!started) {
+              started = true;
+              finished = !reader.start();
+            } else {
+              finished = !reader.advance();
+            }
+            if (finished) {
+              // safely close the reader if there are no more elements left to read.
+              closeIfNotClosed();
+            }
+            return !finished;
+          } catch (IOException e) {
+            closeIfNotClosed();
+            throw new RuntimeException("Failed to read from reader.", e);
+          }
+        }
+
+        @Override
+        public WindowedValue<T> next() {
+          return WindowedValue.timestampedValueInGlobalWindow(reader.getCurrent(),
+              reader.getCurrentTimestamp());
+        }
+
+        @Override
+        public void remove() {
+          throw new UnsupportedOperationException("Remove from partition iterator is not allowed.");
+        }
+
+        private void closeIfNotClosed() {
+          if (!closed) {
+            closed = true;
+            try {
+              reader.close();
+            } catch (IOException e) {
+              throw new RuntimeException("Failed to close Reader.", e);
+            }
+          }
+        }
+      };
+
+      return new InterruptibleIterator<>(context,
+          scala.collection.JavaConversions.asScalaIterator(iter));
+    }
+
+    private BoundedSource.BoundedReader<T> createReader(SourcePartition<T> partition) {
+      try {
+        return ((BoundedSource<T>) partition.source).createReader(
+            runtimeContext.getPipelineOptions());
+      } catch (IOException e) {
+        throw new RuntimeException("Failed to create reader from a BoundedSource.", e);
+      }
+    }
+  }
+
+  /**
+   * An input {@link Partition} wrapping the partitioned {@link Source}.
+   */
+  private static class SourcePartition<T> implements Partition {
+
+    private final int rddId;
+    private final int index;
+    private final Source<T> source;
+
+    SourcePartition(int rddId, int index, Source<T> source) {
+      this.rddId = rddId;
+      this.index = index;
+      this.source = source;
+    }
+
+    @Override
+    public int index() {
+      return index;
+    }
+
+    @Override
+    public int hashCode() {
+      return 41 * (41 + rddId) + index;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cccc5848/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 1a0511f..59cc647 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -38,6 +38,7 @@ import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
 import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.io.SourceRDD;
 import org.apache.beam.runners.spark.io.hadoop.HadoopIO;
 import org.apache.beam.runners.spark.io.hadoop.ShardNameTemplateHelper;
 import org.apache.beam.runners.spark.io.hadoop.TemplatedAvroKeyOutputFormat;
@@ -47,6 +48,7 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.io.AvroIO;
+import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Create;
@@ -380,6 +382,21 @@ public final class TransformTranslator {
     };
   }
 
+  private static <T> TransformEvaluator<Read.Bounded<T>> readBounded() {
+    return new TransformEvaluator<Read.Bounded<T>>() {
+      @Override
+      public void evaluate(Read.Bounded<T> transform, EvaluationContext context) {
+        final JavaSparkContext jsc = context.getSparkContext();
+        final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
+        // create an RDD from a BoundedSource.
+        JavaRDD<WindowedValue<T>> input = new SourceRDD.Bounded<>(
+            jsc.sc(), transform.getSource(), runtimeContext).toJavaRDD();
+        // cache to avoid re-evaluation of the source by Spark's lazy DAG evaluation.
+        context.setOutputRDD(transform, input.cache());
+      }
+    };
+  }
+
   private static <K, V> TransformEvaluator<HadoopIO.Read.Bound<K, V>> readHadoop() {
     return new TransformEvaluator<HadoopIO.Read.Bound<K, V>>() {
       @Override
@@ -561,10 +578,7 @@ public final class TransformTranslator {
       .newHashMap();
 
   static {
-    EVALUATORS.put(TextIO.Read.Bound.class, readText());
-    EVALUATORS.put(TextIO.Write.Bound.class, writeText());
-    EVALUATORS.put(AvroIO.Read.Bound.class, readAvro());
-    EVALUATORS.put(AvroIO.Write.Bound.class, writeAvro());
+    EVALUATORS.put(Read.Bounded.class, readBounded());
     EVALUATORS.put(HadoopIO.Read.Bound.class, readHadoop());
     EVALUATORS.put(HadoopIO.Write.Bound.class, writeHadoop());
     EVALUATORS.put(ParDo.Bound.class, parDo());


[2/2] incubator-beam git commit: This closes #983

Posted by am...@apache.org.
This closes #983


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a00d2f81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a00d2f81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a00d2f81

Branch: refs/heads/master
Commit: a00d2f810992c450c14eab2bb5e3aa3ad3f80f74
Parents: 4872bde cccc584
Author: Sela <an...@paypal.com>
Authored: Fri Sep 23 01:32:26 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Fri Sep 23 01:32:26 2016 +0300

----------------------------------------------------------------------
 examples/java/pom.xml                           |   2 +-
 .../org/apache/beam/examples/WordCountIT.java   |   6 -
 runners/spark/pom.xml                           |   6 +
 .../apache/beam/runners/spark/SparkRunner.java  |   5 -
 .../apache/beam/runners/spark/io/SourceRDD.java | 198 +++++++++++++++++++
 .../spark/translation/TransformTranslator.java  |  22 ++-
 6 files changed, 223 insertions(+), 16 deletions(-)
----------------------------------------------------------------------