You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2016/05/08 06:06:14 UTC

[1/2] incubator-beam git commit: [BEAM-267] Enable checkstyle in Spark runner

Repository: incubator-beam
Updated Branches:
  refs/heads/master ff825b077 -> 0f3b05335


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
index 1c83700..afcca93 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
@@ -18,13 +18,6 @@
 package org.apache.beam.runners.spark.translation.streaming;
 
 
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-
 import org.apache.beam.runners.spark.translation.EvaluationContext;
 import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
 import org.apache.beam.sdk.Pipeline;
@@ -35,6 +28,7 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
+
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaRDDLike;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -43,6 +37,13 @@ import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaDStreamLike;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+
 
 /**
  * Streaming evaluation context helps to handle streaming.
@@ -179,11 +180,11 @@ public class StreamingEvaluationContext extends EvaluationContext {
 
   //---------------- override in order to expose in package
   @Override
-  protected <I extends PInput> I getInput(PTransform<I, ?> transform) {
+  protected <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform) {
     return super.getInput(transform);
   }
   @Override
-  protected <O extends POutput> O getOutput(PTransform<?, O> transform) {
+  protected <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform) {
     return super.getOutput(transform);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index d9daeb0..c1ecc43 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -17,18 +17,6 @@
  */
 package org.apache.beam.runners.spark.translation.streaming;
 
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import com.google.api.client.util.Lists;
-import com.google.api.client.util.Maps;
-import com.google.api.client.util.Sets;
-import com.google.common.reflect.TypeToken;
-import kafka.serializer.Decoder;
 import org.apache.beam.runners.spark.io.ConsoleIO;
 import org.apache.beam.runners.spark.io.CreateStream;
 import org.apache.beam.runners.spark.io.KafkaIO;
@@ -58,6 +46,12 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PDone;
+
+import com.google.api.client.util.Lists;
+import com.google.api.client.util.Maps;
+import com.google.api.client.util.Sets;
+import com.google.common.reflect.TypeToken;
+
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.VoidFunction;
@@ -68,6 +62,15 @@ import org.apache.spark.streaming.api.java.JavaDStreamLike;
 import org.apache.spark.streaming.api.java.JavaPairInputDStream;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.apache.spark.streaming.kafka.KafkaUtils;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import kafka.serializer.Decoder;
 import scala.Tuple2;
 
 
@@ -173,14 +176,14 @@ public final class StreamingTransformTranslator {
     };
   }
 
