You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/04/20 23:43:01 UTC
[1/2] incubator-beam git commit: Closes #214
Repository: incubator-beam
Updated Branches:
refs/heads/master b36ecc493 -> de601a833
Closes #214
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/de601a83
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/de601a83
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/de601a83
Branch: refs/heads/master
Commit: de601a833cf3e92e973b090c672ad25849c52a96
Parents: b36ecc4 46e4187
Author: Dan Halperin <dh...@google.com>
Authored: Wed Apr 20 14:42:47 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Apr 20 14:42:47 2016 -0700
----------------------------------------------------------------------
.../beam/runners/spark/SparkPipelineRunner.java | 5 ++
.../util/SinglePrimitiveOutputPTransform.java | 48 ++++++++++++++++++++
2 files changed, 53 insertions(+)
----------------------------------------------------------------------
[2/2] incubator-beam git commit: Override Create in the
SparkPipelineRunner
Posted by dh...@apache.org.
Override Create in the SparkPipelineRunner
This allows existing pipelines to continue to function by keeping the
graph structure identical while replacing Create with a Read.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/46e4187b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/46e4187b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/46e4187b
Branch: refs/heads/master
Commit: 46e4187bd9f16da16444baa0ff20945f16f3746c
Parents: b36ecc4
Author: Thomas Groh <tg...@google.com>
Authored: Tue Apr 19 16:12:32 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Apr 20 14:42:47 2016 -0700
----------------------------------------------------------------------
.../beam/runners/spark/SparkPipelineRunner.java | 5 ++
.../util/SinglePrimitiveOutputPTransform.java | 48 ++++++++++++++++++++
2 files changed, 53 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/46e4187b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
index 2b33e7a..8635cfb 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
@@ -27,12 +27,14 @@ import org.apache.beam.runners.spark.translation.TransformTranslator;
import org.apache.beam.runners.spark.translation.streaming.StreamingEvaluationContext;
import org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator;
import org.apache.beam.runners.spark.translation.streaming.StreamingWindowPipelineDetector;
+import org.apache.beam.runners.spark.util.SinglePrimitiveOutputPTransform;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly;
@@ -124,6 +126,9 @@ public final class SparkPipelineRunner extends PipelineRunner<EvaluationResult>
if (transform instanceof GroupByKey) {
return (OT) ((PCollection) input).apply(
new GroupByKeyViaGroupByKeyOnly((GroupByKey) transform));
+ } else if (transform instanceof Create.Values) {
+ return (OT) super.apply(
+ new SinglePrimitiveOutputPTransform((Create.Values) transform), input);
} else {
return super.apply(transform, input);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/46e4187b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
new file mode 100644
index 0000000..9a8aa2e
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.util;
+
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PInput;
+
+public class SinglePrimitiveOutputPTransform<T> extends PTransform<PInput, PCollection<T>> {
+ private PTransform<PInput, PCollection<T>> transform;
+
+ public SinglePrimitiveOutputPTransform(PTransform<PInput, PCollection<T>> transform) {
+ this.transform = transform;
+ }
+
+ @Override
+ public PCollection<T> apply(PInput input) {
+ try {
+ PCollection<T> collection = PCollection.<T>createPrimitiveOutputInternal(
+ input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+ collection.setCoder(transform.getDefaultOutputCoder(input, collection));
+ return collection;
+ } catch (CannotProvideCoderException e) {
+ throw new IllegalArgumentException(
+ "Unable to infer a coder and no Coder was specified. "
+ + "Please set a coder by invoking Create.withCoder() explicitly.",
+ e);
+ }
+ }
+}