You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/03/15 19:48:03 UTC
[06/23] incubator-beam git commit: [BEAM-11] Spark runner directory
structure and pom setup.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateHelper.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateHelper.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateHelper.java
new file mode 100644
index 0000000..fd6f5da
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateHelper.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package org.apache.beam.runners.spark.io.hadoop;
+
+import java.io.IOException;
+
+import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardNumber;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class ShardNameTemplateHelper {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ShardNameTemplateHelper.class);
+
+ public static final String OUTPUT_FILE_PREFIX = "spark.dataflow.fileoutputformat.prefix";
+ public static final String OUTPUT_FILE_TEMPLATE = "spark.dataflow.fileoutputformat.template";
+ public static final String OUTPUT_FILE_SUFFIX = "spark.dataflow.fileoutputformat.suffix";
+
+ private ShardNameTemplateHelper() {
+ }
+
+ public static <K, V> Path getDefaultWorkFile(FileOutputFormat<K, V> format,
+ TaskAttemptContext context) throws IOException {
+ FileOutputCommitter committer =
+ (FileOutputCommitter) format.getOutputCommitter(context);
+ return new Path(committer.getWorkPath(), getOutputFile(context));
+ }
+
+ private static String getOutputFile(TaskAttemptContext context) {
+ TaskID taskId = context.getTaskAttemptID().getTaskID();
+ int partition = taskId.getId();
+
+ String filePrefix = context.getConfiguration().get(OUTPUT_FILE_PREFIX);
+ String fileTemplate = context.getConfiguration().get(OUTPUT_FILE_TEMPLATE);
+ String fileSuffix = context.getConfiguration().get(OUTPUT_FILE_SUFFIX);
+ return filePrefix + replaceShardNumber(fileTemplate, partition) + fileSuffix;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedAvroKeyOutputFormat.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedAvroKeyOutputFormat.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedAvroKeyOutputFormat.java
new file mode 100644
index 0000000..4feaff6
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedAvroKeyOutputFormat.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package org.apache.beam.runners.spark.io.hadoop;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.avro.mapreduce.AvroKeyOutputFormat;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class TemplatedAvroKeyOutputFormat<T> extends AvroKeyOutputFormat<T>
+ implements ShardNameTemplateAware {
+
+ @Override
+ public void checkOutputSpecs(JobContext job) {
+ // don't fail if the output already exists
+ }
+
+ @Override
+ protected OutputStream getAvroFileOutputStream(TaskAttemptContext context) throws IOException {
+ Path path = ShardNameTemplateHelper.getDefaultWorkFile(this, context);
+ return path.getFileSystem(context.getConfiguration()).create(path);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedSequenceFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedSequenceFileOutputFormat.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedSequenceFileOutputFormat.java
new file mode 100644
index 0000000..922b906
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedSequenceFileOutputFormat.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package org.apache.beam.runners.spark.io.hadoop;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+
+public class TemplatedSequenceFileOutputFormat<K, V> extends SequenceFileOutputFormat<K, V>
+ implements ShardNameTemplateAware {
+
+ @Override
+ public void checkOutputSpecs(JobContext job) {
+ // don't fail if the output already exists
+ }
+
+ @Override
+ public Path getDefaultWorkFile(TaskAttemptContext context,
+ String extension) throws IOException {
+ // note that the passed-in extension is ignored since it comes from the template
+ return ShardNameTemplateHelper.getDefaultWorkFile(this, context);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedTextOutputFormat.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedTextOutputFormat.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedTextOutputFormat.java
new file mode 100644
index 0000000..1e53dce
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedTextOutputFormat.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package org.apache.beam.runners.spark.io.hadoop;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+
+public class TemplatedTextOutputFormat<K, V> extends TextOutputFormat<K, V>
+ implements ShardNameTemplateAware {
+
+ @Override
+ public void checkOutputSpecs(JobContext job) {
+ // don't fail if the output already exists
+ }
+
+ @Override
+ public Path getDefaultWorkFile(TaskAttemptContext context,
+ String extension) throws IOException {
+ // note that the passed-in extension is ignored since it comes from the template
+ return ShardNameTemplateHelper.getDefaultWorkFile(this, context);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptions.java
new file mode 100644
index 0000000..17edba3
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptions.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package org.apache.beam.runners.spark.streaming;
+
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+
+/**
+ * Options used to configure Spark streaming.
+ */
+public interface SparkStreamingPipelineOptions extends SparkPipelineOptions {
+ @Description("Timeout to wait (in msec) for the streaming execution so stop, -1 runs until " +
+ "execution is stopped")
+ @Default.Long(-1)
+ Long getTimeout();
+
+ void setTimeout(Long batchInterval);
+
+ @Override
+ @Default.Boolean(true)
+ boolean isStreaming();
+
+ @Override
+ @Default.String("spark streaming dataflow pipeline job")
+ String getAppName();
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptionsFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptionsFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptionsFactory.java
new file mode 100644
index 0000000..822feb4
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptionsFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package org.apache.beam.runners.spark.streaming;
+
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+
+public final class SparkStreamingPipelineOptionsFactory {
+
+ private SparkStreamingPipelineOptionsFactory() {
+ }
+
+ public static SparkStreamingPipelineOptions create() {
+ return PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptionsRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptionsRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptionsRegistrar.java
new file mode 100644
index 0000000..2c5414d
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/SparkStreamingPipelineOptionsRegistrar.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package org.apache.beam.runners.spark.streaming;
+
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar;
+import com.google.common.collect.ImmutableList;
+
+public class SparkStreamingPipelineOptionsRegistrar implements PipelineOptionsRegistrar {
+
+ @Override
+ public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+ return ImmutableList.<Class<? extends PipelineOptions>>of(SparkStreamingPipelineOptions
+ .class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingEvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingEvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingEvaluationContext.java
new file mode 100644
index 0000000..9d1d786
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingEvaluationContext.java
@@ -0,0 +1,226 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package org.apache.beam.runners.spark.streaming;
+
+
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.cloud.dataflow.sdk.values.POutput;
+import com.google.cloud.dataflow.sdk.values.PValue;
+
+import org.apache.beam.runners.spark.EvaluationContext;
+import org.apache.beam.runners.spark.SparkRuntimeContext;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaRDDLike;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaDStreamLike;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+
+
+/**
+ * Streaming evaluation context helps to handle streaming.
+ */
+public class StreamingEvaluationContext extends EvaluationContext {
+
+ private final JavaStreamingContext jssc;
+ private final long timeout;
+ private final Map<PValue, DStreamHolder<?>> pstreams = new LinkedHashMap<>();
+ private final Set<DStreamHolder<?>> leafStreams = new LinkedHashSet<>();
+
+ public StreamingEvaluationContext(JavaSparkContext jsc, Pipeline pipeline,
+ JavaStreamingContext jssc, long timeout) {
+ super(jsc, pipeline);
+ this.jssc = jssc;
+ this.timeout = timeout;
+ }
+
+ /**
+ * DStream holder Can also crate a DStream from a supplied queue of values, but mainly for
+ * testing.
+ */
+ private class DStreamHolder<T> {
+
+ private Iterable<Iterable<T>> values;
+ private Coder<T> coder;
+ private JavaDStream<WindowedValue<T>> dStream;
+
+ DStreamHolder(Iterable<Iterable<T>> values, Coder<T> coder) {
+ this.values = values;
+ this.coder = coder;
+ }
+
+ DStreamHolder(JavaDStream<WindowedValue<T>> dStream) {
+ this.dStream = dStream;
+ }
+
+ @SuppressWarnings("unchecked")
+ JavaDStream<WindowedValue<T>> getDStream() {
+ if (dStream == null) {
+ // create the DStream from values
+ Queue<JavaRDD<WindowedValue<T>>> rddQueue = new LinkedBlockingQueue<>();
+ for (Iterable<T> v : values) {
+ setOutputRDDFromValues(currentTransform.getTransform(), v, coder);
+ rddQueue.offer((JavaRDD<WindowedValue<T>>) getOutputRDD(currentTransform.getTransform()));
+ }
+ // create dstream from queue, one at a time, no defaults
+ // mainly for unit test so no reason to have this configurable
+ dStream = jssc.queueStream(rddQueue, true);
+ }
+ return dStream;
+ }
+ }
+
+ <T> void setDStreamFromQueue(
+ PTransform<?, ?> transform, Iterable<Iterable<T>> values, Coder<T> coder) {
+ pstreams.put((PValue) getOutput(transform), new DStreamHolder<>(values, coder));
+ }
+
+ <T> void setStream(PTransform<?, ?> transform, JavaDStream<WindowedValue<T>> dStream) {
+ PValue pvalue = (PValue) getOutput(transform);
+ DStreamHolder<T> dStreamHolder = new DStreamHolder<>(dStream);
+ pstreams.put(pvalue, dStreamHolder);
+ leafStreams.add(dStreamHolder);
+ }
+
+ boolean hasStream(PTransform<?, ?> transform) {
+ PValue pvalue = (PValue) getInput(transform);
+ return pstreams.containsKey(pvalue);
+ }
+
+ JavaDStreamLike<?, ?, ?> getStream(PTransform<?, ?> transform) {
+ return getStream((PValue) getInput(transform));
+ }
+
+ JavaDStreamLike<?, ?, ?> getStream(PValue pvalue) {
+ DStreamHolder<?> dStreamHolder = pstreams.get(pvalue);
+ JavaDStreamLike<?, ?, ?> dStream = dStreamHolder.getDStream();
+ leafStreams.remove(dStreamHolder);
+ return dStream;
+ }
+
+ // used to set the RDD from the DStream in the RDDHolder for transformation
+ <T> void setInputRDD(
+ PTransform<? extends PInput, ?> transform, JavaRDDLike<WindowedValue<T>, ?> rdd) {
+ setRDD((PValue) getInput(transform), rdd);
+ }
+
+ // used to get the RDD transformation output and use it as the DStream transformation output
+ JavaRDDLike<?, ?> getOutputRDD(PTransform<?, ?> transform) {
+ return getRDD((PValue) getOutput(transform));
+ }
+
+ public JavaStreamingContext getStreamingContext() {
+ return jssc;
+ }
+
+ @Override
+ protected void computeOutputs() {
+ for (DStreamHolder<?> streamHolder : leafStreams) {
+ computeOutput(streamHolder);
+ }
+ }
+
+ private static <T> void computeOutput(DStreamHolder<T> streamHolder) {
+ streamHolder.getDStream().foreachRDD(new Function<JavaRDD<WindowedValue<T>>, Void>() {
+ @Override
+ public Void call(JavaRDD<WindowedValue<T>> rdd) throws Exception {
+ rdd.rdd().cache();
+ rdd.count();
+ return null;
+ }
+ }); // force a DStream action
+ }
+
+ @Override
+ public void close() {
+ if (timeout > 0) {
+ jssc.awaitTerminationOrTimeout(timeout);
+ } else {
+ jssc.awaitTermination();
+ }
+ //TODO: stop gracefully ?
+ jssc.stop(false, false);
+ state = State.DONE;
+ super.close();
+ }
+
+ private State state = State.RUNNING;
+
+ @Override
+ public State getState() {
+ return state;
+ }
+
+ //---------------- override in order to expose in package
+ @Override
+ protected <I extends PInput> I getInput(PTransform<I, ?> transform) {
+ return super.getInput(transform);
+ }
+ @Override
+ protected <O extends POutput> O getOutput(PTransform<?, O> transform) {
+ return super.getOutput(transform);
+ }
+
+ @Override
+ protected JavaSparkContext getSparkContext() {
+ return super.getSparkContext();
+ }
+
+ @Override
+ protected SparkRuntimeContext getRuntimeContext() {
+ return super.getRuntimeContext();
+ }
+
+ @Override
+ protected void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) {
+ super.setCurrentTransform(transform);
+ }
+
+ @Override
+ protected AppliedPTransform<?, ?, ?> getCurrentTransform() {
+ return super.getCurrentTransform();
+ }
+
+ @Override
+ protected <T> void setOutputRDD(PTransform<?, ?> transform,
+ JavaRDDLike<WindowedValue<T>, ?> rdd) {
+ super.setOutputRDD(transform, rdd);
+ }
+
+ @Override
+ protected <T> void setOutputRDDFromValues(PTransform<?, ?> transform, Iterable<T> values,
+ Coder<T> coder) {
+ super.setOutputRDDFromValues(transform, values, coder);
+ }
+
+ @Override
+ protected boolean hasOutputRDD(PTransform<? extends PInput, ?> transform) {
+ return super.hasOutputRDD(transform);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingTransformTranslator.java
new file mode 100644
index 0000000..c78c7fa
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingTransformTranslator.java
@@ -0,0 +1,415 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package org.apache.beam.runners.spark.streaming;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.api.client.util.Lists;
+import com.google.api.client.util.Maps;
+import com.google.api.client.util.Sets;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.VoidCoder;
+import com.google.cloud.dataflow.sdk.io.AvroIO;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.Flatten;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
+import com.google.cloud.dataflow.sdk.util.AssignWindowsDoFn;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollectionList;
+import com.google.cloud.dataflow.sdk.values.PDone;
+
+import com.google.common.reflect.TypeToken;
+import kafka.serializer.Decoder;
+
+import org.apache.beam.runners.spark.DoFnFunction;
+import org.apache.beam.runners.spark.EvaluationContext;
+import org.apache.beam.runners.spark.SparkPipelineTranslator;
+import org.apache.beam.runners.spark.TransformEvaluator;
+import org.apache.beam.runners.spark.TransformTranslator;
+import org.apache.beam.runners.spark.WindowingHelpers;
+import org.apache.beam.runners.spark.io.ConsoleIO;
+import org.apache.beam.runners.spark.io.CreateStream;
+import org.apache.beam.runners.spark.io.KafkaIO;
+import org.apache.beam.runners.spark.io.hadoop.HadoopIO;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.Durations;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaDStreamLike;
+import org.apache.spark.streaming.api.java.JavaPairInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.kafka.KafkaUtils;
+
+import scala.Tuple2;
+
+
+/**
+ * Supports translation between a DataFlow transform, and Spark's operations on DStreams.
+ */
+public final class StreamingTransformTranslator {
+
+ private StreamingTransformTranslator() {
+ }
+
+ private static <T> TransformEvaluator<ConsoleIO.Write.Unbound<T>> print() {
+ return new TransformEvaluator<ConsoleIO.Write.Unbound<T>>() {
+ @Override
+ public void evaluate(ConsoleIO.Write.Unbound<T> transform, EvaluationContext context) {
+ @SuppressWarnings("unchecked")
+ JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>> dstream =
+ (JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>>)
+ ((StreamingEvaluationContext) context).getStream(transform);
+ dstream.map(WindowingHelpers.<T>unwindowFunction()).print(transform.getNum());
+ }
+ };
+ }
+
+ private static <K, V> TransformEvaluator<KafkaIO.Read.Unbound<K, V>> kafka() {
+ return new TransformEvaluator<KafkaIO.Read.Unbound<K, V>>() {
+ @Override
+ public void evaluate(KafkaIO.Read.Unbound<K, V> transform, EvaluationContext context) {
+ StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
+ JavaStreamingContext jssc = sec.getStreamingContext();
+ Class<K> keyClazz = transform.getKeyClass();
+ Class<V> valueClazz = transform.getValueClass();
+ Class<? extends Decoder<K>> keyDecoderClazz = transform.getKeyDecoderClass();
+ Class<? extends Decoder<V>> valueDecoderClazz = transform.getValueDecoderClass();
+ Map<String, String> kafkaParams = transform.getKafkaParams();
+ Set<String> topics = transform.getTopics();
+ JavaPairInputDStream<K, V> inputPairStream = KafkaUtils.createDirectStream(jssc, keyClazz,
+ valueClazz, keyDecoderClazz, valueDecoderClazz, kafkaParams, topics);
+ JavaDStream<WindowedValue<KV<K, V>>> inputStream =
+ inputPairStream.map(new Function<Tuple2<K, V>, KV<K, V>>() {
+ @Override
+ public KV<K, V> call(Tuple2<K, V> t2) throws Exception {
+ return KV.of(t2._1(), t2._2());
+ }
+ }).map(WindowingHelpers.<KV<K, V>>windowFunction());
+ sec.setStream(transform, inputStream);
+ }
+ };
+ }
+
+ private static <T> TransformEvaluator<Create.Values<T>> create() {
+ return new TransformEvaluator<Create.Values<T>>() {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void evaluate(Create.Values<T> transform, EvaluationContext context) {
+ StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
+ Iterable<T> elems = transform.getElements();
+ Coder<T> coder = sec.getOutput(transform).getCoder();
+ if (coder != VoidCoder.of()) {
+ // actual create
+ sec.setOutputRDDFromValues(transform, elems, coder);
+ } else {
+ // fake create as an input
+ // creates a stream with a single batch containing a single null element
+ // to invoke following transformations once
+ // to support DataflowAssert
+ sec.setDStreamFromQueue(transform,
+ Collections.<Iterable<Void>>singletonList(Collections.singletonList((Void) null)),
+ (Coder<Void>) coder);
+ }
+ }
+ };
+ }
+
+ private static <T> TransformEvaluator<CreateStream.QueuedValues<T>> createFromQueue() {
+ return new TransformEvaluator<CreateStream.QueuedValues<T>>() {
+ @Override
+ public void evaluate(CreateStream.QueuedValues<T> transform, EvaluationContext context) {
+ StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
+ Iterable<Iterable<T>> values = transform.getQueuedValues();
+ Coder<T> coder = sec.getOutput(transform).getCoder();
+ sec.setDStreamFromQueue(transform, values, coder);
+ }
+ };
+ }
+
+ private static <T> TransformEvaluator<Flatten.FlattenPCollectionList<T>> flattenPColl() {
+ return new TransformEvaluator<Flatten.FlattenPCollectionList<T>>() {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void evaluate(Flatten.FlattenPCollectionList<T> transform, EvaluationContext context) {
+ StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
+ PCollectionList<T> pcs = sec.getInput(transform);
+ JavaDStream<WindowedValue<T>> first =
+ (JavaDStream<WindowedValue<T>>) sec.getStream(pcs.get(0));
+ List<JavaDStream<WindowedValue<T>>> rest = Lists.newArrayListWithCapacity(pcs.size() - 1);
+ for (int i = 1; i < pcs.size(); i++) {
+ rest.add((JavaDStream<WindowedValue<T>>) sec.getStream(pcs.get(i)));
+ }
+ JavaDStream<WindowedValue<T>> dstream = sec.getStreamingContext().union(first, rest);
+ sec.setStream(transform, dstream);
+ }
+ };
+ }
+
+ private static <PT extends PTransform<?, ?>> TransformEvaluator<PT> rddTransform(
+ final SparkPipelineTranslator rddTranslator) {
+ return new TransformEvaluator<PT>() {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void evaluate(PT transform, EvaluationContext context) {
+ TransformEvaluator<PT> rddEvaluator =
+ rddTranslator.translate((Class<PT>) transform.getClass());
+
+ StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
+ if (sec.hasStream(transform)) {
+ JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>> dStream =
+ (JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>>)
+ sec.getStream(transform);
+
+ sec.setStream(transform, dStream
+ .transform(new RDDTransform<>(sec, rddEvaluator, transform)));
+ } else {
+ // if the transformation requires direct access to RDD (not in stream)
+ // this is used for "fake" transformations like with DataflowAssert
+ rddEvaluator.evaluate(transform, context);
+ }
+ }
+ };
+ }
+
+ /**
+ * RDD transform function If the transformation function doesn't have an input, create a fake one
+ * as an empty RDD.
+ *
+ * @param <PT> PTransform type
+ */
+ private static final class RDDTransform<PT extends PTransform<?, ?>>
+ implements Function<JavaRDD<WindowedValue<Object>>, JavaRDD<WindowedValue<Object>>> {
+
+ private final StreamingEvaluationContext context;
+ private final AppliedPTransform<?, ?, ?> appliedPTransform;
+ private final TransformEvaluator<PT> rddEvaluator;
+ private final PT transform;
+
+
+ private RDDTransform(StreamingEvaluationContext context, TransformEvaluator<PT> rddEvaluator,
+ PT transform) {
+ this.context = context;
+ this.appliedPTransform = context.getCurrentTransform();
+ this.rddEvaluator = rddEvaluator;
+ this.transform = transform;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public JavaRDD<WindowedValue<Object>>
+ call(JavaRDD<WindowedValue<Object>> rdd) throws Exception {
+ AppliedPTransform<?, ?, ?> existingAPT = context.getCurrentTransform();
+ context.setCurrentTransform(appliedPTransform);
+ context.setInputRDD(transform, rdd);
+ rddEvaluator.evaluate(transform, context);
+ if (!context.hasOutputRDD(transform)) {
+ // fake RDD as output
+ context.setOutputRDD(transform,
+ context.getSparkContext().<WindowedValue<Object>>emptyRDD());
+ }
+ JavaRDD<WindowedValue<Object>> outRDD =
+ (JavaRDD<WindowedValue<Object>>) context.getOutputRDD(transform);
+ context.setCurrentTransform(existingAPT);
+ return outRDD;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <PT extends PTransform<?, ?>> TransformEvaluator<PT> foreachRDD(
+ final SparkPipelineTranslator rddTranslator) {
+ return new TransformEvaluator<PT>() {
+ @Override
+ public void evaluate(PT transform, EvaluationContext context) {
+ TransformEvaluator<PT> rddEvaluator =
+ rddTranslator.translate((Class<PT>) transform.getClass());
+
+ StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
+ if (sec.hasStream(transform)) {
+ JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>> dStream =
+ (JavaDStreamLike<WindowedValue<Object>, ?, JavaRDD<WindowedValue<Object>>>)
+ sec.getStream(transform);
+
+ dStream.foreachRDD(new RDDOutputOperator<>(sec, rddEvaluator, transform));
+ } else {
+ rddEvaluator.evaluate(transform, context);
+ }
+ }
+ };
+ }
+
+ /**
+ * RDD output function.
+ *
+ * @param <PT> PTransform type
+ */
+ private static final class RDDOutputOperator<PT extends PTransform<?, ?>>
+ implements Function<JavaRDD<WindowedValue<Object>>, Void> {
+
+ private final StreamingEvaluationContext context;
+ private final AppliedPTransform<?, ?, ?> appliedPTransform;
+ private final TransformEvaluator<PT> rddEvaluator;
+ private final PT transform;
+
+
+ private RDDOutputOperator(StreamingEvaluationContext context,
+ TransformEvaluator<PT> rddEvaluator, PT transform) {
+ this.context = context;
+ this.appliedPTransform = context.getCurrentTransform();
+ this.rddEvaluator = rddEvaluator;
+ this.transform = transform;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Void call(JavaRDD<WindowedValue<Object>> rdd) throws Exception {
+ AppliedPTransform<?, ?, ?> existingAPT = context.getCurrentTransform();
+ context.setCurrentTransform(appliedPTransform);
+ context.setInputRDD(transform, rdd);
+ rddEvaluator.evaluate(transform, context);
+ context.setCurrentTransform(existingAPT);
+ return null;
+ }
+ }
+
+ private static final TransformTranslator.FieldGetter WINDOW_FG =
+ new TransformTranslator.FieldGetter(Window.Bound.class);
+
+ private static <T, W extends BoundedWindow> TransformEvaluator<Window.Bound<T>> window() {
+ return new TransformEvaluator<Window.Bound<T>>() {
+ @Override
+ public void evaluate(Window.Bound<T> transform, EvaluationContext context) {
+ StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
+ //--- first we apply windowing to the stream
+ WindowFn<? super T, W> windowFn = WINDOW_FG.get("windowFn", transform);
+ @SuppressWarnings("unchecked")
+ JavaDStream<WindowedValue<T>> dStream =
+ (JavaDStream<WindowedValue<T>>) sec.getStream(transform);
+ if (windowFn instanceof FixedWindows) {
+ Duration windowDuration = Durations.milliseconds(((FixedWindows) windowFn).getSize()
+ .getMillis());
+ sec.setStream(transform, dStream.window(windowDuration));
+ } else if (windowFn instanceof SlidingWindows) {
+ Duration windowDuration = Durations.milliseconds(((SlidingWindows) windowFn).getSize()
+ .getMillis());
+ Duration slideDuration = Durations.milliseconds(((SlidingWindows) windowFn).getPeriod()
+ .getMillis());
+ sec.setStream(transform, dStream.window(windowDuration, slideDuration));
+ }
+ //--- then we apply windowing to the elements
+ DoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn);
+ DoFnFunction<T, T> dofn = new DoFnFunction<>(addWindowsDoFn,
+ ((StreamingEvaluationContext)context).getRuntimeContext(), null);
+ @SuppressWarnings("unchecked")
+ JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>> dstream =
+ (JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>>)
+ sec.getStream(transform);
+ sec.setStream(transform, dstream.mapPartitions(dofn));
+ }
+ };
+ }
+
+ private static final Map<Class<? extends PTransform>, TransformEvaluator<?>> EVALUATORS = Maps
+ .newHashMap();
+
+ static {
+ EVALUATORS.put(ConsoleIO.Write.Unbound.class, print());
+ EVALUATORS.put(CreateStream.QueuedValues.class, createFromQueue());
+ EVALUATORS.put(Create.Values.class, create());
+ EVALUATORS.put(KafkaIO.Read.Unbound.class, kafka());
+ EVALUATORS.put(Window.Bound.class, window());
+ EVALUATORS.put(Flatten.FlattenPCollectionList.class, flattenPColl());
+ }
+
+ private static final Set<Class<? extends PTransform>> UNSUPPORTED_EVALUATORS = Sets
+ .newHashSet();
+
+ static {
+ //TODO - add support for the following
+ UNSUPPORTED_EVALUATORS.add(TextIO.Read.Bound.class);
+ UNSUPPORTED_EVALUATORS.add(TextIO.Write.Bound.class);
+ UNSUPPORTED_EVALUATORS.add(AvroIO.Read.Bound.class);
+ UNSUPPORTED_EVALUATORS.add(AvroIO.Write.Bound.class);
+ UNSUPPORTED_EVALUATORS.add(HadoopIO.Read.Bound.class);
+ UNSUPPORTED_EVALUATORS.add(HadoopIO.Write.Bound.class);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <PT extends PTransform<?, ?>> TransformEvaluator<PT>
+ getTransformEvaluator(Class<PT> clazz, SparkPipelineTranslator rddTranslator) {
+ TransformEvaluator<PT> transform = (TransformEvaluator<PT>) EVALUATORS.get(clazz);
+ if (transform == null) {
+ if (UNSUPPORTED_EVALUATORS.contains(clazz)) {
+ throw new UnsupportedOperationException("Dataflow transformation " + clazz
+ .getCanonicalName()
+ + " is currently unsupported by the Spark streaming pipeline");
+ }
+ // DStream transformations will transform an RDD into another RDD
+ // Actions will create output
+ // In Dataflow it depends on the PTransform's Input and Output class
+ Class<?> pTOutputClazz = getPTransformOutputClazz(clazz);
+ if (PDone.class.equals(pTOutputClazz)) {
+ return foreachRDD(rddTranslator);
+ } else {
+ return rddTransform(rddTranslator);
+ }
+ }
+ return transform;
+ }
+
+ private static <PT extends PTransform<?, ?>> Class<?> getPTransformOutputClazz(Class<PT> clazz) {
+ Type[] types = ((ParameterizedType) clazz.getGenericSuperclass()).getActualTypeArguments();
+ return TypeToken.of(clazz).resolveType(types[1]).getRawType();
+ }
+
+ /**
+ * Translator matches Dataflow transformation with the appropriate Spark streaming evaluator.
+ * rddTranslator uses Spark evaluators in transform/foreachRDD to evaluate the transformation
+ */
+ public static class Translator implements SparkPipelineTranslator {
+
+ private final SparkPipelineTranslator rddTranslator;
+
+ public Translator(SparkPipelineTranslator rddTranslator) {
+ this.rddTranslator = rddTranslator;
+ }
+
+ @Override
+ public boolean hasTranslation(Class<? extends PTransform<?, ?>> clazz) {
+ // streaming includes rdd transformations as well
+ return EVALUATORS.containsKey(clazz) || rddTranslator.hasTranslation(clazz);
+ }
+
+ @Override
+ public <PT extends PTransform<?, ?>> TransformEvaluator<PT> translate(Class<PT> clazz) {
+ return getTransformEvaluator(clazz, rddTranslator);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingWindowPipelineDetector.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingWindowPipelineDetector.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingWindowPipelineDetector.java
new file mode 100644
index 0000000..6844011
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/streaming/StreamingWindowPipelineDetector.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package org.apache.beam.runners.spark.streaming;
+
+import com.google.cloud.dataflow.sdk.runners.TransformTreeNode;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
+import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.cloud.dataflow.sdk.values.POutput;
+
+import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.runners.spark.SparkPipelineTranslator;
+import org.apache.beam.runners.spark.TransformTranslator;
+
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.Durations;
+
+
+/**
+ * Pipeline {@link SparkPipelineRunner.Evaluator} to detect windowing.
+ */
+public final class StreamingWindowPipelineDetector extends SparkPipelineRunner.Evaluator {
+
+ // Currently, Spark streaming recommends batches no smaller then 500 msec
+ private static final Duration SPARK_MIN_WINDOW = Durations.milliseconds(500);
+
+ private boolean windowing;
+ private Duration batchDuration;
+
+ public StreamingWindowPipelineDetector(SparkPipelineTranslator translator) {
+ super(translator);
+ }
+
+ private static final TransformTranslator.FieldGetter WINDOW_FG =
+ new TransformTranslator.FieldGetter(Window.Bound.class);
+
+ // Use the smallest window (fixed or sliding) as Spark streaming's batch duration
+ @Override
+ protected <PT extends PTransform<? super PInput, POutput>> void
+ doVisitTransform(TransformTreeNode node) {
+ @SuppressWarnings("unchecked")
+ PT transform = (PT) node.getTransform();
+ @SuppressWarnings("unchecked")
+ Class<PT> transformClass = (Class<PT>) (Class<?>) transform.getClass();
+ if (transformClass.isAssignableFrom(Window.Bound.class)) {
+ WindowFn<?, ?> windowFn = WINDOW_FG.get("windowFn", transform);
+ if (windowFn instanceof FixedWindows) {
+ setBatchDuration(((FixedWindows) windowFn).getSize());
+ } else if (windowFn instanceof SlidingWindows) {
+ if (((SlidingWindows) windowFn).getOffset().getMillis() > 0) {
+ throw new UnsupportedOperationException("Spark does not support window offsets");
+ }
+ // Sliding window size might as well set the batch duration. Applying the transformation
+ // will add the "slide"
+ setBatchDuration(((SlidingWindows) windowFn).getSize());
+ } else if (!(windowFn instanceof GlobalWindows)) {
+ throw new IllegalStateException("Windowing function not supported: " + windowFn);
+ }
+ }
+ }
+
+ private void setBatchDuration(org.joda.time.Duration duration) {
+ Long durationMillis = duration.getMillis();
+ // validate window size
+ if (durationMillis < SPARK_MIN_WINDOW.milliseconds()) {
+ throw new IllegalArgumentException("Windowing of size " + durationMillis +
+ "msec is not supported!");
+ }
+ // choose the smallest duration to be Spark's batch duration, larger ones will be handled
+ // as window functions over the batched-stream
+ if (!windowing || this.batchDuration.milliseconds() > durationMillis) {
+ this.batchDuration = Durations.milliseconds(durationMillis);
+ }
+ windowing = true;
+ }
+
+ public boolean isWindowing() {
+ return windowing;
+ }
+
+ public Duration getBatchDuration() {
+ return batchDuration;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
new file mode 100644
index 0000000..af831c6
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package org.apache.beam.runners.spark.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class BroadcastHelper<T> implements Serializable {
+
+ /**
+ * If the property {@code dataflow.spark.directBroadcast} is set to
+ * {@code true} then Spark serialization (Kryo) will be used to broadcast values
+ * in View objects. By default this property is not set, and values are coded using
+ * the appropriate {@link Coder}.
+ */
+ public static final String DIRECT_BROADCAST = "dataflow.spark.directBroadcast";
+
+ private static final Logger LOG = LoggerFactory.getLogger(BroadcastHelper.class);
+
+ public static <T> BroadcastHelper<T> create(T value, Coder<T> coder) {
+ if (Boolean.parseBoolean(System.getProperty(DIRECT_BROADCAST, "false"))) {
+ return new DirectBroadcastHelper<>(value);
+ }
+ return new CodedBroadcastHelper<>(value, coder);
+ }
+
+ public abstract T getValue();
+
+ public abstract void broadcast(JavaSparkContext jsc);
+
+ /**
+ * A {@link BroadcastHelper} that relies on the underlying
+ * Spark serialization (Kryo) to broadcast values. This is appropriate when
+ * broadcasting very large values, since no copy of the object is made.
+ * @param <T>
+ */
+ static class DirectBroadcastHelper<T> extends BroadcastHelper<T> {
+ private Broadcast<T> bcast;
+ private transient T value;
+
+ DirectBroadcastHelper(T value) {
+ this.value = value;
+ }
+
+ @Override
+ public synchronized T getValue() {
+ if (value == null) {
+ value = bcast.getValue();
+ }
+ return value;
+ }
+
+ @Override
+ public void broadcast(JavaSparkContext jsc) {
+ this.bcast = jsc.broadcast(value);
+ }
+ }
+
+ /**
+ * A {@link BroadcastHelper} that uses a
+ * {@link Coder} to encode values as byte arrays
+ * before broadcasting.
+ * @param <T>
+ */
+ static class CodedBroadcastHelper<T> extends BroadcastHelper<T> {
+ private Broadcast<byte[]> bcast;
+ private final Coder<T> coder;
+ private transient T value;
+
+ CodedBroadcastHelper(T value, Coder<T> coder) {
+ this.value = value;
+ this.coder = coder;
+ }
+
+ @Override
+ public synchronized T getValue() {
+ if (value == null) {
+ value = deserialize();
+ }
+ return value;
+ }
+
+ @Override
+ public void broadcast(JavaSparkContext jsc) {
+ this.bcast = jsc.broadcast(CoderHelpers.toByteArray(value, coder));
+ }
+
+ private T deserialize() {
+ T val;
+ try {
+ val = coder.decode(new ByteArrayInputStream(bcast.value()),
+ new Coder.Context(true));
+ } catch (IOException ioe) {
+ // this should not ever happen, log it if it does.
+ LOG.warn(ioe.getMessage());
+ val = null;
+ }
+ return val;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/ByteArray.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/ByteArray.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/ByteArray.java
new file mode 100644
index 0000000..7679b9c
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/ByteArray.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package org.apache.beam.runners.spark.util;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+import com.google.common.primitives.UnsignedBytes;
+
+public class ByteArray implements Serializable, Comparable<ByteArray> {
+
+ private final byte[] value;
+
+ public ByteArray(byte[] value) {
+ this.value = value;
+ }
+
+ public byte[] getValue() {
+ return value;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ByteArray byteArray = (ByteArray) o;
+ return Arrays.equals(value, byteArray.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return value != null ? Arrays.hashCode(value) : 0;
+ }
+
+ @Override
+ public int compareTo(ByteArray other) {
+ return UnsignedBytes.lexicographicalComparator().compare(value, other.value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar b/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar
index 5733a86..98387a6 100644
--- a/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar
+++ b/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar
@@ -13,5 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-com.cloudera.dataflow.spark.SparkPipelineOptionsRegistrar
-com.cloudera.dataflow.spark.streaming.SparkStreamingPipelineOptionsRegistrar
\ No newline at end of file
+org.apache.beam.runners.spark.SparkPipelineOptionsRegistrar
+org.apache.beam.runners.spark.streaming.SparkStreamingPipelineOptionsRegistrar
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar b/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar
index 26e0b3a..972b1a3 100644
--- a/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar
+++ b/runners/spark/src/main/resources/META-INF/services/com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar
@@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-com.cloudera.dataflow.spark.SparkPipelineRunnerRegistrar
\ No newline at end of file
+org.apache.beam.runners.spark.SparkPipelineRunnerRegistrar
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/com/cloudera/dataflow/hadoop/WritableCoderTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/hadoop/WritableCoderTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/hadoop/WritableCoderTest.java
deleted file mode 100644
index 29a73b6..0000000
--- a/runners/spark/src/test/java/com/cloudera/dataflow/hadoop/WritableCoderTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.hadoop;
-
-import com.google.cloud.dataflow.sdk.testing.CoderProperties;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.junit.Test;
-
-/**
- * Tests for WritableCoder.
- */
-public class WritableCoderTest {
-
- @Test
- public void testIntWritableEncoding() throws Exception {
- IntWritable value = new IntWritable(42);
- WritableCoder<IntWritable> coder = WritableCoder.of(IntWritable.class);
-
- CoderProperties.coderDecodeEncodeEqual(coder, value);
- }
-
- @Test
- public void testNullWritableEncoding() throws Exception {
- WritableCoder<NullWritable> coder = WritableCoder.of(NullWritable.class);
-
- CoderProperties.coderDecodeEncodeEqual(coder, NullWritable.get());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/com/cloudera/dataflow/spark/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/AvroPipelineTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/AvroPipelineTest.java
deleted file mode 100644
index ea4cc38..0000000
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/AvroPipelineTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.AvroIO;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.collect.Lists;
-import com.google.common.io.Resources;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.List;
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import static org.junit.Assert.assertEquals;
-
-public class AvroPipelineTest {
-
- private File inputFile;
- private File outputDir;
-
- @Rule
- public final TemporaryFolder tmpDir = new TemporaryFolder();
-
- @Before
- public void setUp() throws IOException {
- inputFile = tmpDir.newFile("test.avro");
- outputDir = tmpDir.newFolder("out");
- outputDir.delete();
- }
-
- @Test
- public void testGeneric() throws Exception {
- Schema schema = new Schema.Parser().parse(Resources.getResource("person.avsc").openStream());
- GenericRecord savedRecord = new GenericData.Record(schema);
- savedRecord.put("name", "John Doe");
- savedRecord.put("age", 42);
- savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
- populateGenericFile(Lists.newArrayList(savedRecord), schema);
-
- Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
- PCollection<GenericRecord> input = p.apply(
- AvroIO.Read.from(inputFile.getAbsolutePath()).withSchema(schema));
- input.apply(AvroIO.Write.to(outputDir.getAbsolutePath()).withSchema(schema));
- EvaluationResult res = SparkPipelineRunner.create().run(p);
- res.close();
-
- List<GenericRecord> records = readGenericFile();
- assertEquals(Lists.newArrayList(savedRecord), records);
- }
-
- private void populateGenericFile(List<GenericRecord> genericRecords, Schema schema) throws IOException {
- FileOutputStream outputStream = new FileOutputStream(this.inputFile);
- GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<>(schema);
-
- try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(genericDatumWriter)) {
- dataFileWriter.create(schema, outputStream);
- for (GenericRecord record : genericRecords) {
- dataFileWriter.append(record);
- }
- }
- outputStream.close();
- }
-
- private List<GenericRecord> readGenericFile() throws IOException {
- List<GenericRecord> records = Lists.newArrayList();
- GenericDatumReader<GenericRecord> genericDatumReader = new GenericDatumReader<>();
- try (DataFileReader<GenericRecord> dataFileReader =
- new DataFileReader<>(new File(outputDir + "-00000-of-00001"), genericDatumReader)) {
- for (GenericRecord record : dataFileReader) {
- records.add(record);
- }
- }
- return records;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/com/cloudera/dataflow/spark/CombineGloballyTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/CombineGloballyTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/CombineGloballyTest.java
deleted file mode 100644
index 667e949..0000000
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/CombineGloballyTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.collect.Iterables;
-import java.util.Arrays;
-import java.util.List;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class CombineGloballyTest {
-
- private static final String[] WORDS_ARRAY = {
- "hi there", "hi", "hi sue bob",
- "hi sue", "", "bob hi"};
- private static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
-
- @Test
- public void test() throws Exception {
- SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
- Pipeline p = Pipeline.create(options);
- PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());
- PCollection<String> output = inputWords.apply(Combine.globally(new WordMerger()));
-
- EvaluationResult res = SparkPipelineRunner.create().run(p);
- assertEquals("hi there,hi,hi sue bob,hi sue,,bob hi", Iterables.getOnlyElement(res.get(output)));
- res.close();
- }
-
- public static class WordMerger extends Combine.CombineFn<String, StringBuilder, String> {
-
- @Override
- public StringBuilder createAccumulator() {
- // return null to differentiate from an empty string
- return null;
- }
-
- @Override
- public StringBuilder addInput(StringBuilder accumulator, String input) {
- return combine(accumulator, input);
- }
-
- @Override
- public StringBuilder mergeAccumulators(Iterable<StringBuilder> accumulators) {
- StringBuilder sb = new StringBuilder();
- for (StringBuilder accum : accumulators) {
- if (accum != null) {
- sb.append(accum);
- }
- }
- return sb;
- }
-
- @Override
- public String extractOutput(StringBuilder accumulator) {
- return accumulator.toString();
- }
-
- private static StringBuilder combine(StringBuilder accum, String datum) {
- if (accum == null) {
- return new StringBuilder(datum);
- } else {
- accum.append(",").append(datum);
- return accum;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/com/cloudera/dataflow/spark/CombinePerKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/CombinePerKeyTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/CombinePerKeyTest.java
deleted file mode 100644
index f9d5b46..0000000
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/CombinePerKeyTest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.coders.VarLongCoder;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.Sum;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.collect.ImmutableList;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class CombinePerKeyTest {
-
- private static final List<String> WORDS =
- ImmutableList.of("the", "quick", "brown", "fox", "jumped", "over", "the", "lazy", "dog");
- @Test
- public void testRun() {
- Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
- PCollection<String> inputWords = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());
- PCollection<KV<String, Long>> cnts = inputWords.apply(new SumPerKey<String>());
- EvaluationResult res = SparkPipelineRunner.create().run(p);
- Map<String, Long> actualCnts = new HashMap<>();
- for (KV<String, Long> kv : res.get(cnts)) {
- actualCnts.put(kv.getKey(), kv.getValue());
- }
- res.close();
- Assert.assertEquals(8, actualCnts.size());
- Assert.assertEquals(Long.valueOf(2L), actualCnts.get("the"));
- }
-
- private static class SumPerKey<T> extends PTransform<PCollection<T>, PCollection<KV<T, Long>>> {
- @Override
- public PCollection<KV<T, Long>> apply(PCollection<T> pcol) {
- PCollection<KV<T, Long>> withLongs = pcol.apply(ParDo.of(new DoFn<T, KV<T, Long>>() {
- @Override
- public void processElement(ProcessContext processContext) throws Exception {
- processContext.output(KV.of(processContext.element(), 1L));
- }
- })).setCoder(KvCoder.of(pcol.getCoder(), VarLongCoder.of()));
- return withLongs.apply(Sum.<T>longsPerKey());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/com/cloudera/dataflow/spark/DeDupTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/DeDupTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/DeDupTest.java
deleted file mode 100644
index 7495aeb..0000000
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/DeDupTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.collect.ImmutableSet;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-import org.junit.Test;
-
-/**
- * A test based on {@code DeDupExample} from the SDK.
- */
-public class DeDupTest {
-
- private static final String[] LINES_ARRAY = {
- "hi there", "hello", "hi there",
- "hi", "hello"};
- private static final List<String> LINES = Arrays.asList(LINES_ARRAY);
- private static final Set<String> EXPECTED_SET =
- ImmutableSet.of("hi there", "hi", "hello");
-
- @Test
- public void testRun() throws Exception {
- SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
- options.setRunner(SparkPipelineRunner.class);
- Pipeline p = Pipeline.create(options);
- PCollection<String> input = p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of());
- PCollection<String> output = input.apply(RemoveDuplicates.<String>create());
-
- DataflowAssert.that(output).containsInAnyOrder(EXPECTED_SET);
-
- EvaluationResult res = SparkPipelineRunner.create().run(p);
- res.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/com/cloudera/dataflow/spark/DoFnOutputTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/DoFnOutputTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/DoFnOutputTest.java
deleted file mode 100644
index 2b0947f..0000000
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/DoFnOutputTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import java.io.Serializable;
-import org.junit.Test;
-
-public class DoFnOutputTest implements Serializable {
- @Test
- public void test() throws Exception {
- SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
- options.setRunner(SparkPipelineRunner.class);
- Pipeline pipeline = Pipeline.create(options);
-
- PCollection<String> strings = pipeline.apply(Create.of("a"));
- // Test that values written from startBundle() and finishBundle() are written to
- // the output
- PCollection<String> output = strings.apply(ParDo.of(new DoFn<String, String>() {
- @Override
- public void startBundle(Context c) throws Exception {
- c.output("start");
- }
- @Override
- public void processElement(ProcessContext c) throws Exception {
- c.output(c.element());
- }
- @Override
- public void finishBundle(Context c) throws Exception {
- c.output("finish");
- }
- }));
-
- DataflowAssert.that(output).containsInAnyOrder("start", "a", "finish");
-
- EvaluationResult res = SparkPipelineRunner.create().run(pipeline);
- res.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/com/cloudera/dataflow/spark/EmptyInputTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/EmptyInputTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/EmptyInputTest.java
deleted file mode 100644
index 6c89ca1..0000000
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/EmptyInputTest.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.collect.Iterables;
-import java.util.Collections;
-import java.util.List;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class EmptyInputTest {
-
- @Test
- public void test() throws Exception {
- SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
- Pipeline p = Pipeline.create(options);
- List<String> empty = Collections.emptyList();
- PCollection<String> inputWords = p.apply(Create.of(empty)).setCoder(StringUtf8Coder.of());
- PCollection<String> output = inputWords.apply(Combine.globally(new ConcatWords()));
-
- EvaluationResult res = SparkPipelineRunner.create().run(p);
- assertEquals("", Iterables.getOnlyElement(res.get(output)));
- res.close();
- }
-
- public static class ConcatWords implements SerializableFunction<Iterable<String>, String> {
- @Override
- public String apply(Iterable<String> input) {
- StringBuilder all = new StringBuilder();
- for (String item : input) {
- if (!item.isEmpty()) {
- if (all.length() == 0) {
- all.append(item);
- } else {
- all.append(",");
- all.append(item);
- }
- }
- }
- return all.toString();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java
deleted file mode 100644
index 579ada5..0000000
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import com.cloudera.dataflow.hadoop.HadoopIO;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import java.io.File;
-import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Reader;
-import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import static org.junit.Assert.assertEquals;
-
-public class HadoopFileFormatPipelineTest {
-
- private File inputFile;
- private File outputFile;
-
- @Rule
- public final TemporaryFolder tmpDir = new TemporaryFolder();
-
- @Before
- public void setUp() throws IOException {
- inputFile = tmpDir.newFile("test.seq");
- outputFile = tmpDir.newFolder("out");
- outputFile.delete();
- }
-
- @Test
- public void testSequenceFile() throws Exception {
- populateFile();
-
- Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
- @SuppressWarnings("unchecked")
- Class<? extends FileInputFormat<IntWritable, Text>> inputFormatClass =
- (Class<? extends FileInputFormat<IntWritable, Text>>) (Class<?>) SequenceFileInputFormat.class;
- HadoopIO.Read.Bound<IntWritable,Text> read =
- HadoopIO.Read.from(inputFile.getAbsolutePath(), inputFormatClass, IntWritable.class, Text.class);
- PCollection<KV<IntWritable, Text>> input = p.apply(read);
- @SuppressWarnings("unchecked")
- Class<? extends FileOutputFormat<IntWritable, Text>> outputFormatClass =
- (Class<? extends FileOutputFormat<IntWritable, Text>>) (Class<?>) TemplatedSequenceFileOutputFormat.class;
- @SuppressWarnings("unchecked")
- HadoopIO.Write.Bound<IntWritable,Text> write = HadoopIO.Write.to(outputFile.getAbsolutePath(),
- outputFormatClass, IntWritable.class, Text.class);
- input.apply(write.withoutSharding());
- EvaluationResult res = SparkPipelineRunner.create().run(p);
- res.close();
-
- IntWritable key = new IntWritable();
- Text value = new Text();
- try (Reader reader = new Reader(new Configuration(), Reader.file(new Path(outputFile.toURI())))) {
- int i = 0;
- while (reader.next(key, value)) {
- assertEquals(i, key.get());
- assertEquals("value-" + i, value.toString());
- i++;
- }
- }
- }
-
- private void populateFile() throws IOException {
- IntWritable key = new IntWritable();
- Text value = new Text();
- try (Writer writer = SequenceFile.createWriter(
- new Configuration(),
- Writer.keyClass(IntWritable.class), Writer.valueClass(Text.class),
- Writer.file(new Path(this.inputFile.toURI())))) {
- for (int i = 0; i < 5; i++) {
- key.set(i);
- value.set("value-" + i);
- writer.append(key, value);
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41c4ca6a/runners/spark/src/test/java/com/cloudera/dataflow/spark/MultiOutputWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/MultiOutputWordCountTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/MultiOutputWordCountTest.java
deleted file mode 100644
index 2df8493..0000000
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/MultiOutputWordCountTest.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.dataflow.spark;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.runners.AggregatorValues;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.ApproximateUnique;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.Flatten;
-import com.google.cloud.dataflow.sdk.transforms.Max;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.Sum;
-import com.google.cloud.dataflow.sdk.transforms.View;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionList;
-import com.google.cloud.dataflow.sdk.values.PCollectionTuple;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.cloud.dataflow.sdk.values.TupleTagList;
-import com.google.common.collect.Iterables;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class MultiOutputWordCountTest {
-
- private static final TupleTag<String> upper = new TupleTag<>();
- private static final TupleTag<String> lower = new TupleTag<>();
- private static final TupleTag<KV<String, Long>> lowerCnts = new TupleTag<>();
- private static final TupleTag<KV<String, Long>> upperCnts = new TupleTag<>();
-
- @Test
- public void testRun() throws Exception {
- Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
- PCollection<String> regex = p.apply(Create.of("[^a-zA-Z']+"));
- PCollection<String> w1 = p.apply(Create.of("Here are some words to count", "and some others"));
- PCollection<String> w2 = p.apply(Create.of("Here are some more words", "and even more words"));
- PCollectionList<String> list = PCollectionList.of(w1).and(w2);
-
- PCollection<String> union = list.apply(Flatten.<String>pCollections());
- PCollectionView<String> regexView = regex.apply(View.<String>asSingleton());
- CountWords countWords = new CountWords(regexView);
- PCollectionTuple luc = union.apply(countWords);
- PCollection<Long> unique = luc.get(lowerCnts).apply(
- ApproximateUnique.<KV<String, Long>>globally(16));
-
- EvaluationResult res = SparkPipelineRunner.create().run(p);
- Iterable<KV<String, Long>> actualLower = res.get(luc.get(lowerCnts));
- Assert.assertEquals("are", actualLower.iterator().next().getKey());
- Iterable<KV<String, Long>> actualUpper = res.get(luc.get(upperCnts));
- Assert.assertEquals("Here", actualUpper.iterator().next().getKey());
- Iterable<Long> actualUniqCount = res.get(unique);
- Assert.assertEquals(9, (long) actualUniqCount.iterator().next());
- int actualTotalWords = res.getAggregatorValue("totalWords", Integer.class);
- Assert.assertEquals(18, actualTotalWords);
- int actualMaxWordLength = res.getAggregatorValue("maxWordLength", Integer.class);
- Assert.assertEquals(6, actualMaxWordLength);
- AggregatorValues<Integer> aggregatorValues = res.getAggregatorValues(countWords
- .getTotalWordsAggregator());
- Assert.assertEquals(18, Iterables.getOnlyElement(aggregatorValues.getValues()).intValue());
-
- res.close();
- }
-
- /**
- * A DoFn that tokenizes lines of text into individual words.
- */
- static class ExtractWordsFn extends DoFn<String, String> {
-
- private final Aggregator<Integer, Integer> totalWords = createAggregator("totalWords",
- new Sum.SumIntegerFn());
- private final Aggregator<Integer, Integer> maxWordLength = createAggregator("maxWordLength",
- new Max.MaxIntegerFn());
- private final PCollectionView<String> regex;
-
- ExtractWordsFn(PCollectionView<String> regex) {
- this.regex = regex;
- }
-
- @Override
- public void processElement(ProcessContext c) {
- String[] words = c.element().split(c.sideInput(regex));
- for (String word : words) {
- totalWords.addValue(1);
- if (!word.isEmpty()) {
- maxWordLength.addValue(word.length());
- if (Character.isLowerCase(word.charAt(0))) {
- c.output(word);
- } else {
- c.sideOutput(upper, word);
- }
- }
- }
- }
- }
-
- public static class CountWords extends PTransform<PCollection<String>, PCollectionTuple> {
-
- private final PCollectionView<String> regex;
- private final ExtractWordsFn extractWordsFn;
-
- public CountWords(PCollectionView<String> regex) {
- this.regex = regex;
- this.extractWordsFn = new ExtractWordsFn(regex);
- }
-
- @Override
- public PCollectionTuple apply(PCollection<String> lines) {
- // Convert lines of text into individual words.
- PCollectionTuple lowerUpper = lines
- .apply(ParDo.of(extractWordsFn)
- .withSideInputs(regex)
- .withOutputTags(lower, TupleTagList.of(upper)));
- lowerUpper.get(lower).setCoder(StringUtf8Coder.of());
- lowerUpper.get(upper).setCoder(StringUtf8Coder.of());
- PCollection<KV<String, Long>> lowerCounts = lowerUpper.get(lower).apply(Count
- .<String>perElement());
- PCollection<KV<String, Long>> upperCounts = lowerUpper.get(upper).apply(Count
- .<String>perElement());
- return PCollectionTuple
- .of(lowerCnts, lowerCounts)
- .and(upperCnts, upperCounts);
- }
-
- Aggregator<Integer, Integer> getTotalWordsAggregator() {
- return extractWordsFn.totalWords;
- }
- }
-}