-  private static <PT extends PTransform<?, ?>> TransformEvaluator<PT> rddTransform(
+  private static <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> rddTransform(
       final SparkPipelineTranslator rddTranslator) {
-    return new TransformEvaluator<PT>() {
+    return new TransformEvaluator<TransformT>() {
       @SuppressWarnings("unchecked")
       @Override
-      public void evaluate(PT transform, EvaluationContext context) {
-        TransformEvaluator<PT> rddEvaluator =
-            rddTranslator.translate((Class<PT>) transform.getClass());
+      public void evaluate(TransformT transform, EvaluationContext context) {
+        TransformEvaluator<TransformT> rddEvaluator =
+            rddTranslator.translate((Class<TransformT>) transform.getClass());
 
         StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
         if (sec.hasStream(transform)) {
@@ -203,19 +206,20 @@ public final class StreamingTransformTranslator {
    * RDD transform function If the transformation function doesn't have an input, create a fake one
    * as an empty RDD.
    *
-   * @param <PT> PTransform type
+   * @param <TransformT> PTransform type
    */
-  private static final class RDDTransform<PT extends PTransform<?, ?>>
+  private static final class RDDTransform<TransformT extends PTransform<?, ?>>
       implements Function<JavaRDD<WindowedValue<Object>>, JavaRDD<WindowedValue<Object>>> {
 
     private final StreamingEvaluationContext context;
     private final AppliedPTransform<?, ?, ?> appliedPTransform;
-    private final TransformEvaluator<PT> rddEvaluator;
-    private final PT transform;
+    private final TransformEvaluator<TransformT> rddEvaluator;
+    private final TransformT transform;
 
 
-    private RDDTransform(StreamingEvaluationContext context, TransformEvaluator<PT> rddEvaluator,
-        PT transform) {
+    private RDDTransform(StreamingEvaluationContext context,
+                         TransformEvaluator<TransformT> rddEvaluator,
+        TransformT transform) {
       this.context = context;
       this.appliedPTransform = context.getCurrentTransform();
       this.rddEvaluator = rddEvaluator;
@@ -243,13 +247,13 @@ public final class StreamingTransformTranslator {
   }
 
   @SuppressWarnings("unchecked")
-  private static <PT extends PTransform<?, ?>> TransformEvaluator<PT> foreachRDD(
+  private static <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> foreachRDD(
       final SparkPipelineTranslator rddTranslator) {
-    return new TransformEvaluator<PT>() {
+    return new TransformEvaluator<TransformT>() {
       @Override
-      public void evaluate(PT transform, EvaluationContext context) {
-        TransformEvaluator<PT> rddEvaluator =
-            rddTranslator.translate((Class<PT>) transform.getClass());
+      public void evaluate(TransformT transform, EvaluationContext context) {
+        TransformEvaluator<TransformT> rddEvaluator =
+            rddTranslator.translate((Class<TransformT>) transform.getClass());
 
         StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
         if (sec.hasStream(transform)) {
@@ -268,19 +272,19 @@ public final class StreamingTransformTranslator {
   /**
    * RDD output function.
    *
-   * @param <PT> PTransform type
+   * @param <TransformT> PTransform type
    */
-  private static final class RDDOutputOperator<PT extends PTransform<?, ?>>
+  private static final class RDDOutputOperator<TransformT extends PTransform<?, ?>>
       implements VoidFunction<JavaRDD<WindowedValue<Object>>> {
 
     private final StreamingEvaluationContext context;
     private final AppliedPTransform<?, ?, ?> appliedPTransform;
-    private final TransformEvaluator<PT> rddEvaluator;
-    private final PT transform;
+    private final TransformEvaluator<TransformT> rddEvaluator;
+    private final TransformT transform;
 
 
     private RDDOutputOperator(StreamingEvaluationContext context,
-        TransformEvaluator<PT> rddEvaluator, PT transform) {
+                              TransformEvaluator<TransformT> rddEvaluator, TransformT transform) {
       this.context = context;
       this.appliedPTransform = context.getCurrentTransform();
       this.rddEvaluator = rddEvaluator;
@@ -325,7 +329,7 @@ public final class StreamingTransformTranslator {
         //--- then we apply windowing to the elements
         DoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn);
         DoFnFunction<T, T> dofn = new DoFnFunction<>(addWindowsDoFn,
-            ((StreamingEvaluationContext)context).getRuntimeContext(), null);
+            ((StreamingEvaluationContext) context).getRuntimeContext(), null);
         @SuppressWarnings("unchecked")
         JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>> dstream =
             (JavaDStreamLike<WindowedValue<T>, ?, JavaRDD<WindowedValue<T>>>)
@@ -361,9 +365,10 @@ public final class StreamingTransformTranslator {
   }
 
   @SuppressWarnings("unchecked")
-  private static <PT extends PTransform<?, ?>> TransformEvaluator<PT>
-      getTransformEvaluator(Class<PT> clazz, SparkPipelineTranslator rddTranslator) {
-    TransformEvaluator<PT> transform = (TransformEvaluator<PT>) EVALUATORS.get(clazz);
+  private static <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT>
+      getTransformEvaluator(Class<TransformT> clazz, SparkPipelineTranslator rddTranslator) {
+    TransformEvaluator<TransformT> transform =
+        (TransformEvaluator<TransformT>) EVALUATORS.get(clazz);
     if (transform == null) {
       if (UNSUPPORTED_EVALUATORS.contains(clazz)) {
         throw new UnsupportedOperationException("Dataflow transformation " + clazz
@@ -383,7 +388,8 @@ public final class StreamingTransformTranslator {
     return transform;
   }
 
-  private static <PT extends PTransform<?, ?>> Class<?> getPTransformOutputClazz(Class<PT> clazz) {
+  private static <TransformT extends PTransform<?, ?>> Class<?>
+  getPTransformOutputClazz(Class<TransformT> clazz) {
     Type[] types = ((ParameterizedType) clazz.getGenericSuperclass()).getActualTypeArguments();
     return TypeToken.of(clazz).resolveType(types[1]).getRawType();
   }
@@ -407,7 +413,8 @@ public final class StreamingTransformTranslator {
     }
 
     @Override
-    public <PT extends PTransform<?, ?>> TransformEvaluator<PT> translate(Class<PT> clazz) {
+    public <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT>
+    translate(Class<TransformT> clazz) {
       return getTransformEvaluator(clazz, rddTranslator);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java
index 8c018d3..6e36102 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java
@@ -54,12 +54,12 @@ public final class StreamingWindowPipelineDetector extends SparkPipelineRunner.E
 
   // Use the smallest window (fixed or sliding) as Spark streaming's batch duration
   @Override
-  protected <PT extends PTransform<? super PInput, POutput>> void
+  protected <TransformT extends PTransform<? super PInput, POutput>> void
       doVisitTransform(TransformTreeNode node) {
     @SuppressWarnings("unchecked")
-    PT transform = (PT) node.getTransform();
+    TransformT transform = (TransformT) node.getTransform();
     @SuppressWarnings("unchecked")
-    Class<PT> transformClass = (Class<PT>) (Class<?>) transform.getClass();
+    Class<TransformT> transformClass = (Class<TransformT>) (Class<?>) transform.getClass();
     if (transformClass.isAssignableFrom(Window.Bound.class)) {
       WindowFn<?, ?> windowFn = WINDOW_FG.get("windowFn", transform);
       if (windowFn instanceof FixedWindows) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
index 1824a9d..d3fa05a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
@@ -18,17 +18,21 @@
 
 package org.apache.beam.runners.spark.util;
 
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.Serializable;
-
 import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.sdk.coders.Coder;
+
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.broadcast.Broadcast;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Broadcast helper.
+ */
 public abstract class BroadcastHelper<T> implements Serializable {
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/util/ByteArray.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/ByteArray.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/ByteArray.java
index b1254d4..8c493f5 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/ByteArray.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/ByteArray.java
@@ -17,11 +17,14 @@
  */
 package org.apache.beam.runners.spark.util;
 
+import com.google.common.primitives.UnsignedBytes;
+
 import java.io.Serializable;
 import java.util.Arrays;
 
-import com.google.common.primitives.UnsignedBytes;
-
+/**
+ * Serializable byte array.
+ */
 public class ByteArray implements Serializable, Comparable<ByteArray> {
 
   private final byte[] value;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
index 9a8aa2e..654614a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SinglePrimitiveOutputPTransform.java
@@ -24,6 +24,9 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PInput;
 
+/**
+ * A {@link PTransform} wrapping another transform.
+ */
 public class SinglePrimitiveOutputPTransform<T> extends PTransform<PInput, PCollection<T>> {
   private PTransform<PInput, PCollection<T>> transform;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
index f9b00cc..7b25e34 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java
@@ -35,6 +35,9 @@ import org.junit.Test;
 import java.util.Collections;
 import java.util.List;
 
+/**
+ * Empty input test.
+ */
 public class EmptyInputTest {
 
   @Test
@@ -51,6 +54,9 @@ public class EmptyInputTest {
     res.close();
   }
 
+  /**
+   * Concat words serizaliable function used in test.
+   */
   public static class ConcatWords implements SerializableFunction<Iterable<String>, String> {
     @Override
     public String apply(Iterable<String> input) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
index 4e9c0b8..eee120e 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
@@ -18,10 +18,13 @@
 
 package org.apache.beam.runners.spark;
 
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Count;
@@ -32,16 +35,14 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.commons.io.FileUtils;
-import org.junit.rules.TemporaryFolder;
-import org.junit.Rule;
-import org.junit.Test;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertThat;
+import org.apache.commons.io.FileUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.util.Arrays;
@@ -49,6 +50,9 @@ import java.util.List;
 import java.util.Set;
 import java.util.regex.Pattern;
 
+/**
+ * Simple word count test.
+ */
 public class SimpleWordCountTest {
   private static final String[] WORDS_ARRAY = {
       "hi there", "hi", "hi sue bob",
@@ -133,6 +137,9 @@ public class SimpleWordCountTest {
     }
   }
 
+  /**
+   * A {@link PTransform} counting words.
+   */
   public static class CountWords extends PTransform<PCollection<String>, PCollection<String>> {
     @Override
     public PCollection<String> apply(PCollection<String> lines) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
index 3643bac..88f4a06 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
@@ -18,19 +18,21 @@
 
 package org.apache.beam.runners.spark;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
 import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
 import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 import java.util.ServiceLoader;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
 /**
  * Test {@link SparkRunnerRegistrar}.
  */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index 693e2c6..f358878 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -47,6 +47,9 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.List;
 
+/**
+ * Avro pipeline test.
+ */
 public class AvroPipelineTest {
 
   private File inputFile;
@@ -82,7 +85,8 @@ public class AvroPipelineTest {
     assertEquals(Lists.newArrayList(savedRecord), records);
   }
 
-  private void populateGenericFile(List<GenericRecord> genericRecords, Schema schema) throws IOException {
+  private void populateGenericFile(List<GenericRecord> genericRecords,
+                                   Schema schema) throws IOException {
     FileOutputStream outputStream = new FileOutputStream(this.inputFile);
     GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<>(schema);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
index 85eeabd..8ce35c4 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
@@ -49,6 +49,9 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 
+/**
+ * Number of shards test.
+ */
 public class NumShardsTest {
 
   private static final String[] WORDS_ARRAY = {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
index 0c8c6fc..eaa508c 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
@@ -47,6 +47,9 @@ import org.junit.rules.TemporaryFolder;
 import java.io.File;
 import java.io.IOException;
 
+/**
+ * Pipeline on the Hadoop file format test.
+ */
 public class HadoopFileFormatPipelineTest {
 
   private File inputFile;
@@ -69,16 +72,21 @@ public class HadoopFileFormatPipelineTest {
     Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
     @SuppressWarnings("unchecked")
     Class<? extends FileInputFormat<IntWritable, Text>> inputFormatClass =
-        (Class<? extends FileInputFormat<IntWritable, Text>>) (Class<?>) SequenceFileInputFormat.class;
-    HadoopIO.Read.Bound<IntWritable,Text> read =
-        HadoopIO.Read.from(inputFile.getAbsolutePath(), inputFormatClass, IntWritable.class, Text.class);
+        (Class<? extends FileInputFormat<IntWritable, Text>>)
+            (Class<?>) SequenceFileInputFormat.class;
+    HadoopIO.Read.Bound<IntWritable, Text> read =
+        HadoopIO.Read.from(inputFile.getAbsolutePath(),
+            inputFormatClass,
+            IntWritable.class,
+            Text.class);
     PCollection<KV<IntWritable, Text>> input = p.apply(read)
         .setCoder(KvCoder.of(WritableCoder.of(IntWritable.class), WritableCoder.of(Text.class)));
     @SuppressWarnings("unchecked")
     Class<? extends FileOutputFormat<IntWritable, Text>> outputFormatClass =
-        (Class<? extends FileOutputFormat<IntWritable, Text>>) (Class<?>) TemplatedSequenceFileOutputFormat.class;
+        (Class<? extends FileOutputFormat<IntWritable, Text>>)
+            (Class<?>) TemplatedSequenceFileOutputFormat.class;
     @SuppressWarnings("unchecked")
-    HadoopIO.Write.Bound<IntWritable,Text> write = HadoopIO.Write.to(outputFile.getAbsolutePath(),
+    HadoopIO.Write.Bound<IntWritable, Text> write = HadoopIO.Write.to(outputFile.getAbsolutePath(),
         outputFormatClass, IntWritable.class, Text.class);
     input.apply(write.withoutSharding());
     EvaluationResult res = SparkPipelineRunner.create().run(p);
@@ -86,7 +94,8 @@ public class HadoopFileFormatPipelineTest {
 
     IntWritable key = new IntWritable();
     Text value = new Text();
-    try (Reader reader = new Reader(new Configuration(), Reader.file(new Path(outputFile.toURI())))) {
+    try (Reader reader = new Reader(new Configuration(),
+        Reader.file(new Path(outputFile.toURI())))) {
       int i = 0;
       while (reader.next(key, value)) {
         assertEquals(i, key.get());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java
index 55991a4..e1620db 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilderTest.java
@@ -28,6 +28,9 @@ import static org.junit.Assert.assertEquals;
 
 import org.junit.Test;
 
+/**
+ * Test on the {@link ShardNameBuilder}.
+ */
 public class ShardNameBuilderTest {
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
index a644673..ac64540 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java
@@ -37,6 +37,9 @@ import org.junit.Test;
 import java.util.Arrays;
 import java.util.List;
 
+/**
+ * Combine globally test.
+ */
 public class CombineGloballyTest {
 
   private static final String[] WORDS_ARRAY = {
@@ -53,10 +56,14 @@ public class CombineGloballyTest {
     PCollection<String> output = inputWords.apply(Combine.globally(new WordMerger()));
 
     EvaluationResult res = SparkPipelineRunner.create().run(p);
-    assertEquals("hi there,hi,hi sue bob,hi sue,,bob hi", Iterables.getOnlyElement(res.get(output)));
+    assertEquals("hi there,hi,hi sue bob,hi sue,,bob hi",
+        Iterables.getOnlyElement(res.get(output)));
     res.close();
   }
 
+  /**
+   * Word merger combine function used in the test.
+   */
   public static class WordMerger extends Combine.CombineFn<String, StringBuilder, String> {
 
     @Override
@@ -83,7 +90,7 @@ public class CombineGloballyTest {
 
     @Override
     public String extractOutput(StringBuilder accumulator) {
-      return accumulator != null ? accumulator.toString(): "";
+      return accumulator != null ? accumulator.toString() : "";
     }
 
     private static StringBuilder combine(StringBuilder accum, String datum) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
index 4e0bc5d..4e6c888 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java
@@ -42,6 +42,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+/**
+ * Combine per key function test.
+ */
 public class CombinePerKeyTest {
 
     private static final List<String> WORDS =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
index ca97a96..0334bfe 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java
@@ -33,6 +33,9 @@ import org.junit.Test;
 
 import java.io.Serializable;
 
+/**
+ * DoFN output test.
+ */
 public class DoFnOutputTest implements Serializable {
   @Test
   public void test() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
index 6a862c9..3402bb4 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
@@ -52,6 +52,9 @@ import org.junit.Test;
 
 import java.util.Set;
 
+/**
+ * Multi-output word count test.
+ */
 public class MultiOutputWordCountTest {
 
   private static final TupleTag<String> upper = new TupleTag<>();
@@ -128,6 +131,9 @@ public class MultiOutputWordCountTest {
     }
   }
 
+  /**
+   * Count words {@link PTransform} used in the test.
+   */
   public static class CountWords extends PTransform<PCollection<String>, PCollectionTuple> {
 
     private final PCollectionView<String> regex;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
index 75d3fb2..22a2241 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java
@@ -51,8 +51,14 @@ import java.util.List;
 import java.util.Set;
 import java.util.regex.Pattern;
 
+/**
+ * Serialization test.
+ */
 public class SerializationTest {
 
+  /**
+   * Simple String holder.
+   */
   public static class StringHolder { // not serializable
     private final String string;
 
@@ -80,12 +86,17 @@ public class SerializationTest {
     }
   }
 
+  /**
+   * Simple String holder with UTF-8 encoding.
+   */
   public static class StringHolderUtf8Coder extends AtomicCoder<StringHolder> {
 
     private final StringUtf8Coder stringUtf8Coder = StringUtf8Coder.of();
 
     @Override
-    public void encode(StringHolder value, OutputStream outStream, Context context) throws IOException {
+    public void encode(StringHolder value,
+                       OutputStream outStream,
+                       Context context) throws IOException {
       stringUtf8Coder.encode(value.toString(), outStream, context);
     }
 
@@ -171,7 +182,8 @@ public class SerializationTest {
     }
   }
 
-  private static class CountWords extends PTransform<PCollection<StringHolder>, PCollection<StringHolder>> {
+  private static class CountWords
+      extends PTransform<PCollection<StringHolder>, PCollection<StringHolder>> {
     @Override
     public PCollection<StringHolder> apply(PCollection<StringHolder> lines) {
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
index 14abbfc..5674900 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
@@ -38,6 +38,9 @@ import org.junit.Test;
 import java.io.Serializable;
 import java.net.URI;
 
+/**
+ * Side effects test.
+ */
 public class SideEffectsTest implements Serializable {
 
   static class UserException extends RuntimeException {
@@ -66,7 +69,8 @@ public class SideEffectsTest implements Serializable {
 
       // TODO: remove the version check (and the setup and teardown methods) when we no
       // longer support Spark 1.3 or 1.4
-      String version = SparkContextFactory.getSparkContext(options.getSparkMaster(), options.getAppName()).version();
+      String version = SparkContextFactory.getSparkContext(options.getSparkMaster(),
+          options.getAppName()).version();
       if (!version.startsWith("1.3.") && !version.startsWith("1.4.")) {
         assertTrue(e.getCause() instanceof UserException);
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsTest.java
index bf18486..59888c2 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkPipelineOptionsTest.java
@@ -24,6 +24,9 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.junit.Assert;
 import org.junit.Test;
 
+/**
+ * Simple test on the Spark runner pipeline options.
+ */
 public class SparkPipelineOptionsTest {
   @Test
   public void testDefaultCreateMethod() {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
index 0db8913..8062658 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
@@ -39,6 +39,9 @@ import org.junit.Test;
 import java.util.Arrays;
 import java.util.List;
 
+/**
+ * Windowed word count test.
+ */
 public class WindowedWordCountTest {
   private static final String[] WORDS_ARRAY = {
       "hi there", "hi", "hi sue bob", "hi sue", "", "bob hi"};

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
index 9152d72..15b2f39 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
@@ -63,7 +63,7 @@ public class FlattenStreamingTest {
         PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class);
     options.setAppName(this.getClass().getSimpleName());
     options.setRunner(SparkPipelineRunner.class);
-    options.setTimeout(TEST_TIMEOUT_MSEC);// run for one interval
+    options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval
     Pipeline p = Pipeline.create(options);
 
     PCollection<String> w1 =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
index e1ff227..fd75e74 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
@@ -99,7 +99,7 @@ public class KafkaStreamingTest {
         PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class);
     options.setAppName(this.getClass().getSimpleName());
     options.setRunner(SparkPipelineRunner.class);
-    options.setTimeout(TEST_TIMEOUT_MSEC);// run for one interval
+    options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval
     Pipeline p = Pipeline.create(options);
 
     Map<String, String> kafkaParams = ImmutableMap.of(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
index ef224da..28133ca 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
@@ -42,6 +42,9 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
+/**
+ * Simple word count streaming test.
+ */
 public class SimpleStreamingWordCountTest {
 
   private static final String[] WORDS_ARRAY = {
@@ -58,7 +61,7 @@ public class SimpleStreamingWordCountTest {
         PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class);
     options.setAppName(this.getClass().getSimpleName());
     options.setRunner(SparkPipelineRunner.class);
-    options.setTimeout(TEST_TIMEOUT_MSEC);// run for one interval
+    options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval
     Pipeline p = Pipeline.create(options);
 
     PCollection<String> inputWords =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java
index 2ade467..0fec573 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java
@@ -39,6 +39,7 @@ import kafka.server.KafkaServer;
 import kafka.utils.Time;
 
 /**
+ * Embedded Kafka cluster.
  * https://gist.github.com/fjavieralba/7930018
  */
 public class EmbeddedKafkaCluster {
@@ -169,6 +170,9 @@ public class EmbeddedKafkaCluster {
     return "EmbeddedKafkaCluster{" + "brokerList='" + brokerList + "'}";
   }
 
+  /**
+   * Embedded Zookeeper.
+   */
   public static class EmbeddedZookeeper {
     private int port = -1;
     private int tickTime = 500;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
index 041cc50..3d8fc32 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
@@ -26,8 +26,9 @@ import org.junit.Assert;
  * success/failure counters.
  */
 public final class PAssertStreaming {
+
   /**
-   * Copied aggregator names from {@link org.apache.beam.sdk.testing.PAssert}
+   * Copied aggregator names from {@link org.apache.beam.sdk.testing.PAssert}.
    */
   static final String SUCCESS_COUNTER = "PAssertSuccess";
   static final String FAILURE_COUNTER = "PAssertFailure";



[2/2] incubator-beam git commit: [BEAM-267] Enable checkstyle in Spark runner

Posted by jb...@apache.org.
[BEAM-267] Enable checkstyle in Spark runner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0f3b0533
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0f3b0533
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0f3b0533

Branch: refs/heads/master
Commit: 0f3b053356f3321d08d4e2ee457a037df778bee4
Parents: ff825b0
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Fri May 6 17:47:46 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Sun May 8 07:59:26 2016 +0200

----------------------------------------------------------------------
 runners/spark/pom.xml                           |   2 -
 .../runners/spark/SparkPipelineOptions.java     |   3 +
 .../beam/runners/spark/SparkPipelineRunner.java |  13 +-
 .../runners/spark/SparkRunnerRegistrar.java     |   5 +-
 .../spark/aggregators/AggAccumParam.java        |   4 +
 .../spark/aggregators/NamedAggregators.java     |  64 +++--
 .../beam/runners/spark/coders/CoderHelpers.java |  13 +-
 .../runners/spark/coders/NullWritableCoder.java |  10 +-
 .../runners/spark/coders/WritableCoder.java     |  19 +-
 .../apache/beam/runners/spark/io/ConsoleIO.java |   7 +
 .../beam/runners/spark/io/CreateStream.java     |   6 +-
 .../apache/beam/runners/spark/io/KafkaIO.java   |  18 +-
 .../beam/runners/spark/io/hadoop/HadoopIO.java  |  27 +-
 .../spark/io/hadoop/ShardNameBuilder.java       |   7 +-
 .../io/hadoop/ShardNameTemplateHelper.java      |   7 +-
 .../io/hadoop/TemplatedAvroKeyOutputFormat.java |   9 +-
 .../TemplatedSequenceFileOutputFormat.java      |   7 +-
 .../io/hadoop/TemplatedTextOutputFormat.java    |   7 +-
 .../runners/spark/translation/DoFnFunction.java |  38 +--
 .../spark/translation/EvaluationContext.java    |  26 +-
 .../spark/translation/MultiDoFnFunction.java    |  42 +--
 .../spark/translation/SparkContextFactory.java  |   3 +
 .../translation/SparkPipelineEvaluator.java     |  13 +-
 .../translation/SparkPipelineTranslator.java    |   3 +-
 .../spark/translation/SparkProcessContext.java  |  72 ++---
 .../spark/translation/SparkRuntimeContext.java  |  51 ++--
 .../spark/translation/TransformEvaluator.java   |  11 +-
 .../spark/translation/TransformTranslator.java  | 271 +++++++++++--------
 .../streaming/StreamingEvaluationContext.java   |  19 +-
 .../streaming/StreamingTransformTranslator.java |  85 +++---
 .../StreamingWindowPipelineDetector.java        |   6 +-
 .../runners/spark/util/BroadcastHelper.java     |  12 +-
 .../beam/runners/spark/util/ByteArray.java      |   7 +-
 .../util/SinglePrimitiveOutputPTransform.java   |   3 +
 .../beam/runners/spark/EmptyInputTest.java      |   6 +
 .../beam/runners/spark/SimpleWordCountTest.java |  21 +-
 .../runners/spark/SparkRunnerRegistrarTest.java |  12 +-
 .../beam/runners/spark/io/AvroPipelineTest.java |   6 +-
 .../beam/runners/spark/io/NumShardsTest.java    |   3 +
 .../io/hadoop/HadoopFileFormatPipelineTest.java |  21 +-
 .../spark/io/hadoop/ShardNameBuilderTest.java   |   3 +
 .../spark/translation/CombineGloballyTest.java  |  11 +-
 .../spark/translation/CombinePerKeyTest.java    |   3 +
 .../spark/translation/DoFnOutputTest.java       |   3 +
 .../translation/MultiOutputWordCountTest.java   |   6 +
 .../spark/translation/SerializationTest.java    |  16 +-
 .../spark/translation/SideEffectsTest.java      |   6 +-
 .../translation/SparkPipelineOptionsTest.java   |   3 +
 .../translation/WindowedWordCountTest.java      |   3 +
 .../streaming/FlattenStreamingTest.java         |   2 +-
 .../streaming/KafkaStreamingTest.java           |   2 +-
 .../streaming/SimpleStreamingWordCountTest.java |   5 +-
 .../streaming/utils/EmbeddedKafkaCluster.java   |   4 +
 .../streaming/utils/PAssertStreaming.java       |   3 +-
 54 files changed, 634 insertions(+), 395 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index e673246..5daf1e1 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -211,12 +211,10 @@
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-jar-plugin</artifactId>
       </plugin>
-      <!-- Checkstyle errors for now
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-checkstyle-plugin</artifactId>
       </plugin>
-      -->
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-source-plugin</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index bdf832b..091382e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -24,6 +24,9 @@ import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.StreamingOptions;
 
+/**
+ * Spark runner pipeline options.
+ */
 public interface SparkPipelineOptions extends PipelineOptions, StreamingOptions,
                                               ApplicationNameOptions {
   @Description("The url of the spark master to connect to, (e.g. spark://host:port, local[4]).")

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
index 8635cfb..bae4e53 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
@@ -120,14 +120,14 @@ public final class SparkPipelineRunner extends PipelineRunner<EvaluationResult>
    */
   @SuppressWarnings("rawtypes")
   @Override
-  public <OT extends POutput, IT extends PInput> OT apply(
-      PTransform<IT, OT> transform, IT input) {
+  public <OutputT extends POutput, InputT extends PInput> OutputT apply(
+      PTransform<InputT, OutputT> transform, InputT input) {
 
     if (transform instanceof GroupByKey) {
-      return (OT) ((PCollection) input).apply(
+      return (OutputT) ((PCollection) input).apply(
           new GroupByKeyViaGroupByKeyOnly((GroupByKey) transform));
     } else if (transform instanceof Create.Values) {
-      return (OT) super.apply(
+      return (OutputT) super.apply(
         new SinglePrimitiveOutputPTransform((Create.Values) transform), input);
     } else {
       return super.apply(transform, input);
@@ -216,6 +216,9 @@ public final class SparkPipelineRunner extends PipelineRunner<EvaluationResult>
     return new StreamingEvaluationContext(jsc, pipeline, jssc, streamingOptions.getTimeout());
   }
 
+  /**
+   * Evaluator on the pipeline.
+   */
   public abstract static class Evaluator implements Pipeline.PipelineVisitor {
     protected static final Logger LOG = LoggerFactory.getLogger(Evaluator.class);
 
@@ -275,7 +278,7 @@ public final class SparkPipelineRunner extends PipelineRunner<EvaluationResult>
       doVisitTransform(node);
     }
 
-    protected abstract <PT extends PTransform<? super PInput, POutput>> void
+    protected abstract <TransformT extends PTransform<? super PInput, POutput>> void
         doVisitTransform(TransformTreeNode node);
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
index 30142f9..9537ec6 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
@@ -18,13 +18,14 @@
 
 package org.apache.beam.runners.spark;
 
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableList;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
 
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+
 /**
  * Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the
  * {@link SparkPipelineRunner}.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggAccumParam.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggAccumParam.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggAccumParam.java
index a75aeb3..9ce8b33 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggAccumParam.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggAccumParam.java
@@ -20,7 +20,11 @@ package org.apache.beam.runners.spark.aggregators;
 
 import org.apache.spark.AccumulatorParam;
 
+/**
+ * Aggregator accumulator param.
+ */
 public class AggAccumParam implements AccumulatorParam<NamedAggregators> {
+
   @Override
   public NamedAggregators addAccumulator(NamedAggregators current, NamedAggregators added) {
     return current.merge(added);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
index 64c473e..6ab6dc9 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
@@ -18,6 +18,13 @@
 
 package org.apache.beam.runners.spark.aggregators;
 
+import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.Combine;
+
+import com.google.common.collect.ImmutableList;
+
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
@@ -25,12 +32,6 @@ import java.io.Serializable;
 import java.util.Map;
 import java.util.TreeMap;
 
-import com.google.common.collect.ImmutableList;
-import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.Combine;
-
 /**
  * This class wraps a map of named aggregators. Spark expects that all accumulators be declared
  * before a job is launched. Dataflow allows aggregators to be used and incremented on the fly.
@@ -96,8 +97,10 @@ public class NamedAggregators implements Serializable {
    * so require some casting.
    */
   @SuppressWarnings("unchecked")
-  private static <A, B, C> State<A, B, C> merge(State<?, ?, ?> s1, State<?, ?, ?> s2) {
-    return ((State<A, B, C>) s1).merge((State<A, B, C>) s2);
+  private static <InputT, InterT, OutputT> State<InputT, InterT, OutputT> merge(
+      State<?, ?, ?> s1,
+      State<?, ?, ?> s2) {
+    return ((State<InputT, InterT, OutputT>) s1).merge((State<InputT, InterT, OutputT>) s2);
   }
 
   @Override
@@ -110,38 +113,39 @@ public class NamedAggregators implements Serializable {
   }
 
   /**
-   * @param <IN>    Input data type
-   * @param <INTER> Intermediate data type (useful for averages)
-   * @param <OUT>   Output data type
+   * @param <InputT>    Input data type
+   * @param <InterT> Intermediate data type (useful for averages)
+   * @param <OutputT>   Output data type
    */
-  public interface State<IN, INTER, OUT> extends Serializable {
+  public interface State<InputT, InterT, OutputT> extends Serializable {
     /**
      * @param element new element to update state
      */
-    void update(IN element);
+    void update(InputT element);
 
-    State<IN, INTER, OUT> merge(State<IN, INTER, OUT> other);
+    State<InputT, InterT, OutputT> merge(State<InputT, InterT, OutputT> other);
 
-    INTER current();
+    InterT current();
 
-    OUT render();
+    OutputT render();
 
-    Combine.CombineFn<IN, INTER, OUT> getCombineFn();
+    Combine.CombineFn<InputT, InterT, OutputT> getCombineFn();
   }
 
   /**
    * =&gt; combineFunction in data flow.
    */
-  public static class CombineFunctionState<IN, INTER, OUT> implements State<IN, INTER, OUT> {
+  public static class CombineFunctionState<InputT, InterT, OutpuT>
+      implements State<InputT, InterT, OutpuT> {
 
-    private Combine.CombineFn<IN, INTER, OUT> combineFn;
-    private Coder<IN> inCoder;
+    private Combine.CombineFn<InputT, InterT, OutpuT> combineFn;
+    private Coder<InputT> inCoder;
     private SparkRuntimeContext ctxt;
-    private transient INTER state;
+    private transient InterT state;
 
     public CombineFunctionState(
-        Combine.CombineFn<IN, INTER, OUT> combineFn,
-        Coder<IN> inCoder,
+        Combine.CombineFn<InputT, InterT, OutpuT> combineFn,
+        Coder<InputT> inCoder,
         SparkRuntimeContext ctxt) {
       this.combineFn = combineFn;
       this.inCoder = inCoder;
@@ -150,28 +154,28 @@ public class NamedAggregators implements Serializable {
     }
 
     @Override
-    public void update(IN element) {
+    public void update(InputT element) {
       combineFn.addInput(state, element);
     }
 
     @Override
-    public State<IN, INTER, OUT> merge(State<IN, INTER, OUT> other) {
+    public State<InputT, InterT, OutpuT> merge(State<InputT, InterT, OutpuT> other) {
       this.state = combineFn.mergeAccumulators(ImmutableList.of(current(), other.current()));
       return this;
     }
 
     @Override
-    public INTER current() {
+    public InterT current() {
       return state;
     }
 
     @Override
-    public OUT render() {
+    public OutpuT render() {
       return combineFn.extractOutput(state);
     }
 
     @Override
-    public Combine.CombineFn<IN, INTER, OUT> getCombineFn() {
+    public Combine.CombineFn<InputT, InterT, OutpuT> getCombineFn() {
       return combineFn;
     }
 
@@ -190,8 +194,8 @@ public class NamedAggregators implements Serializable {
     @SuppressWarnings("unchecked")
     private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
       ctxt = (SparkRuntimeContext) ois.readObject();
-      combineFn = (Combine.CombineFn<IN, INTER, OUT>) ois.readObject();
-      inCoder = (Coder<IN>) ois.readObject();
+      combineFn = (Combine.CombineFn<InputT, InterT, OutpuT>) ois.readObject();
+      inCoder = (Coder<InputT>) ois.readObject();
       try {
         state = combineFn.getAccumulatorCoder(ctxt.getCoderRegistry(), inCoder)
             .decode(ois, Coder.Context.NESTED);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/CoderHelpers.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/CoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/CoderHelpers.java
index 7dc6af6..07587fc 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/CoderHelpers.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/CoderHelpers.java
@@ -18,17 +18,20 @@
 
 package org.apache.beam.runners.spark.coders;
 
+import org.apache.beam.runners.spark.util.ByteArray;
+import org.apache.beam.sdk.coders.Coder;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFunction;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
 
-import com.google.common.collect.Iterables;
-import org.apache.beam.runners.spark.util.ByteArray;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.PairFunction;
 import scala.Tuple2;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/NullWritableCoder.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/NullWritableCoder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/NullWritableCoder.java
index 529d67b..7cff325 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/NullWritableCoder.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/NullWritableCoder.java
@@ -18,13 +18,17 @@
 
 package org.apache.beam.runners.spark.coders;
 
-import java.io.InputStream;
-import java.io.OutputStream;
+import org.apache.beam.sdk.coders.Coder;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.hadoop.io.NullWritable;
 
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * Simple writable coder for Null.
+ */
 public final class NullWritableCoder extends WritableCoder<NullWritable> {
   private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java
index f2836fe..4719e46 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java
@@ -18,6 +18,16 @@
 
 package org.apache.beam.runners.spark.coders;
 
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.util.CloudObject;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -26,15 +36,6 @@ import java.io.OutputStream;
 import java.lang.reflect.InvocationTargetException;
 import java.util.List;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StandardCoder;
-import org.apache.beam.sdk.util.CloudObject;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-
 /**
  * A {@code WritableCoder} is a {@link Coder} for a Java class that implements {@link Writable}.
  *

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java
index 41dc367..eefea77 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java
@@ -29,6 +29,9 @@ public final class ConsoleIO {
   private ConsoleIO() {
   }
 
+  /**
+   * Write on the console.
+   */
   public static final class Write {
 
     private Write() {
@@ -42,6 +45,10 @@ public final class ConsoleIO {
       return new Unbound<>(num);
     }
 
+    /**
+     * {@link PTransform} writing {@link PCollection} on the console.
+     * @param <T>
+     */
     public static class Unbound<T> extends PTransform<PCollection<T>, PDone> {
 
       private final int num;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
index 917f8a0..e7a9971 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
@@ -17,12 +17,13 @@
  */
 package org.apache.beam.runners.spark.io;
 
-import com.google.common.base.Preconditions;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PInput;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Create an input stream from Queue.
  *
@@ -44,6 +45,9 @@ public final class CreateStream<T> {
     return new QueuedValues<>(queuedValues);
   }
 
+  /**
+   * {@link PTransform} for queueing values.
+   */
   public static final class QueuedValues<T> extends PTransform<PInput, PCollection<T>> {
 
     private final Iterable<Iterable<T>> queuedValues;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/io/KafkaIO.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/KafkaIO.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/KafkaIO.java
index 1592bec..a97d86e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/KafkaIO.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/KafkaIO.java
@@ -17,17 +17,19 @@
  */
 package org.apache.beam.runners.spark.io;
 
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.base.Preconditions;
-import kafka.serializer.Decoder;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PInput;
 
+import com.google.common.base.Preconditions;
+
+import java.util.Map;
+import java.util.Set;
+
+import kafka.serializer.Decoder;
+
 /**
  * Read stream from Kafka.
  */
@@ -36,6 +38,9 @@ public final class KafkaIO {
   private KafkaIO() {
   }
 
+  /**
+   * Read operation from Kafka topics.
+   */
   public static final class Read {
 
     private Read() {
@@ -62,6 +67,9 @@ public final class KafkaIO {
       return new Unbound<>(keyDecoder, valueDecoder, key, value, topics, kafkaParams);
     }
 
+    /**
+     * A {@link PTransform} reading from Kafka topics and providing {@link PCollection}.
+     */
     public static class Unbound<K, V> extends PTransform<PInput, PCollection<KV<K, V>>> {
 
       private final Class<? extends Decoder<K>> keyDecoderClass;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
index 5b50d3e..00c10d4 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
@@ -17,10 +17,6 @@
  */
 package org.apache.beam.runners.spark.io.hadoop;
 
-import java.util.HashMap;
-import java.util.Map;
-
-import com.google.common.base.Preconditions;
 import org.apache.beam.sdk.io.ShardNameTemplate;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -28,14 +24,26 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.PInput;
+
+import com.google.common.base.Preconditions;
+
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Spark native HadoopIO.
+ */
 public final class HadoopIO {
 
   private HadoopIO() {
   }
 
+  /**
+   * Read operation from HDFS.
+   */
   public static final class Read {
 
     private Read() {
@@ -46,6 +54,11 @@ public final class HadoopIO {
       return new Bound<>(filepattern, format, key, value);
     }
 
+    /**
+     * A {@link PTransform} reading bounded collection of data from HDFS.
+     * @param <K>
+     * @param <V>
+     */
     public static class Bound<K, V> extends PTransform<PInput, PCollection<KV<K, V>>> {
 
       private final String filepattern;
@@ -95,6 +108,9 @@ public final class HadoopIO {
 
   }
 
+  /**
+   * Write operation on HDFS.
+   */
   public static final class Write {
 
     private Write() {
@@ -105,6 +121,9 @@ public final class HadoopIO {
       return new Bound<>(filenamePrefix, format, key, value);
     }
 
+    /**
+     * A {@link PTransform} writing {@link PCollection} on HDFS.
+     */
     public static class Bound<K, V> extends PTransform<PCollection<KV<K, V>>, PDone> {
 
       /** The filename to write to. */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilder.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilder.java
index c768340..6b36427 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilder.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameBuilder.java
@@ -18,11 +18,14 @@
 
 package org.apache.beam.runners.spark.io.hadoop;
 
+import org.apache.hadoop.fs.Path;
+
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import org.apache.hadoop.fs.Path;
-
+/**
+ * Shard name builder.
+ */
 public final class ShardNameBuilder {
 
   private ShardNameBuilder() {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateHelper.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateHelper.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateHelper.java
index 2267ccb..d06b016 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateHelper.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/ShardNameTemplateHelper.java
@@ -20,8 +20,6 @@ package org.apache.beam.runners.spark.io.hadoop;
 
 import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardNumber;
 
-import java.io.IOException;
-
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskID;
@@ -30,6 +28,11 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+
+/**
+ * Shard name template helper.
+ */
 public final class ShardNameTemplateHelper {
 
   private static final Logger LOG = LoggerFactory.getLogger(ShardNameTemplateHelper.class);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedAvroKeyOutputFormat.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedAvroKeyOutputFormat.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedAvroKeyOutputFormat.java
index b755928..f747e7b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedAvroKeyOutputFormat.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedAvroKeyOutputFormat.java
@@ -18,14 +18,17 @@
 
 package org.apache.beam.runners.spark.io.hadoop;
 
-import java.io.IOException;
-import java.io.OutputStream;
-
 import org.apache.avro.mapreduce.AvroKeyOutputFormat;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Templated Avro key output format.
+ */
 public class TemplatedAvroKeyOutputFormat<T> extends AvroKeyOutputFormat<T>
     implements ShardNameTemplateAware {
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedSequenceFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedSequenceFileOutputFormat.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedSequenceFileOutputFormat.java
index 35b6163..bd2ee4d 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedSequenceFileOutputFormat.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedSequenceFileOutputFormat.java
@@ -18,13 +18,16 @@
 
 package org.apache.beam.runners.spark.io.hadoop;
 
-import java.io.IOException;
-
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 
+import java.io.IOException;
+
+/**
+ * Templated sequence file output format.
+ */
 public class TemplatedSequenceFileOutputFormat<K, V> extends SequenceFileOutputFormat<K, V>
     implements ShardNameTemplateAware {
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedTextOutputFormat.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedTextOutputFormat.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedTextOutputFormat.java
index 8f0c0d2..8725a95 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedTextOutputFormat.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/TemplatedTextOutputFormat.java
@@ -18,13 +18,16 @@
 
 package org.apache.beam.runners.spark.io.hadoop;
 
-import java.io.IOException;
-
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 
+import java.io.IOException;
+
+/**
+ * Templates text output format.
+ */
 public class TemplatedTextOutputFormat<K, V> extends TextOutputFormat<K, V>
     implements ShardNameTemplateAware {
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
index fbc9e98..b5888bd 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
@@ -18,26 +18,28 @@
 
 package org.apache.beam.runners.spark.translation;
 
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.beam.runners.spark.util.BroadcastHelper;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TupleTag;
+
 import org.apache.spark.api.java.function.FlatMapFunction;
 
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
 /**
  * Dataflow's Do functions correspond to Spark's FlatMap functions.
  *
- * @param <I> Input element type.
- * @param <O> Output element type.
+ * @param <InputT> Input element type.
+ * @param <OutputT> Output element type.
  */
-public class DoFnFunction<I, O> implements FlatMapFunction<Iterator<WindowedValue<I>>,
-    WindowedValue<O>> {
-  private final DoFn<I, O> mFunction;
+public class DoFnFunction<InputT, OutputT>
+    implements FlatMapFunction<Iterator<WindowedValue<InputT>>,
+    WindowedValue<OutputT>> {
+  private final DoFn<InputT, OutputT> mFunction;
   private final SparkRuntimeContext mRuntimeContext;
   private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs;
 
@@ -46,7 +48,7 @@ public class DoFnFunction<I, O> implements FlatMapFunction<Iterator<WindowedValu
    * @param runtime    Runtime to apply function in.
    * @param sideInputs Side inputs used in DoFunction.
    */
-  public DoFnFunction(DoFn<I, O> fn,
+  public DoFnFunction(DoFn<InputT, OutputT> fn,
                SparkRuntimeContext runtime,
                Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) {
     this.mFunction = fn;
@@ -55,7 +57,7 @@ public class DoFnFunction<I, O> implements FlatMapFunction<Iterator<WindowedValu
   }
 
   @Override
-  public Iterable<WindowedValue<O>> call(Iterator<WindowedValue<I>> iter) throws
+  public Iterable<WindowedValue<OutputT>> call(Iterator<WindowedValue<InputT>> iter) throws
       Exception {
     ProcCtxt ctxt = new ProcCtxt(mFunction, mRuntimeContext, mSideInputs);
     ctxt.setup();
@@ -63,23 +65,23 @@ public class DoFnFunction<I, O> implements FlatMapFunction<Iterator<WindowedValu
     return ctxt.getOutputIterable(iter, mFunction);
   }
 
-  private class ProcCtxt extends SparkProcessContext<I, O, WindowedValue<O>> {
+  private class ProcCtxt extends SparkProcessContext<InputT, OutputT, WindowedValue<OutputT>> {
 
-    private final List<WindowedValue<O>> outputs = new LinkedList<>();
+    private final List<WindowedValue<OutputT>> outputs = new LinkedList<>();
 
-    ProcCtxt(DoFn<I, O> fn, SparkRuntimeContext runtimeContext, Map<TupleTag<?>,
+    ProcCtxt(DoFn<InputT, OutputT> fn, SparkRuntimeContext runtimeContext, Map<TupleTag<?>,
         BroadcastHelper<?>> sideInputs) {
       super(fn, runtimeContext, sideInputs);
     }
 
     @Override
-    public synchronized void output(O o) {
+    public synchronized void output(OutputT o) {
       outputs.add(windowedValue != null ? windowedValue.withValue(o) :
           WindowedValue.valueInGlobalWindow(o));
     }
 
     @Override
-    public synchronized void output(WindowedValue<O> o) {
+    public synchronized void output(WindowedValue<OutputT> o) {
       outputs.add(o);
     }
 
@@ -89,7 +91,7 @@ public class DoFnFunction<I, O> implements FlatMapFunction<Iterator<WindowedValu
     }
 
     @Override
-    protected Iterator<WindowedValue<O>> getOutputIterator() {
+    protected Iterator<WindowedValue<OutputT>> getOutputIterator() {
       return outputs.iterator();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index 6d49bd3..d737f5e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -20,14 +20,6 @@ package org.apache.beam.runners.spark.translation;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
 import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.sdk.Pipeline;
@@ -46,9 +38,19 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+
 import org.apache.spark.api.java.JavaRDDLike;
 import org.apache.spark.api.java.JavaSparkContext;
 
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 
 /**
  * Evaluation context allows us to define how pipeline instructions.
@@ -151,19 +153,19 @@ public class EvaluationContext implements EvaluationResult {
     return currentTransform;
   }
 
-  protected <I extends PInput> I getInput(PTransform<I, ?> transform) {
+  protected <T extends PInput> T getInput(PTransform<T, ?> transform) {
     checkArgument(currentTransform != null && currentTransform.getTransform() == transform,
         "can only be called with current transform");
     @SuppressWarnings("unchecked")
-    I input = (I) currentTransform.getInput();
+    T input = (T) currentTransform.getInput();
     return input;
   }
 
-  protected <O extends POutput> O getOutput(PTransform<?, O> transform) {
+  protected <T extends POutput> T getOutput(PTransform<?, T> transform) {
     checkArgument(currentTransform != null && currentTransform.getTransform() == transform,
         "can only be called with current transform");
     @SuppressWarnings("unchecked")
-    O output = (O) currentTransform.getOutput();
+    T output = (T) currentTransform.getOutput();
     return output;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
index 2641e31..daa767d 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
@@ -18,39 +18,42 @@
 
 package org.apache.beam.runners.spark.translation;
 
-import java.util.Iterator;
-import java.util.Map;
+import org.apache.beam.runners.spark.util.BroadcastHelper;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.Multimap;
-import org.apache.beam.runners.spark.util.BroadcastHelper;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
+
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.joda.time.Instant;
+
+import java.util.Iterator;
+import java.util.Map;
+
 import scala.Tuple2;
 
 /**
  * DoFunctions ignore side outputs. MultiDoFunctions deal with side outputs by enriching the
  * underlying data with multiple TupleTags.
  *
- * @param <I> Input type for DoFunction.
- * @param <O> Output type for DoFunction.
+ * @param <InputT> Input type for DoFunction.
+ * @param <OutputT> Output type for DoFunction.
  */
-class MultiDoFnFunction<I, O>
-    implements PairFlatMapFunction<Iterator<WindowedValue<I>>, TupleTag<?>, WindowedValue<?>> {
-  private final DoFn<I, O> mFunction;
+class MultiDoFnFunction<InputT, OutputT>
+    implements PairFlatMapFunction<Iterator<WindowedValue<InputT>>, TupleTag<?>, WindowedValue<?>> {
+  private final DoFn<InputT, OutputT> mFunction;
   private final SparkRuntimeContext mRuntimeContext;
-  private final TupleTag<O> mMainOutputTag;
+  private final TupleTag<OutputT> mMainOutputTag;
   private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs;
 
   MultiDoFnFunction(
-      DoFn<I, O> fn,
+      DoFn<InputT, OutputT> fn,
       SparkRuntimeContext runtimeContext,
-      TupleTag<O> mainOutputTag,
+      TupleTag<OutputT> mainOutputTag,
       Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) {
     this.mFunction = fn;
     this.mRuntimeContext = runtimeContext;
@@ -60,29 +63,30 @@ class MultiDoFnFunction<I, O>
 
   @Override
   public Iterable<Tuple2<TupleTag<?>, WindowedValue<?>>>
-      call(Iterator<WindowedValue<I>> iter) throws Exception {
+      call(Iterator<WindowedValue<InputT>> iter) throws Exception {
     ProcCtxt ctxt = new ProcCtxt(mFunction, mRuntimeContext, mSideInputs);
     mFunction.startBundle(ctxt);
     ctxt.setup();
     return ctxt.getOutputIterable(iter, mFunction);
   }
 
-  private class ProcCtxt extends SparkProcessContext<I, O, Tuple2<TupleTag<?>, WindowedValue<?>>> {
+  private class ProcCtxt
+      extends SparkProcessContext<InputT, OutputT, Tuple2<TupleTag<?>, WindowedValue<?>>> {
 
     private final Multimap<TupleTag<?>, WindowedValue<?>> outputs = LinkedListMultimap.create();
 
-    ProcCtxt(DoFn<I, O> fn, SparkRuntimeContext runtimeContext, Map<TupleTag<?>,
+    ProcCtxt(DoFn<InputT, OutputT> fn, SparkRuntimeContext runtimeContext, Map<TupleTag<?>,
         BroadcastHelper<?>> sideInputs) {
       super(fn, runtimeContext, sideInputs);
     }
 
     @Override
-    public synchronized void output(O o) {
+    public synchronized void output(OutputT o) {
       outputs.put(mMainOutputTag, windowedValue.withValue(o));
     }
 
     @Override
-    public synchronized void output(WindowedValue<O> o) {
+    public synchronized void output(WindowedValue<OutputT> o) {
       outputs.put(mMainOutputTag, o);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
index 2bc8a7b..225afb8 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
@@ -22,6 +22,9 @@ import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.serializer.KryoSerializer;
 
+/**
+ * The Spark context factory.
+ */
 public final class SparkContextFactory {
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineEvaluator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineEvaluator.java
index 0f47af6..609c413 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineEvaluator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineEvaluator.java
@@ -38,16 +38,17 @@ public final class SparkPipelineEvaluator extends SparkPipelineRunner.Evaluator
   }
 
   @Override
-  protected <PT extends PTransform<? super PInput, POutput>> void doVisitTransform(TransformTreeNode
+  protected <TransformT extends PTransform<? super PInput, POutput>>
+  void doVisitTransform(TransformTreeNode
       node) {
     @SuppressWarnings("unchecked")
-    PT transform = (PT) node.getTransform();
+    TransformT transform = (TransformT) node.getTransform();
     @SuppressWarnings("unchecked")
-    Class<PT> transformClass = (Class<PT>) (Class<?>) transform.getClass();
-    @SuppressWarnings("unchecked") TransformEvaluator<PT> evaluator =
-        (TransformEvaluator<PT>) translator.translate(transformClass);
+    Class<TransformT> transformClass = (Class<TransformT>) (Class<?>) transform.getClass();
+    @SuppressWarnings("unchecked") TransformEvaluator<TransformT> evaluator =
+        (TransformEvaluator<TransformT>) translator.translate(transformClass);
     LOG.info("Evaluating {}", transform);
-    AppliedPTransform<PInput, POutput, PT> appliedTransform =
+    AppliedPTransform<PInput, POutput, TransformT> appliedTransform =
         AppliedPTransform.of(node.getFullName(), node.getInput(), node.getOutput(), transform);
     ctxt.setCurrentTransform(appliedTransform);
     evaluator.evaluate(transform, ctxt);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineTranslator.java
index 77849a9..997940b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineTranslator.java
@@ -26,5 +26,6 @@ public interface SparkPipelineTranslator {
 
   boolean hasTranslation(Class<? extends PTransform<?, ?>> clazz);
 
-  <PT extends PTransform<?, ?>> TransformEvaluator<PT> translate(Class<PT> clazz);
+  <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT>
+  translate(Class<TransformT> clazz);
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
index 5d4ece6..4f90a12 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
@@ -18,13 +18,6 @@
 
 package org.apache.beam.runners.spark.translation;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-
-import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.Iterables;
 import org.apache.beam.runners.spark.util.BroadcastHelper;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -40,21 +33,34 @@ import org.apache.beam.sdk.util.state.InMemoryStateInternals;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterables;
+
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessContext {
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Spark runner process context.
+ */
+public abstract class SparkProcessContext<InputT, OutputT, ValueT>
+    extends DoFn<InputT, OutputT>.ProcessContext {
 
   private static final Logger LOG = LoggerFactory.getLogger(SparkProcessContext.class);
 
-  private final DoFn<I, O> fn;
+  private final DoFn<InputT, OutputT> fn;
   private final SparkRuntimeContext mRuntimeContext;
   private final Map<TupleTag<?>, BroadcastHelper<?>> mSideInputs;
 
-  protected WindowedValue<I> windowedValue;
+  protected WindowedValue<InputT> windowedValue;
 
-  SparkProcessContext(DoFn<I, O> fn,
+  SparkProcessContext(DoFn<InputT, OutputT> fn,
       SparkRuntimeContext runtime,
       Map<TupleTag<?>, BroadcastHelper<?>> sideInputs) {
     fn.super();
@@ -82,9 +88,9 @@ public abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessCon
   }
 
   @Override
-  public abstract void output(O output);
+  public abstract void output(OutputT output);
 
-  public abstract void output(WindowedValue<O> output);
+  public abstract void output(WindowedValue<OutputT> output);
 
   @Override
   public <T> void sideOutput(TupleTag<T> tupleTag, T t) {
@@ -104,19 +110,20 @@ public abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessCon
   }
 
   @Override
-  public <AI, AO> Aggregator<AI, AO> createAggregatorInternal(
+  public <AggregatprInputT, AggregatorOutputT>
+  Aggregator<AggregatprInputT, AggregatorOutputT> createAggregatorInternal(
       String named,
-      Combine.CombineFn<AI, ?, AO> combineFn) {
+      Combine.CombineFn<AggregatprInputT, ?, AggregatorOutputT> combineFn) {
     return mRuntimeContext.createAggregator(named, combineFn);
   }
 
   @Override
-  public I element() {
+  public InputT element() {
     return windowedValue.getValue();
   }
 
   @Override
-  public void outputWithTimestamp(O output, Instant timestamp) {
+  public void outputWithTimestamp(OutputT output, Instant timestamp) {
     output(WindowedValue.of(output, timestamp,
         windowedValue.getWindows(), windowedValue.getPane()));
   }
@@ -141,8 +148,8 @@ public abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessCon
   }
 
   @Override
-  public WindowingInternals<I, O> windowingInternals() {
-    return new WindowingInternals<I, O>() {
+  public WindowingInternals<InputT, OutputT> windowingInternals() {
+    return new WindowingInternals<InputT, OutputT>() {
 
       @Override
       public Collection<? extends BoundedWindow> windows() {
@@ -150,7 +157,7 @@ public abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessCon
       }
 
       @Override
-      public void outputWindowedValue(O output, Instant timestamp, Collection<?
+      public void outputWindowedValue(OutputT output, Instant timestamp, Collection<?
           extends BoundedWindow> windows, PaneInfo paneInfo) {
         output(WindowedValue.of(output, timestamp, windows, paneInfo));
       }
@@ -190,33 +197,33 @@ public abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessCon
   }
 
   protected abstract void clearOutput();
-  protected abstract Iterator<V> getOutputIterator();
+  protected abstract Iterator<ValueT> getOutputIterator();
 
-  protected Iterable<V> getOutputIterable(final Iterator<WindowedValue<I>> iter,
-      final DoFn<I, O> doFn) {
-    return new Iterable<V>() {
+  protected Iterable<ValueT> getOutputIterable(final Iterator<WindowedValue<InputT>> iter,
+                                               final DoFn<InputT, OutputT> doFn) {
+    return new Iterable<ValueT>() {
       @Override
-      public Iterator<V> iterator() {
+      public Iterator<ValueT> iterator() {
         return new ProcCtxtIterator(iter, doFn);
       }
     };
   }
 
-  private class ProcCtxtIterator extends AbstractIterator<V> {
+  private class ProcCtxtIterator extends AbstractIterator<ValueT> {
 
-    private final Iterator<WindowedValue<I>> inputIterator;
-    private final DoFn<I, O> doFn;
-    private Iterator<V> outputIterator;
+    private final Iterator<WindowedValue<InputT>> inputIterator;
+    private final DoFn<InputT, OutputT> doFn;
+    private Iterator<ValueT> outputIterator;
     private boolean calledFinish;
 
-    ProcCtxtIterator(Iterator<WindowedValue<I>> iterator, DoFn<I, O> doFn) {
+    ProcCtxtIterator(Iterator<WindowedValue<InputT>> iterator, DoFn<InputT, OutputT> doFn) {
       this.inputIterator = iterator;
       this.doFn = doFn;
       this.outputIterator = getOutputIterator();
     }
 
     @Override
-    protected V computeNext() {
+    protected ValueT computeNext() {
       // Process each element from the (input) iterator, which produces, zero, one or more
       // output elements (of type V) in the output iterator. Note that the output
       // collection (and iterator) is reset between each call to processElement, so the
@@ -253,6 +260,9 @@ public abstract class SparkProcessContext<I, O, V> extends DoFn<I, O>.ProcessCon
     }
   }
 
+  /**
+   * Spark process runtime exception.
+   */
   public static class SparkProcessException extends RuntimeException {
     SparkProcessException(Throwable t) {
       super(t);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
index ea125de..46f5b33 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
@@ -18,15 +18,6 @@
 
 package org.apache.beam.runners.spark.translation;
 
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.ImmutableList;
 import org.apache.beam.runners.spark.aggregators.AggAccumParam;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.sdk.Pipeline;
@@ -41,9 +32,20 @@ import org.apache.beam.sdk.transforms.Max;
 import org.apache.beam.sdk.transforms.Min;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.values.TypeDescriptor;
+
+import com.google.common.collect.ImmutableList;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.JavaSparkContext;
 
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
 
 /**
  * The SparkRuntimeContext allows us to define useful features on the client side before our
@@ -122,22 +124,22 @@ public class SparkRuntimeContext implements Serializable {
    *
    * @param named     Name of aggregator.
    * @param combineFn Combine function used in aggregation.
-   * @param <IN>      Type of inputs to aggregator.
-   * @param <INTER>   Intermediate data type
-   * @param <OUT>     Type of aggregator outputs.
+   * @param <InputT>      Type of inputs to aggregator.
+   * @param <InterT>   Intermediate data type
+   * @param <OutputT>     Type of aggregator outputs.
    * @return Specified aggregator
    */
-  public synchronized <IN, INTER, OUT> Aggregator<IN, OUT> createAggregator(
+  public synchronized <InputT, InterT, OutputT> Aggregator<InputT, OutputT> createAggregator(
       String named,
-      Combine.CombineFn<? super IN, INTER, OUT> combineFn) {
+      Combine.CombineFn<? super InputT, InterT, OutputT> combineFn) {
     @SuppressWarnings("unchecked")
-    Aggregator<IN, OUT> aggregator = (Aggregator<IN, OUT>) aggregators.get(named);
+    Aggregator<InputT, OutputT> aggregator = (Aggregator<InputT, OutputT>) aggregators.get(named);
     if (aggregator == null) {
       @SuppressWarnings("unchecked")
-      NamedAggregators.CombineFunctionState<IN, INTER, OUT> state =
+      NamedAggregators.CombineFunctionState<InputT, InterT, OutputT> state =
           new NamedAggregators.CombineFunctionState<>(
-              (Combine.CombineFn<IN, INTER, OUT>) combineFn,
-              (Coder<IN>) getCoder(combineFn),
+              (Combine.CombineFn<InputT, InterT, OutputT>) combineFn,
+              (Coder<InputT>) getCoder(combineFn),
               this);
       accum.add(new NamedAggregators(named, state));
       aggregator = new SparkAggregator<>(named, state);
@@ -186,13 +188,14 @@ public class SparkRuntimeContext implements Serializable {
   /**
    * Initialize spark aggregators exactly once.
    *
-   * @param <IN> Type of element fed in to aggregator.
+   * @param <InputT> Type of element fed in to aggregator.
    */
-  private static class SparkAggregator<IN, OUT> implements Aggregator<IN, OUT>, Serializable {
+  private static class SparkAggregator<InputT, OutputT>
+      implements Aggregator<InputT, OutputT>, Serializable {
     private final String name;
-    private final NamedAggregators.State<IN, ?, OUT> state;
+    private final NamedAggregators.State<InputT, ?, OutputT> state;
 
-    SparkAggregator(String name, NamedAggregators.State<IN, ?, OUT> state) {
+    SparkAggregator(String name, NamedAggregators.State<InputT, ?, OutputT> state) {
       this.name = name;
       this.state = state;
     }
@@ -203,12 +206,12 @@ public class SparkRuntimeContext implements Serializable {
     }
 
     @Override
-    public void addValue(IN elem) {
+    public void addValue(InputT elem) {
       state.update(elem);
     }
 
     @Override
-    public Combine.CombineFn<IN, ?, OUT> getCombineFn() {
+    public Combine.CombineFn<InputT, ?, OutputT> getCombineFn() {
       return state.getCombineFn();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformEvaluator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformEvaluator.java
index 30ab076..c5c7128 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformEvaluator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformEvaluator.java
@@ -18,10 +18,13 @@
 
 package org.apache.beam.runners.spark.translation;
 
-import java.io.Serializable;
-
 import org.apache.beam.sdk.transforms.PTransform;
 
-public interface TransformEvaluator<PT extends PTransform<?, ?>> extends Serializable {
-  void evaluate(PT transform, EvaluationContext context);
+import java.io.Serializable;
+
+/**
+ * Describe a {@link PTransform} evaluator.
+ */
+public interface TransformEvaluator<TransformT extends PTransform<?, ?>> extends Serializable {
+  void evaluate(TransformT transform, EvaluationContext context);
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f3b0533/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 0366856..b462d35 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -23,19 +23,6 @@ import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutput
 import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFileTemplate;
 import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardCount;
 
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapreduce.AvroJob;
-import org.apache.avro.mapreduce.AvroKeyInputFormat;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.runners.spark.io.hadoop.HadoopIO;
 import org.apache.beam.runners.spark.io.hadoop.ShardNameTemplateHelper;
@@ -68,6 +55,14 @@ import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapreduce.AvroJob;
+import org.apache.avro.mapreduce.AvroKeyInputFormat;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
@@ -81,6 +76,14 @@ import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.api.java.function.PairFunction;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
 import scala.Tuple2;
 
 /**
@@ -91,6 +94,9 @@ public final class TransformTranslator {
   private TransformTranslator() {
   }
 
+  /**
+   * Getter of the field.
+   */
   public static class FieldGetter {
     private final Map<String, Field> fields;
 
@@ -157,14 +163,16 @@ public final class TransformTranslator {
 
   private static final FieldGetter GROUPED_FG = new FieldGetter(Combine.GroupedValues.class);
 
-  private static <K, VI, VO> TransformEvaluator<Combine.GroupedValues<K, VI, VO>> grouped() {
-    return new TransformEvaluator<Combine.GroupedValues<K, VI, VO>>() {
+  private static <K, InputT, OutputT> TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>>
+  grouped() {
+    return new TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>>() {
       @Override
-      public void evaluate(Combine.GroupedValues<K, VI, VO> transform, EvaluationContext context) {
-        Combine.KeyedCombineFn<K, VI, ?, VO> keyed = GROUPED_FG.get("fn", transform);
+      public void evaluate(Combine.GroupedValues<K, InputT, OutputT> transform,
+                           EvaluationContext context) {
+        Combine.KeyedCombineFn<K, InputT, ?, OutputT> keyed = GROUPED_FG.get("fn", transform);
         @SuppressWarnings("unchecked")
-        JavaRDDLike<WindowedValue<KV<K, Iterable<VI>>>, ?> inRDD =
-            (JavaRDDLike<WindowedValue<KV<K, Iterable<VI>>>, ?>) context.getInputRDD(transform);
+        JavaRDDLike<WindowedValue<KV<K, Iterable<InputT>>>, ?> inRDD =
+            (JavaRDDLike<WindowedValue<KV<K, Iterable<InputT>>>, ?>) context.getInputRDD(transform);
         context.setOutputRDD(transform,
             inRDD.map(new KVFunction<>(keyed)));
       }
@@ -173,19 +181,21 @@ public final class TransformTranslator {
 
   private static final FieldGetter COMBINE_GLOBALLY_FG = new FieldGetter(Combine.Globally.class);
 
-  private static <I, A, O> TransformEvaluator<Combine.Globally<I, O>> combineGlobally() {
-    return new TransformEvaluator<Combine.Globally<I, O>>() {
+  private static <InputT, AccumT, OutputT> TransformEvaluator<Combine.Globally<InputT, OutputT>>
+  combineGlobally() {
+    return new TransformEvaluator<Combine.Globally<InputT, OutputT>>() {
 
       @Override
-      public void evaluate(Combine.Globally<I, O> transform, EvaluationContext context) {
-        final Combine.CombineFn<I, A, O> globally = COMBINE_GLOBALLY_FG.get("fn", transform);
+      public void evaluate(Combine.Globally<InputT, OutputT> transform, EvaluationContext context) {
+        final Combine.CombineFn<InputT, AccumT, OutputT> globally =
+            COMBINE_GLOBALLY_FG.get("fn", transform);
 
         @SuppressWarnings("unchecked")
-        JavaRDDLike<WindowedValue<I>, ?> inRdd =
-            (JavaRDDLike<WindowedValue<I>, ?>) context.getInputRDD(transform);
+        JavaRDDLike<WindowedValue<InputT>, ?> inRdd =
+            (JavaRDDLike<WindowedValue<InputT>, ?>) context.getInputRDD(transform);
 
-        final Coder<I> iCoder = context.getInput(transform).getCoder();
-        final Coder<A> aCoder;
+        final Coder<InputT> iCoder = context.getInput(transform).getCoder();
+        final Coder<AccumT> aCoder;
         try {
           aCoder = globally.getAccumulatorCoder(
               context.getPipeline().getCoderRegistry(), iCoder);
@@ -196,86 +206,92 @@ public final class TransformTranslator {
         // Use coders to convert objects in the PCollection to byte arrays, so they
         // can be transferred over the network for the shuffle.
         JavaRDD<byte[]> inRddBytes = inRdd
-            .map(WindowingHelpers.<I>unwindowFunction())
+            .map(WindowingHelpers.<InputT>unwindowFunction())
             .map(CoderHelpers.toByteFunction(iCoder));
 
-        /*A*/ byte[] acc = inRddBytes.aggregate(
+        /*AccumT*/ byte[] acc = inRddBytes.aggregate(
             CoderHelpers.toByteArray(globally.createAccumulator(), aCoder),
-            new Function2</*A*/ byte[], /*I*/ byte[], /*A*/ byte[]>() {
+            new Function2</*AccumT*/ byte[], /*InputT*/ byte[], /*AccumT*/ byte[]>() {
               @Override
-              public /*A*/ byte[] call(/*A*/ byte[] ab, /*I*/ byte[] ib) throws Exception {
-                A a = CoderHelpers.fromByteArray(ab, aCoder);
-                I i = CoderHelpers.fromByteArray(ib, iCoder);
+              public /*AccumT*/ byte[] call(/*AccumT*/ byte[] ab, /*InputT*/ byte[] ib)
+                  throws Exception {
+                AccumT a = CoderHelpers.fromByteArray(ab, aCoder);
+                InputT i = CoderHelpers.fromByteArray(ib, iCoder);
                 return CoderHelpers.toByteArray(globally.addInput(a, i), aCoder);
               }
             },
-            new Function2</*A*/ byte[], /*A*/ byte[], /*A*/ byte[]>() {
+            new Function2</*AccumT*/ byte[], /*AccumT*/ byte[], /*AccumT*/ byte[]>() {
               @Override
-              public /*A*/ byte[] call(/*A*/ byte[] a1b, /*A*/ byte[] a2b) throws Exception {
-                A a1 = CoderHelpers.fromByteArray(a1b, aCoder);
-                A a2 = CoderHelpers.fromByteArray(a2b, aCoder);
+              public /*AccumT*/ byte[] call(/*AccumT*/ byte[] a1b, /*AccumT*/ byte[] a2b)
+                  throws Exception {
+                AccumT a1 = CoderHelpers.fromByteArray(a1b, aCoder);
+                AccumT a2 = CoderHelpers.fromByteArray(a2b, aCoder);
                 // don't use Guava's ImmutableList.of as values may be null
-                List<A> accumulators = Collections.unmodifiableList(Arrays.asList(a1, a2));
-                A merged = globally.mergeAccumulators(accumulators);
+                List<AccumT> accumulators = Collections.unmodifiableList(Arrays.asList(a1, a2));
+                AccumT merged = globally.mergeAccumulators(accumulators);
                 return CoderHelpers.toByteArray(merged, aCoder);
               }
             }
         );
-        O output = globally.extractOutput(CoderHelpers.fromByteArray(acc, aCoder));
+        OutputT output = globally.extractOutput(CoderHelpers.fromByteArray(acc, aCoder));
 
-        Coder<O> coder = context.getOutput(transform).getCoder();
+        Coder<OutputT> coder = context.getOutput(transform).getCoder();
         JavaRDD<byte[]> outRdd = context.getSparkContext().parallelize(
             // don't use Guava's ImmutableList.of as output may be null
             CoderHelpers.toByteArrays(Collections.singleton(output), coder));
         context.setOutputRDD(transform, outRdd.map(CoderHelpers.fromByteFunction(coder))
-            .map(WindowingHelpers.<O>windowFunction()));
+            .map(WindowingHelpers.<OutputT>windowFunction()));
       }
     };
   }
 
   private static final FieldGetter COMBINE_PERKEY_FG = new FieldGetter(Combine.PerKey.class);
 
-  private static <K, VI, VA, VO> TransformEvaluator<Combine.PerKey<K, VI, VO>> combinePerKey() {
-    return new TransformEvaluator<Combine.PerKey<K, VI, VO>>() {
+  private static <K, InputT, AccumT, OutputT>
+  TransformEvaluator<Combine.PerKey<K, InputT, OutputT>> combinePerKey() {
+    return new TransformEvaluator<Combine.PerKey<K, InputT, OutputT>>() {
       @Override
-      public void evaluate(Combine.PerKey<K, VI, VO> transform, EvaluationContext context) {
-        final Combine.KeyedCombineFn<K, VI, VA, VO> keyed =
+      public void evaluate(Combine.PerKey<K, InputT, OutputT>
+                               transform, EvaluationContext context) {
+        final Combine.KeyedCombineFn<K, InputT, AccumT, OutputT> keyed =
             COMBINE_PERKEY_FG.get("fn", transform);
         @SuppressWarnings("unchecked")
-        JavaRDDLike<WindowedValue<KV<K, VI>>, ?> inRdd =
-            (JavaRDDLike<WindowedValue<KV<K, VI>>, ?>) context.getInputRDD(transform);
+        JavaRDDLike<WindowedValue<KV<K, InputT>>, ?> inRdd =
+            (JavaRDDLike<WindowedValue<KV<K, InputT>>, ?>) context.getInputRDD(transform);
 
         @SuppressWarnings("unchecked")
-        KvCoder<K, VI> inputCoder = (KvCoder<K, VI>) context.getInput(transform).getCoder();
+        KvCoder<K, InputT> inputCoder = (KvCoder<K, InputT>)
+            context.getInput(transform).getCoder();
         Coder<K> keyCoder = inputCoder.getKeyCoder();
-        Coder<VI> viCoder = inputCoder.getValueCoder();
-        Coder<VA> vaCoder;
+        Coder<InputT> viCoder = inputCoder.getValueCoder();
+        Coder<AccumT> vaCoder;
         try {
           vaCoder = keyed.getAccumulatorCoder(
               context.getPipeline().getCoderRegistry(), keyCoder, viCoder);
         } catch (CannotProvideCoderException e) {
           throw new IllegalStateException("Could not determine coder for accumulator", e);
         }
-        Coder<KV<K, VI>> kviCoder = KvCoder.of(keyCoder, viCoder);
-        Coder<KV<K, VA>> kvaCoder = KvCoder.of(keyCoder, vaCoder);
+        Coder<KV<K, InputT>> kviCoder = KvCoder.of(keyCoder, viCoder);
+        Coder<KV<K, AccumT>> kvaCoder = KvCoder.of(keyCoder, vaCoder);
 
         // We need to duplicate K as both the key of the JavaPairRDD as well as inside the value,
         // since the functions passed to combineByKey don't receive the associated key of each
         // value, and we need to map back into methods in Combine.KeyedCombineFn, which each
-        // require the key in addition to the VI's and VA's being merged/accumulated. Once Spark
-        // provides a way to include keys in the arguments of combine/merge functions, we won't
-        // need to duplicate the keys anymore.
+        // require the key in addition to the InputT's and AccumT's being merged/accumulated.
+        // Once Spark provides a way to include keys in the arguments of combine/merge functions,
+        // we won't need to duplicate the keys anymore.
 
         // Key has to bw windowed in order to group by window as well
-        JavaPairRDD<WindowedValue<K>, WindowedValue<KV<K, VI>>> inRddDuplicatedKeyPair =
+        JavaPairRDD<WindowedValue<K>, WindowedValue<KV<K, InputT>>> inRddDuplicatedKeyPair =
             inRdd.flatMapToPair(
-                new PairFlatMapFunction<WindowedValue<KV<K, VI>>, WindowedValue<K>,
-                    WindowedValue<KV<K, VI>>>() {
+                new PairFlatMapFunction<WindowedValue<KV<K, InputT>>, WindowedValue<K>,
+                    WindowedValue<KV<K, InputT>>>() {
                   @Override
                   public Iterable<Tuple2<WindowedValue<K>,
-                      WindowedValue<KV<K, VI>>>> call(WindowedValue<KV<K, VI>> kv) {
+                      WindowedValue<KV<K, InputT>>>>
+                  call(WindowedValue<KV<K, InputT>> kv) {
                       List<Tuple2<WindowedValue<K>,
-                          WindowedValue<KV<K, VI>>>> tuple2s =
+                          WindowedValue<KV<K, InputT>>>> tuple2s =
                           Lists.newArrayListWithCapacity(kv.getWindows().size());
                       for (BoundedWindow boundedWindow: kv.getWindows()) {
                         WindowedValue<K> wk = WindowedValue.of(kv.getValue().getKey(),
@@ -289,10 +305,10 @@ public final class TransformTranslator {
         final WindowedValue.FullWindowedValueCoder<K> wkCoder =
                 WindowedValue.FullWindowedValueCoder.of(keyCoder,
                 context.getInput(transform).getWindowingStrategy().getWindowFn().windowCoder());
-        final WindowedValue.FullWindowedValueCoder<KV<K, VI>> wkviCoder =
+        final WindowedValue.FullWindowedValueCoder<KV<K, InputT>> wkviCoder =
                 WindowedValue.FullWindowedValueCoder.of(kviCoder,
                 context.getInput(transform).getWindowingStrategy().getWindowFn().windowCoder());
-        final WindowedValue.FullWindowedValueCoder<KV<K, VA>> wkvaCoder =
+        final WindowedValue.FullWindowedValueCoder<KV<K, AccumT>> wkvaCoder =
                 WindowedValue.FullWindowedValueCoder.of(kvaCoder,
                 context.getInput(transform).getWindowingStrategy().getWindowFn().windowCoder());
 
@@ -301,58 +317,69 @@ public final class TransformTranslator {
         JavaPairRDD<ByteArray, byte[]> inRddDuplicatedKeyPairBytes = inRddDuplicatedKeyPair
             .mapToPair(CoderHelpers.toByteFunction(wkCoder, wkviCoder));
 
-        // The output of combineByKey will be "VA" (accumulator) types rather than "VO" (final
-        // output types) since Combine.CombineFn only provides ways to merge VAs, and no way
-        // to merge VOs.
-        JavaPairRDD</*K*/ ByteArray, /*KV<K, VA>*/ byte[]> accumulatedBytes =
+        // The output of combineByKey will be "AccumT" (accumulator)
+        // types rather than "OutputT" (final output types) since Combine.CombineFn
+        // only provides ways to merge VAs, and no way to merge VOs.
+        JavaPairRDD</*K*/ ByteArray, /*KV<K, AccumT>*/ byte[]> accumulatedBytes =
             inRddDuplicatedKeyPairBytes.combineByKey(
-            new Function</*KV<K, VI>*/ byte[], /*KV<K, VA>*/ byte[]>() {
+            new Function</*KV<K, InputT>*/ byte[], /*KV<K, AccumT>*/ byte[]>() {
               @Override
-              public /*KV<K, VA>*/ byte[] call(/*KV<K, VI>*/ byte[] input) {
-                WindowedValue<KV<K, VI>> wkvi = CoderHelpers.fromByteArray(input, wkviCoder);
-                VA va = keyed.createAccumulator(wkvi.getValue().getKey());
+              public /*KV<K, AccumT>*/ byte[] call(/*KV<K, InputT>*/ byte[] input) {
+                WindowedValue<KV<K, InputT>> wkvi =
+                    CoderHelpers.fromByteArray(input, wkviCoder);
+                AccumT va = keyed.createAccumulator(wkvi.getValue().getKey());
                 va = keyed.addInput(wkvi.getValue().getKey(), va, wkvi.getValue().getValue());
-                WindowedValue<KV<K, VA>> wkva =
+                WindowedValue<KV<K, AccumT>> wkva =
                     WindowedValue.of(KV.of(wkvi.getValue().getKey(), va), wkvi.getTimestamp(),
                     wkvi.getWindows(), wkvi.getPane());
                 return CoderHelpers.toByteArray(wkva, wkvaCoder);
               }
             },
-            new Function2</*KV<K, VA>*/ byte[], /*KV<K, VI>*/ byte[], /*KV<K, VA>*/ byte[]>() {
+            new Function2</*KV<K, AccumT>*/ byte[],
+                /*KV<K, InputT>*/ byte[],
+                /*KV<K, AccumT>*/ byte[]>() {
               @Override
-              public /*KV<K, VA>*/ byte[] call(/*KV<K, VA>*/ byte[] acc,
-                  /*KV<K, VI>*/ byte[] input) {
-                WindowedValue<KV<K, VA>> wkva = CoderHelpers.fromByteArray(acc, wkvaCoder);
-                WindowedValue<KV<K, VI>> wkvi = CoderHelpers.fromByteArray(input, wkviCoder);
-                VA va = keyed.addInput(wkva.getValue().getKey(), wkva.getValue().getValue(),
+              public /*KV<K, AccumT>*/ byte[] call(/*KV<K, AccumT>*/ byte[] acc,
+                  /*KV<K, InputT>*/ byte[] input) {
+                WindowedValue<KV<K, AccumT>> wkva =
+                    CoderHelpers.fromByteArray(acc, wkvaCoder);
+                WindowedValue<KV<K, InputT>> wkvi =
+                    CoderHelpers.fromByteArray(input, wkviCoder);
+                AccumT va =
+                    keyed.addInput(wkva.getValue().getKey(), wkva.getValue().getValue(),
                     wkvi.getValue().getValue());
                 wkva = WindowedValue.of(KV.of(wkva.getValue().getKey(), va), wkva.getTimestamp(),
                     wkva.getWindows(), wkva.getPane());
                 return CoderHelpers.toByteArray(wkva, wkvaCoder);
               }
             },
-            new Function2</*KV<K, VA>*/ byte[], /*KV<K, VA>*/ byte[], /*KV<K, VA>*/ byte[]>() {
+            new Function2</*KV<K, AccumT>*/ byte[],
+                /*KV<K, AccumT>*/ byte[],
+                /*KV<K, AccumT>*/ byte[]>() {
               @Override
-              public /*KV<K, VA>*/ byte[] call(/*KV<K, VA>*/ byte[] acc1,
-                  /*KV<K, VA>*/ byte[] acc2) {
-                WindowedValue<KV<K, VA>> wkva1 = CoderHelpers.fromByteArray(acc1, wkvaCoder);
-                WindowedValue<KV<K, VA>> wkva2 = CoderHelpers.fromByteArray(acc2, wkvaCoder);
-                VA va = keyed.mergeAccumulators(wkva1.getValue().getKey(),
+              public /*KV<K, AccumT>*/ byte[] call(/*KV<K, AccumT>*/ byte[] acc1,
+                  /*KV<K, AccumT>*/ byte[] acc2) {
+                WindowedValue<KV<K, AccumT>> wkva1 =
+                    CoderHelpers.fromByteArray(acc1, wkvaCoder);
+                WindowedValue<KV<K, AccumT>> wkva2 =
+                    CoderHelpers.fromByteArray(acc2, wkvaCoder);
+                AccumT va = keyed.mergeAccumulators(wkva1.getValue().getKey(),
                     // don't use Guava's ImmutableList.of as values may be null
                     Collections.unmodifiableList(Arrays.asList(wkva1.getValue().getValue(),
                     wkva2.getValue().getValue())));
-                WindowedValue<KV<K, VA>> wkva = WindowedValue.of(KV.of(wkva1.getValue().getKey(),
+                WindowedValue<KV<K, AccumT>> wkva =
+                    WindowedValue.of(KV.of(wkva1.getValue().getKey(),
                     va), wkva1.getTimestamp(), wkva1.getWindows(), wkva1.getPane());
                 return CoderHelpers.toByteArray(wkva, wkvaCoder);
               }
             });
 
-        JavaPairRDD<WindowedValue<K>, WindowedValue<VO>> extracted = accumulatedBytes
+        JavaPairRDD<WindowedValue<K>, WindowedValue<OutputT>> extracted = accumulatedBytes
             .mapToPair(CoderHelpers.fromByteFunction(wkCoder, wkvaCoder))
             .mapValues(
-                new Function<WindowedValue<KV<K, VA>>, WindowedValue<VO>>() {
+                new Function<WindowedValue<KV<K, AccumT>>, WindowedValue<OutputT>>() {
                   @Override
-                  public WindowedValue<VO> call(WindowedValue<KV<K, VA>> acc) {
+                  public WindowedValue<OutputT> call(WindowedValue<KV<K, AccumT>> acc) {
                     return WindowedValue.of(keyed.extractOutput(acc.getValue().getKey(),
                         acc.getValue().getValue()), acc.getTimestamp(),
                         acc.getWindows(), acc.getPane());
@@ -361,12 +388,14 @@ public final class TransformTranslator {
 
         context.setOutputRDD(transform,
             fromPair(extracted)
-            .map(new Function<KV<WindowedValue<K>, WindowedValue<VO>>, WindowedValue<KV<K, VO>>>() {
+            .map(new Function<KV<WindowedValue<K>, WindowedValue<OutputT>>,
+                WindowedValue<KV<K, OutputT>>>() {
               @Override
-              public WindowedValue<KV<K, VO>> call(KV<WindowedValue<K>, WindowedValue<VO>> kwvo)
+              public WindowedValue<KV<K, OutputT>> call(KV<WindowedValue<K>,
+                  WindowedValue<OutputT>> kwvo)
                   throws Exception {
-                WindowedValue<VO> wvo = kwvo.getValue();
-                KV<K, VO> kvo = KV.of(kwvo.getKey().getValue(), wvo.getValue());
+                WindowedValue<OutputT> wvo = kwvo.getValue();
+                KV<K, OutputT> kvo = KV.of(kwvo.getKey().getValue(), wvo.getValue());
                 return WindowedValue.of(kvo, wvo.getTimestamp(), wvo.getWindows(), wvo.getPane());
               }
             }));
@@ -374,18 +403,20 @@ public final class TransformTranslator {
     };
   }
 
-  private static final class KVFunction<K, VI, VO>
-      implements Function<WindowedValue<KV<K, Iterable<VI>>>, WindowedValue<KV<K, VO>>> {
-    private final Combine.KeyedCombineFn<K, VI, ?, VO> keyed;
+  private static final class KVFunction<K, InputT, OutputT>
+      implements Function<WindowedValue<KV<K, Iterable<InputT>>>,
+      WindowedValue<KV<K, OutputT>>> {
+    private final Combine.KeyedCombineFn<K, InputT, ?, OutputT> keyed;
 
-     KVFunction(Combine.KeyedCombineFn<K, VI, ?, VO> keyed) {
+     KVFunction(Combine.KeyedCombineFn<K, InputT, ?, OutputT> keyed) {
       this.keyed = keyed;
     }
 
     @Override
-    public WindowedValue<KV<K, VO>> call(WindowedValue<KV<K, Iterable<VI>>> windowedKv)
+    public WindowedValue<KV<K, OutputT>> call(WindowedValue<KV<K,
+        Iterable<InputT>>> windowedKv)
         throws Exception {
-      KV<K, Iterable<VI>> kv = windowedKv.getValue();
+      KV<K, Iterable<InputT>> kv = windowedKv.getValue();
       return WindowedValue.of(KV.of(kv.getKey(), keyed.apply(kv.getKey(), kv.getValue())),
           windowedKv.getTimestamp(), windowedKv.getWindows(), windowedKv.getPane());
     }
@@ -409,17 +440,17 @@ public final class TransformTranslator {
     });
   }
 
-  private static <I, O> TransformEvaluator<ParDo.Bound<I, O>> parDo() {
-    return new TransformEvaluator<ParDo.Bound<I, O>>() {
+  private static <InputT, OutputT> TransformEvaluator<ParDo.Bound<InputT, OutputT>> parDo() {
+    return new TransformEvaluator<ParDo.Bound<InputT, OutputT>>() {
       @Override
-      public void evaluate(ParDo.Bound<I, O> transform, EvaluationContext context) {
-        DoFnFunction<I, O> dofn =
+      public void evaluate(ParDo.Bound<InputT, OutputT> transform, EvaluationContext context) {
+        DoFnFunction<InputT, OutputT> dofn =
             new DoFnFunction<>(transform.getFn(),
                 context.getRuntimeContext(),
                 getSideInputs(transform.getSideInputs(), context));
         @SuppressWarnings("unchecked")
-        JavaRDDLike<WindowedValue<I>, ?> inRDD =
-            (JavaRDDLike<WindowedValue<I>, ?>) context.getInputRDD(transform);
+        JavaRDDLike<WindowedValue<InputT>, ?> inRDD =
+            (JavaRDDLike<WindowedValue<InputT>, ?>) context.getInputRDD(transform);
         context.setOutputRDD(transform, inRDD.mapPartitions(dofn));
       }
     };
@@ -427,20 +458,20 @@ public final class TransformTranslator {
 
   private static final FieldGetter MULTIDO_FG = new FieldGetter(ParDo.BoundMulti.class);
 
-  private static <I, O> TransformEvaluator<ParDo.BoundMulti<I, O>> multiDo() {
-    return new TransformEvaluator<ParDo.BoundMulti<I, O>>() {
+  private static <InputT, OutputT> TransformEvaluator<ParDo.BoundMulti<InputT, OutputT>> multiDo() {
+    return new TransformEvaluator<ParDo.BoundMulti<InputT, OutputT>>() {
       @Override
-      public void evaluate(ParDo.BoundMulti<I, O> transform, EvaluationContext context) {
-        TupleTag<O> mainOutputTag = MULTIDO_FG.get("mainOutputTag", transform);
-        MultiDoFnFunction<I, O> multifn = new MultiDoFnFunction<>(
+      public void evaluate(ParDo.BoundMulti<InputT, OutputT> transform, EvaluationContext context) {
+        TupleTag<OutputT> mainOutputTag = MULTIDO_FG.get("mainOutputTag", transform);
+        MultiDoFnFunction<InputT, OutputT> multifn = new MultiDoFnFunction<>(
             transform.getFn(),
             context.getRuntimeContext(),
             mainOutputTag,
             getSideInputs(transform.getSideInputs(), context));
 
         @SuppressWarnings("unchecked")
-        JavaRDDLike<WindowedValue<I>, ?> inRDD =
-            (JavaRDDLike<WindowedValue<I>, ?>) context.getInputRDD(transform);
+        JavaRDDLike<WindowedValue<InputT>, ?> inRDD =
+            (JavaRDDLike<WindowedValue<InputT>, ?>) context.getInputRDD(transform);
         JavaPairRDD<TupleTag<?>, WindowedValue<?>> all = inRDD
             .mapPartitionsToPair(multifn)
             .cache();
@@ -716,10 +747,12 @@ public final class TransformTranslator {
     };
   }
 
-  private static <R, W> TransformEvaluator<View.CreatePCollectionView<R, W>> createPCollView() {
-    return new TransformEvaluator<View.CreatePCollectionView<R, W>>() {
+  private static <ReadT, WriteT> TransformEvaluator<View.CreatePCollectionView<ReadT, WriteT>>
+  createPCollView() {
+    return new TransformEvaluator<View.CreatePCollectionView<ReadT, WriteT>>() {
       @Override
-      public void evaluate(View.CreatePCollectionView<R, W> transform, EvaluationContext context) {
+      public void evaluate(View.CreatePCollectionView<ReadT, WriteT> transform,
+                           EvaluationContext context) {
         Iterable<? extends WindowedValue<?>> iter =
             context.getWindowedValues(context.getInput(transform));
         context.setPView(context.getOutput(transform), iter);
@@ -787,10 +820,11 @@ public final class TransformTranslator {
     EVALUATORS.put(Window.Bound.class, window());
   }
 
-  public static <PT extends PTransform<?, ?>> TransformEvaluator<PT>
-  getTransformEvaluator(Class<PT> clazz) {
+  public static <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT>
+  getTransformEvaluator(Class<TransformT> clazz) {
     @SuppressWarnings("unchecked")
-    TransformEvaluator<PT> transform = (TransformEvaluator<PT>) EVALUATORS.get(clazz);
+    TransformEvaluator<TransformT> transform =
+        (TransformEvaluator<TransformT>) EVALUATORS.get(clazz);
     if (transform == null) {
       throw new IllegalStateException("No TransformEvaluator registered for " + clazz);
     }
@@ -808,7 +842,8 @@ public final class TransformTranslator {
     }
 
     @Override
-    public <PT extends PTransform<?, ?>> TransformEvaluator<PT> translate(Class<PT> clazz) {
+    public <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> translate(
+        Class<TransformT> clazz) {
       return getTransformEvaluator(clazz);
     }
   }