You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/12 02:28:27 UTC

[09/39] incubator-beam git commit: BEAM-261 Enable checkstyle and cleanup.

BEAM-261 Enable checkstyle and cleanup.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9454b3bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9454b3bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9454b3bd

Branch: refs/heads/master
Commit: 9454b3bdc6f6ff69363dcd339cfb069c2c2f8cc9
Parents: 1ec7cd9
Author: Thomas Weise <th...@apache.org>
Authored: Sun Oct 16 17:36:01 2016 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Mon Oct 17 09:22:49 2016 -0700

----------------------------------------------------------------------
 runners/apex/pom.xml                            |   2 -
 .../runners/apex/ApexPipelineTranslator.java    |  39 +--
 .../apache/beam/runners/apex/ApexRunner.java    | 314 +++----------------
 .../beam/runners/apex/ApexRunnerResult.java     |  23 +-
 .../beam/runners/apex/TestApexRunner.java       |   9 +-
 .../apache/beam/runners/apex/package-info.java  |  22 ++
 .../translators/CreateValuesTranslator.java     |  12 +-
 .../FlattenPCollectionTranslator.java           |  13 +-
 .../apex/translators/GroupByKeyTranslator.java  |   4 +-
 .../translators/ParDoBoundMultiTranslator.java  |  47 +--
 .../apex/translators/ParDoBoundTranslator.java  |   5 +-
 .../translators/ReadUnboundedTranslator.java    |   4 +-
 .../apex/translators/TransformTranslator.java   |   8 +-
 .../apex/translators/TranslationContext.java    |  40 +--
 .../functions/ApexFlattenOperator.java          |  42 ++-
 .../functions/ApexGroupByKeyOperator.java       | 155 +++++----
 .../functions/ApexParDoOperator.java            | 140 ++++-----
 .../translators/functions/package-info.java     |  22 ++
 .../io/ApexReadUnboundedInputOperator.java      |  57 ++--
 .../apex/translators/io/ValuesSource.java       |  23 +-
 .../apex/translators/io/package-info.java       |  22 ++
 .../runners/apex/translators/package-info.java  |  22 ++
 .../apex/translators/utils/ApexStreamTuple.java |  85 +++--
 .../utils/CoderAdapterStreamCodec.java          |  24 +-
 .../apex/translators/utils/NoOpStepContext.java |   7 +-
 .../utils/SerializablePipelineOptions.java      |  21 +-
 .../utils/ValueAndCoderKryoSerializable.java    |  26 +-
 .../apex/translators/utils/package-info.java    |  22 ++
 .../beam/runners/apex/examples/IntTest.java     | 133 --------
 .../apex/examples/StreamingWordCountTest.java   |  15 +-
 .../apex/examples/UnboundedTextSource.java      |  16 +-
 .../runners/apex/examples/package-info.java     |  22 ++
 .../FlattenPCollectionTranslatorTest.java       |  32 +-
 .../translators/GroupByKeyTranslatorTest.java   |  45 ++-
 .../translators/ParDoBoundTranslatorTest.java   |  20 +-
 .../translators/ReadUnboundTranslatorTest.java  |  45 ++-
 .../translators/utils/CollectionSource.java     |  13 +-
 .../translators/utils/PipelineOptionsTest.java  |  28 +-
 .../apex/src/test/resources/log4j.properties    |   8 +-
 39 files changed, 662 insertions(+), 925 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 929feb4..8b62410 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -148,12 +148,10 @@
   <build>
     <plugins>
 
-      <!-- Checkstyle errors for now
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-checkstyle-plugin</artifactId>
       </plugin>
-      -->
 
       <!-- Integration Tests -->
       <plugin>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
index a16f551..a6857ee 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
@@ -18,6 +18,11 @@
 
 package org.apache.beam.runners.apex;
 
+import com.datatorrent.api.DAG;
+
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.beam.runners.apex.ApexRunner.CreateApexPCollectionView;
 import org.apache.beam.runners.apex.translators.CreateValuesTranslator;
 import org.apache.beam.runners.apex.translators.FlattenPCollectionTranslator;
@@ -43,18 +48,13 @@ import org.apache.beam.sdk.values.PValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.Map;
-
 /**
  * {@link ApexPipelineTranslator} translates {@link Pipeline} objects
  * into Apex logical plan {@link DAG}.
  */
 @SuppressWarnings({"rawtypes", "unchecked"})
 public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
-
-  private static final Logger LOG = LoggerFactory.getLogger(
-      ApexPipelineTranslator.class);
+  private static final Logger LOG = LoggerFactory.getLogger(ApexPipelineTranslator.class);
 
   /**
    * A map from {@link PTransform} subclass to the corresponding
@@ -75,8 +75,10 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
     registerTransformTranslator(Flatten.FlattenPCollectionList.class,
         new FlattenPCollectionTranslator());
     registerTransformTranslator(Create.Values.class, new CreateValuesTranslator());
-    registerTransformTranslator(CreateApexPCollectionView.class, new CreateApexPCollectionViewTranslator());
-    registerTransformTranslator(CreatePCollectionView.class, new CreatePCollectionViewTranslator());
+    registerTransformTranslator(CreateApexPCollectionView.class,
+        new CreateApexPCollectionViewTranslator());
+    registerTransformTranslator(CreatePCollectionView.class,
+        new CreatePCollectionViewTranslator());
   }
 
   public ApexPipelineTranslator(TranslationContext translationContext) {
@@ -134,7 +136,7 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
    * Returns the {@link TransformTranslator} to use for instances of the
    * specified PTransform class, or null if none registered.
    */
-  private <TransformT extends PTransform<?,?>>
+  private <TransformT extends PTransform<?, ?>>
   TransformTranslator<TransformT> getTransformTranslator(Class<TransformT> transformClass) {
     return transformTranslators.get(transformClass);
   }
@@ -145,7 +147,8 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
     @Override
     public void translate(Read.Bounded<T> transform, TranslationContext context) {
       // TODO: adapter is visibleForTesting
-      BoundedToUnboundedSourceAdapter unboundedSource = new BoundedToUnboundedSourceAdapter<>(transform.getSource());
+      BoundedToUnboundedSourceAdapter unboundedSource = new BoundedToUnboundedSourceAdapter<>(
+          transform.getSource());
       ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>(
           unboundedSource, context.getPipelineOptions());
       context.addOperator(operator, operator.output);
@@ -153,26 +156,26 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
 
   }
 
