You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2016/05/08 06:06:14 UTC
[1/2] incubator-beam git commit: [BEAM-267] Enable checkstyle in
Spark runner
Repository: incubator-beam
Updated Branches:
refs/heads/master ff825b077 -> 0f3b05335
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
index 1c83700..afcca93 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
@@ -18,13 +18,6 @@
package org.apache.beam.runners.spark.translation.streaming;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-
import org.apache.beam.runners.spark.translation.EvaluationContext;
import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
import org.apache.beam.sdk.Pipeline;
@@ -35,6 +28,7 @@ import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
+
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaRDDLike;
import org.apache.spark.api.java.JavaSparkContext;
@@ -43,6 +37,13 @@ import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaDStreamLike;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+
/**
* Streaming evaluation context helps to handle streaming.
@@ -179,11 +180,11 @@ public class StreamingEvaluationContext extends EvaluationContext {
//---------------- override in order to expose in package
@Override
- protected <I extends PInput> I getInput(PTransform<I, ?> transform) {
+ protected <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform) {
return super.getInput(transform);
}
@Override
- protected <O extends POutput> O getOutput(PTransform<?, O> transform) {
+ protected <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform) {
return super.getOutput(transform);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index d9daeb0..c1ecc43 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -17,18 +17,6 @@
*/
package org.apache.beam.runners.spark.translation.streaming;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import com.google.api.client.util.Lists;
-import com.google.api.client.util.Maps;
-import com.google.api.client.util.Sets;
-import com.google.common.reflect.TypeToken;
-import kafka.serializer.Decoder;
import org.apache.beam.runners.spark.io.ConsoleIO;
import org.apache.beam.runners.spark.io.CreateStream;
import org.apache.beam.runners.spark.io.KafkaIO;
@@ -58,6 +46,12 @@ import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PDone;
+
+import com.google.api.client.util.Lists;
+import com.google.api.client.util.Maps;
+import com.google.api.client.util.Sets;
+import com.google.common.reflect.TypeToken;
+
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
@@ -68,6 +62,15 @@ import org.apache.spark.streaming.api.java.JavaDStreamLike;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import kafka.serializer.Decoder;
import scala.Tuple2;
@@ -173,14 +176,14 @@ public final class StreamingTransformTranslator {
};
}
- private static <PT extends PTransform<?, ?>> TransformEvaluator<PT> rddTransform(
+ private static <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> rddTransform(
final SparkPipelineTranslator rddTranslator) {
- return new TransformEvaluator<PT>() {
+ return new TransformEvaluator<TransformT>() {
@SuppressWarnings("unchecked")
@Override
- public void evaluate(PT transform, EvaluationContext context) {
- TransformEvaluator<PT> rddEvaluator =
- rddTranslator.translate((Class<PT>) transform.getClass());
+ public void evaluate(TransformT transform, EvaluationContext context) {
+ TransformEvaluator<TransformT> rddEvaluator =
+ rddTranslator.translate((Class<TransformT>) transform.getClass());
StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
if (sec.hasStream(transform)) {
@@ -203,19 +206,20 @@ public final class StreamingTransformTranslator {
* RDD transform function If the transformation function doesn't have an input, create a fake one
* as an empty RDD.
*
- * @param <PT> PTransform type
+ * @param <TransformT> PTransform type
*/
- private static final class RDDTransform<PT extends PTransform<?, ?>>
+ private static final class RDDTransform<TransformT extends PTransform<?, ?>>
implements Function<JavaRDD<WindowedValue<Object>>, JavaRDD<WindowedValue<Object>>> {
private final StreamingEvaluationContext context;
private final AppliedPTransform<?, ?, ?> appliedPTransform;
- private final TransformEvaluator<PT> rddEvaluator;
- private final PT transform;
+ private final TransformEvaluator<TransformT> rddEvaluator;
+ private final TransformT transform;
- private RDDTransform(StreamingEvaluationContext context, TransformEvaluator<PT> rddEvaluator,
- PT transform) {
+ private RDDTransform(StreamingEvaluationContext context,
+ TransformEvaluator<TransformT> rddEvaluator,
+ TransformT transform) {
this.context = context;
this.appliedPTransform = context.getCurrentTransform();
this.rddEvaluator = rddEvaluator;
@@ -243,13 +247,13 @@ public final class StreamingTransformTranslator {
}
@SuppressWarnings("unchecked")
- private static <PT extends PTransform<?, ?>> TransformEvaluator<PT> foreachRDD(
+ private static <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> foreachRDD(
final SparkPipelineTranslator rddTranslator) {
- return new TransformEvaluator<PT>() {
+ return new TransformEvaluator<TransformT>() {
@Override
- public void evaluate(PT transform, EvaluationContext context) {
- TransformEvaluator<PT> rddEvaluator =
- rddTranslator.translate((Class<PT>) transform.getClass());
+ public void evaluate(TransformT transform, EvaluationContext context) {
+ TransformEvaluator<TransformT> rddEvaluator =
+ rddTranslator.translate((Class<TransformT>) transform.getClass());
StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
if (sec.hasStream(transform)) {
@@ -268,19 +272,19 @@ public final class StreamingTransformTranslator {
/**
* RDD output function.
*
- * @param <PT> PTransform type
+ * @param <TransformT> PTransform type
*/
- private static final class RDDOutputOperator<PT extends PTransform<?, ?>>
+ private static final class RDDOutputOperator<TransformT extends PTransform<?, ?>>
implements VoidFunction<JavaRDD<WindowedValue<Object>>> {
private final StreamingEvaluationContext context;
private final AppliedPTransform<?, ?, ?> appliedPTransform;
- private final TransformEvaluator<PT> rddEvaluator;
- private final PT transform;
+ private final TransformEvaluator<TransformT> rddEvaluator;
+ private final TransformT transform;
private RDDOutputOperator(StreamingEvaluationContext context,
- TransformEvaluator<PT> rddEvaluator, PT transform) {
+ TransformEvaluator<TransformT> rddEvaluator, TransformT transform) {
this.context = context;
this.appliedPTransform = context.getCurrentTransform();
this.rddEvaluator = rddEvaluator;
@@ -325,7 +329,7 @@ public final class StreamingTransformTranslator {
//--- then we apply windowing to the elements
DoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn);
DoFnFunction<T, T> dofn = new DoFnFunction<>(addWindowsDoFn,
- ((StreamingEvaluationContext)context).getRuntimeContext(), null);
+ ((StreamingEvaluationContext) context).getRuntimeContext(), null);
@SuppressWarnings("unchecked")
JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>> dstream =
(JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>>)
@@ -361,9 +365,10 @@ public final class StreamingTransformTranslator {
}
@SuppressWarnings("unchecked")
- private static <PT extends PTransform<?, ?>> TransformEvaluator<PT>
- getTransformEvaluator(Class<PT> clazz, SparkPipelineTranslator rddTranslator) {
- TransformEvaluator<PT> transform = (TransformEvaluator<PT>) EVALUATORS.get(clazz);
+ private static <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT>
+ getTransformEvaluator(Class<TransformT> clazz, SparkPipelineTranslator rddTranslator) {
+ TransformEvaluator<TransformT> transform =
+ (TransformEvaluator<TransformT>) EVALUATORS.get(clazz);
if (transform == null) {
if (UNSUPPORTED_EVALUATORS.contains(clazz)) {
throw new UnsupportedOperationException("Dataflow transformation " + clazz
@@ -383,7 +388,8 @@ public final class StreamingTransformTranslator {
return transform;
}
- private static <PT extends PTransform<?, ?>> Class<?> getPTransformOutputClazz(Class<PT> clazz) {
+ private static <TransformT extends PTransform<?, ?>> Class<?>
+ getPTransformOutputClazz(Class<TransformT> clazz) {
Type[] types = ((ParameterizedType) clazz.getGenericSuperclass()).getActualTypeArguments();
return TypeToken.of(clazz).resolveType(types[1]).getRawType();
}
@@ -407,7 +413,8 @@ public final class StreamingTransformTranslator {
}
@Override
- public <PT extends PTransform<?, ?>> TransformEvaluator<PT> translate(Class<PT> clazz) {
+ public <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT>
+ translate(Class<TransformT> clazz) {
return getTransformEvaluator(clazz, rddTranslator);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java
index 8c018d3..6e36102 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java
@@ -54,12 +54,12 @@ public final class StreamingWindowPipelineDetector extends SparkPipelineRunner.E
// Use the smallest window (fixed or sliding) as Spark streaming's batch duration
@Override
- protected <PT extends PTransform<? super PInput, POutput>> void
+ protected <TransformT extends PTransform<? super PInput, POutput>> void
doVisitTransform(TransformTreeNode node) {
@SuppressWarnings("unchecked")
- PT transform = (PT) node.getTransform();
+ TransformT transform = (TransformT) node.getTransform();
@SuppressWarnings("unchecked")
- Class<PT> transformClass = (Class<PT>) (Class<?>) transform.getClass();
+ Class<TransformT> transformClass = (Class<TransformT>) (Class<?>) transform.getClass();
if (transformClass.isAssignableFrom(Window.Bound.class)) {
WindowFn<?, ?> windowFn = WINDOW_FG.get("windowFn", transform);
if (windowFn instanceof FixedWindows) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
index 1824a9d..d3fa05a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
@@ -18,17 +18,21 @@
package org.apache.beam.runners.spark.util;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.Serializable;
-
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.sdk.coders.Coder;
+
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Broadcast helper.
+ */
public abstract class BroadcastHelper<T> implements Serializable {
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/util/ByteArray.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/ByteArray.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/ByteArray.java
index b1254d4..8c493f5 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/ByteArray.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/ByteArray.java
@@ -17,11 +17,14 @@
*/
package org.apache.beam.runners.spark.util;
+import com.google.common.primitives.UnsignedBytes;
+
import java.io.Serializable;
import java.util.Arrays;
-import com.google.common.primitives.UnsignedBytes;
-
+/**
+ * Serializable byte array.
+ */
public class ByteArray implements Serializable, Comparable<ByteArray> {
private final byte[] value;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
index 9a8aa2e..654614a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
@@ -24,6 +24,9 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PInput;
+/**
+ * A {@link PTransform} wrapping another transform.
+ */
public class SinglePrimitiveOutputPTransform<T> extends PTransform<PInput, PCollection<T>> {
private PTransform<PInput, PCollection<T>> transform;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
index f9b00cc..7b25e34 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
@@ -35,6 +35,9 @@ import org.junit.Test;
import java.util.Collections;
import java.util.List;
+/**
+ * Empty input test.
+ */
public class EmptyInputTest {
@Test
@@ -51,6 +54,9 @@ public class EmptyInputTest {
res.close();
}
+ /**
+ * Concat words serizaliable function used in test.
+ */
public static class ConcatWords implements SerializableFunction<Iterable<String>, String> {
@Override
public String apply(Iterable<String> input) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
index 4e9c0b8..eee120e 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
@@ -18,10 +18,13 @@
package org.apache.beam.runners.spark;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
@@ -32,16 +35,14 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.commons.io.FileUtils;
-import org.junit.rules.TemporaryFolder;
-import org.junit.Rule;
-import org.junit.Test;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertThat;
+import org.apache.commons.io.FileUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.util.Arrays;
@@ -49,6 +50,9 @@ import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;
+/**
+ * Simple word count test.
+ */
public class SimpleWordCountTest {
private static final String[] WORDS_ARRAY = {
"hi there", "hi", "hi sue bob",
@@ -133,6 +137,9 @@ public class SimpleWordCountTest {
}
}
+ /**
+ * A {@link PTransform} counting words.
+ */
public static class CountWords extends PTransform<PCollection<String>, PCollection<String>> {
@Override
public PCollection<String> apply(PCollection<String> lines) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
index 3643bac..88f4a06 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
@@ -18,19 +18,21 @@
package org.apache.beam.runners.spark;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import java.util.ServiceLoader;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
/**
* Test {@link SparkRunnerRegistrar}.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index 693e2c6..f358878 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -47,6 +47,9 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.util.List;
+/**
+ * Avro pipeline test.
+ */
public class AvroPipelineTest {
private File inputFile;
@@ -82,7 +85,8 @@ public class AvroPipelineTest {
assertEquals(Lists.newArrayList(savedRecord), records);
}
- private void populateGenericFile(List<GenericRecord> genericRecords, Schema schema) throws IOException {
+ private void populateGenericFile(List<GenericRecord> genericRecords,
+ Schema schema) throws IOException {
FileOutputStream outputStream = new FileOutputStream(this.inputFile);
GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<>(schema);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
index 85eeabd..8ce35c4 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
@@ -49,6 +49,9 @@ import java.util.Arrays;
import java.util.List;
import java.util.Set;
+/**
+ * Number of shards test.
+ */
public class NumShardsTest {
private static final String[] WORDS_ARRAY = {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
index 0c8c6fc..eaa508c 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
@@ -47,6 +47,9 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
+/**
+ * Pipeline on the Hadoop file format test.
+ */
public class HadoopFileFormatPipelineTest {
private File inputFile;
@@ -69,16 +72,21 @@ public class HadoopFileFormatPipelineTest {
Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
@SuppressWarnings("unchecked")
Class<? extends FileInputFormat<IntWritable, Text>> inputFormatClass =
- (Class<? extends FileInputFormat<IntWritable, Text>>) (Class<?>) SequenceFileInputFormat.class;
- HadoopIO.Read.Bound<IntWritable,Text> read =
- HadoopIO.Read.from(inputFile.getAbsolutePath(), inputFormatClass, IntWritable.class, Text.class);
+ (Class<? extends FileInputFormat<IntWritable, Text>>)
+ (Class<?>) SequenceFileInputFormat.class;
+ HadoopIO.Read.Bound<IntWritable, Text> read =
+ HadoopIO.Read.from(inputFile.getAbsolutePath(),
+ inputFormatClass,
+ IntWritable.class,
+ Text.class);
PCollection<KV<IntWritable, Text>> input = p.apply(read)
.setCoder(KvCoder.of(WritableCoder.of(IntWritable.class), WritableCoder.of(Text.class)));
@SuppressWarnings("unchecked")
Class<? extends FileOutputFormat<IntWritable, Text>> outputFormatClass =
- (Class<? extends FileOutputFormat<IntWritable, Text>>) (Class<?>) TemplatedSequenceFileOutputFormat.class;
+ (Class<? extends FileOutputFormat<IntWritable, Text>>)
+ (Class<?>) TemplatedSequenceFileOutputFormat.class;
@SuppressWarnings("unchecked")
- HadoopIO.Write.Bound<IntWritable,Text> write = HadoopIO.Write.to(outputFile.getAbsolutePath(),
+ HadoopIO.Write.Bound<IntWritable, Text> write = HadoopIO.Write.to(outputFile.getAbsolutePath(),
outputFormatClass, IntWritable.class, Text.class);
input.apply(write.withoutSharding());
EvaluationResult res = SparkPipelineRunner.create().run(p);
@@ -86,7 +94,8 @@ public class HadoopFileFormatPipelineTest {
IntWritable key = new IntWritable();
Text value = new Text();
- try (Reader reader = new Reader(new Configuration(), Reader.file(new Path(outputFile.toURI())))) {
+ try (Reader reader = new Reader(new Configuration(),
+ Reader.file(new Path(outputFile.toURI())))) {
int i = 0;
while (reader.next(key, value)) {
assertEquals(i, key.get());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java
index 55991a4..e1620db 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java
@@ -28,6 +28,9 @@ import static org.junit.Assert.assertEquals;
import org.junit.Test;
+/**
+ * Test on the {@link ShardNameBuilder}.
+ */
public class ShardNameBuilderTest {
@Test
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
index a644673..ac64540 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
@@ -37,6 +37,9 @@ import org.junit.Test;
import java.util.Arrays;
import java.util.List;
+/**
+ * Combine globally test.
+ */
public class CombineGloballyTest {
private static final String[] WORDS_ARRAY = {
@@ -53,10 +56,14 @@ public class CombineGloballyTest {
PCollection<String> output = inputWords.apply(Combine.globally(new WordMerger()));
EvaluationResult res = SparkPipelineRunner.create().run(p);
- assertEquals("hi there,hi,hi sue bob,hi sue,,bob hi", Iterables.getOnlyElement(res.get(output)));
+ assertEquals("hi there,hi,hi sue bob,hi sue,,bob hi",
+ Iterables.getOnlyElement(res.get(output)));
res.close();
}
+ /**
+ * Word merger combine function used in the test.
+ */
public static class WordMerger extends Combine.CombineFn<String, StringBuilder, String> {
@Override
@@ -83,7 +90,7 @@ public class CombineGloballyTest {
@Override
public String extractOutput(StringBuilder accumulator) {
- return accumulator != null ? accumulator.toString(): "";
+ return accumulator != null ? accumulator.toString() : "";
}
private static StringBuilder combine(StringBuilder accum, String datum) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
index 4e0bc5d..4e6c888 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
@@ -42,6 +42,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+/**
+ * Combine per key function test.
+ */
public class CombinePerKeyTest {
private static final List<String> WORDS =
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
index ca97a96..0334bfe 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
@@ -33,6 +33,9 @@ import org.junit.Test;
import java.io.Serializable;
+/**
+ * DoFN output test.
+ */
public class DoFnOutputTest implements Serializable {
@Test
public void test() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
index 6a862c9..3402bb4 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
@@ -52,6 +52,9 @@ import org.junit.Test;
import java.util.Set;
+/**
+ * Multi-output word count test.
+ */
public class MultiOutputWordCountTest {
private static final TupleTag<String> upper = new TupleTag<>();
@@ -128,6 +131,9 @@ public class MultiOutputWordCountTest {
}
}
+ /**
+ * Count words {@link PTransform} used in the test.
+ */
public static class CountWords extends PTransform<PCollection<String>, PCollectionTuple> {
private final PCollectionView<String> regex;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
index 75d3fb2..22a2241 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
@@ -51,8 +51,14 @@ import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;
+/**
+ * Serialization test.
+ */
public class SerializationTest {
+ /**
+ * Simple String holder.
+ */
public static class StringHolder { // not serializable
private final String string;
@@ -80,12 +86,17 @@ public class SerializationTest {
}
}
+ /**
+ * Simple String holder with UTF-8 encoding.
+ */
public static class StringHolderUtf8Coder extends AtomicCoder<StringHolder> {
private final StringUtf8Coder stringUtf8Coder = StringUtf8Coder.of();
@Override
- public void encode(StringHolder value, OutputStream outStream, Context context) throws IOException {
+ public void encode(StringHolder value,
+ OutputStream outStream,
+ Context context) throws IOException {
stringUtf8Coder.encode(value.toString(), outStream, context);
}
@@ -171,7 +182,8 @@ public class SerializationTest {
}
}
- private static class CountWords extends PTransform<PCollection<StringHolder>, PCollection<StringHolder>> {
+ private static class CountWords
+ extends PTransform<PCollection<StringHolder>, PCollection<StringHolder>> {
@Override
public PCollection<StringHolder> apply(PCollection<StringHolder> lines) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
index 14abbfc..5674900 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
@@ -38,6 +38,9 @@ import org.junit.Test;
import java.io.Serializable;
import java.net.URI;
+/**
+ * Side effects test.
+ */
public class SideEffectsTest implements Serializable {
static class UserException extends RuntimeException {
@@ -66,7 +69,8 @@ public class SideEffectsTest implements Serializable {
// TODO: remove the version check (and the setup and teardown methods) when we no
// longer support Spark 1.3 or 1.4
- String version = SparkContextFactory.getSparkContext(options.getSparkMaster(), options.getAppName()).version();
+ String version = SparkContextFactory.getSparkContext(options.getSparkMaster(),
+ options.getAppName()).version();
if (!version.startsWith("1.3.") && !version.startsWith("1.4.")) {
assertTrue(e.getCause() instanceof UserException);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsTest.java
index bf18486..59888c2 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsTest.java
@@ -24,6 +24,9 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.junit.Assert;
import org.junit.Test;
+/**
+ * Simple test on the Spark runner pipeline options.
+ */
public class SparkPipelineOptionsTest {
@Test
public void testDefaultCreateMethod() {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
index 0db8913..8062658 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
@@ -39,6 +39,9 @@ import org.junit.Test;
import java.util.Arrays;
import java.util.List;
+/**
+ * Windowed word count test.
+ */
public class WindowedWordCountTest {
private static final String[] WORDS_ARRAY = {
"hi there", "hi", "hi sue bob", "hi sue", "", "bob hi"};
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
index 9152d72..15b2f39 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
@@ -63,7 +63,7 @@ public class FlattenStreamingTest {
PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class);
options.setAppName(this.getClass().getSimpleName());
options.setRunner(SparkPipelineRunner.class);
- options.setTimeout(TEST_TIMEOUT_MSEC);// run for one interval
+ options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval
Pipeline p = Pipeline.create(options);
PCollection<String> w1 =
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
index e1ff227..fd75e74 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
@@ -99,7 +99,7 @@ public class KafkaStreamingTest {
PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class);
options.setAppName(this.getClass().getSimpleName());
options.setRunner(SparkPipelineRunner.class);
- options.setTimeout(TEST_TIMEOUT_MSEC);// run for one interval
+ options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval
Pipeline p = Pipeline.create(options);
Map<String, String> kafkaParams = ImmutableMap.of(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
index ef224da..28133ca 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
@@ -42,6 +42,9 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
+/**
+ * Simple word count streaming test.
+ */
public class SimpleStreamingWordCountTest {
private static final String[] WORDS_ARRAY = {
@@ -58,7 +61,7 @@ public class SimpleStreamingWordCountTest {
PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class);
options.setAppName(this.getClass().getSimpleName());
options.setRunner(SparkPipelineRunner.class);
- options.setTimeout(TEST_TIMEOUT_MSEC);// run for one interval
+ options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval
Pipeline p = Pipeline.create(options);
PCollection<String> inputWords =
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java
index 2ade467..0fec573 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java
@@ -39,6 +39,7 @@ import kafka.server.KafkaServer;
import kafka.utils.Time;
/**
+ * Embedded Kafka cluster.
* https://gist.github.com/fjavieralba/7930018
*/
public class EmbeddedKafkaCluster {
@@ -169,6 +170,9 @@ public class EmbeddedKafkaCluster {
return "EmbeddedKafkaCluster{" + "brokerList='" + brokerList + "'}";
}
+ /**
+ * Embedded Zookeeper.
+ */
public static class EmbeddedZookeeper {
private int port = -1;
private int tickTime = 500;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
index 041cc50..3d8fc32 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
@@ -26,8 +26,9 @@ import org.junit.Assert;
* success/failure counters.
*/
public final class PAssertStreaming {
+
/**
- * Copied aggregator names from {@link org.apache.beam.sdk.testing.PAssert}
+ * Copied aggregator names from {@link org.apache.beam.sdk.testing.PAssert}.
*/
static final String SUCCESS_COUNTER = "PAssertSuccess";
static final String FAILURE_COUNTER = "PAssertFailure";
[2/2] incubator-beam git commit: [BEAM-267] Enable checkstyle in
Spark runner
Posted by jb...@apache.org.
[BEAM-267] Enable checkstyle in Spark runner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0f3b0533
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0f3b0533
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0f3b0533
Branch: refs/heads/master
Commit: 0f3b053356f3321d08d4e2ee457a037df778bee4
Parents: ff825b0
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Fri May 6 17:47:46 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Sun May 8 07:59:26 2016 +0200
----------------------------------------------------------------------
runners/spark/pom.xml | 2 -
.../runners/spark/SparkPipelineOptions.java | 3 +
.../beam/runners/spark/SparkPipelineRunner.java | 13 +-
.../runners/spark/SparkRunnerRegistrar.java | 5 +-
.../spark/aggregators/AggAccumParam.java | 4 +
.../spark/aggregators/NamedAggregators.java | 64 +++--
.../beam/runners/spark/coders/CoderHelpers.java | 13 +-
.../runners/spark/coders/NullWritableCoder.java | 10 +-
.../runners/spark/coders/WritableCoder.java | 19 +-
.../apache/beam/runners/spark/io/ConsoleIO.java | 7 +
.../beam/runners/spark/io/CreateStream.java | 6 +-
.../apache/beam/runners/spark/io/KafkaIO.java | 18 +-
.../beam/runners/spark/io/hadoop/HadoopIO.java | 27 +-
.../spark/io/hadoop/ShardNameBuilder.java | 7 +-
.../io/hadoop/ShardNameTemplateHelper.java | 7 +-
.../io/hadoop/TemplatedAvroKeyOutputFormat.java | 9 +-
.../TemplatedSequenceFileOutputFormat.java | 7 +-
.../io/hadoop/TemplatedTextOutputFormat.java | 7 +-
.../runners/spark/translation/DoFnFunction.java | 38 +--
.../spark/translation/EvaluationContext.java | 26 +-
.../spark/translation/MultiDoFnFunction.java | 42 +--
.../spark/translation/SparkContextFactory.java | 3 +
.../translation/SparkPipelineEvaluator.java | 13 +-
.../translation/SparkPipelineTranslator.java | 3 +-
.../spark/translation/SparkProcessContext.java | 72 ++---
.../spark/translation/SparkRuntimeContext.java | 51 ++--
.../spark/translation/TransformEvaluator.java | 11 +-
.../spark/translation/TransformTranslator.java | 271 +++++++++++--------
.../streaming/StreamingEvaluationContext.java | 19 +-
.../streaming/StreamingTransformTranslator.java | 85 +++---
.../StreamingWindowPipelineDetector.java | 6 +-
.../runners/spark/util/BroadcastHelper.java | 12 +-
.../beam/runners/spark/util/ByteArray.java | 7 +-
.../util/SinglePrimitiveOutputPTransform.java | 3 +
.../beam/runners/spark/EmptyInputTest.java | 6 +
.../beam/runners/spark/SimpleWordCountTest.java | 21 +-
.../runners/spark/SparkRunnerRegistrarTest.java | 12 +-
.../beam/runners/spark/io/AvroPipelineTest.java | 6 +-
.../beam/runners/spark/io/NumShardsTest.java | 3 +
.../io/hadoop/HadoopFileFormatPipelineTest.java | 21 +-
.../spark/io/hadoop/ShardNameBuilderTest.java | 3 +
.../spark/translation/CombineGloballyTest.java | 11 +-
.../spark/translation/CombinePerKeyTest.java | 3 +
.../spark/translation/DoFnOutputTest.java | 3 +
.../translation/MultiOutputWordCountTest.java | 6 +
.../spark/translation/SerializationTest.java | 16 +-
.../spark/translation/SideEffectsTest.java | 6 +-
.../translation/SparkPipelineOptionsTest.java | 3 +
.../translation/WindowedWordCountTest.java | 3 +
.../streaming/FlattenStreamingTest.java | 2 +-
.../streaming/KafkaStreamingTest.java | 2 +-
.../streaming/SimpleStreamingWordCountTest.java | 5 +-
.../streaming/utils/EmbeddedKafkaCluster.java | 4 +
.../streaming/utils/PAssertStreaming.java | 3 +-
54 files changed, 634 insertions(+), 395 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index e673246..5daf1e1 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -211,12 +211,10 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
</plugin>
- <!-- Checkstyle errors for now
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
</plugin>
- -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index bdf832b..091382e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -24,6 +24,9 @@ import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
+/**
+ * Spark runner pipeline options.
+ */
public interface SparkPipelineOptions extends PipelineOptions, StreamingOptions,
ApplicationNameOptions {
@Description("The url of the spark master to connect to, (e.g. spark://host:port, local[4]).")
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
index 8635cfb..bae4e53 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
@@ -120,14 +120,14 @@ public final class SparkPipelineRunner extends PipelineRunner<EvaluationResult>
*/
@SuppressWarnings("rawtypes")
@Override
- public <OT extends POutput, IT extends PInput> OT apply(
- PTransform<IT, OT> transform, IT input) {
+ public <OutputT extends POutput, InputT extends PInput> OutputT apply(
+ PTransform<InputT, OutputT> transform, InputT input) {
if (transform instanceof GroupByKey) {
- return (OT) ((PCollection) input).apply(
+ return (OutputT) ((PCollection) input).apply(
new GroupByKeyViaGroupByKeyOnly((GroupByKey) transform));
} else if (transform instanceof Create.Values) {
- return (OT) super.apply(
+ return (OutputT) super.apply(
new SinglePrimitiveOutputPTransform((Create.Values) transform), input);
} else {
return super.apply(transform, input);
@@ -216,6 +216,9 @@ public final class SparkPipelineRunner extends PipelineRunner<EvaluationResult>
return new StreamingEvaluationContext(jsc, pipeline, jssc, streamingOptions.getTimeout());
}
+ /**
+ * Evaluator on the pipeline.
+ */
public abstract static class Evaluator implements Pipeline.PipelineVisitor {
protected static final Logger LOG = LoggerFactory.getLogger(Evaluator.class);
@@ -275,7 +278,7 @@ public final class SparkPipelineRunner extends PipelineRunner<EvaluationResult>
doVisitTransform(node);
}
- protected abstract <PT extends PTransform<? super PInput, POutput>> void
+ protected abstract <TransformT extends PTransform<? super PInput, POutput>> void
doVisitTransform(TransformTreeNode node);
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
index 30142f9..9537ec6 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
@@ -18,13 +18,14 @@
package org.apache.beam.runners.spark;
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+
/**
* Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the
* {@link SparkPipelineRunner}.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggAccumParam.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggAccumParam.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggAccumParam.java
index a75aeb3..9ce8b33 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggAccumParam.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggAccumParam.java
@@ -20,7 +20,11 @@ package org.apache.beam.runners.spark.aggregators;
import org.apache.spark.AccumulatorParam;
+/**
+ * Aggregator accumulator param.
+ */
public class AggAccumParam implements AccumulatorParam<NamedAggregators> {
+
@Override
public NamedAggregators addAccumulator(NamedAggregators current, NamedAggregators added) {
return current.merge(added);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
index 64c473e..6ab6dc9 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
@@ -18,6 +18,13 @@
package org.apache.beam.runners.spark.aggregators;
+import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.Combine;
+
+import com.google.common.collect.ImmutableList;
+
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
@@ -25,12 +32,6 @@ import java.io.Serializable;
import java.util.Map;
import java.util.TreeMap;
-import com.google.common.collect.ImmutableList;
-import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.Combine;
-
/**
* This class wraps a map of named aggregators. Spark expects that all accumulators be declared
* before a job is launched. Dataflow allows aggregators to be used and incremented on the fly.
@@ -96,8 +97,10 @@ public class NamedAggregators implements Serializable {
* so require some casting.
*/
@SuppressWarnings("unchecked")
- private static <A, B, C> State<A, B, C> merge(State<?, ?, ?> s1, State<?, ?, ?> s2) {
- return ((State<A, B, C>) s1).merge((State<A, B, C>) s2);
+ private static <InputT, InterT, OutputT> State<InputT, InterT, OutputT> merge(
+ State<?, ?, ?> s1,
+ State<?, ?, ?> s2) {
+ return ((State<InputT, InterT, OutputT>) s1).merge((State<InputT, InterT, OutputT>) s2);
}
@Override
@@ -110,38 +113,39 @@ public class NamedAggregators implements Serializable {
}
/**
- * @param <IN> Input data type
- * @param <INTER> Intermediate data type (useful for averages)
- * @param <OUT> Output data type
+ * @param <InputT> Input data type
+ * @param <InterT> Intermediate data type (useful for averages)
+ * @param <OutputT> Output data type
*/
- public interface State<IN, INTER, OUT> extends Serializable {
+ public interface State<InputT, InterT, OutputT> extends Serializable {
/**
* @param element new element to update state
*/
- void update(IN element);
+ void update(InputT element);
- State<IN, INTER, OUT> merge(State<IN, INTER, OUT> other);
+ State<InputT, InterT, OutputT> merge(State<InputT, InterT, OutputT> other);
- INTER current();
+ InterT current();
- OUT render();
+ OutputT render();
- Combine.CombineFn<IN, INTER, OUT> getCombineFn();
+ Combine.CombineFn<InputT, InterT, OutputT> getCombineFn();
}
/**
* => combineFunction in data flow.
*/
- public static class CombineFunctionState<IN, INTER, OUT> implements State<IN, INTER, OUT> {
+ public static class CombineFunctionState<InputT, InterT, OutpuT>
+ implements State<InputT, InterT, OutpuT> {
- private Combine.CombineFn<IN, INTER, OUT> combineFn;
- private Coder<IN> inCoder;
+ private Combine.CombineFn<InputT, InterT, OutpuT> combineFn;
+ private Coder<InputT> inCoder;
private SparkRuntimeContext ctxt;
- private transient INTER state;
+ private transient InterT state;
public CombineFunctionState(
- Combine.CombineFn<IN, INTER, OUT> combineFn,
- Coder<IN> inCoder,
+ Combine.CombineFn<InputT, InterT, OutpuT> combineFn,
+ Coder<InputT> inCoder,
SparkRuntimeContext ctxt) {
this.combineFn = combineFn;
this.inCoder = inCoder;
@@ -150,28 +154,28 @@ public class NamedAggregators implements Serializable {
}
@Override
- public void update(IN element) {
+ public void update(InputT element) {
combineFn.addInput(state, element);
}
@Override
- public State<IN, INTER, OUT> merge(State<IN, INTER, OUT> other) {
+ public State<InputT, InterT, OutpuT> merge(State<InputT, InterT, OutpuT> other) {
this.state = combineFn.mergeAccumulators(ImmutableList.of(current(), other.current()));
return this;
}
@Override
- public INTER current() {
+ public InterT current() {
return state;
}
@Override
- public OUT render() {
+ public OutpuT render() {
return combineFn.extractOutput(state);
}
@Override
- public Combine.CombineFn<IN, INTER, OUT> getCombineFn() {
+ public Combine.CombineFn<InputT, InterT, OutpuT> getCombineFn() {
return combineFn;
}
@@ -190,8 +194,8 @@ public class NamedAggregators implements Serializable {
@SuppressWarnings("unchecked")
private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
ctxt = (SparkRuntimeContext) ois.readObject();
- combineFn = (Combine.CombineFn<IN, INTER, OUT>) ois.readObject();
- inCoder = (Coder<IN>) ois.readObject();
+ combineFn = (Combine.CombineFn<InputT, InterT, OutpuT>) ois.readObject();
+ inCoder = (Coder<InputT>) ois.readObject();
try {
state = combineFn.getAccumulatorCoder(ctxt.getCoderRegistry(), inCoder)
.decode(ois, Coder.Context.NESTED);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/CoderHelpers.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/CoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/CoderHelpers.java
index 7dc6af6..07587fc 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/CoderHelpers.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/CoderHelpers.java
@@ -18,17 +18,20 @@
package org.apache.beam.runners.spark.coders;
+import org.apache.beam.runners.spark.util.ByteArray;
+import org.apache.beam.sdk.coders.Coder;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFunction;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
-import com.google.common.collect.Iterables;
-import org.apache.beam.runners.spark.util.ByteArray;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/NullWritableCoder.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/NullWritableCoder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/NullWritableCoder.java
index 529d67b..7cff325 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/NullWritableCoder.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/NullWritableCoder.java
@@ -18,13 +18,17 @@
package org.apache.beam.runners.spark.coders;
-import java.io.InputStream;
-import java.io.OutputStream;
+import org.apache.beam.sdk.coders.Coder;
import com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.beam.sdk.coders.Coder;
import org.apache.hadoop.io.NullWritable;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * Simple writable coder for Null.
+ */
public final class NullWritableCoder extends WritableCoder<NullWritable> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java
index f2836fe..4719e46 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java
@@ -18,6 +18,16 @@
package org.apache.beam.runners.spark.coders;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.util.CloudObject;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -26,15 +36,6 @@ import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.util.List;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StandardCoder;
-import org.apache.beam.sdk.util.CloudObject;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-
/**
* A {@code WritableCoder} is a {@link Coder} for a Java class that implements {@link Writable}.
*
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java
index 41dc367..eefea77 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java
@@ -29,6 +29,9 @@ public final class ConsoleIO {
private ConsoleIO() {
}
+ /**
+ * Write on the console.
+ */
public static final class Write {
private Write() {
@@ -42,6 +45,10 @@ public final class ConsoleIO {
return new Unbound<>(num);
}
+ /**
+ * {@link PTransform} writing {@link PCollection} on the console.
+ * @param <T>
+ */
public static class Unbound<T> extends PTransform<PCollection<T>, PDone> {
private final int num;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
index 917f8a0..e7a9971 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
@@ -17,12 +17,13 @@
*/
package org.apache.beam.runners.spark.io;
-import com.google.common.base.Preconditions;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
+import com.google.common.base.Preconditions;
+
/**
* Create an input stream from Queue.
*
@@ -44,6 +45,9 @@ public final class CreateStream<T> {
return new QueuedValues<>(queuedValues);
}
+ /**
+ * {@link PTransform} for queueing values.
+ */
public static final class QueuedValues<T> extends PTransform<PInput, PCollection<T>> {
private final Iterable<Iterable<T>> queuedValues;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/io/KafkaIO.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/KafkaIO.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/KafkaIO.java
index 1592bec..a97d86e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/KafkaIO.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/KafkaIO.java
@@ -17,17 +17,19 @@
*/
package org.apache.beam.runners.spark.io;
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.base.Preconditions;
-import kafka.serializer.Decoder;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
+import com.google.common.base.Preconditions;
+
+import java.util.Map;
+import java.util.Set;
+
+import kafka.serializer.Decoder;
+
/**
* Read stream from Kafka.
*/
@@ -36,6 +38,9 @@ public final class KafkaIO {
private KafkaIO() {
}
+ /**
+ * Read operation from Kafka topics.
+ */
public static final class Read {
private Read() {
@@ -62,6 +67,9 @@ public final class KafkaIO {
return new Unbound<>(keyDecoder, valueDecoder, key, value, topics, kafkaParams);
}
+ /**
+ * A {@link PTransform} reading from Kafka topics and providing {@link PCollection}.
+ */
public static class Unbound<K, V> extends PTransform<PInput, PCollection<KV<K, V>>> {
private final Class<? extends Decoder<K>> keyDecoderClass;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
index 5b50d3e..00c10d4 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
@@ -17,10 +17,6 @@
*/
package org.apache.beam.runners.spark.io.hadoop;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.google.common.base.Preconditions;
import org.apache.beam.sdk.io.ShardNameTemplate;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -28,14 +24,26 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PInput;
+
+import com.google.common.base.Preconditions;
+
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Spark native HadoopIO.
+ */
public final class HadoopIO {
private HadoopIO() {
}
+ /**
+ * Read operation from HDFS.
+ */
public static final class Read {
private Read() {
@@ -46,6 +54,11 @@ public final class HadoopIO {
return new Bound<>(filepattern, format, key, value);
}
+ /**
+ * A {@link PTransform} reading bounded collection of data from HDFS.
+ * @param <K>
+ * @param <V>
+ */
public static class Bound<K, V> extends PTransform<PInput, PCollection<KV<K, V>>> {
private final String filepattern;
@@ -95,6 +108,9 @@ public final class HadoopIO {
}
+ /**
+ * Write operation on HDFS.
+ */
public static final class Write {
private Write() {
@@ -105,6 +121,9 @@ public final class HadoopIO {
return new Bound<>(filenamePrefix, format, key, value);
}
+ /**
+ * A {@link PTransform} writing {@link PCollection} on HDFS.
+ */
public static class Bound<K, V> extends PTransform<PCollection<KV<K, V>>, PDone> {
/** The filename to write to. */
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilder.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilder.java
index c768340..6b36427 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilder.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilder.java
@@ -18,11 +18,14 @@
package org.apache.beam.runners.spark.io.hadoop;
+import org.apache.hadoop.fs.Path;
+
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import org.apache.hadoop.fs.Path;
-
+/**
+ * Shard name builder.
+ */
public final class ShardNameBuilder {
private ShardNameBuilder() {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateHelper.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateHelper.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateHelper.java
index 2267ccb..d06b016 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateHelper.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateHelper.java
@@ -20,8 +20,6 @@ package org.apache.beam.runners.spark.io.hadoop;
import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardNumber;
-import java.io.IOException;
-
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskID;
@@ -30,6 +28,11 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+
+/**
+ * Shard name template helper.
+ */
public final class ShardNameTemplateHelper {
private static final Logger LOG = LoggerFactory.getLogger(ShardNameTemplateHelper.class);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedAvroKeyOutputFormat.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedAvroKeyOutputFormat.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedAvroKeyOutputFormat.java
index b755928..f747e7b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedAvroKeyOutputFormat.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedAvroKeyOutputFormat.java
@@ -18,14 +18,17 @@
package org.apache.beam.runners.spark.io.hadoop;
-import java.io.IOException;
-import java.io.OutputStream;
-
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Templated Avro key output format.
+ */
public class TemplatedAvroKeyOutputFormat<T> extends AvroKeyOutputFormat<T>
implements ShardNameTemplateAware {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedSequenceFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedSequenceFileOutputFormat.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedSequenceFileOutputFormat.java
index 35b6163..bd2ee4d 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedSequenceFileOutputFormat.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedSequenceFileOutputFormat.java
@@ -18,13 +18,16 @@
package org.apache.beam.runners.spark.io.hadoop;
-import java.io.IOException;
-
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import java.io.IOException;
+
+/**
+ * Templated sequence file output format.
+ */
public class TemplatedSequenceFileOutputFormat<K, V> extends SequenceFileOutputFormat<K, V>
implements ShardNameTemplateAware {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedTextOutputFormat.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedTextOutputFormat.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedTextOutputFormat.java
index 8f0c0d2..8725a95 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedTextOutputFormat.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedTextOutputFormat.java
@@ -18,13 +18,16 @@
package org.apache.beam.runners.spark.io.hadoop;
-import java.io.IOException;
-
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import java.io.IOException;
+
+/**
+ * Templates text output format.
+ */
public class TemplatedTextOutputFormat<K, V> extends TextOutputFormat<K, V>
implements ShardNameTemplateAware {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
index fbc9e98..b5888bd 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
@@ -18,26 +18,28 @@
package org.apache.beam.runners.spark.translation;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
import org.apache.beam.runners.spark.util.BroadcastHelper;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TupleTag;
+
import org.apache.spark.api.java.function.FlatMapFunction;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
/**
* Dataflow's Do functions correspond to Spark's FlatMap functions.
*
- * @param <I> Input element type.
- * @param <O> Output element type.
+ * @param <InputT> Input element type.
+ * @param <OutputT> Output element type.
*/
-public class DoFnFunction<I, O> implements FlatMapFunction<Iterator<WindowedValue<I>>,
- WindowedValue<O>> {
- private final DoFn<I, O> mFunction;
+public class DoFnFunction<InputT, OutputT>
+ implements FlatMapFunction<Iterator<WindowedValue<InputT>>,
+ WindowedValue<OutputT>> {
+ private final DoFn<InputT, OutputT> mFunction;
private final SparkRuntimeContext mRuntimeContext;
private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs;
@@ -46,7 +48,7 @@ public class DoFnFunction<I, O> implements FlatMapFunction<Iterator<WindowedValu
* @param runtime Runtime to apply function in.
* @param sideInputs Side inputs used in DoFunction.
*/
- public DoFnFunction(DoFn<I, O> fn,
+ public DoFnFunction(DoFn<InputT, OutputT> fn,
SparkRuntimeContext runtime,
Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) {
this.mFunction = fn;
@@ -55,7 +57,7 @@ public class DoFnFunction<I, O> implements FlatMapFunction<Iterator<WindowedValu
}
@Override
- public Iterable<WindowedValue<O>> call(Iterator<WindowedValue<I>> iter) throws
+ public Iterable<WindowedValue<OutputT>> call(Iterator<WindowedValue<InputT>> iter) throws
Exception {
ProcCtxt ctxt = new ProcCtxt(mFunction, mRuntimeContext, mSideInputs);
ctxt.setup();
@@ -63,23 +65,23 @@ public class DoFnFunction<I, O> implements FlatMapFunction<Iterator<WindowedValu
return ctxt.getOutputIterable(iter, mFunction);
}
- private class ProcCtxt extends SparkProcessContext<I, O, WindowedValue<O>> {
+ private class ProcCtxt extends SparkProcessContext<InputT, OutputT, WindowedValue<OutputT>> {
- private final List<WindowedValue<O>> outputs = new LinkedList<>();
+ private final List<WindowedValue<OutputT>> outputs = new LinkedList<>();
- ProcCtxt(DoFn<I, O> fn, SparkRuntimeContext runtimeContext, Map<TupleTag<?>,
+ ProcCtxt(DoFn<InputT, OutputT> fn, SparkRuntimeContext runtimeContext, Map<TupleTag<?>,
BroadcastHelper<?>> sideInputs) {
super(fn, runtimeContext, sideInputs);
}
@Override
- public synchronized void output(O o) {
+ public synchronized void output(OutputT o) {
outputs.add(windowedValue != null ? windowedValue.withValue(o) :
WindowedValue.valueInGlobalWindow(o));
}
@Override
- public synchronized void output(WindowedValue<O> o) {
+ public synchronized void output(WindowedValue<OutputT> o) {
outputs.add(o);
}
@@ -89,7 +91,7 @@ public class DoFnFunction<I, O> implements FlatMapFunction<Iterator<WindowedValu
}
@Override
- protected Iterator<WindowedValue<O>> getOutputIterator() {
+ protected Iterator<WindowedValue<OutputT>> getOutputIterator() {
return outputs.iterator();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index 6d49bd3..d737f5e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -20,14 +20,6 @@ package org.apache.beam.runners.spark.translation;
import static com.google.common.base.Preconditions.checkArgument;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
import org.apache.beam.runners.spark.EvaluationResult;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.sdk.Pipeline;
@@ -46,9 +38,19 @@ import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+
import org.apache.spark.api.java.JavaRDDLike;
import org.apache.spark.api.java.JavaSparkContext;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
/**
* Evaluation context allows us to define how pipeline instructions.
@@ -151,19 +153,19 @@ public class EvaluationContext implements EvaluationResult {
return currentTransform;
}
- protected <I extends PInput> I getInput(PTransform<I, ?> transform) {
+ protected <T extends PInput> T getInput(PTransform<T, ?> transform) {
checkArgument(currentTransform != null && currentTransform.getTransform() == transform,
"can only be called with current transform");
@SuppressWarnings("unchecked")
- I input = (I) currentTransform.getInput();
+ T input = (T) currentTransform.getInput();
return input;
}
- protected <O extends POutput> O getOutput(PTransform<?, O> transform) {
+ protected <T extends POutput> T getOutput(PTransform<?, T> transform) {
checkArgument(currentTransform != null && currentTransform.getTransform() == transform,
"can only be called with current transform");
@SuppressWarnings("unchecked")
- O output = (O) currentTransform.getOutput();
+ T output = (T) currentTransform.getOutput();
return output;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
index 2641e31..daa767d 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
@@ -18,39 +18,42 @@
package org.apache.beam.runners.spark.translation;
-import java.util.Iterator;
-import java.util.Map;
+import org.apache.beam.runners.spark.util.BroadcastHelper;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
import com.google.common.base.Function;
import com.google.common.collect.Iterators;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Multimap;
-import org.apache.beam.runners.spark.util.BroadcastHelper;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
+
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.joda.time.Instant;
+
+import java.util.Iterator;
+import java.util.Map;
+
import scala.Tuple2;
/**
* DoFunctions ignore side outputs. MultiDoFunctions deal with side outputs by enriching the
* underlying data with multiple TupleTags.
*
- * @param <I> Input type for DoFunction.
- * @param <O> Output type for DoFunction.
+ * @param <InputT> Input type for DoFunction.
+ * @param <OutputT> Output type for DoFunction.
*/
-class MultiDoFnFunction<I, O>
- implements PairFlatMapFunction<Iterator<WindowedValue<I>>, TupleTag<?>, WindowedValue<?>> {
- private final DoFn<I, O> mFunction;
+class MultiDoFnFunction<InputT, OutputT>
+ implements PairFlatMapFunction<Iterator<WindowedValue<InputT>>, TupleTag<?>, WindowedValue<?>> {
+ private final DoFn<InputT, OutputT> mFunction;
private final SparkRuntimeContext mRuntimeContext;
- private final TupleTag<O> mMainOutputTag;
+ private final TupleTag<OutputT> mMainOutputTag;
private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs;
MultiDoFnFunction(
- DoFn<I, O> fn,
+ DoFn<InputT, OutputT> fn,
SparkRuntimeContext runtimeContext,
- TupleTag<O> mainOutputTag,
+ TupleTag<OutputT> mainOutputTag,
Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) {
this.mFunction = fn;
this.mRuntimeContext = runtimeContext;
@@ -60,29 +63,30 @@ class MultiDoFnFunction<I, O>
@Override
public Iterable<Tuple2<TupleTag<?>, WindowedValue<?>>>
- call(Iterator<WindowedValue<I>> iter) throws Exception {
+ call(Iterator<WindowedValue<InputT>> iter) throws Exception {
ProcCtxt ctxt = new ProcCtxt(mFunction, mRuntimeContext, mSideInputs);
mFunction.startBundle(ctxt);
ctxt.setup();
return ctxt.getOutputIterable(iter, mFunction);
}
- private class ProcCtxt extends SparkProcessContext<I, O, Tuple2<TupleTag<?>, WindowedValue<?>>> {
+ private class ProcCtxt
+ extends SparkProcessContext<InputT, OutputT, Tuple2<TupleTag<?>, WindowedValue<?>>> {
private final Multimap<TupleTag<?>, WindowedValue<?>> outputs = LinkedListMultimap.create();
- ProcCtxt(DoFn<I, O> fn, SparkRuntimeContext runtimeContext, Map<TupleTag<?>,
+ ProcCtxt(DoFn<InputT, OutputT> fn, SparkRuntimeContext runtimeContext, Map<TupleTag<?>,
BroadcastHelper<?>> sideInputs) {
super(fn, runtimeContext, sideInputs);
}
@Override
- public synchronized void output(O o) {
+ public synchronized void output(OutputT o) {
outputs.put(mMainOutputTag, windowedValue.withValue(o));
}
@Override
- public synchronized void output(WindowedValue<O> o) {
+ public synchronized void output(WindowedValue<OutputT> o) {
outputs.put(mMainOutputTag, o);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
index 2bc8a7b..225afb8 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
@@ -22,6 +22,9 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.serializer.KryoSerializer;
+/**
+ * The Spark context factory.
+ */
public final class SparkContextFactory {
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineEvaluator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineEvaluator.java
index 0f47af6..609c413 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineEvaluator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineEvaluator.java
@@ -38,16 +38,17 @@ public final class SparkPipelineEvaluator extends SparkPipelineRunner.Evaluator
}
@Override
- protected <PT extends PTransform<? super PInput, POutput>> void doVisitTransform(TransformTreeNode
+ protected <TransformT extends PTransform<? super PInput, POutput>>
+ void doVisitTransform(TransformTreeNode
node) {
@SuppressWarnings("unchecked")
- PT transform = (PT) node.getTransform();
+ TransformT transform = (TransformT) node.getTransform();
@SuppressWarnings("unchecked")
- Class<PT> transformClass = (Class<PT>) (Class<?>) transform.getClass();
- @SuppressWarnings("unchecked") TransformEvaluator<PT> evaluator =
- (TransformEvaluator<PT>) translator.translate(transformClass);
+ Class<TransformT> transformClass = (Class<TransformT>) (Class<?>) transform.getClass();
+ @SuppressWarnings("unchecked") TransformEvaluator<TransformT> evaluator =
+ (TransformEvaluator<TransformT>) translator.translate(transformClass);
LOG.info("Evaluating {}", transform);
- AppliedPTransform<PInput, POutput, PT> appliedTransform =
+ AppliedPTransform<PInput, POutput, TransformT> appliedTransform =
AppliedPTransform.of(node.getFullName(), node.getInput(), node.getOutput(), transform);
ctxt.setCurrentTransform(appliedTransform);
evaluator.evaluate(transform, ctxt);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineTranslator.java
index 77849a9..997940b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineTranslator.java
@@ -26,5 +26,6 @@ public interface SparkPipelineTranslator {
boolean hasTranslation(Class<? extends PTransform<?, ?>> clazz);
- <PT extends PTransform<?, ?>> TransformEvaluator<PT> translate(Class<PT> clazz);
+ <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT>
+ translate(Class<TransformT> clazz);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
index 5d4ece6..4f90a12 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
@@ -18,13 +18,6 @@
package org.apache.beam.runners.spark.translation;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-
-import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.Iterables;
import org.apache.beam.runners.spark.util.BroadcastHelper;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -40,21 +33,34 @@ import org.apache.beam.sdk.util.state.InMemoryStateInternals;
import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterables;
+
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessContext {
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Spark runner process context.
+ */
+public abstract class SparkProcessContext<InputT, OutputT, ValueT>
+ extends DoFn<InputT, OutputT>.ProcessContext {
private static final Logger LOG = LoggerFactory.getLogger(SparkProcessContext.class);
- private final DoFn<I, O> fn;
+ private final DoFn<InputT, OutputT> fn;
private final SparkRuntimeContext mRuntimeContext;
private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs;
- protected WindowedValue<I> windowedValue;
+ protected WindowedValue<InputT> windowedValue;
- SparkProcessContext(DoFn<I, O> fn,
+ SparkProcessContext(DoFn<InputT, OutputT> fn,
SparkRuntimeContext runtime,
Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) {
fn.super();
@@ -82,9 +88,9 @@ public abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessCon
}
@Override
- public abstract void output(O output);
+ public abstract void output(OutputT output);
- public abstract void output(WindowedValue<O> output);
+ public abstract void output(WindowedValue<OutputT> output);
@Override
public <T> void sideOutput(TupleTag<T> tupleTag, T t) {
@@ -104,19 +110,20 @@ public abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessCon
}
@Override
- public <AI, AO> Aggregator<AI, AO> createAggregatorInternal(
+ public <AggregatprInputT, AggregatorOutputT>
+ Aggregator<AggregatprInputT, AggregatorOutputT> createAggregatorInternal(
String named,
- Combine.CombineFn<AI, ?, AO> combineFn) {
+ Combine.CombineFn<AggregatprInputT, ?, AggregatorOutputT> combineFn) {
return mRuntimeContext.createAggregator(named, combineFn);
}
@Override
- public I element() {
+ public InputT element() {
return windowedValue.getValue();
}
@Override
- public void outputWithTimestamp(O output, Instant timestamp) {
+ public void outputWithTimestamp(OutputT output, Instant timestamp) {
output(WindowedValue.of(output, timestamp,
windowedValue.getWindows(), windowedValue.getPane()));
}
@@ -141,8 +148,8 @@ public abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessCon
}
@Override
- public WindowingInternals<I, O> windowingInternals() {
- return new WindowingInternals<I, O>() {
+ public WindowingInternals<InputT, OutputT> windowingInternals() {
+ return new WindowingInternals<InputT, OutputT>() {
@Override
public Collection<? extends BoundedWindow> windows() {
@@ -150,7 +157,7 @@ public abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessCon
}
@Override
- public void outputWindowedValue(O output, Instant timestamp, Collection<?
+ public void outputWindowedValue(OutputT output, Instant timestamp, Collection<?
extends BoundedWindow> windows, PaneInfo paneInfo) {
output(WindowedValue.of(output, timestamp, windows, paneInfo));
}
@@ -190,33 +197,33 @@ public abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessCon
}
protected abstract void clearOutput();
- protected abstract Iterator<V> getOutputIterator();
+ protected abstract Iterator<ValueT> getOutputIterator();
- protected Iterable<V> getOutputIterable(final Iterator<WindowedValue<I>> iter,
- final DoFn<I, O> doFn) {
- return new Iterable<V>() {
+ protected Iterable<ValueT> getOutputIterable(final Iterator<WindowedValue<InputT>> iter,
+ final DoFn<InputT, OutputT> doFn) {
+ return new Iterable<ValueT>() {
@Override
- public Iterator<V> iterator() {
+ public Iterator<ValueT> iterator() {
return new ProcCtxtIterator(iter, doFn);
}
};
}
- private class ProcCtxtIterator extends AbstractIterator<V> {
+ private class ProcCtxtIterator extends AbstractIterator<ValueT> {
- private final Iterator<WindowedValue<I>> inputIterator;
- private final DoFn<I, O> doFn;
- private Iterator<V> outputIterator;
+ private final Iterator<WindowedValue<InputT>> inputIterator;
+ private final DoFn<InputT, OutputT> doFn;
+ private Iterator<ValueT> outputIterator;
private boolean calledFinish;
- ProcCtxtIterator(Iterator<WindowedValue<I>> iterator, DoFn<I, O> doFn) {
+ ProcCtxtIterator(Iterator<WindowedValue<InputT>> iterator, DoFn<InputT, OutputT> doFn) {
this.inputIterator = iterator;
this.doFn = doFn;
this.outputIterator = getOutputIterator();
}
@Override
- protected V computeNext() {
+ protected ValueT computeNext() {
// Process each element from the (input) iterator, which produces, zero, one or more
// output elements (of type V) in the output iterator. Note that the output
// collection (and iterator) is reset between each call to processElement, so the
@@ -253,6 +260,9 @@ public abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessCon
}
}
+ /**
+ * Spark process runtime exception.
+ */
public static class SparkProcessException extends RuntimeException {
SparkProcessException(Throwable t) {
super(t);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
index ea125de..46f5b33 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
@@ -18,15 +18,6 @@
package org.apache.beam.runners.spark.translation;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.ImmutableList;
import org.apache.beam.runners.spark.aggregators.AggAccumParam;
import org.apache.beam.runners.spark.aggregators.NamedAggregators;
import org.apache.beam.sdk.Pipeline;
@@ -41,9 +32,20 @@ import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.TypeDescriptor;
+
+import com.google.common.collect.ImmutableList;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.spark.Accumulator;
import org.apache.spark.api.java.JavaSparkContext;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* The SparkRuntimeContext allows us to define useful features on the client side before our
@@ -122,22 +124,22 @@ public class SparkRuntimeContext implements Serializable {
*
* @param named Name of aggregator.
* @param combineFn Combine function used in aggregation.
- * @param <IN> Type of inputs to aggregator.
- * @param <INTER> Intermediate data type
- * @param <OUT> Type of aggregator outputs.
+ * @param <InputT> Type of inputs to aggregator.
+ * @param <InterT> Intermediate data type
+ * @param <OutputT> Type of aggregator outputs.
* @return Specified aggregator
*/
- public synchronized <IN, INTER, OUT> Aggregator<IN, OUT> createAggregator(
+ public synchronized <InputT, InterT, OutputT> Aggregator<InputT, OutputT> createAggregator(
String named,
- Combine.CombineFn<? super IN, INTER, OUT> combineFn) {
+ Combine.CombineFn<? super InputT, InterT, OutputT> combineFn) {
@SuppressWarnings("unchecked")
- Aggregator<IN, OUT> aggregator = (Aggregator<IN, OUT>) aggregators.get(named);
+ Aggregator<InputT, OutputT> aggregator = (Aggregator<InputT, OutputT>) aggregators.get(named);
if (aggregator == null) {
@SuppressWarnings("unchecked")
- NamedAggregators.CombineFunctionState<IN, INTER, OUT> state =
+ NamedAggregators.CombineFunctionState<InputT, InterT, OutputT> state =
new NamedAggregators.CombineFunctionState<>(
- (Combine.CombineFn<IN, INTER, OUT>) combineFn,
- (Coder<IN>) getCoder(combineFn),
+ (Combine.CombineFn<InputT, InterT, OutputT>) combineFn,
+ (Coder<InputT>) getCoder(combineFn),
this);
accum.add(new NamedAggregators(named, state));
aggregator = new SparkAggregator<>(named, state);
@@ -186,13 +188,14 @@ public class SparkRuntimeContext implements Serializable {
/**
* Initialize spark aggregators exactly once.
*
- * @param <IN> Type of element fed in to aggregator.
+ * @param <InputT> Type of element fed in to aggregator.
*/
- private static class SparkAggregator<IN, OUT> implements Aggregator<IN, OUT>, Serializable {
+ private static class SparkAggregator<InputT, OutputT>
+ implements Aggregator<InputT, OutputT>, Serializable {
private final String name;
- private final NamedAggregators.State<IN, ?, OUT> state;
+ private final NamedAggregators.State<InputT, ?, OutputT> state;
- SparkAggregator(String name, NamedAggregators.State<IN, ?, OUT> state) {
+ SparkAggregator(String name, NamedAggregators.State<InputT, ?, OutputT> state) {
this.name = name;
this.state = state;
}
@@ -203,12 +206,12 @@ public class SparkRuntimeContext implements Serializable {
}
@Override
- public void addValue(IN elem) {
+ public void addValue(InputT elem) {
state.update(elem);
}
@Override
- public Combine.CombineFn<IN, ?, OUT> getCombineFn() {
+ public Combine.CombineFn<InputT, ?, OutputT> getCombineFn() {
return state.getCombineFn();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformEvaluator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformEvaluator.java
index 30ab076..c5c7128 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformEvaluator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformEvaluator.java
@@ -18,10 +18,13 @@
package org.apache.beam.runners.spark.translation;
-import java.io.Serializable;
-
import org.apache.beam.sdk.transforms.PTransform;
-public interface TransformEvaluator<PT extends PTransform<?, ?>> extends Serializable {
- void evaluate(PT transform, EvaluationContext context);
+import java.io.Serializable;
+
+/**
+ * Describe a {@link PTransform} evaluator.
+ */
+public interface TransformEvaluator<TransformT extends PTransform<?, ?>> extends Serializable {
+ void evaluate(TransformT transform, EvaluationContext context);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 0366856..b462d35 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -23,19 +23,6 @@ import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutput
import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFileTemplate;
import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardCount;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapreduce.AvroJob;
-import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.io.hadoop.HadoopIO;
import org.apache.beam.runners.spark.io.hadoop.ShardNameTemplateHelper;
@@ -68,6 +55,14 @@ import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapreduce.AvroJob;
+import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
@@ -81,6 +76,14 @@ import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
import scala.Tuple2;
/**
@@ -91,6 +94,9 @@ public final class TransformTranslator {
private TransformTranslator() {
}
+ /**
+ * Getter of the field.
+ */
public static class FieldGetter {
private final Map<String, Field> fields;
@@ -157,14 +163,16 @@ public final class TransformTranslator {
private static final FieldGetter GROUPED_FG = new FieldGetter(Combine.GroupedValues.class);
- private static <K, VI, VO> TransformEvaluator<Combine.GroupedValues<K, VI, VO>> grouped() {
- return new TransformEvaluator<Combine.GroupedValues<K, VI, VO>>() {
+ private static <K, InputT, OutputT> TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>>
+ grouped() {
+ return new TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>>() {
@Override
- public void evaluate(Combine.GroupedValues<K, VI, VO> transform, EvaluationContext context) {
- Combine.KeyedCombineFn<K, VI, ?, VO> keyed = GROUPED_FG.get("fn", transform);
+ public void evaluate(Combine.GroupedValues<K, InputT, OutputT> transform,
+ EvaluationContext context) {
+ Combine.KeyedCombineFn<K, InputT, ?, OutputT> keyed = GROUPED_FG.get("fn", transform);
@SuppressWarnings("unchecked")
- JavaRDDLike<WindowedValue<KV<K, Iterable<VI>>>, ?> inRDD =
- (JavaRDDLike<WindowedValue<KV<K, Iterable<VI>>>, ?>) context.getInputRDD(transform);
+ JavaRDDLike<WindowedValue<KV<K, Iterable<InputT>>>, ?> inRDD =
+ (JavaRDDLike<WindowedValue<KV<K, Iterable<InputT>>>, ?>) context.getInputRDD(transform);
context.setOutputRDD(transform,
inRDD.map(new KVFunction<>(keyed)));
}
@@ -173,19 +181,21 @@ public final class TransformTranslator {
private static final FieldGetter COMBINE_GLOBALLY_FG = new FieldGetter(Combine.Globally.class);
- private static <I, A, O> TransformEvaluator<Combine.Globally<I, O>> combineGlobally() {
- return new TransformEvaluator<Combine.Globally<I, O>>() {
+ private static <InputT, AccumT, OutputT> TransformEvaluator<Combine.Globally<InputT, OutputT>>
+ combineGlobally() {
+ return new TransformEvaluator<Combine.Globally<InputT, OutputT>>() {
@Override
- public void evaluate(Combine.Globally<I, O> transform, EvaluationContext context) {
- final Combine.CombineFn<I, A, O> globally = COMBINE_GLOBALLY_FG.get("fn", transform);
+ public void evaluate(Combine.Globally<InputT, OutputT> transform, EvaluationContext context) {
+ final Combine.CombineFn<InputT, AccumT, OutputT> globally =
+ COMBINE_GLOBALLY_FG.get("fn", transform);
@SuppressWarnings("unchecked")
- JavaRDDLike<WindowedValue<I>, ?> inRdd =
- (JavaRDDLike<WindowedValue<I>, ?>) context.getInputRDD(transform);
+ JavaRDDLike<WindowedValue<InputT>, ?> inRdd =
+ (JavaRDDLike<WindowedValue<InputT>, ?>) context.getInputRDD(transform);
- final Coder<I> iCoder = context.getInput(transform).getCoder();
- final Coder<A> aCoder;
+ final Coder<InputT> iCoder = context.getInput(transform).getCoder();
+ final Coder<AccumT> aCoder;
try {
aCoder = globally.getAccumulatorCoder(
context.getPipeline().getCoderRegistry(), iCoder);
@@ -196,86 +206,92 @@ public final class TransformTranslator {
// Use coders to convert objects in the PCollection to byte arrays, so they
// can be transferred over the network for the shuffle.
JavaRDD<byte[]> inRddBytes = inRdd
- .map(WindowingHelpers.<I>unwindowFunction())
+ .map(WindowingHelpers.<InputT>unwindowFunction())
.map(CoderHelpers.toByteFunction(iCoder));
- /*A*/ byte[] acc = inRddBytes.aggregate(
+ /*AccumT*/ byte[] acc = inRddBytes.aggregate(
CoderHelpers.toByteArray(globally.createAccumulator(), aCoder),
- new Function2</*A*/ byte[], /*I*/ byte[], /*A*/ byte[]>() {
+ new Function2</*AccumT*/ byte[], /*InputT*/ byte[], /*AccumT*/ byte[]>() {
@Override
- public /*A*/ byte[] call(/*A*/ byte[] ab, /*I*/ byte[] ib) throws Exception {
- A a = CoderHelpers.fromByteArray(ab, aCoder);
- I i = CoderHelpers.fromByteArray(ib, iCoder);
+ public /*AccumT*/ byte[] call(/*AccumT*/ byte[] ab, /*InputT*/ byte[] ib)
+ throws Exception {
+ AccumT a = CoderHelpers.fromByteArray(ab, aCoder);
+ InputT i = CoderHelpers.fromByteArray(ib, iCoder);
return CoderHelpers.toByteArray(globally.addInput(a, i), aCoder);
}
},
- new Function2</*A*/ byte[], /*A*/ byte[], /*A*/ byte[]>() {
+ new Function2</*AccumT*/ byte[], /*AccumT*/ byte[], /*AccumT*/ byte[]>() {
@Override
- public /*A*/ byte[] call(/*A*/ byte[] a1b, /*A*/ byte[] a2b) throws Exception {
- A a1 = CoderHelpers.fromByteArray(a1b, aCoder);
- A a2 = CoderHelpers.fromByteArray(a2b, aCoder);
+ public /*AccumT*/ byte[] call(/*AccumT*/ byte[] a1b, /*AccumT*/ byte[] a2b)
+ throws Exception {
+ AccumT a1 = CoderHelpers.fromByteArray(a1b, aCoder);
+ AccumT a2 = CoderHelpers.fromByteArray(a2b, aCoder);
// don't use Guava's ImmutableList.of as values may be null
- List<A> accumulators = Collections.unmodifiableList(Arrays.asList(a1, a2));
- A merged = globally.mergeAccumulators(accumulators);
+ List<AccumT> accumulators = Collections.unmodifiableList(Arrays.asList(a1, a2));
+ AccumT merged = globally.mergeAccumulators(accumulators);
return CoderHelpers.toByteArray(merged, aCoder);
}
}
);
- O output = globally.extractOutput(CoderHelpers.fromByteArray(acc, aCoder));
+ OutputT output = globally.extractOutput(CoderHelpers.fromByteArray(acc, aCoder));
- Coder<O> coder = context.getOutput(transform).getCoder();
+ Coder<OutputT> coder = context.getOutput(transform).getCoder();
JavaRDD<byte[]> outRdd = context.getSparkContext().parallelize(
// don't use Guava's ImmutableList.of as output may be null
CoderHelpers.toByteArrays(Collections.singleton(output), coder));
context.setOutputRDD(transform, outRdd.map(CoderHelpers.fromByteFunction(coder))
- .map(WindowingHelpers.<O>windowFunction()));
+ .map(WindowingHelpers.<OutputT>windowFunction()));
}
};
}
private static final FieldGetter COMBINE_PERKEY_FG = new FieldGetter(Combine.PerKey.class);
- private static <K, VI, VA, VO> TransformEvaluator<Combine.PerKey<K, VI, VO>> combinePerKey() {
- return new TransformEvaluator<Combine.PerKey<K, VI, VO>>() {
+ private static <K, InputT, AccumT, OutputT>
+ TransformEvaluator<Combine.PerKey<K, InputT, OutputT>> combinePerKey() {
+ return new TransformEvaluator<Combine.PerKey<K, InputT, OutputT>>() {
@Override
- public void evaluate(Combine.PerKey<K, VI, VO> transform, EvaluationContext context) {
- final Combine.KeyedCombineFn<K, VI, VA, VO> keyed =
+ public void evaluate(Combine.PerKey<K, InputT, OutputT>
+ transform, EvaluationContext context) {
+ final Combine.KeyedCombineFn<K, InputT, AccumT, OutputT> keyed =
COMBINE_PERKEY_FG.get("fn", transform);
@SuppressWarnings("unchecked")
- JavaRDDLike<WindowedValue<KV<K, VI>>, ?> inRdd =
- (JavaRDDLike<WindowedValue<KV<K, VI>>, ?>) context.getInputRDD(transform);
+ JavaRDDLike<WindowedValue<KV<K, InputT>>, ?> inRdd =
+ (JavaRDDLike<WindowedValue<KV<K, InputT>>, ?>) context.getInputRDD(transform);
@SuppressWarnings("unchecked")
- KvCoder<K, VI> inputCoder = (KvCoder<K, VI>) context.getInput(transform).getCoder();
+ KvCoder<K, InputT> inputCoder = (KvCoder<K, InputT>)
+ context.getInput(transform).getCoder();
Coder<K> keyCoder = inputCoder.getKeyCoder();
- Coder<VI> viCoder = inputCoder.getValueCoder();
- Coder<VA> vaCoder;
+ Coder<InputT> viCoder = inputCoder.getValueCoder();
+ Coder<AccumT> vaCoder;
try {
vaCoder = keyed.getAccumulatorCoder(
context.getPipeline().getCoderRegistry(), keyCoder, viCoder);
} catch (CannotProvideCoderException e) {
throw new IllegalStateException("Could not determine coder for accumulator", e);
}
- Coder<KV<K, VI>> kviCoder = KvCoder.of(keyCoder, viCoder);
- Coder<KV<K, VA>> kvaCoder = KvCoder.of(keyCoder, vaCoder);
+ Coder<KV<K, InputT>> kviCoder = KvCoder.of(keyCoder, viCoder);
+ Coder<KV<K, AccumT>> kvaCoder = KvCoder.of(keyCoder, vaCoder);
// We need to duplicate K as both the key of the JavaPairRDD as well as inside the value,
// since the functions passed to combineByKey don't receive the associated key of each
// value, and we need to map back into methods in Combine.KeyedCombineFn, which each
- // require the key in addition to the VI's and VA's being merged/accumulated. Once Spark
- // provides a way to include keys in the arguments of combine/merge functions, we won't
- // need to duplicate the keys anymore.
+ // require the key in addition to the InputT's and AccumT's being merged/accumulated.
+ // Once Spark provides a way to include keys in the arguments of combine/merge functions,
+ // we won't need to duplicate the keys anymore.
// Key has to bw windowed in order to group by window as well
- JavaPairRDD<WindowedValue<K>, WindowedValue<KV<K, VI>>> inRddDuplicatedKeyPair =
+ JavaPairRDD<WindowedValue<K>, WindowedValue<KV<K, InputT>>> inRddDuplicatedKeyPair =
inRdd.flatMapToPair(
- new PairFlatMapFunction<WindowedValue<KV<K, VI>>, WindowedValue<K>,
- WindowedValue<KV<K, VI>>>() {
+ new PairFlatMapFunction<WindowedValue<KV<K, InputT>>, WindowedValue<K>,
+ WindowedValue<KV<K, InputT>>>() {
@Override
public Iterable<Tuple2<WindowedValue<K>,
- WindowedValue<KV<K, VI>>>> call(WindowedValue<KV<K, VI>> kv) {
+ WindowedValue<KV<K, InputT>>>>
+ call(WindowedValue<KV<K, InputT>> kv) {
List<Tuple2<WindowedValue<K>,
- WindowedValue<KV<K, VI>>>> tuple2s =
+ WindowedValue<KV<K, InputT>>>> tuple2s =
Lists.newArrayListWithCapacity(kv.getWindows().size());
for (BoundedWindow boundedWindow: kv.getWindows()) {
WindowedValue<K> wk = WindowedValue.of(kv.getValue().getKey(),
@@ -289,10 +305,10 @@ public final class TransformTranslator {
final WindowedValue.FullWindowedValueCoder<K> wkCoder =
WindowedValue.FullWindowedValueCoder.of(keyCoder,
context.getInput(transform).getWindowingStrategy().getWindowFn().windowCoder());
- final WindowedValue.FullWindowedValueCoder<KV<K, VI>> wkviCoder =
+ final WindowedValue.FullWindowedValueCoder<KV<K, InputT>> wkviCoder =
WindowedValue.FullWindowedValueCoder.of(kviCoder,
context.getInput(transform).getWindowingStrategy().getWindowFn().windowCoder());
- final WindowedValue.FullWindowedValueCoder<KV<K, VA>> wkvaCoder =
+ final WindowedValue.FullWindowedValueCoder<KV<K, AccumT>> wkvaCoder =
WindowedValue.FullWindowedValueCoder.of(kvaCoder,
context.getInput(transform).getWindowingStrategy().getWindowFn().windowCoder());
@@ -301,58 +317,69 @@ public final class TransformTranslator {
JavaPairRDD<ByteArray, byte[]> inRddDuplicatedKeyPairBytes = inRddDuplicatedKeyPair
.mapToPair(CoderHelpers.toByteFunction(wkCoder, wkviCoder));
- // The output of combineByKey will be "VA" (accumulator) types rather than "VO" (final
- // output types) since Combine.CombineFn only provides ways to merge VAs, and no way
- // to merge VOs.
- JavaPairRDD</*K*/ ByteArray, /*KV<K, VA>*/ byte[]> accumulatedBytes =
+ // The output of combineByKey will be "AccumT" (accumulator)
+ // types rather than "OutputT" (final output types) since Combine.CombineFn
+ // only provides ways to merge VAs, and no way to merge VOs.
+ JavaPairRDD</*K*/ ByteArray, /*KV<K, AccumT>*/ byte[]> accumulatedBytes =
inRddDuplicatedKeyPairBytes.combineByKey(
- new Function</*KV<K, VI>*/ byte[], /*KV<K, VA>*/ byte[]>() {
+ new Function</*KV<K, InputT>*/ byte[], /*KV<K, AccumT>*/ byte[]>() {
@Override
- public /*KV<K, VA>*/ byte[] call(/*KV<K, VI>*/ byte[] input) {
- WindowedValue<KV<K, VI>> wkvi = CoderHelpers.fromByteArray(input, wkviCoder);
- VA va = keyed.createAccumulator(wkvi.getValue().getKey());
+ public /*KV<K, AccumT>*/ byte[] call(/*KV<K, InputT>*/ byte[] input) {
+ WindowedValue<KV<K, InputT>> wkvi =
+ CoderHelpers.fromByteArray(input, wkviCoder);
+ AccumT va = keyed.createAccumulator(wkvi.getValue().getKey());
va = keyed.addInput(wkvi.getValue().getKey(), va, wkvi.getValue().getValue());
- WindowedValue<KV<K, VA>> wkva =
+ WindowedValue<KV<K, AccumT>> wkva =
WindowedValue.of(KV.of(wkvi.getValue().getKey(), va), wkvi.getTimestamp(),
wkvi.getWindows(), wkvi.getPane());
return CoderHelpers.toByteArray(wkva, wkvaCoder);
}
},
- new Function2</*KV<K, VA>*/ byte[], /*KV<K, VI>*/ byte[], /*KV<K, VA>*/ byte[]>() {
+ new Function2</*KV<K, AccumT>*/ byte[],
+ /*KV<K, InputT>*/ byte[],
+ /*KV<K, AccumT>*/ byte[]>() {
@Override
- public /*KV<K, VA>*/ byte[] call(/*KV<K, VA>*/ byte[] acc,
- /*KV<K, VI>*/ byte[] input) {
- WindowedValue<KV<K, VA>> wkva = CoderHelpers.fromByteArray(acc, wkvaCoder);
- WindowedValue<KV<K, VI>> wkvi = CoderHelpers.fromByteArray(input, wkviCoder);
- VA va = keyed.addInput(wkva.getValue().getKey(), wkva.getValue().getValue(),
+ public /*KV<K, AccumT>*/ byte[] call(/*KV<K, AccumT>*/ byte[] acc,
+ /*KV<K, InputT>*/ byte[] input) {
+ WindowedValue<KV<K, AccumT>> wkva =
+ CoderHelpers.fromByteArray(acc, wkvaCoder);
+ WindowedValue<KV<K, InputT>> wkvi =
+ CoderHelpers.fromByteArray(input, wkviCoder);
+ AccumT va =
+ keyed.addInput(wkva.getValue().getKey(), wkva.getValue().getValue(),
wkvi.getValue().getValue());
wkva = WindowedValue.of(KV.of(wkva.getValue().getKey(), va), wkva.getTimestamp(),
wkva.getWindows(), wkva.getPane());
return CoderHelpers.toByteArray(wkva, wkvaCoder);
}
},
- new Function2</*KV<K, VA>*/ byte[], /*KV<K, VA>*/ byte[], /*KV<K, VA>*/ byte[]>() {
+ new Function2</*KV<K, AccumT>*/ byte[],
+ /*KV<K, AccumT>*/ byte[],
+ /*KV<K, AccumT>*/ byte[]>() {
@Override
- public /*KV<K, VA>*/ byte[] call(/*KV<K, VA>*/ byte[] acc1,
- /*KV<K, VA>*/ byte[] acc2) {
- WindowedValue<KV<K, VA>> wkva1 = CoderHelpers.fromByteArray(acc1, wkvaCoder);
- WindowedValue<KV<K, VA>> wkva2 = CoderHelpers.fromByteArray(acc2, wkvaCoder);
- VA va = keyed.mergeAccumulators(wkva1.getValue().getKey(),
+ public /*KV<K, AccumT>*/ byte[] call(/*KV<K, AccumT>*/ byte[] acc1,
+ /*KV<K, AccumT>*/ byte[] acc2) {
+ WindowedValue<KV<K, AccumT>> wkva1 =
+ CoderHelpers.fromByteArray(acc1, wkvaCoder);
+ WindowedValue<KV<K, AccumT>> wkva2 =
+ CoderHelpers.fromByteArray(acc2, wkvaCoder);
+ AccumT va = keyed.mergeAccumulators(wkva1.getValue().getKey(),
// don't use Guava's ImmutableList.of as values may be null
Collections.unmodifiableList(Arrays.asList(wkva1.getValue().getValue(),
wkva2.getValue().getValue())));
- WindowedValue<KV<K, VA>> wkva = WindowedValue.of(KV.of(wkva1.getValue().getKey(),
+ WindowedValue<KV<K, AccumT>> wkva =
+ WindowedValue.of(KV.of(wkva1.getValue().getKey(),
va), wkva1.getTimestamp(), wkva1.getWindows(), wkva1.getPane());
return CoderHelpers.toByteArray(wkva, wkvaCoder);
}
});
- JavaPairRDD<WindowedValue<K>, WindowedValue<VO>> extracted = accumulatedBytes
+ JavaPairRDD<WindowedValue<K>, WindowedValue<OutputT>> extracted = accumulatedBytes
.mapToPair(CoderHelpers.fromByteFunction(wkCoder, wkvaCoder))
.mapValues(
- new Function<WindowedValue<KV<K, VA>>, WindowedValue<VO>>() {
+ new Function<WindowedValue<KV<K, AccumT>>, WindowedValue<OutputT>>() {
@Override
- public WindowedValue<VO> call(WindowedValue<KV<K, VA>> acc) {
+ public WindowedValue<OutputT> call(WindowedValue<KV<K, AccumT>> acc) {
return WindowedValue.of(keyed.extractOutput(acc.getValue().getKey(),
acc.getValue().getValue()), acc.getTimestamp(),
acc.getWindows(), acc.getPane());
@@ -361,12 +388,14 @@ public final class TransformTranslator {
context.setOutputRDD(transform,
fromPair(extracted)
- .map(new Function<KV<WindowedValue<K>, WindowedValue<VO>>, WindowedValue<KV<K, VO>>>() {
+ .map(new Function<KV<WindowedValue<K>, WindowedValue<OutputT>>,
+ WindowedValue<KV<K, OutputT>>>() {
@Override
- public WindowedValue<KV<K, VO>> call(KV<WindowedValue<K>, WindowedValue<VO>> kwvo)
+ public WindowedValue<KV<K, OutputT>> call(KV<WindowedValue<K>,
+ WindowedValue<OutputT>> kwvo)
throws Exception {
- WindowedValue<VO> wvo = kwvo.getValue();
- KV<K, VO> kvo = KV.of(kwvo.getKey().getValue(), wvo.getValue());
+ WindowedValue<OutputT> wvo = kwvo.getValue();
+ KV<K, OutputT> kvo = KV.of(kwvo.getKey().getValue(), wvo.getValue());
return WindowedValue.of(kvo, wvo.getTimestamp(), wvo.getWindows(), wvo.getPane());
}
}));
@@ -374,18 +403,20 @@ public final class TransformTranslator {
};
}
- private static final class KVFunction<K, VI, VO>
- implements Function<WindowedValue<KV<K, Iterable<VI>>>, WindowedValue<KV<K, VO>>> {
- private final Combine.KeyedCombineFn<K, VI, ?, VO> keyed;
+ private static final class KVFunction<K, InputT, OutputT>
+ implements Function<WindowedValue<KV<K, Iterable<InputT>>>,
+ WindowedValue<KV<K, OutputT>>> {
+ private final Combine.KeyedCombineFn<K, InputT, ?, OutputT> keyed;
- KVFunction(Combine.KeyedCombineFn<K, VI, ?, VO> keyed) {
+ KVFunction(Combine.KeyedCombineFn<K, InputT, ?, OutputT> keyed) {
this.keyed = keyed;
}
@Override
- public WindowedValue<KV<K, VO>> call(WindowedValue<KV<K, Iterable<VI>>> windowedKv)
+ public WindowedValue<KV<K, OutputT>> call(WindowedValue<KV<K,
+ Iterable<InputT>>> windowedKv)
throws Exception {
- KV<K, Iterable<VI>> kv = windowedKv.getValue();
+ KV<K, Iterable<InputT>> kv = windowedKv.getValue();
return WindowedValue.of(KV.of(kv.getKey(), keyed.apply(kv.getKey(), kv.getValue())),
windowedKv.getTimestamp(), windowedKv.getWindows(), windowedKv.getPane());
}
@@ -409,17 +440,17 @@ public final class TransformTranslator {
});
}
- private static <I, O> TransformEvaluator<ParDo.Bound<I, O>> parDo() {
- return new TransformEvaluator<ParDo.Bound<I, O>>() {
+ private static <InputT, OutputT> TransformEvaluator<ParDo.Bound<InputT, OutputT>> parDo() {
+ return new TransformEvaluator<ParDo.Bound<InputT, OutputT>>() {
@Override
- public void evaluate(ParDo.Bound<I, O> transform, EvaluationContext context) {
- DoFnFunction<I, O> dofn =
+ public void evaluate(ParDo.Bound<InputT, OutputT> transform, EvaluationContext context) {
+ DoFnFunction<InputT, OutputT> dofn =
new DoFnFunction<>(transform.getFn(),
context.getRuntimeContext(),
getSideInputs(transform.getSideInputs(), context));
@SuppressWarnings("unchecked")
- JavaRDDLike<WindowedValue<I>, ?> inRDD =
- (JavaRDDLike<WindowedValue<I>, ?>) context.getInputRDD(transform);
+ JavaRDDLike<WindowedValue<InputT>, ?> inRDD =
+ (JavaRDDLike<WindowedValue<InputT>, ?>) context.getInputRDD(transform);
context.setOutputRDD(transform, inRDD.mapPartitions(dofn));
}
};
@@ -427,20 +458,20 @@ public final class TransformTranslator {
private static final FieldGetter MULTIDO_FG = new FieldGetter(ParDo.BoundMulti.class);
- private static <I, O> TransformEvaluator<ParDo.BoundMulti<I, O>> multiDo() {
- return new TransformEvaluator<ParDo.BoundMulti<I, O>>() {
+ private static <InputT, OutputT> TransformEvaluator<ParDo.BoundMulti<InputT, OutputT>> multiDo() {
+ return new TransformEvaluator<ParDo.BoundMulti<InputT, OutputT>>() {
@Override
- public void evaluate(ParDo.BoundMulti<I, O> transform, EvaluationContext context) {
- TupleTag<O> mainOutputTag = MULTIDO_FG.get("mainOutputTag", transform);
- MultiDoFnFunction<I, O> multifn = new MultiDoFnFunction<>(
+ public void evaluate(ParDo.BoundMulti<InputT, OutputT> transform, EvaluationContext context) {
+ TupleTag<OutputT> mainOutputTag = MULTIDO_FG.get("mainOutputTag", transform);
+ MultiDoFnFunction<InputT, OutputT> multifn = new MultiDoFnFunction<>(
transform.getFn(),
context.getRuntimeContext(),
mainOutputTag,
getSideInputs(transform.getSideInputs(), context));
@SuppressWarnings("unchecked")
- JavaRDDLike<WindowedValue<I>, ?> inRDD =
- (JavaRDDLike<WindowedValue<I>, ?>) context.getInputRDD(transform);
+ JavaRDDLike<WindowedValue<InputT>, ?> inRDD =
+ (JavaRDDLike<WindowedValue<InputT>, ?>) context.getInputRDD(transform);
JavaPairRDD<TupleTag<?>, WindowedValue<?>> all = inRDD
.mapPartitionsToPair(multifn)
.cache();
@@ -716,10 +747,12 @@ public final class TransformTranslator {
};
}
- private static <R, W> TransformEvaluator<View.CreatePCollectionView<R, W>> createPCollView() {
- return new TransformEvaluator<View.CreatePCollectionView<R, W>>() {
+ private static <ReadT, WriteT> TransformEvaluator<View.CreatePCollectionView<ReadT, WriteT>>
+ createPCollView() {
+ return new TransformEvaluator<View.CreatePCollectionView<ReadT, WriteT>>() {
@Override
- public void evaluate(View.CreatePCollectionView<R, W> transform, EvaluationContext context) {
+ public void evaluate(View.CreatePCollectionView<ReadT, WriteT> transform,
+ EvaluationContext context) {
Iterable<? extends WindowedValue<?>> iter =
context.getWindowedValues(context.getInput(transform));
context.setPView(context.getOutput(transform), iter);
@@ -787,10 +820,11 @@ public final class TransformTranslator {
EVALUATORS.put(Window.Bound.class, window());
}
- public static <PT extends PTransform<?, ?>> TransformEvaluator<PT>
- getTransformEvaluator(Class<PT> clazz) {
+ public static <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT>
+ getTransformEvaluator(Class<TransformT> clazz) {
@SuppressWarnings("unchecked")
- TransformEvaluator<PT> transform = (TransformEvaluator<PT>) EVALUATORS.get(clazz);
+ TransformEvaluator<TransformT> transform =
+ (TransformEvaluator<TransformT>) EVALUATORS.get(clazz);
if (transform == null) {
throw new IllegalStateException("No TransformEvaluator registered for " + clazz);
}
@@ -808,7 +842,8 @@ public final class TransformTranslator {
}
@Override
- public <PT extends PTransform<?, ?>> TransformEvaluator<PT> translate(Class<PT> clazz) {
+ public <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> translate(
+ Class<TransformT> clazz) {
return getTransformEvaluator(clazz);
}
}