You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/04/20 23:43:01 UTC

[1/2] incubator-beam git commit: Closes #214

Repository: incubator-beam
Updated Branches:
  refs/heads/master b36ecc493 -> de601a833


Closes #214


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/de601a83
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/de601a83
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/de601a83

Branch: refs/heads/master
Commit: de601a833cf3e92e973b090c672ad25849c52a96
Parents: b36ecc4 46e4187
Author: Dan Halperin <dh...@google.com>
Authored: Wed Apr 20 14:42:47 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Apr 20 14:42:47 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/spark/SparkPipelineRunner.java |  5 ++
 .../util/SinglePrimitiveOutputPTransform.java   | 48 ++++++++++++++++++++
 2 files changed, 53 insertions(+)
----------------------------------------------------------------------



[2/2] incubator-beam git commit: Override Create in the SparkPipelineRunner

Posted by dh...@apache.org.
Override Create in the SparkPipelineRunner

This allows existing pipelines to continue to function by keeping the
graph structure identical while replacing Create with a Read.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/46e4187b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/46e4187b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/46e4187b

Branch: refs/heads/master
Commit: 46e4187bd9f16da16444baa0ff20945f16f3746c
Parents: b36ecc4
Author: Thomas Groh <tg...@google.com>
Authored: Tue Apr 19 16:12:32 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Apr 20 14:42:47 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/spark/SparkPipelineRunner.java |  5 ++
 .../util/SinglePrimitiveOutputPTransform.java   | 48 ++++++++++++++++++++
 2 files changed, 53 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/46e4187b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
index 2b33e7a..8635cfb 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
@@ -27,12 +27,14 @@ import org.apache.beam.runners.spark.translation.TransformTranslator;
 import org.apache.beam.runners.spark.translation.streaming.StreamingEvaluationContext;
 import org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator;
 import org.apache.beam.runners.spark.translation.streaming.StreamingWindowPipelineDetector;
+import org.apache.beam.runners.spark.util.SinglePrimitiveOutputPTransform;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly;
@@ -124,6 +126,9 @@ public final class SparkPipelineRunner extends PipelineRunner<EvaluationResult>
     if (transform instanceof GroupByKey) {
       return (OT) ((PCollection) input).apply(
           new GroupByKeyViaGroupByKeyOnly((GroupByKey) transform));
+    } else if (transform instanceof Create.Values) {
+      return (OT) super.apply(
+        new SinglePrimitiveOutputPTransform((Create.Values) transform), input);
     } else {
       return super.apply(transform, input);
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/46e4187b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
new file mode 100644
index 0000000..9a8aa2e
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.util;
+
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PInput;
+
+public class SinglePrimitiveOutputPTransform<T> extends PTransform<PInput, PCollection<T>> {
+  private PTransform<PInput, PCollection<T>> transform;
+
+  public SinglePrimitiveOutputPTransform(PTransform<PInput, PCollection<T>> transform) {
+    this.transform = transform;
+  }
+
+  @Override
+  public PCollection<T> apply(PInput input) {
+    try {
+      PCollection<T> collection = PCollection.<T>createPrimitiveOutputInternal(
+              input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+      collection.setCoder(transform.getDefaultOutputCoder(input, collection));
+      return collection;
+    } catch (CannotProvideCoderException e) {
+      throw new IllegalArgumentException(
+          "Unable to infer a coder and no Coder was specified. "
+              + "Please set a coder by invoking Create.withCoder() explicitly.",
+          e);
+    }
+  }
+}