-  private static class CreateApexPCollectionViewTranslator<ElemT, ViewT> implements TransformTranslator<CreateApexPCollectionView<ElemT, ViewT>>
-  {
+  private static class CreateApexPCollectionViewTranslator<ElemT, ViewT>
+      implements TransformTranslator<CreateApexPCollectionView<ElemT, ViewT>> {
     private static final long serialVersionUID = 1L;
 
     @Override
-    public void translate(CreateApexPCollectionView<ElemT, ViewT> transform, TranslationContext context)
-    {
+    public void translate(CreateApexPCollectionView<ElemT, ViewT> transform,
+        TranslationContext context) {
       PCollectionView<ViewT> view = transform.getView();
       context.addView(view);
       LOG.debug("view {}", view.getName());
     }
   }
 
-  private static class CreatePCollectionViewTranslator<ElemT, ViewT> implements TransformTranslator<CreatePCollectionView<ElemT, ViewT>>
-  {
+  private static class CreatePCollectionViewTranslator<ElemT, ViewT>
+      implements TransformTranslator<CreatePCollectionView<ElemT, ViewT>> {
     private static final long serialVersionUID = 1L;
 
     @Override
-    public void translate(CreatePCollectionView<ElemT, ViewT> transform, TranslationContext context)
-    {
+    public void translate(CreatePCollectionView<ElemT, ViewT> transform,
+        TranslationContext context) {
       PCollectionView<ViewT> view = transform.getView();
       context.addView(view);
       LOG.debug("view {}", view.getName());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index 667f1c8..f3c44bb 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -19,17 +19,18 @@ package org.apache.beam.runners.apex;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import java.util.ArrayList;
+import com.datatorrent.api.Context.DAGContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.google.common.base.Throwables;
+
 import java.util.Arrays;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.beam.runners.apex.translators.TranslationContext;
+import org.apache.beam.runners.core.AssignWindows;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.Combine;
@@ -39,31 +40,22 @@ import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.OldDoFn.ProcessContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.runners.core.AssignWindows;
 import org.apache.beam.sdk.util.PCollectionViews;
 import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.hadoop.conf.Configuration;
 
-import com.datatorrent.api.Context.DAGContext;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.api.StreamingApplication;
-import com.google.common.base.Throwables;
-
 /**
  * A {@link PipelineRunner} that translates the
  * pipeline to an Apex DAG and executes it on an Apex cluster.
- * <p>
- * Currently execution is always in embedded mode,
+ *
+ * <p>Currently execution is always in embedded mode,
  * launch on Hadoop cluster will be added in subsequent iteration.
  */
 @SuppressWarnings({"rawtypes", "unchecked"})
@@ -99,37 +91,16 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
               input.getPipeline(),
               WindowingStrategy.globalDefault(),
               PCollection.IsBounded.BOUNDED);
-// TODO: replace this with a mapping
-////
-
     } else if (Combine.GloballyAsSingletonView.class.equals(transform.getClass())) {
-      PTransform<InputT, OutputT> customTransform = (PTransform)new StreamingCombineGloballyAsSingletonView<InputT, OutputT>(this,
-          (Combine.GloballyAsSingletonView)transform);
+      PTransform<InputT, OutputT> customTransform = (PTransform)
+          new StreamingCombineGloballyAsSingletonView<InputT, OutputT>(
+              this, (Combine.GloballyAsSingletonView) transform);
       return Pipeline.applyTransform(input, customTransform);
     } else if (View.AsSingleton.class.equals(transform.getClass())) {
-      // note this assumes presence of above Combine.GloballyAsSingletonView mapping
-      PTransform<InputT, OutputT> customTransform = (PTransform)new StreamingViewAsSingleton<InputT>(this,
-          (View.AsSingleton)transform);
+      // assumes presence of above Combine.GloballyAsSingletonView mapping
+      PTransform<InputT, OutputT> customTransform = (PTransform)
+          new StreamingViewAsSingleton<InputT>(this, (View.AsSingleton) transform);
       return Pipeline.applyTransform(input, customTransform);
-/*
-    } else if (View.AsIterable.class.equals(transform.getClass())) {
-      PTransform<InputT, OutputT> customTransform = (PTransform)new StreamingViewAsIterable<InputT>(this,
-          (View.AsIterable)transform);
-      return Pipeline.applyTransform(input, customTransform);
-    } else if (View.AsList.class.equals(transform.getClass())) {
-      PTransform<InputT, OutputT> customTransform = (PTransform)new StreamingViewAsList<InputT>(this,
-          (View.AsList)transform);
-      return Pipeline.applyTransform(input, customTransform);
-    } else if (View.AsMap.class.equals(transform.getClass())) {
-      PTransform<InputT, OutputT> customTransform = new StreamingViewAsMap(this,
-          (View.AsMap)transform);
-      return Pipeline.applyTransform(input, customTransform);
-    } else if (View.AsMultimap.class.equals(transform.getClass())) {
-      PTransform<InputT, OutputT> customTransform = new StreamingViewAsMultimap(this,
-          (View.AsMultimap)transform);
-      return Pipeline.applyTransform(input, customTransform);
-*/
-////
     } else {
       return super.apply(transform, input);
     }
@@ -142,17 +113,16 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
     ApexPipelineTranslator translator = new ApexPipelineTranslator(translationContext);
     translator.translate(pipeline);
 
-    StreamingApplication apexApp = new StreamingApplication()
-    {
+    StreamingApplication apexApp = new StreamingApplication() {
       @Override
-      public void populateDAG(DAG dag, Configuration conf)
-      {
+      public void populateDAG(DAG dag, Configuration conf) {
         dag.setAttribute(DAGContext.APPLICATION_NAME, options.getApplicationName());
         translationContext.populateDAG(dag);
       }
     };
 
-    checkArgument(options.isEmbeddedExecution(), "only embedded execution is supported at this time");
+    checkArgument(options.isEmbeddedExecution(),
+        "only embedded execution is supported at this time");
     LocalMode lma = LocalMode.newInstance();
     Configuration conf = new Configuration(false);
     try {
@@ -178,7 +148,8 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
       }
       return new ApexRunnerResult(lma.getDAG(), lc);
     } catch (Exception e) {
-      throw Throwables.propagate(e);
+      Throwables.propagateIfPossible(e);
+      throw new RuntimeException(e);
     }
   }
 
@@ -231,13 +202,6 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
 // Adapted from FlinkRunner for View support
 
   /**
-   * Records that the {@link PTransform} requires a deterministic key coder.
-   */
-  private void recordViewUsesNonDeterministicKeyCoder(PTransform<?, ?> ptransform) {
-    //throw new UnsupportedOperationException();
-  }
-
-  /**
    * Creates a primitive {@link PCollectionView}.
    *
    * <p>For internal use only by runner implementors.
@@ -247,6 +211,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
    */
   public static class CreateApexPCollectionView<ElemT, ViewT>
       extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> {
+    private static final long serialVersionUID = 1L;
     private PCollectionView<ViewT> view;
 
     private CreateApexPCollectionView(PCollectionView<ViewT> view) {
@@ -276,52 +241,50 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
   }
 
   private static class StreamingCombineGloballyAsSingletonView<InputT, OutputT>
-      extends PTransform<PCollection<InputT>, PCollectionView<OutputT>>
-  {
+      extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
+    private static final long serialVersionUID = 1L;
     Combine.GloballyAsSingletonView<InputT, OutputT> transform;
 
     /**
      * Builds an instance of this class from the overridden transform.
      */
     public StreamingCombineGloballyAsSingletonView(ApexRunner runner,
-        Combine.GloballyAsSingletonView<InputT, OutputT> transform)
-    {
+        Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
       this.transform = transform;
     }
 
     @Override
-    public PCollectionView<OutputT> apply(PCollection<InputT> input)
-    {
+    public PCollectionView<OutputT> apply(PCollection<InputT> input) {
       PCollection<OutputT> combined = input
-          .apply(Combine.globally(transform.getCombineFn()).withoutDefaults().withFanout(transform.getFanout()));
+          .apply(Combine.globally(transform.getCombineFn())
+              .withoutDefaults().withFanout(transform.getFanout()));
 
       PCollectionView<OutputT> view = PCollectionViews.singletonView(combined.getPipeline(),
           combined.getWindowingStrategy(), transform.getInsertDefault(),
-          transform.getInsertDefault() ? transform.getCombineFn().defaultValue() : null, combined.getCoder());
+          transform.getInsertDefault() ? transform.getCombineFn().defaultValue() : null,
+              combined.getCoder());
       return combined.apply(ParDo.of(new WrapAsList<OutputT>()))
           .apply(CreateApexPCollectionView.<OutputT, OutputT> of(view));
     }
 
     @Override
-    protected String getKindString()
-    {
+    protected String getKindString() {
       return "StreamingCombineGloballyAsSingletonView";
     }
   }
 
-  private static class StreamingViewAsSingleton<T> extends PTransform<PCollection<T>, PCollectionView<T>>
-  {
+  private static class StreamingViewAsSingleton<T>
+      extends PTransform<PCollection<T>, PCollectionView<T>> {
     private static final long serialVersionUID = 1L;
+
     private View.AsSingleton<T> transform;
 
-    public StreamingViewAsSingleton(ApexRunner runner, View.AsSingleton<T> transform)
-    {
+    public StreamingViewAsSingleton(ApexRunner runner, View.AsSingleton<T> transform) {
       this.transform = transform;
     }
 
     @Override
-    public PCollectionView<T> apply(PCollection<T> input)
-    {
+    public PCollectionView<T> apply(PCollection<T> input) {
       Combine.Globally<T, T> combine = Combine
           .globally(new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
       if (!transform.hasDefaultValue()) {
@@ -331,33 +294,28 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
     }
 
     @Override
-    protected String getKindString()
-    {
+    protected String getKindString() {
       return "StreamingViewAsSingleton";
     }
 
-    private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T>
-    {
+    private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
       private boolean hasDefaultValue;
       private T defaultValue;
 
-      SingletonCombine(boolean hasDefaultValue, T defaultValue)
-      {
+      SingletonCombine(boolean hasDefaultValue, T defaultValue) {
         this.hasDefaultValue = hasDefaultValue;
         this.defaultValue = defaultValue;
       }
 
       @Override
-      public T apply(T left, T right)
-      {
+      public T apply(T left, T right) {
         throw new IllegalArgumentException("PCollection with more than one element "
             + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
             + "combine the PCollection into a single value");
       }
 
       @Override
-      public T identity()
-      {
+      public T identity() {
         if (hasDefaultValue) {
           return defaultValue;
         } else {
@@ -368,194 +326,4 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
     }
   }
 
-  private static class StreamingViewAsMap<K, V>
-      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
-
-    private final ApexRunner runner;
-
-    public StreamingViewAsMap(ApexRunner runner, View.AsMap<K, V> transform) {
-      this.runner = runner;
-    }
-
-    @Override
-    public PCollectionView<Map<K, V>> apply(PCollection<KV<K, V>> input) {
-      PCollectionView<Map<K, V>> view =
-          PCollectionViews.mapView(
-              input.getPipeline(),
-              input.getWindowingStrategy(),
-              input.getCoder());
-
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
-      try {
-        inputCoder.getKeyCoder().verifyDeterministic();
-      } catch (Coder.NonDeterministicException e) {
-        runner.recordViewUsesNonDeterministicKeyCoder(this);
-      }
-
-      return input
-          .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
-          .apply(CreateApexPCollectionView.<KV<K, V>, Map<K, V>>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsMap";
-    }
-  }
-
-  /**
-   * Specialized expansion for {@link
-   * org.apache.beam.sdk.transforms.View.AsMultimap View.AsMultimap} for the
-   * Flink runner in streaming mode.
-   */
-  private static class StreamingViewAsMultimap<K, V>
-      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
-
-    private final ApexRunner runner;
-
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
-    public StreamingViewAsMultimap(ApexRunner runner, View.AsMultimap<K, V> transform) {
-      this.runner = runner;
-    }
-
-    @Override
-    public PCollectionView<Map<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
-      PCollectionView<Map<K, Iterable<V>>> view =
-          PCollectionViews.multimapView(
-              input.getPipeline(),
-              input.getWindowingStrategy(),
-              input.getCoder());
-
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
-      try {
-        inputCoder.getKeyCoder().verifyDeterministic();
-      } catch (Coder.NonDeterministicException e) {
-        runner.recordViewUsesNonDeterministicKeyCoder(this);
-      }
-
-      return input
-          .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
-          .apply(CreateApexPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsMultimap";
-    }
-  }
-
-  /**
-   * Specialized implementation for
-   * {@link org.apache.beam.sdk.transforms.View.AsList View.AsList} for the
-   * Flink runner in streaming mode.
-   */
-  private static class StreamingViewAsList<T>
-      extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
-    public StreamingViewAsList(ApexRunner runner, View.AsList<T> transform) {}
-
-    @Override
-    public PCollectionView<List<T>> apply(PCollection<T> input) {
-      PCollectionView<List<T>> view =
-          PCollectionViews.listView(
-              input.getPipeline(),
-              input.getWindowingStrategy(),
-              input.getCoder());
-
-      return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
-          .apply(CreateApexPCollectionView.<T, List<T>>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsList";
-    }
-  }
-
-  /**
-   * Specialized implementation for
-   * {@link org.apache.beam.sdk.transforms.View.AsIterable View.AsIterable} for the
-   * Flink runner in streaming mode.
-   */
-  private static class StreamingViewAsIterable<T>
-      extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
-    public StreamingViewAsIterable(ApexRunner runner, View.AsIterable<T> transform) { }
-
-    @Override
-    public PCollectionView<Iterable<T>> apply(PCollection<T> input) {
-      PCollectionView<Iterable<T>> view =
-          PCollectionViews.iterableView(
-              input.getPipeline(),
-              input.getWindowingStrategy(),
-              input.getCoder());
-
-      return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
-          .apply(CreateApexPCollectionView.<T, Iterable<T>>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsIterable";
-    }
-  }
-
-  /**
-   * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
-   *
-   * <p>For internal use by {@link StreamingViewAsMap}, {@link StreamingViewAsMultimap},
-   * {@link StreamingViewAsList}, {@link StreamingViewAsIterable}.
-   * They require the input {@link PCollection} fits in memory.
-   * For a large {@link PCollection} this is expected to crash!
-   *
-   * @param <T> the type of elements to concatenate.
-   */
-  private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
-    @Override
-    public List<T> createAccumulator() {
-      return new ArrayList<T>();
-    }
-
-    @Override
-    public List<T> addInput(List<T> accumulator, T input) {
-      accumulator.add(input);
-      return accumulator;
-    }
-
-    @Override
-    public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
-      List<T> result = createAccumulator();
-      for (List<T> accumulator : accumulators) {
-        result.addAll(accumulator);
-      }
-      return result;
-    }
-
-    @Override
-    public List<T> extractOutput(List<T> accumulator) {
-      return accumulator;
-    }
-
-    @Override
-    public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
-      return ListCoder.of(inputCoder);
-    }
-
-    @Override
-    public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
-      return ListCoder.of(inputCoder);
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
index 6817684..d5613fe 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
@@ -17,20 +17,19 @@
  */
 package org.apache.beam.runners.apex;
 
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.metrics.MetricResults;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
 
 import java.io.IOException;
 
 import org.apache.beam.sdk.AggregatorRetrievalException;
 import org.apache.beam.sdk.AggregatorValues;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.joda.time.Duration;
 
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.LocalMode;
-
 /**
  * Result of executing a {@link Pipeline} with Apex in embedded mode.
  */
@@ -56,28 +55,24 @@ public class ApexRunnerResult implements PipelineResult {
   }
 
   @Override
-  public State cancel() throws IOException
-  {
+  public State cancel() throws IOException {
     ctrl.shutdown();
     state = State.CANCELLED;
     return state;
   }
 
   @Override
-  public State waitUntilFinish(Duration duration) throws IOException, InterruptedException
-  {
+  public State waitUntilFinish(Duration duration) throws IOException, InterruptedException {
     throw new UnsupportedOperationException();
   }
 
   @Override
-  public State waitUntilFinish() throws IOException, InterruptedException
-  {
+  public State waitUntilFinish() throws IOException, InterruptedException {
     throw new UnsupportedOperationException();
   }
 
   @Override
-  public MetricResults metrics()
-  {
+  public MetricResults metrics() {
     throw new UnsupportedOperationException();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java
index 45c143e..2e048f0 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java
@@ -25,7 +25,9 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 
-
+/**
+ * Apex {@link PipelineRunner} for testing.
+ */
 public class TestApexRunner extends PipelineRunner<ApexRunnerResult> {
 
   private ApexRunner delegate;
@@ -38,13 +40,14 @@ public class TestApexRunner extends PipelineRunner<ApexRunnerResult> {
   }
 
   public static TestApexRunner fromOptions(PipelineOptions options) {
-    ApexPipelineOptions apexOptions = PipelineOptionsValidator.validate(ApexPipelineOptions.class, options);
+    ApexPipelineOptions apexOptions = PipelineOptionsValidator
+        .validate(ApexPipelineOptions.class, options);
     return new TestApexRunner(apexOptions);
   }
 
   @Override
   public <OutputT extends POutput, InputT extends PInput>
-      OutputT apply(PTransform<InputT,OutputT> transform, InputT input) {
+      OutputT apply(PTransform<InputT, OutputT> transform, InputT input) {
     return delegate.apply(transform, input);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/package-info.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/package-info.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/package-info.java
new file mode 100644
index 0000000..4d2f417
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation of the Beam runner for Apache Apex.
+ */
+package org.apache.beam.runners.apex;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java
index 7a29057..539f311 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java
@@ -25,12 +25,10 @@ import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.PBegin;
 
-import com.google.common.base.Throwables;
-
 
 /**
  * Wraps elements from Create.Values into an {@link UnboundedSource}.
- * mainly used for test
+ * mainly used for testing
  */
 public class CreateValuesTranslator<T> implements TransformTranslator<Create.Values<T>> {
   private static final long serialVersionUID = 1451000241832745629L;
@@ -39,12 +37,12 @@ public class CreateValuesTranslator<T> implements TransformTranslator<Create.Val
   public void translate(Create.Values<T> transform, TranslationContext context) {
     try {
       UnboundedSource<T, ?> unboundedSource = new ValuesSource<>(transform.getElements(),
-          transform.getDefaultOutputCoder((PBegin)context.getInput()));
-      ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>(unboundedSource,
-          context.getPipelineOptions());
+          transform.getDefaultOutputCoder((PBegin) context.getInput()));
+      ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>(
+          unboundedSource, context.getPipelineOptions());
       context.addOperator(operator, operator.output);
     } catch (CannotProvideCoderException e) {
-      Throwables.propagate(e);
+      throw new RuntimeException(e);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
index 6737767..a39aacb 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
@@ -18,6 +18,8 @@
 
 package org.apache.beam.runners.apex.translators;
 
+import com.google.common.collect.Lists;
+
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -32,8 +34,6 @@ import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 
-import com.google.common.collect.Lists;
-
 /**
  * {@link Flatten.FlattenPCollectionList} translation to Apex operator.
  */
@@ -72,7 +72,8 @@ public class FlattenPCollectionTranslator<T> implements
    * @param finalCollection
    * @param context
    */
-  static <T> void flattenCollections(List<PCollection<T>> collections, Map<PCollection<?>, Integer> unionTags, PCollection<T> finalCollection, TranslationContext context) {
+  static <T> void flattenCollections(List<PCollection<T>> collections, Map<PCollection<?>,
+      Integer> unionTags, PCollection<T> finalCollection, TranslationContext context) {
     List<PCollection<T>> remainingCollections = Lists.newArrayList();
     PCollection<T> firstCollection = null;
     while (!collections.isEmpty()) {
@@ -93,7 +94,8 @@ public class FlattenPCollectionTranslator<T> implements
           }
 
           if (collections.size() > 2) {
-            PCollection<T> intermediateCollection = intermediateCollection(collection, collection.getCoder());
+            PCollection<T> intermediateCollection = intermediateCollection(collection,
+                collection.getCoder());
             context.addOperator(operator, operator.out, intermediateCollection);
             remainingCollections.add(intermediateCollection);
           } else {
@@ -118,7 +120,8 @@ public class FlattenPCollectionTranslator<T> implements
   }
 
   static <T> PCollection<T> intermediateCollection(PCollection<T> input, Coder<T> outputCoder) {
-    PCollection<T> output = PCollection.createPrimitiveOutputInternal(input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+    PCollection<T> output = PCollection.createPrimitiveOutputInternal(input.getPipeline(),
+        input.getWindowingStrategy(), input.isBounded());
     output.setCoder(outputCoder);
     return output;
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java
index 43c82a9..d3e7d2d 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java
@@ -31,9 +31,9 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
 
   @Override
   public void translate(GroupByKey<K, V> transform, TranslationContext context) {
-
     PCollection<KV<K, V>> input = context.getInput();
-    ApexGroupByKeyOperator<K, V> group = new ApexGroupByKeyOperator<>(context.getPipelineOptions(), input);
+    ApexGroupByKeyOperator<K, V> group = new ApexGroupByKeyOperator<>(context.getPipelineOptions(),
+        input);
     context.addOperator(group, group.output);
     context.addStream(input, group.input);
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
index a229a81..13f07c1 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
@@ -20,6 +20,10 @@ package org.apache.beam.runners.apex.translators;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.OutputPort;
+import com.google.common.collect.Maps;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -27,6 +31,7 @@ import java.util.Map;
 
 import org.apache.beam.runners.apex.translators.functions.ApexParDoOperator;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
@@ -35,16 +40,16 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
-
-import com.datatorrent.api.Operator;
-import com.datatorrent.api.Operator.OutputPort;
-import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * {@link ParDo.BoundMulti} is translated to Apex operator that wraps the {@link DoFn}
+ * {@link ParDo.BoundMulti} is translated to {@link ApexParDoOperator} that wraps the {@link DoFn}.
  */
-public class ParDoBoundMultiTranslator<InputT, OutputT> implements TransformTranslator<ParDo.BoundMulti<InputT, OutputT>>  {
+public class ParDoBoundMultiTranslator<InputT, OutputT>
+    implements TransformTranslator<ParDo.BoundMulti<InputT, OutputT>> {
   private static final long serialVersionUID = 1L;
+  private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundMultiTranslator.class);
 
   @Override
   public void translate(ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) {
@@ -56,7 +61,8 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements TransformTran
     WindowedValueCoder<InputT> wvInputCoder = FullWindowedValueCoder.of(inputCoder,
         input.getWindowingStrategy().getWindowFn().windowCoder());
 
-    ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(context.getPipelineOptions(),
+    ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(
+        context.getPipelineOptions(),
         doFn, transform.getMainOutputTag(), transform.getSideOutputTags().getAll(),
         context.<PCollection<?>>getInput().getWindowingStrategy(), sideInputs, wvInputCoder);
 
@@ -73,36 +79,37 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements TransformTran
     }
   }
 
-  static void addSideInputs(ApexParDoOperator<?, ?> operator, List<PCollectionView<?>> sideInputs, TranslationContext context) {
+  static void addSideInputs(ApexParDoOperator<?, ?> operator, List<PCollectionView<?>> sideInputs,
+      TranslationContext context) {
     Operator.InputPort<?>[] sideInputPorts = {operator.sideInput1};
     if (sideInputs.size() > sideInputPorts.length) {
-      //  String msg = String.format("Too many side inputs in %s (currently only supporting %s).",
-      //      transform.toString(), sideInputPorts.length);
-      //  throw new UnsupportedOperationException(msg);
       PCollection<?> unionCollection = unionSideInputs(sideInputs, context);
       context.addStream(unionCollection, sideInputPorts[0]);
     } else {
-      for (int i=0; i<sideInputs.size(); i++) {
-        // the number of input ports for side inputs are fixed and each port can only take one input.
-        // more (optional) ports can be added to give reasonable capacity or an explicit union operation introduced.
+      // the number of ports for side inputs is fixed and each port can only take one input.
+      for (int i = 0; i < sideInputs.size(); i++) {
         context.addStream(context.getViewInput(sideInputs.get(i)), sideInputPorts[i]);
       }
     }
   }
 
-  private static PCollection<?> unionSideInputs(List<PCollectionView<?>> sideInputs, TranslationContext context) {
+  private static PCollection<?> unionSideInputs(List<PCollectionView<?>> sideInputs,
+      TranslationContext context) {
     checkArgument(sideInputs.size() > 1, "requires multiple side inputs");
     // flatten and assign union tag
     List<PCollection<Object>> sourceCollections = new ArrayList<>();
     Map<PCollection<?>, Integer> unionTags = new HashMap<>();
     PCollection<Object> firstSideInput = context.getViewInput(sideInputs.get(0));
-    for (int i=0; i < sideInputs.size(); i++) {
+    for (int i = 0; i < sideInputs.size(); i++) {
       PCollectionView<?> sideInput = sideInputs.get(i);
       PCollection<?> sideInputCollection = context.getViewInput(sideInput);
-      if (!sideInputCollection.getWindowingStrategy().equals(firstSideInput.getWindowingStrategy())) {
+      if (!sideInputCollection.getWindowingStrategy().equals(
+          firstSideInput.getWindowingStrategy())) {
         // TODO: check how to handle this in stream codec
         //String msg = "Multiple side inputs with different window strategies.";
         //throw new UnsupportedOperationException(msg);
+        LOG.warn("Side inputs union with different windowing strategies {} {}",
+            firstSideInput.getWindowingStrategy(), sideInputCollection.getWindowingStrategy());
       }
       if (!sideInputCollection.getCoder().equals(firstSideInput.getCoder())) {
         String msg = "Multiple side inputs with different coders.";
@@ -112,8 +119,10 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements TransformTran
       unionTags.put(sideInputCollection, i);
     }
 
-    PCollection<Object> resultCollection = FlattenPCollectionTranslator.intermediateCollection(firstSideInput, firstSideInput.getCoder());
-    FlattenPCollectionTranslator.flattenCollections(sourceCollections, unionTags, resultCollection, context);
+    PCollection<Object> resultCollection = FlattenPCollectionTranslator.intermediateCollection(
+        firstSideInput, firstSideInput.getCoder());
+    FlattenPCollectionTranslator.flattenCollections(sourceCollections, unionTags, resultCollection,
+        context);
     return resultCollection;
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
index 7749a06..bd7115e 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
@@ -33,7 +33,7 @@ import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 
 /**
- * {@link ParDo.Bound} is translated to Apex operator that wraps the {@link DoFn}
+ * {@link ParDo.Bound} is translated to {link ApexParDoOperator} that wraps the {@link DoFn}.
  */
 public class ParDoBoundTranslator<InputT, OutputT> implements
     TransformTranslator<ParDo.Bound<InputT, OutputT>> {
@@ -49,7 +49,8 @@ public class ParDoBoundTranslator<InputT, OutputT> implements
     WindowedValueCoder<InputT> wvInputCoder = FullWindowedValueCoder.of(inputCoder,
         input.getWindowingStrategy().getWindowFn().windowCoder());
 
-    ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(context.getPipelineOptions(),
+    ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(
+        context.getPipelineOptions(),
         doFn, new TupleTag<OutputT>(), TupleTagList.empty().getAll() /*sideOutputTags*/,
         output.getWindowingStrategy(), sideInputs, wvInputCoder);
     context.addOperator(operator, operator.output);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ReadUnboundedTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ReadUnboundedTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ReadUnboundedTranslator.java
index b53e4dd..3097276 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ReadUnboundedTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ReadUnboundedTranslator.java
@@ -18,12 +18,12 @@
 
 package org.apache.beam.runners.apex.translators;
 
+import com.datatorrent.api.InputOperator;
+
 import org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.UnboundedSource;
 
-import com.datatorrent.api.InputOperator;
-
 /**
  * {@link Read.Unbounded} is translated to Apex {@link InputOperator}
  * that wraps {@link UnboundedSource}.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TransformTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TransformTranslator.java
index 1a99885..dfd2045 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TransformTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TransformTranslator.java
@@ -19,13 +19,13 @@
 package org.apache.beam.runners.apex.translators;
 
 
-import org.apache.beam.sdk.transforms.PTransform;
-
 import java.io.Serializable;
 
+import org.apache.beam.sdk.transforms.PTransform;
+
 /**
- * translates {@link PTransform} to Apex functions.
+ * Translates {@link PTransform} to Apex functions.
  */
-public interface TransformTranslator<T extends PTransform<?,?>> extends Serializable {
+public interface TransformTranslator<T extends PTransform<?, ?>> extends Serializable {
   void translate(T transform, TranslationContext context);
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
index bd44a20..ddacc29 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
@@ -19,6 +19,17 @@ package org.apache.beam.runners.apex.translators;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.InputPort;
+import com.datatorrent.api.Operator.OutputPort;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
 import org.apache.beam.runners.apex.translators.utils.CoderAdapterStreamCodec;
@@ -34,17 +45,6 @@ import org.apache.beam.sdk.values.POutput;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.Operator;
-import com.datatorrent.api.Context.PortContext;
-import com.datatorrent.api.Operator.InputPort;
-import com.datatorrent.api.Operator.OutputPort;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 /**
  * Maintains context data for {@link TransformTranslator}s.
  */
@@ -64,7 +64,7 @@ public class TranslationContext {
   public <InputT extends PInput> InputT getViewInput(PCollectionView<?> view) {
     PInput input = this.viewInputs.get(view);
     checkArgument(input != null, "unknown view " + view.getName());
-    return (InputT)input;
+    return (InputT) input;
   }
 
   public TranslationContext(ApexPipelineOptions pipelineOptions) {
@@ -109,13 +109,14 @@ public class TranslationContext {
         addOperator(operator, portEntry.getValue(), portEntry.getKey());
         first = false;
       } else {
-        this.streams.put(portEntry.getKey(), (Pair)new ImmutablePair<>(portEntry.getValue(), new ArrayList<>()));
+        this.streams.put(portEntry.getKey(), (Pair) new ImmutablePair<>(portEntry.getValue(),
+            new ArrayList<>()));
       }
     }
   }
 
   /**
-   * Add intermediate operator for the current transformation.
+   * Add the operator with its output port for the given result {link PCollection}.
    * @param operator
    * @param port
    * @param output
@@ -124,9 +125,11 @@ public class TranslationContext {
     // Apex DAG requires a unique operator name
     // use the transform's name and make it unique
     String name = getCurrentTransform().getFullName();
-    for (int i=1; this.operators.containsKey(name); name = getCurrentTransform().getFullName() + i++);
+    for (int i = 1; this.operators.containsKey(name); i++) {
+      name = getCurrentTransform().getFullName() + i;
+    }
     this.operators.put(name, operator);
-    this.streams.put(output, (Pair)new ImmutablePair<>(port, new ArrayList<>()));
+    this.streams.put(output, (Pair) new ImmutablePair<>(port, new ArrayList<>()));
   }
 
   public void addStream(PInput input, InputPort inputPort) {
@@ -140,11 +143,12 @@ public class TranslationContext {
       dag.addOperator(nameAndOperator.getKey(), nameAndOperator.getValue());
     }
     int streamIndex = 0;
-    for (Map.Entry<PCollection, Pair<OutputPort<?>, List<InputPort<?>>>> streamEntry : this.streams.entrySet()) {
+    for (Map.Entry<PCollection, Pair<OutputPort<?>, List<InputPort<?>>>> streamEntry : this.
+        streams.entrySet()) {
       List<InputPort<?>> sinksList = streamEntry.getValue().getRight();
       InputPort[] sinks = sinksList.toArray(new InputPort[sinksList.size()]);
       if (sinks.length > 0) {
-        dag.addStream("stream"+streamIndex++, streamEntry.getValue().getLeft(), sinks);
+        dag.addStream("stream" + streamIndex++, streamEntry.getValue().getLeft(), sinks);
         for (InputPort port : sinks) {
           PCollection pc = streamEntry.getKey();
           Coder coder = pc.getCoder();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
index 202f2d3..dd8fcd1 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
@@ -17,23 +17,22 @@
  */
 package org.apache.beam.runners.apex.translators.functions;
 
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
 import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
 import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple.WatermarkTuple;
-import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.datatorrent.common.util.BaseOperator;
-
 /**
  * Apex operator for Beam {@link Flatten.FlattenPCollectionList}.
  */
-public class ApexFlattenOperator<InputT> extends BaseOperator
-{
+public class ApexFlattenOperator<InputT> extends BaseOperator {
+
   private static final Logger LOG = LoggerFactory.getLogger(ApexFlattenOperator.class);
   private boolean traceTuples = true;
 
@@ -47,16 +46,15 @@ public class ApexFlattenOperator<InputT> extends BaseOperator
   /**
    * Data input port 1.
    */
-  public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> data1 = new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>()
-  {
+  public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> data1 =
+      new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() {
     /**
      * Emits to port "out"
      */
     @Override
-    public void process(ApexStreamTuple<WindowedValue<InputT>> tuple)
-    {
+    public void process(ApexStreamTuple<WindowedValue<InputT>> tuple) {
       if (tuple instanceof WatermarkTuple) {
-        WatermarkTuple<?> wmTuple = (WatermarkTuple<?>)tuple;
+        WatermarkTuple<?> wmTuple = (WatermarkTuple<?>) tuple;
         if (wmTuple.getTimestamp() > inputWM1) {
           inputWM1 = wmTuple.getTimestamp();
           if (inputWM1 <= inputWM2) {
@@ -75,7 +73,7 @@ public class ApexFlattenOperator<InputT> extends BaseOperator
       }
 
       if (data1Tag > 0 && tuple instanceof ApexStreamTuple.DataTuple) {
-        ((ApexStreamTuple.DataTuple<?>)tuple).setUnionTag(data1Tag);
+        ((ApexStreamTuple.DataTuple<?>) tuple).setUnionTag(data1Tag);
       }
       out.emit(tuple);
     }
@@ -84,16 +82,15 @@ public class ApexFlattenOperator<InputT> extends BaseOperator
   /**
    * Data input port 2.
    */
-  public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> data2 = new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>()
-  {
+  public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> data2 =
+      new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() {
     /**
      * Emits to port "out"
      */
     @Override
-    public void process(ApexStreamTuple<WindowedValue<InputT>> tuple)
-    {
+    public void process(ApexStreamTuple<WindowedValue<InputT>> tuple) {
       if (tuple instanceof WatermarkTuple) {
-        WatermarkTuple<?> wmTuple = (WatermarkTuple<?>)tuple;
+        WatermarkTuple<?> wmTuple = (WatermarkTuple<?>) tuple;
         if (wmTuple.getTimestamp() > inputWM2) {
           inputWM2 = wmTuple.getTimestamp();
           if (inputWM2 <= inputWM1) {
@@ -112,7 +109,7 @@ public class ApexFlattenOperator<InputT> extends BaseOperator
       }
 
       if (data2Tag > 0 && tuple instanceof ApexStreamTuple.DataTuple) {
-        ((ApexStreamTuple.DataTuple<?>)tuple).setUnionTag(data2Tag);
+        ((ApexStreamTuple.DataTuple<?>) tuple).setUnionTag(data2Tag);
       }
       out.emit(tuple);
     }
@@ -121,6 +118,7 @@ public class ApexFlattenOperator<InputT> extends BaseOperator
   /**
    * Output port.
    */
-  @OutputPortFieldAnnotation(optional=true)
-  public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<InputT>>> out = new DefaultOutputPort<ApexStreamTuple<WindowedValue<InputT>>>();
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<InputT>>> out =
+    new DefaultOutputPort<ApexStreamTuple<WindowedValue<InputT>>>();
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
index 5970f36..845618d 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
@@ -17,6 +17,20 @@
  */
 package org.apache.beam.runners.apex.translators.functions;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.base.Throwables;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
@@ -61,19 +75,6 @@ import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Operator;
-import com.datatorrent.api.StreamCodec;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-
 /**
  * Apex operator for Beam {@link GroupByKey}.
  * This operator expects the input stream already partitioned by K,
@@ -82,8 +83,7 @@ import com.google.common.collect.Multimap;
  * @param <K>
  * @param <V>
  */
-public class ApexGroupByKeyOperator<K, V> implements Operator
-{
+public class ApexGroupByKeyOperator<K, V> implements Operator {
   private static final Logger LOG = LoggerFactory.getLogger(ApexGroupByKeyOperator.class);
   private boolean traceTuples = true;
 
@@ -98,7 +98,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator
   private final SerializablePipelineOptions serializedOptions;
   @Bind(JavaSerializer.class)
 // TODO: InMemoryStateInternals not serializable
-transient  private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new HashMap<>();
+  private transient Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new HashMap<>();
   private Map<ByteBuffer, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>();
 
   private transient ProcessContext context;
@@ -106,19 +106,19 @@ transient  private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new
   private transient ApexTimerInternals timerInternals = new ApexTimerInternals();
   private Instant inputWatermark = new Instant(0);
 
-  public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<KV<K, V>>>> input = new DefaultInputPort<ApexStreamTuple<WindowedValue<KV<K, V>>>>()
-  {
+  public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<KV<K, V>>>> input =
+      new DefaultInputPort<ApexStreamTuple<WindowedValue<KV<K, V>>>>() {
     @Override
-    public void process(ApexStreamTuple<WindowedValue<KV<K, V>>> t)
-    {
+    public void process(ApexStreamTuple<WindowedValue<KV<K, V>>> t) {
       try {
         if (t instanceof ApexStreamTuple.WatermarkTuple) {
-          ApexStreamTuple.WatermarkTuple<?> mark = (ApexStreamTuple.WatermarkTuple<?>)t;
+          ApexStreamTuple.WatermarkTuple<?> mark = (ApexStreamTuple.WatermarkTuple<?>) t;
           processWatermark(mark);
           if (traceTuples) {
             LOG.debug("\nemitting watermark {}\n", mark.getTimestamp());
           }
-          output.emit(ApexStreamTuple.WatermarkTuple.<WindowedValue<KV<K, Iterable<V>>>>of(mark.getTimestamp()));
+          output.emit(ApexStreamTuple.WatermarkTuple.<WindowedValue<KV<K, Iterable<V>>>>of(
+              mark.getTimestamp()));
           return;
         }
         if (traceTuples) {
@@ -126,53 +126,49 @@ transient  private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new
         }
         processElement(t.getValue());
       } catch (Exception e) {
-        Throwables.propagate(e);
+        Throwables.propagateIfPossible(e);
+        throw new RuntimeException(e);
       }
     }
   };
 
-  @OutputPortFieldAnnotation(optional=true)
-  public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<KV<K, Iterable<V>>>>> output = new DefaultOutputPort<>();
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<KV<K, Iterable<V>>>>>
+      output = new DefaultOutputPort<>();
 
   @SuppressWarnings("unchecked")
-  public ApexGroupByKeyOperator(ApexPipelineOptions pipelineOptions, PCollection<KV<K, V>> input)
-  {
-    Preconditions.checkNotNull(pipelineOptions);
+  public ApexGroupByKeyOperator(ApexPipelineOptions pipelineOptions, PCollection<KV<K, V>> input) {
+    checkNotNull(pipelineOptions);
     this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
-    this.windowingStrategy = (WindowingStrategy<V, BoundedWindow>)input.getWindowingStrategy();
-    this.keyCoder = ((KvCoder<K, V>)input.getCoder()).getKeyCoder();
-    this.valueCoder = ((KvCoder<K, V>)input.getCoder()).getValueCoder();
+    this.windowingStrategy = (WindowingStrategy<V, BoundedWindow>) input.getWindowingStrategy();
+    this.keyCoder = ((KvCoder<K, V>) input.getCoder()).getKeyCoder();
+    this.valueCoder = ((KvCoder<K, V>) input.getCoder()).getValueCoder();
   }
 
   @SuppressWarnings("unused") // for Kryo
-  private ApexGroupByKeyOperator()
-  {
+  private ApexGroupByKeyOperator() {
     this.serializedOptions = null;
   }
 
   @Override
-  public void beginWindow(long l)
-  {
+  public void beginWindow(long l) {
   }
 
   @Override
-  public void endWindow()
-  {
+  public void endWindow() {
   }
 
   @Override
-  public void setup(OperatorContext context)
-  {
+  public void setup(OperatorContext context) {
     this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(serializedOptions.get(), this);
     StateInternalsFactory<K> stateInternalsFactory = new GroupByKeyStateInternalsFactory();
-    this.fn = GroupAlsoByWindowViaWindowSetDoFn.create(this.windowingStrategy, stateInternalsFactory,
-        SystemReduceFn.<K, V, BoundedWindow>buffering(this.valueCoder));
+    this.fn = GroupAlsoByWindowViaWindowSetDoFn.create(this.windowingStrategy,
+        stateInternalsFactory, SystemReduceFn.<K, V, BoundedWindow>buffering(this.valueCoder));
     this.context = new ProcessContext(fn, this.timerInternals);
   }
 
   @Override
-  public void teardown()
-  {
+  public void teardown() {
   }
 
   /**
@@ -181,14 +177,16 @@ transient  private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new
    * We keep these timers in a Set, so that they are deduplicated, as the same
    * timer can be registered multiple times.
    */
-  private Multimap<ByteBuffer, TimerInternals.TimerData> getTimersReadyToProcess(long currentWatermark) {
+  private Multimap<ByteBuffer, TimerInternals.TimerData> getTimersReadyToProcess(
+      long currentWatermark) {
 
     // we keep the timers to return in a different list and launch them later
     // because we cannot prevent a trigger from registering another trigger,
     // which would lead to concurrent modification exception.
     Multimap<ByteBuffer, TimerInternals.TimerData> toFire = HashMultimap.create();
 
-    Iterator<Map.Entry<ByteBuffer, Set<TimerInternals.TimerData>>> it = activeTimers.entrySet().iterator();
+    Iterator<Map.Entry<ByteBuffer, Set<TimerInternals.TimerData>>> it =
+        activeTimers.entrySet().iterator();
     while (it.hasNext()) {
       Map.Entry<ByteBuffer, Set<TimerInternals.TimerData>> keyWithTimers = it.next();
 
@@ -223,18 +221,15 @@ transient  private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new
     fn.processElement(context);
   }
 
-  private StateInternals<K> getStateInternalsForKey(K key)
-  {
+  private StateInternals<K> getStateInternalsForKey(K key) {
     final ByteBuffer keyBytes;
     try {
       keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key));
     } catch (CoderException e) {
-      throw Throwables.propagate(e);
+      throw new RuntimeException(e);
     }
     StateInternals<K> stateInternals = perKeyStateInternals.get(keyBytes);
     if (stateInternals == null) {
-      //Coder<? extends BoundedWindow> windowCoder = this.windowingStrategy.getWindowFn().windowCoder();
-      //OutputTimeFn<? super BoundedWindow> outputTimeFn = this.windowingStrategy.getOutputTimeFn();
       stateInternals = InMemoryStateInternals.forKey(key);
       perKeyStateInternals.put(keyBytes, stateInternals);
     }
@@ -246,7 +241,7 @@ transient  private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new
     try {
       keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key));
     } catch (CoderException e) {
-      throw Throwables.propagate(e);
+      throw new RuntimeException(e);
     }
     Set<TimerInternals.TimerData> timersForKey = activeTimers.get(keyBytes);
     if (timersForKey == null) {
@@ -261,7 +256,7 @@ transient  private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new
     try {
       keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key));
     } catch (CoderException e) {
-      throw Throwables.propagate(e);
+      throw new RuntimeException(e);
     }
     Set<TimerInternals.TimerData> timersForKey = activeTimers.get(keyBytes);
     if (timersForKey != null) {
@@ -276,7 +271,8 @@ transient  private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new
 
   private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) throws Exception {
     this.inputWatermark = new Instant(mark.getTimestamp());
-    Multimap<ByteBuffer, TimerInternals.TimerData> timers = getTimersReadyToProcess(mark.getTimestamp());
+    Multimap<ByteBuffer, TimerInternals.TimerData> timers = getTimersReadyToProcess(
+        mark.getTimestamp());
     if (!timers.isEmpty()) {
       for (ByteBuffer keyBytes : timers.keySet()) {
         K key = CoderUtils.decodeFromByteArray(keyCoder, keyBytes.array());
@@ -287,7 +283,8 @@ transient  private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new
     }
   }
 
-  private class ProcessContext extends GroupAlsoByWindowViaWindowSetDoFn<K, V, Iterable<V>, ?, KeyedWorkItem<K, V>>.ProcessContext {
+  private class ProcessContext extends GroupAlsoByWindowViaWindowSetDoFn<K, V, Iterable<V>, ?,
+      KeyedWorkItem<K, V>>.ProcessContext {
 
     private final ApexTimerInternals timerInternals;
     private StateInternals<K> stateInternals;
@@ -296,7 +293,7 @@ transient  private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new
     public ProcessContext(OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> function,
                           ApexTimerInternals timerInternals) {
       function.super();
-      this.timerInternals = Preconditions.checkNotNull(timerInternals);
+      this.timerInternals = checkNotNull(timerInternals);
     }
 
     public void setElement(KeyedWorkItem<K, V> element, StateInternals<K> stateForKey) {
@@ -311,7 +308,8 @@ transient  private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new
 
     @Override
     public Instant timestamp() {
-      throw new UnsupportedOperationException("timestamp() is not available when processing KeyedWorkItems.");
+      throw new UnsupportedOperationException(
+          "timestamp() is not available when processing KeyedWorkItems.");
     }
 
     @Override
@@ -333,7 +331,8 @@ transient  private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new
 
     @Override
     public PaneInfo pane() {
-      throw new UnsupportedOperationException("pane() is not available when processing KeyedWorkItems.");
+      throw new UnsupportedOperationException(
+          "pane() is not available when processing KeyedWorkItems.");
     }
 
     @Override
@@ -352,11 +351,13 @@ transient  private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new
         }
 
         @Override
-        public void outputWindowedValue(KV<K, Iterable<V>> output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+        public void outputWindowedValue(KV<K, Iterable<V>> output, Instant timestamp,
+            Collection<? extends BoundedWindow> windows, PaneInfo pane) {
           if (traceTuples) {
             LOG.debug("\nemitting {} timestamp {}\n", output, timestamp);
           }
-          ApexGroupByKeyOperator.this.output.emit(ApexStreamTuple.DataTuple.of(WindowedValue.of(output, timestamp, windows, pane)));
+          ApexGroupByKeyOperator.this.output.emit(ApexStreamTuple.DataTuple.of(
+              WindowedValue.of(output, timestamp, windows, pane)));
         }
 
         @Override
@@ -375,7 +376,8 @@ transient  private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new
         }
 
         @Override
-        public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
+        public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data,
+            Coder<T> elemCoder) throws IOException {
           throw new RuntimeException("writePCollectionViewData() not available in Streaming mode.");
         }
 
@@ -404,7 +406,8 @@ transient  private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new
     }
 
     @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+        String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
       throw new UnsupportedOperationException();
     }
   }
@@ -416,52 +419,44 @@ transient  private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new
   public class ApexTimerInternals implements TimerInternals {
 
     @Override
-    public void setTimer(TimerData timerKey)
-    {
+    public void setTimer(TimerData timerKey) {
       registerActiveTimer(context.element().key(), timerKey);
     }
 
     @Override
-    public void deleteTimer(TimerData timerKey)
-    {
+    public void deleteTimer(TimerData timerKey) {
       unregisterActiveTimer(context.element().key(), timerKey);
     }
 
     @Override
-    public Instant currentProcessingTime()
-    {
+    public Instant currentProcessingTime() {
       return Instant.now();
     }
 
     @Override
-    public Instant currentSynchronizedProcessingTime()
-    {
+    public Instant currentSynchronizedProcessingTime() {
       // TODO Auto-generated method stub
       return null;
     }
 
     @Override
-    public Instant currentInputWatermarkTime()
-    {
+    public Instant currentInputWatermarkTime() {
       return inputWatermark;
     }
 
     @Override
-    public Instant currentOutputWatermarkTime()
-    {
+    public Instant currentOutputWatermarkTime() {
       // TODO Auto-generated method stub
       return null;
     }
-
   }
 
-  private class GroupByKeyStateInternalsFactory implements StateInternalsFactory<K>, Serializable
-  {
+  private class GroupByKeyStateInternalsFactory implements StateInternalsFactory<K>, Serializable {
+    private static final long serialVersionUID = 1L;
+
     @Override
-    public StateInternals<K> stateInternalsForKey(K key)
-    {
+    public StateInternals<K> stateInternalsForKey(K key) {
       return getStateInternalsForKey(key);
     }
   }
-
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
index 96be11d..9e8f3dc 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
@@ -17,6 +17,18 @@
  */
 package org.apache.beam.runners.apex.translators.functions;
 
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -29,9 +41,9 @@ import org.apache.beam.runners.apex.translators.utils.SerializablePipelineOption
 import org.apache.beam.runners.apex.translators.utils.ValueAndCoderKryoSerializable;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.DoFnRunners.OutputManager;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.SideInputHandler;
-import org.apache.beam.runners.core.DoFnRunners.OutputManager;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.transforms.Aggregator;
@@ -52,18 +64,6 @@ import org.apache.beam.sdk.values.TupleTag;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.InputPortFieldAnnotation;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.datatorrent.common.util.BaseOperator;
-import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
-
 /**
  * Apex operator for Beam {@link DoFn}.
  */
@@ -85,8 +85,8 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
   private final List<PCollectionView<?>> sideInputs;
 
 // TODO: not Kryo serializable, integrate codec
-//@Bind(JavaSerializer.class)
-private transient StateInternals<Void> sideInputStateInternals = InMemoryStateInternals.forKey(null);
+  private transient StateInternals<Void> sideInputStateInternals = InMemoryStateInternals
+      .forKey(null);
   private final ValueAndCoderKryoSerializable<List<WindowedValue<InputT>>> pushedBack;
   private LongMin pushedBackWatermark = new LongMin();
   private long currentInputWatermark = Long.MIN_VALUE;
@@ -94,7 +94,8 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn
 
   private transient PushbackSideInputDoFnRunner<InputT, OutputT> pushbackDoFnRunner;
   private transient SideInputHandler sideInputHandler;
-  private transient Map<TupleTag<?>, DefaultOutputPort<ApexStreamTuple<?>>> sideOutputPortMapping = Maps.newHashMapWithExpectedSize(5);
+  private transient Map<TupleTag<?>, DefaultOutputPort<ApexStreamTuple<?>>> sideOutputPortMapping =
+      Maps.newHashMapWithExpectedSize(5);
 
   public ApexParDoOperator(
       ApexPipelineOptions pipelineOptions,
@@ -104,8 +105,7 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn
       WindowingStrategy<?, ?> windowingStrategy,
       List<PCollectionView<?>> sideInputs,
       Coder<WindowedValue<InputT>> inputCoder
-      )
-  {
+      ) {
     this.pipelineOptions = new SerializablePipelineOptions(pipelineOptions);
     this.doFn = doFn;
     this.mainOutputTag = mainOutputTag;
@@ -120,7 +120,8 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn
     }
 
     Coder<List<WindowedValue<InputT>>> coder = ListCoder.of(inputCoder);
-    this.pushedBack = new ValueAndCoderKryoSerializable<>(new ArrayList<WindowedValue<InputT>>(), coder);
+    this.pushedBack = new ValueAndCoderKryoSerializable<>(new ArrayList<WindowedValue<InputT>>(),
+        coder);
 
   }
 
@@ -135,13 +136,12 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn
     this.pushedBack = null;
   }
 
-  public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> input = new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>()
-  {
+  public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> input =
+      new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() {
     @Override
-    public void process(ApexStreamTuple<WindowedValue<InputT>> t)
-    {
+    public void process(ApexStreamTuple<WindowedValue<InputT>> t) {
       if (t instanceof ApexStreamTuple.WatermarkTuple) {
-        processWatermark((ApexStreamTuple.WatermarkTuple<?>)t);
+        processWatermark((ApexStreamTuple.WatermarkTuple<?>) t);
       } else {
         if (traceTuples) {
           LOG.debug("\ninput {}\n", t.getValue());
@@ -155,12 +155,11 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn
     }
   };
 
-  @InputPortFieldAnnotation(optional=true)
-  public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<Iterable<?>>>> sideInput1 = new DefaultInputPort<ApexStreamTuple<WindowedValue<Iterable<?>>>>()
-  {
+  @InputPortFieldAnnotation(optional = true)
+  public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<Iterable<?>>>> sideInput1 =
+      new DefaultInputPort<ApexStreamTuple<WindowedValue<Iterable<?>>>>() {
     @Override
-    public void process(ApexStreamTuple<WindowedValue<Iterable<?>>> t)
-    {
+    public void process(ApexStreamTuple<WindowedValue<Iterable<?>>> t) {
       if (t instanceof ApexStreamTuple.WatermarkTuple) {
         // ignore side input watermarks
         return;
@@ -168,7 +167,7 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn
 
       int sideInputIndex = 0;
       if (t instanceof ApexStreamTuple.DataTuple) {
-        sideInputIndex = ((ApexStreamTuple.DataTuple<?>)t).getUnionTag();
+        sideInputIndex = ((ApexStreamTuple.DataTuple<?>) t).getUnionTag();
       }
 
       if (traceTuples) {
@@ -196,25 +195,30 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn
     }
   };
 
-  @OutputPortFieldAnnotation(optional=true)
+  @OutputPortFieldAnnotation(optional = true)
   public final transient DefaultOutputPort<ApexStreamTuple<?>> output = new DefaultOutputPort<>();
 
-  @OutputPortFieldAnnotation(optional=true)
-  public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput1 = new DefaultOutputPort<>();
-  @OutputPortFieldAnnotation(optional=true)
-  public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput2 = new DefaultOutputPort<>();
-  @OutputPortFieldAnnotation(optional=true)
-  public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput3 = new DefaultOutputPort<>();
-  @OutputPortFieldAnnotation(optional=true)
-  public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput4 = new DefaultOutputPort<>();
-  @OutputPortFieldAnnotation(optional=true)
-  public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput5 = new DefaultOutputPort<>();
-
-  public final transient DefaultOutputPort<?>[] sideOutputPorts = {sideOutput1, sideOutput2, sideOutput3, sideOutput4, sideOutput5};
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput1 =
+      new DefaultOutputPort<>();
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput2 =
+      new DefaultOutputPort<>();
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput3 =
+      new DefaultOutputPort<>();
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput4 =
+      new DefaultOutputPort<>();
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput5 =
+      new DefaultOutputPort<>();
+
+  public final transient DefaultOutputPort<?>[] sideOutputPorts = {sideOutput1, sideOutput2,
+      sideOutput3, sideOutput4, sideOutput5};
 
   @Override
-  public <T> void output(TupleTag<T> tag, WindowedValue<T> tuple)
-  {
+  public <T> void output(TupleTag<T> tag, WindowedValue<T> tuple) {
     DefaultOutputPort<ApexStreamTuple<?>> sideOutputPort = sideOutputPortMapping.get(tag);
     if (sideOutputPort != null) {
       sideOutputPort.emit(ApexStreamTuple.DataTuple.of(tuple));
@@ -229,19 +233,19 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn
   private Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem) {
     try {
       pushbackDoFnRunner.startBundle();
-      Iterable<WindowedValue<InputT>> pushedBack = pushbackDoFnRunner.processElementInReadyWindows(elem);
+      Iterable<WindowedValue<InputT>> pushedBack = pushbackDoFnRunner
+          .processElementInReadyWindows(elem);
       pushbackDoFnRunner.finishBundle();
       return pushedBack;
     } catch (UserCodeException ue) {
       if (ue.getCause() instanceof AssertionError) {
-        ApexRunner.assertionError = (AssertionError)ue.getCause();
+        ApexRunner.assertionError = (AssertionError) ue.getCause();
       }
       throw ue;
     }
   }
 
-  private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark)
-  {
+  private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) {
     this.currentInputWatermark = mark.getTimestamp();
 
     if (sideInputs.isEmpty()) {
@@ -264,8 +268,7 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn
   }
 
   @Override
-  public void setup(OperatorContext context)
-  {
+  public void setup(OperatorContext context) {
     this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(pipelineOptions.get(), this);
     SideInputReader sideInputReader = NullSideInputReader.of(sideInputs);
     if (!sideInputs.isEmpty()) {
@@ -273,9 +276,10 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn
       sideInputReader = sideInputHandler;
     }
 
-    for (int i=0; i < sideOutputTags.size(); i++) {
+    for (int i = 0; i < sideOutputTags.size(); i++) {
       @SuppressWarnings("unchecked")
-      DefaultOutputPort<ApexStreamTuple<?>> port = (DefaultOutputPort<ApexStreamTuple<?>>)sideOutputPorts[i];
+      DefaultOutputPort<ApexStreamTuple<?>> port = (DefaultOutputPort<ApexStreamTuple<?>>)
+          sideOutputPorts[i];
       sideOutputPortMapping.put(sideOutputTags.get(i), port);
     }
 
@@ -297,25 +301,18 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn
     try {
       doFn.setup();
     } catch (Exception e) {
-      Throwables.propagate(e);
+      Throwables.propagateIfPossible(e);
+      throw new RuntimeException(e);
     }
 
   }
 
   @Override
-  public void beginWindow(long windowId)
-  {
-    /*
-    Collection<Aggregator<?, ?>> aggregators = AggregatorRetriever.getAggregators(doFn);
-    if (!aggregators.isEmpty()) {
-      System.out.println("\n" + Thread.currentThread().getName() + "\n" +AggregatorRetriever.getAggregators(doFn) + "\n");
-    }
-    */
+  public void beginWindow(long windowId) {
   }
 
   @Override
-  public void endWindow()
-  {
+  public void endWindow() {
   }
 
   /**
@@ -334,32 +331,27 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn
       return new NoOpAggregator<InputT, OutputT>();
     }
 
-    private static class NoOpAggregator<InputT, OutputT> implements Aggregator<InputT, OutputT>, java.io.Serializable
-    {
+    private static class NoOpAggregator<InputT, OutputT> implements Aggregator<InputT, OutputT>,
+        java.io.Serializable {
       private static final long serialVersionUID = 1L;
 
       @Override
-      public void addValue(InputT value)
-      {
+      public void addValue(InputT value) {
       }
 
       @Override
-      public String getName()
-      {
+      public String getName() {
         // TODO Auto-generated method stub
         return null;
       }
 
       @Override
-      public CombineFn<InputT, ?, OutputT> getCombineFn()
-      {
+      public CombineFn<InputT, ?, OutputT> getCombineFn() {
         // TODO Auto-generated method stub
         return null;
       }
 
     };
-
-
   }
 
   private static class LongMin {