You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/12 02:28:26 UTC

[08/39] incubator-beam git commit: BEAM-261 Enable checkstyle and cleanup.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/package-info.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/package-info.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/package-info.java
new file mode 100644
index 0000000..ecb0adb
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation of the Beam runner for Apache Apex.
+ */
+package org.apache.beam.runners.apex.translators.functions;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
index 6ee82ea..3188dfa 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
@@ -18,6 +18,17 @@
 
 package org.apache.beam.runners.apex.translators.io;
 
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.base.Throwables;
+
+import java.io.IOException;
+
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
 import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple.DataTuple;
@@ -26,27 +37,15 @@ import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
-
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.google.common.base.Throwables;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.common.util.BaseOperator;
-import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
-
-import java.io.IOException;
-
 /**
  * Apex input operator that wraps Beam {@link UnboundedSource}.
  */
-public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark>
-    implements InputOperator {
+public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT
+    extends UnboundedSource.CheckpointMark> implements InputOperator {
   private static final Logger LOG = LoggerFactory.getLogger(
       ApexReadUnboundedInputOperator.class);
   private boolean traceTuples = false;
@@ -58,10 +57,12 @@ public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT extends Unb
   private final UnboundedSource<OutputT, CheckpointMarkT> source;
   private transient UnboundedSource.UnboundedReader<OutputT> reader;
   private transient boolean available = false;
-  @OutputPortFieldAnnotation(optional=true)
-  public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<OutputT>>> output = new DefaultOutputPort<>();
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<OutputT>>> output =
+      new DefaultOutputPort<>();
 
-  public ApexReadUnboundedInputOperator(UnboundedSource<OutputT, CheckpointMarkT> source, ApexPipelineOptions options) {
+  public ApexReadUnboundedInputOperator(UnboundedSource<OutputT, CheckpointMarkT> source,
+      ApexPipelineOptions options) {
     this.pipelineOptions = new SerializablePipelineOptions(options);
     this.source = source;
   }
@@ -72,8 +73,7 @@ public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT extends Unb
   }
 
   @Override
-  public void beginWindow(long windowId)
-  {
+  public void beginWindow(long windowId) {
     if (!available && source instanceof ValuesSource) {
       // if it's a Create and the input was consumed, emit final watermark
       emitWatermarkIfNecessary(GlobalWindow.TIMESTAMP_MAX_VALUE.getMillis());
@@ -95,37 +95,33 @@ public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT extends Unb
   }
 
   @Override
-  public void endWindow()
-  {
+  public void endWindow() {
   }
 
   @Override
-  public void setup(OperatorContext context)
-  {
+  public void setup(OperatorContext context) {
     this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(pipelineOptions.get(), this);
     try {
       reader = source.createReader(this.pipelineOptions.get(), null);
       available = reader.start();
     } catch (IOException e) {
-      Throwables.propagate(e);
+      throw new RuntimeException(e);
     }
   }
 
   @Override
-  public void teardown()
-  {
+  public void teardown() {
     try {
       if (reader != null) {
         reader.close();
       }
     } catch (IOException e) {
-      Throwables.propagate(e);
+      throw new RuntimeException(e);
     }
   }
 
   @Override
-  public void emitTuples()
-  {
+  public void emitTuples() {
     try {
       if (!available) {
         available = reader.advance();
@@ -141,7 +137,8 @@ public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT extends Unb
             data, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)));
       }
     } catch (Exception e) {
-      Throwables.propagate(e);
+      Throwables.propagateIfPossible(e);
+      throw new RuntimeException(e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ValuesSource.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ValuesSource.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ValuesSource.java
index 2c4b298..fadf8ec4 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ValuesSource.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ValuesSource.java
@@ -18,16 +18,6 @@
 
 package org.apache.beam.runners.apex.translators.io;
 
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.coders.Coder.Context;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-import org.joda.time.Instant;
-
-import com.google.common.base.Throwables;
-
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -37,8 +27,15 @@ import java.util.NoSuchElementException;
 
 import javax.annotation.Nullable;
 
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.Context;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.joda.time.Instant;
+
 /**
- * unbounded source that reads from a Java {@link Iterable}.
+ * Unbounded source that reads from a Java {@link Iterable}.
  */
 public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.CheckpointMark> {
   private static final long serialVersionUID = 1L;
@@ -52,7 +49,7 @@ public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.Checkpoi
     try {
       iterableCoder.encode(values, bos, Context.OUTER);
     } catch (IOException ex) {
-      Throwables.propagate(ex);
+      throw new RuntimeException(ex);
     }
     this.codedValues = bos.toByteArray();
   }
@@ -71,7 +68,7 @@ public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.Checkpoi
       Iterable<T> values = this.iterableCoder.decode(bis, Context.OUTER);
       return new ValuesReader<>(values, this);
     } catch (IOException ex) {
-      throw Throwables.propagate(ex);
+      throw new RuntimeException(ex);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/package-info.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/package-info.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/package-info.java
new file mode 100644
index 0000000..0d17f19
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation of the Beam runner for Apache Apex.
+ */
+package org.apache.beam.runners.apex.translators.io;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/package-info.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/package-info.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/package-info.java
new file mode 100644
index 0000000..7d7c6cc
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation of the Beam runner for Apache Apex.
+ */
+package org.apache.beam.runners.apex.translators;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
index c9bf6dc..a260a66 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
@@ -19,6 +19,8 @@ package org.apache.beam.runners.apex.translators.utils;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import com.datatorrent.api.Operator;
+
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -32,24 +34,25 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.StandardCoder;
 
-import com.datatorrent.api.Operator;
-
-public interface ApexStreamTuple<T>
-{
+/**
+ * The common interface for all objects transmitted through streams.
+ *
+ * @param <T> The actual payload type.
+ */
+public interface ApexStreamTuple<T> {
   /**
-   * Gets the value of the tuple
+   * Gets the value of the tuple.
    *
    * @return
    */
   T getValue();
 
   /**
-   * Plain tuple class
+   * Data tuple class.
    *
    * @param <T>
    */
-  class DataTuple<T> implements ApexStreamTuple<T>
-  {
+  class DataTuple<T> implements ApexStreamTuple<T> {
     private int unionTag;
     private T value;
 
@@ -57,86 +60,73 @@ public interface ApexStreamTuple<T>
       return new DataTuple<>(value, 0);
     }
 
-    private DataTuple(T value, int unionTag)
-    {
+    private DataTuple(T value, int unionTag) {
       this.value = value;
       this.unionTag = unionTag;
     }
 
     @Override
-    public T getValue()
-    {
+    public T getValue() {
       return value;
     }
 
-    public void setValue(T value)
-    {
+    public void setValue(T value) {
       this.value = value;
     }
 
-    public int getUnionTag()
-    {
+    public int getUnionTag() {
       return unionTag;
     }
 
-    public void setUnionTag(int unionTag)
-    {
+    public void setUnionTag(int unionTag) {
       this.unionTag = unionTag;
     }
 
     @Override
-    public String toString()
-    {
+    public String toString() {
       return value.toString();
     }
 
   }
 
   /**
-   * Tuple that includes a timestamp
+   * Tuple that includes a timestamp.
    *
    * @param <T>
    */
-  class TimestampedTuple<T> extends DataTuple<T>
-  {
+  class TimestampedTuple<T> extends DataTuple<T> {
     private long timestamp;
 
-    public TimestampedTuple(long timestamp, T value)
-    {
+    public TimestampedTuple(long timestamp, T value) {
       super(value, 0);
       this.timestamp = timestamp;
     }
 
-    public long getTimestamp()
-    {
+    public long getTimestamp() {
       return timestamp;
     }
 
-    public void setTimestamp(long timestamp)
-    {
+    public void setTimestamp(long timestamp) {
       this.timestamp = timestamp;
     }
   }
 
   /**
-   * Tuple that represents a watermark
+   * Tuple that represents a watermark.
    *
    * @param <T>
    */
-  class WatermarkTuple<T> extends TimestampedTuple<T>
-  {
+  class WatermarkTuple<T> extends TimestampedTuple<T> {
     public static <T> WatermarkTuple<T> of(long timestamp) {
       return new WatermarkTuple<>(timestamp);
     }
 
-    protected WatermarkTuple(long timestamp)
-    {
+    protected WatermarkTuple(long timestamp) {
       super(timestamp, null);
     }
 
     @Override
-    public String toString()
-    {
+    public String toString() {
       return "[Watermark " + getTimestamp() + "]";
     }
   }
@@ -161,18 +151,17 @@ public interface ApexStreamTuple<T>
         throws CoderException, IOException {
       if (value instanceof WatermarkTuple) {
         outStream.write(1);
-        new DataOutputStream(outStream).writeLong(((WatermarkTuple<?>)value).getTimestamp());
+        new DataOutputStream(outStream).writeLong(((WatermarkTuple<?>) value).getTimestamp());
       } else {
         outStream.write(0);
-        outStream.write(((DataTuple<?>)value).unionTag);
+        outStream.write(((DataTuple<?>) value).unionTag);
         valueCoder.encode(value.getValue(), outStream, context);
       }
     }
 
     @Override
     public ApexStreamTuple<T> decode(InputStream inStream, Context context)
-        throws CoderException, IOException
-    {
+        throws CoderException, IOException {
       int b = inStream.read();
       if (b == 1) {
         return new WatermarkTuple<T>(new DataInputStream(inStream).readLong());
@@ -183,14 +172,12 @@ public interface ApexStreamTuple<T>
     }
 
     @Override
-    public List<? extends Coder<?>> getCoderArguments()
-    {
+    public List<? extends Coder<?>> getCoderArguments() {
       return Arrays.<Coder<?>>asList(valueCoder);
     }
 
     @Override
-    public void verifyDeterministic() throws NonDeterministicException
-    {
+    public void verifyDeterministic() throws NonDeterministicException {
       verifyDeterministic(
           this.getClass().getSimpleName() + " requires a deterministic valueCoder",
           valueCoder);
@@ -205,10 +192,12 @@ public interface ApexStreamTuple<T>
 
   }
 
-  final class Logging
-  {
-    public static boolean isDebugEnabled(ApexPipelineOptions options, Operator operator)
-    {
+  /**
+   * Central if data tuples received on and emitted from ports should be logged.
+   * Should be called in setup and value cached in operator.
+   */
+  final class Logging {
+    public static boolean isDebugEnabled(ApexPipelineOptions options, Operator operator) {
       return options.isTupleTracingEnabled();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/CoderAdapterStreamCodec.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/CoderAdapterStreamCodec.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/CoderAdapterStreamCodec.java
index c18765b..61e3b83 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/CoderAdapterStreamCodec.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/CoderAdapterStreamCodec.java
@@ -17,6 +17,9 @@
  */
 package org.apache.beam.runners.apex.translators.utils;
 
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.netlet.util.Slice;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -25,15 +28,10 @@ import java.io.Serializable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.Context;
 
-import com.datatorrent.api.StreamCodec;
-import com.datatorrent.netlet.util.Slice;
-import com.google.common.base.Throwables;
-
 /**
  * The Apex {@link StreamCodec} adapter for using Beam {@link Coder}.
  */
 public class CoderAdapterStreamCodec implements StreamCodec<Object>, Serializable {
-
   private static final long serialVersionUID = 1L;
   private final Coder<? super Object> coder;
 
@@ -42,31 +40,29 @@ public class CoderAdapterStreamCodec implements StreamCodec<Object>, Serializabl
   }
 
   @Override
-  public Object fromByteArray(Slice fragment)
-  {
-    ByteArrayInputStream bis = new ByteArrayInputStream(fragment.buffer, fragment.offset, fragment.length);
+  public Object fromByteArray(Slice fragment) {
+    ByteArrayInputStream bis = new ByteArrayInputStream(fragment.buffer, fragment.offset,
+        fragment.length);
     try {
       return coder.decode(bis, Context.OUTER);
     } catch (IOException e) {
-      throw Throwables.propagate(e);
+      throw new RuntimeException(e);
     }
   }
 
   @Override
-  public Slice toByteArray(Object wv)
-  {
+  public Slice toByteArray(Object wv) {
     ByteArrayOutputStream bos = new ByteArrayOutputStream();
     try {
       coder.encode(wv, bos, Context.OUTER);
     } catch (IOException e) {
-      Throwables.propagate(e);
+      throw new RuntimeException(e);
     }
     return new Slice(bos.toByteArray());
   }
 
   @Override
-  public int getPartition(Object o)
-  {
+  public int getPartition(Object o) {
     return o.hashCode();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpStepContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpStepContext.java
index 43d92f6..3b19c37 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpStepContext.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpStepContext.java
@@ -17,6 +17,9 @@
  */
 package org.apache.beam.runners.apex.translators.utils;
 
+import java.io.IOException;
+import java.io.Serializable;
+
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.ExecutionContext;
@@ -25,14 +28,10 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.TupleTag;
 
-import java.io.IOException;
-import java.io.Serializable;
-
 /**
  * Serializable {@link ExecutionContext.StepContext} that does nothing.
  */
 public class NoOpStepContext implements ExecutionContext.StepContext, Serializable {
-
   private static final long serialVersionUID = 1L;
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/SerializablePipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/SerializablePipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/SerializablePipelineOptions.java
index 7f7b3ef..d32b869 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/SerializablePipelineOptions.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/SerializablePipelineOptions.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.runners.apex.translators.utils;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+
 import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
@@ -25,37 +27,34 @@ import java.io.ObjectOutput;
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-
 /**
- * A wrapper to enable serialization of {@link PipelineOptions} 
+ * A wrapper to enable serialization of {@link PipelineOptions}.
  */
 public class SerializablePipelineOptions implements Externalizable {
 
   private transient ApexPipelineOptions pipelineOptions;
-  
+
   public SerializablePipelineOptions(ApexPipelineOptions pipelineOptions) {
     this.pipelineOptions = pipelineOptions;
   }
 
   public SerializablePipelineOptions() {
   }
-  
+
   public ApexPipelineOptions get() {
     return this.pipelineOptions;
   }
-  
+
   @Override
-  public void writeExternal(ObjectOutput out) throws IOException
-  {
+  public void writeExternal(ObjectOutput out) throws IOException {
     out.writeUTF(new ObjectMapper().writeValueAsString(pipelineOptions));
   }
 
   @Override
-  public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
-  {
+  public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
     String s = in.readUTF();
-    this.pipelineOptions = new ObjectMapper().readValue(s, PipelineOptions.class).as(ApexPipelineOptions.class);
+    this.pipelineOptions = new ObjectMapper().readValue(s, PipelineOptions.class)
+        .as(ApexPipelineOptions.class);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ValueAndCoderKryoSerializable.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ValueAndCoderKryoSerializable.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ValueAndCoderKryoSerializable.java
index 2de737d..c06c500 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ValueAndCoderKryoSerializable.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ValueAndCoderKryoSerializable.java
@@ -17,26 +17,24 @@
  */
 package org.apache.beam.runners.apex.translators.utils;
 
-import java.io.IOException;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.Coder.Context;
-
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.KryoSerializable;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import com.esotericsoftware.kryo.serializers.JavaSerializer;
-import com.google.common.base.Throwables;
+
+import java.io.IOException;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.Context;
 
 
 /**
  * A {@link KryoSerializable} holder that uses the specified {@link Coder}.
  * @param <T>
  */
-public class ValueAndCoderKryoSerializable<T> implements KryoSerializable
-{
-  private static JavaSerializer JAVA_SERIALIZER = new JavaSerializer();
+public class ValueAndCoderKryoSerializable<T> implements KryoSerializable {
+  private static final JavaSerializer JAVA_SERIALIZER = new JavaSerializer();
   private T value;
   private Coder<T> coder;
 
@@ -54,27 +52,25 @@ public class ValueAndCoderKryoSerializable<T> implements KryoSerializable
   }
 
   @Override
-  public void write(Kryo kryo, Output output)
-  {
+  public void write(Kryo kryo, Output output) {
     try {
       kryo.writeClass(output, coder.getClass());
       kryo.writeObject(output, coder, JAVA_SERIALIZER);
       coder.encode(value, output, Context.OUTER);
     } catch (IOException e) {
-      Throwables.propagate(e);
+      throw new RuntimeException(e);
     }
   }
 
   @Override
-  public void read(Kryo kryo, Input input)
-  {
+  public void read(Kryo kryo, Input input) {
     try {
       @SuppressWarnings("unchecked")
       Class<Coder<T>> type = kryo.readClass(input).getType();
       coder = kryo.readObject(input, type, JAVA_SERIALIZER);
       value = coder.decode(input, Context.OUTER);
     } catch (IOException e) {
-      Throwables.propagate(e);
+      throw new RuntimeException(e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/package-info.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/package-info.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/package-info.java
new file mode 100644
index 0000000..4aeba35
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation of the Beam runner for Apache Apex.
+ */
+package org.apache.beam.runners.apex.translators.utils;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTest.java
deleted file mode 100644
index 3573d31..0000000
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTest.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.apex.examples;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.runners.apex.TestApexRunner;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.dataflow.TestCountingSource;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * For debugging only.
- */
-@Ignore
-@RunWith(JUnit4.class)
-public class IntTest implements java.io.Serializable
-{
-
-  @Test
-  public void test()
-  {
-    ApexPipelineOptions options = PipelineOptionsFactory.as(ApexPipelineOptions.class);
-    options.setTupleTracingEnabled(true);
-    options.setRunner(TestApexRunner.class);
-    Pipeline p = Pipeline.create(options);
-boolean timeBound = false;
-
-
-  TestCountingSource source = new TestCountingSource(Integer.MAX_VALUE).withoutSplitting();
-//List<KV<Integer,Integer>> values = Lists.newArrayList(
-//    KV.of(0, 99),KV.of(0, 99),KV.of(0, 98));
-
-//UnboundedSource<KV<Integer,Integer>, ?> source = new ValuesSource<>(values,
-//   KvCoder.of(VarIntCoder.of(), VarIntCoder.of()));
-
-  if (true) {
-      source = source.withDedup();
-    }
-
-    PCollection<KV<Integer, Integer>> output =
-        timeBound
-        ? p.apply(Read.from(source).withMaxReadTime(Duration.millis(200)))
-         : p.apply(Read.from(source).withMaxNumRecords(NUM_RECORDS));
-
-    List<KV<Integer, Integer>> expectedOutput = new ArrayList<>();
-    for (int i = 0; i < NUM_RECORDS; i++) {
-      expectedOutput.add(KV.of(0, i));
-    }
-
-    // Because some of the NUM_RECORDS elements read are dupes, the final output
-    // will only have output from 0 to n where n < NUM_RECORDS.
-    PAssert.that(output).satisfies(new Checker(true, timeBound));
-
-
-    p.run();
-    return;
-  }
-
-  private static final int NUM_RECORDS = 10;
-  private static class Checker implements SerializableFunction<Iterable<KV<Integer, Integer>>, Void>
-  {
-    private final boolean dedup;
-    private final boolean timeBound;
-
-    Checker(boolean dedup, boolean timeBound)
-    {
-      this.dedup = dedup;
-      this.timeBound = timeBound;
-    }
-
-    @Override
-    public Void apply(Iterable<KV<Integer, Integer>> input)
-    {
-      List<Integer> values = new ArrayList<>();
-      for (KV<Integer, Integer> kv : input) {
-        assertEquals(0, (int)kv.getKey());
-        values.add(kv.getValue());
-      }
-      if (timeBound) {
-        assertTrue(values.size() >= 1);
-      } else if (dedup) {
-        // Verify that at least some data came through.  The chance of 90% of the input
-        // being duplicates is essentially zero.
-        assertTrue(values.size() > NUM_RECORDS / 10 && values.size() <= NUM_RECORDS);
-      } else {
-        assertEquals(NUM_RECORDS, values.size());
-      }
-      Collections.sort(values);
-      for (int i = 0; i < values.size(); i++) {
-        assertEquals(i, (int)values.get(i));
-      }
-      //if (finalizeTracker != null) {
-      //  assertThat(finalizeTracker, containsInAnyOrder(values.size() - 1));
-      //}
-      return null;
-    }
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java
index 582d839..6ab2e8e 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java
@@ -21,8 +21,8 @@ package org.apache.beam.runners.apex.examples;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.runners.apex.ApexRunnerResult;
 import org.apache.beam.runners.apex.ApexRunner;
+import org.apache.beam.runners.apex.ApexRunnerResult;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -37,7 +37,6 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -76,8 +75,8 @@ public class StreamingWordCountTest {
 
     @ProcessElement
     public void processElement(ProcessContext c) {
-      String row = c.element().getKey() + " - " + c.element().getValue() +
-          " @ " + c.timestamp().toString();
+      String row = c.element().getKey() + " - " + c.element().getValue()
+          + " @ " + c.timestamp().toString();
       LOG.debug("output {}", row);
       c.output(row);
       RESULTS.put(c.element().getKey(), c.element().getValue());
@@ -103,17 +102,19 @@ public class StreamingWordCountTest {
 
     wordCounts.apply(ParDo.of(new FormatAsStringFn()));
 
-    ApexRunnerResult result = (ApexRunnerResult)p.run();
+    ApexRunnerResult result = (ApexRunnerResult) p.run();
     Assert.assertNotNull(result.getApexDAG().getOperatorMeta("Read(UnboundedTextSource)"));
     long timeout = System.currentTimeMillis() + 30000;
     while (System.currentTimeMillis() < timeout) {
-      if (FormatAsStringFn.RESULTS.containsKey("foo") && FormatAsStringFn.RESULTS.containsKey("bar")) {
+      if (FormatAsStringFn.RESULTS.containsKey("foo")
+          && FormatAsStringFn.RESULTS.containsKey("bar")) {
         break;
       }
       Thread.sleep(1000);
     }
     result.cancel();
-    Assert.assertTrue(FormatAsStringFn.RESULTS.containsKey("foo") && FormatAsStringFn.RESULTS.containsKey("bar"));
+    Assert.assertTrue(
+        FormatAsStringFn.RESULTS.containsKey("foo") && FormatAsStringFn.RESULTS.containsKey("bar"));
     FormatAsStringFn.RESULTS.clear();
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
index 29351e9..8132ee5 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
@@ -18,14 +18,6 @@
 
 package org.apache.beam.runners.apex.examples;
 
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.joda.time.Instant;
-
-import com.google.common.base.Throwables;
-
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collections;
@@ -34,6 +26,12 @@ import java.util.NoSuchElementException;
 
 import javax.annotation.Nullable;
 
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.joda.time.Instant;
+
 /**
  * unbounded source that reads from text.
  */
@@ -102,7 +100,7 @@ public class UnboundedTextSource extends UnboundedSource<String, UnboundedSource
       try {
         Thread.sleep(index); // allow for downstream processing to complete
       } catch (InterruptedException e) {
-        Throwables.propagate(e);
+        throw new RuntimeException(e);
       }
       return true;
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/package-info.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/package-info.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/package-info.java
new file mode 100644
index 0000000..4308c80
--- /dev/null
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation of the Beam runner for Apache Apex.
+ */
+package org.apache.beam.runners.apex.examples;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslatorTest.java
index 6b181ba..7defc77 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslatorTest.java
@@ -18,9 +18,16 @@
 
 package org.apache.beam.runners.apex.translators;
 
+import com.google.common.collect.Sets;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
 import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.runners.apex.ApexRunnerResult;
 import org.apache.beam.runners.apex.ApexRunner;
+import org.apache.beam.runners.apex.ApexRunnerResult;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -30,21 +37,13 @@ import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
-
-import com.google.common.collect.Sets;
-
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-
 /**
- * integration test for {@link FlattenPCollectionTranslator}.
+ * Integration test for {@link FlattenPCollectionTranslator}.
  */
 public class FlattenPCollectionTranslatorTest {
   private static final Logger LOG = LoggerFactory.getLogger(FlattenPCollectionTranslatorTest.class);
@@ -70,29 +69,30 @@ public class FlattenPCollectionTranslatorTest {
     PCollection<String> actual = PCollectionList.of(pcList).apply(Flatten.<String>pCollections());
     actual.apply(ParDo.of(new EmbeddedCollector()));
 
-    ApexRunnerResult result = (ApexRunnerResult)p.run();
+    ApexRunnerResult result = (ApexRunnerResult) p.run();
     // TODO: verify translation
     result.getApexDAG();
     long timeout = System.currentTimeMillis() + 30000;
-    while (System.currentTimeMillis() < timeout && EmbeddedCollector.results.size() < expected.size()) {
+    while (System.currentTimeMillis() < timeout
+        && EmbeddedCollector.RESULTS.size() < expected.size()) {
       LOG.info("Waiting for expected results.");
       Thread.sleep(500);
     }
 
-    Assert.assertEquals("number results", expected.size(), EmbeddedCollector.results.size());
-    Assert.assertEquals(expected, Sets.newHashSet(EmbeddedCollector.results));
+    Assert.assertEquals("number results", expected.size(), EmbeddedCollector.RESULTS.size());
+    Assert.assertEquals(expected, Sets.newHashSet(EmbeddedCollector.RESULTS));
   }
 
   @SuppressWarnings("serial")
   private static class EmbeddedCollector extends OldDoFn<Object, Void> {
-    protected static final ArrayList<Object> results = new ArrayList<>();
+    protected static final ArrayList<Object> RESULTS = new ArrayList<>();
 
     public EmbeddedCollector() {
     }
 
     @Override
     public void processElement(ProcessContext c) throws Exception {
-      results.add(c.element());
+      RESULTS.add(c.element());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java
index e4d4606..cb764d6 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java
@@ -18,9 +18,22 @@
 
 package org.apache.beam.runners.apex.translators;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import javax.annotation.Nullable;
+
 import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.runners.apex.ApexRunnerResult;
 import org.apache.beam.runners.apex.ApexRunner;
+import org.apache.beam.runners.apex.ApexRunnerResult;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -35,28 +48,13 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
-
-import com.datatorrent.api.DAG;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import javax.annotation.Nullable;
-
 /**
- * integration test for {@link GroupByKeyTranslator}.
+ * Integration test for {@link GroupByKeyTranslator}.
  */
 public class GroupByKeyTranslatorTest {
 
@@ -94,31 +92,30 @@ public class GroupByKeyTranslatorTest {
         .apply(ParDo.of(new EmbeddedCollector()))
         ;
 
-    ApexRunnerResult result = (ApexRunnerResult)p.run();
-    // TODO: verify translation
-    DAG dag = result.getApexDAG();
+    ApexRunnerResult result = (ApexRunnerResult) p.run();
+    result.getApexDAG();
 
     long timeout = System.currentTimeMillis() + 30000;
     while (System.currentTimeMillis() < timeout) {
-      if (EmbeddedCollector.results.containsAll(expected)) {
+      if (EmbeddedCollector.RESULTS.containsAll(expected)) {
         break;
       }
       Thread.sleep(1000);
     }
-    Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.results);
+    Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS);
 
   }
 
   @SuppressWarnings("serial")
   private static class EmbeddedCollector extends OldDoFn<Object, Void> {
-    protected static final HashSet<Object> results = new HashSet<>();
+    protected static final HashSet<Object> RESULTS = new HashSet<>();
 
     public EmbeddedCollector() {
     }
 
     @Override
     public void processElement(ProcessContext c) throws Exception {
-      results.add(c.element());
+      RESULTS.add(c.element());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
index b9748ee..ad22acd 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
@@ -21,6 +21,11 @@ package org.apache.beam.runners.apex.translators;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.util.KryoCloneUtils;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -56,11 +61,6 @@ import org.junit.runners.JUnit4;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.datatorrent.api.DAG;
-import com.datatorrent.lib.util.KryoCloneUtils;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
 /**
  * integration test for {@link ParDoBoundTranslator}.
  */
@@ -83,7 +83,7 @@ public class ParDoBoundTranslatorTest {
         .apply(ParDo.of(new Add(5)))
         .apply(ParDo.of(new EmbeddedCollector()));
 
-    ApexRunnerResult result = (ApexRunnerResult)p.run();
+    ApexRunnerResult result = (ApexRunnerResult) p.run();
     DAG dag = result.getApexDAG();
 
     DAG.OperatorMeta om = dag.getOperatorMeta("Create.Values");
@@ -96,13 +96,13 @@ public class ParDoBoundTranslatorTest {
 
     long timeout = System.currentTimeMillis() + 30000;
     while (System.currentTimeMillis() < timeout) {
-      if (EmbeddedCollector.results.containsAll(expected)) {
+      if (EmbeddedCollector.RESULTS.containsAll(expected)) {
         break;
       }
       LOG.info("Waiting for expected results.");
       Thread.sleep(1000);
     }
-    Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.results);
+    Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS);
   }
 
   @SuppressWarnings("serial")
@@ -121,14 +121,14 @@ public class ParDoBoundTranslatorTest {
 
   @SuppressWarnings("serial")
   private static class EmbeddedCollector extends OldDoFn<Object, Void> {
-    protected static final HashSet<Object> results = new HashSet<>();
+    protected static final HashSet<Object> RESULTS = new HashSet<>();
 
     public EmbeddedCollector() {
     }
 
     @Override
     public void processElement(ProcessContext c) throws Exception {
-      results.add(c.element());
+      RESULTS.add(c.element());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java
index f954537..71c5354 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java
@@ -18,9 +18,20 @@
 
 package org.apache.beam.runners.apex.translators;
 
+import com.datatorrent.api.DAG;
+import com.google.common.collect.ContiguousSet;
+import com.google.common.collect.DiscreteDomain;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
 import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.runners.apex.ApexRunnerResult;
 import org.apache.beam.runners.apex.ApexRunner;
+import org.apache.beam.runners.apex.ApexRunnerResult;
 import org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator;
 import org.apache.beam.runners.apex.translators.utils.CollectionSource;
 import org.apache.beam.sdk.Pipeline;
@@ -30,23 +41,11 @@ import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
-
-import com.datatorrent.api.DAG;
-import com.google.common.collect.ContiguousSet;
-import com.google.common.collect.DiscreteDomain;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Range;
-import com.google.common.collect.Sets;
-
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
 /**
  * integration test for {@link ReadUnboundedTranslator}.
  */
@@ -57,7 +56,7 @@ public class ReadUnboundTranslatorTest {
   public void test() throws Exception {
     ApexPipelineOptions options = PipelineOptionsFactory.create()
         .as(ApexPipelineOptions.class);
-    EmbeddedCollector.results.clear();
+    EmbeddedCollector.RESULTS.clear();
     options.setApplicationName("ReadUnbound");
     options.setRunner(ApexRunner.class);
     Pipeline p = Pipeline.create(options);
@@ -67,7 +66,7 @@ public class ReadUnboundTranslatorTest {
     p.apply(Read.from(source))
         .apply(ParDo.of(new EmbeddedCollector()));
 
-    ApexRunnerResult result = (ApexRunnerResult)p.run();
+    ApexRunnerResult result = (ApexRunnerResult) p.run();
     DAG dag = result.getApexDAG();
     DAG.OperatorMeta om = dag.getOperatorMeta("Read(CollectionSource)");
     Assert.assertNotNull(om);
@@ -75,20 +74,20 @@ public class ReadUnboundTranslatorTest {
 
     long timeout = System.currentTimeMillis() + 30000;
     while (System.currentTimeMillis() < timeout) {
-      if (EmbeddedCollector.results.containsAll(collection)) {
+      if (EmbeddedCollector.RESULTS.containsAll(collection)) {
         break;
       }
       LOG.info("Waiting for expected results.");
       Thread.sleep(1000);
     }
-    Assert.assertEquals(Sets.newHashSet(collection), EmbeddedCollector.results);
+    Assert.assertEquals(Sets.newHashSet(collection), EmbeddedCollector.RESULTS);
   }
 
   @Test
   public void testReadBounded() throws Exception {
     ApexPipelineOptions options = PipelineOptionsFactory.create()
         .as(ApexPipelineOptions.class);
-    EmbeddedCollector.results.clear();
+    EmbeddedCollector.RESULTS.clear();
     options.setApplicationName("ReadBounded");
     options.setRunner(ApexRunner.class);
     Pipeline p = Pipeline.create(options);
@@ -97,7 +96,7 @@ public class ReadUnboundTranslatorTest {
     p.apply(Read.from(CountingSource.upTo(10)))
         .apply(ParDo.of(new EmbeddedCollector()));
 
-    ApexRunnerResult result = (ApexRunnerResult)p.run();
+    ApexRunnerResult result = (ApexRunnerResult) p.run();
     DAG dag = result.getApexDAG();
     DAG.OperatorMeta om = dag.getOperatorMeta("Read(BoundedCountingSource)");
     Assert.assertNotNull(om);
@@ -105,25 +104,25 @@ public class ReadUnboundTranslatorTest {
 
     long timeout = System.currentTimeMillis() + 30000;
     while (System.currentTimeMillis() < timeout) {
-      if (EmbeddedCollector.results.containsAll(expected)) {
+      if (EmbeddedCollector.RESULTS.containsAll(expected)) {
         break;
       }
       LOG.info("Waiting for expected results.");
       Thread.sleep(1000);
     }
-    Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.results);
+    Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS);
   }
 
   @SuppressWarnings("serial")
   private static class EmbeddedCollector extends OldDoFn<Object, Void> {
-    protected static final HashSet<Object> results = new HashSet<>();
+    protected static final HashSet<Object> RESULTS = new HashSet<>();
 
     public EmbeddedCollector() {
     }
 
     @Override
     public void processElement(ProcessContext c) throws Exception {
-      results.add(c.element());
+      RESULTS.add(c.element());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/CollectionSource.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/CollectionSource.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/CollectionSource.java
index a1e8b3e..c368bb2 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/CollectionSource.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/CollectionSource.java
@@ -18,12 +18,6 @@
 
 package org.apache.beam.runners.apex.translators.utils;
 
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-import org.joda.time.Instant;
-
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collection;
@@ -34,11 +28,16 @@ import java.util.NoSuchElementException;
 
 import javax.annotation.Nullable;
 
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.joda.time.Instant;
+
 /**
  * collection as {@link UnboundedSource}, used for tests.
  */
 public class CollectionSource<T> extends UnboundedSource<T, UnboundedSource.CheckpointMark> {
-
+  private static final long serialVersionUID = 1L;
   private final Collection<T> collection;
   private final Coder<T> coder;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/PipelineOptionsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/PipelineOptionsTest.java
index e2fa9d9..e67efa9 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/PipelineOptionsTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/PipelineOptionsTest.java
@@ -17,29 +17,31 @@
  */
 package org.apache.beam.runners.apex.translators.utils;
 
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 
 import com.datatorrent.common.util.FSStorageAgent;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
 import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
 /**
  * Tests the serialization of PipelineOptions.
  */
 public class PipelineOptionsTest {
 
+  /**
+   * Interface for testing.
+   */
   public interface MyOptions extends ApexPipelineOptions {
     @Description("Bla bla bla")
     @Default.String("Hello")
@@ -60,7 +62,7 @@ public class PipelineOptionsTest {
 
   private static MyOptions options;
 
-  private final static String[] args = new String[]{"--testOption=nothing"};
+  private static final String[] args = new String[]{"--testOption=nothing"};
 
   @BeforeClass
   public static void beforeTest() {
@@ -74,7 +76,7 @@ public class PipelineOptionsTest {
     FSStorageAgent.store(bos, wrapper);
 
     ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
-    MyOptionsWrapper wrapperCopy = (MyOptionsWrapper)FSStorageAgent.retrieve(bis);
+    MyOptionsWrapper wrapperCopy = (MyOptionsWrapper) FSStorageAgent.retrieve(bis);
     assertNotNull(wrapperCopy.options);
     assertEquals("nothing", wrapperCopy.options.get().as(MyOptions.class).getTestOption());
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/resources/log4j.properties b/runners/apex/src/test/resources/log4j.properties
index c0efc5d..d1e6b44 100644
--- a/runners/apex/src/test/resources/log4j.properties
+++ b/runners/apex/src/test/resources/log4j.properties
@@ -18,16 +18,18 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-log4j.rootLogger=DEBUG, testlogger
+log4j.rootLogger=OFF, testlogger
 
 # A1 is set to be a ConsoleAppender.
 log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
 log4j.appender.testlogger.target = System.err
 log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
 log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+log4j.appender.testlogger.threshold=${test.log.threshold}
+test.log.threshold=DEBUG
 
-log4j.logger.org=debug
+log4j.logger.org=info
 log4j.logger.org.apache.commons.beanutils=warn
 log4j.logger.com.datatorrent=info
-log4j.logger.org.apache.apex=debug
+log4j.logger.org.apache.apex=info
 log4j.logger.org.apache.beam.runners.apex=debug