You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/11/08 03:04:37 UTC

[4/6] incubator-beam git commit: BEAM-261 Make translators package private.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
new file mode 100644
index 0000000..44e7b11
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translation.operators;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.ApexRunner;
+import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
+import org.apache.beam.runners.apex.translation.utils.NoOpStepContext;
+import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
+import org.apache.beam.runners.apex.translation.utils.ValueAndCoderKryoSerializable;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
+import org.apache.beam.runners.core.SideInputHandler;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.util.ExecutionContext;
+import org.apache.beam.sdk.util.NullSideInputReader;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Apex operator for Beam {@link DoFn}.
+ */
+public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements OutputManager {
+  private static final Logger LOG = LoggerFactory.getLogger(ApexParDoOperator.class);
+  private boolean traceTuples = true;
+
+  @Bind(JavaSerializer.class)
+  private final SerializablePipelineOptions pipelineOptions;
+  @Bind(JavaSerializer.class)
+  private final OldDoFn<InputT, OutputT> doFn;
+  @Bind(JavaSerializer.class)
+  private final TupleTag<OutputT> mainOutputTag;
+  @Bind(JavaSerializer.class)
+  private final List<TupleTag<?>> sideOutputTags;
+  @Bind(JavaSerializer.class)
+  private final WindowingStrategy<?, ?> windowingStrategy;
+  @Bind(JavaSerializer.class)
+  private final List<PCollectionView<?>> sideInputs;
+
+  private final StateInternals<Void> sideInputStateInternals;
+  private final ValueAndCoderKryoSerializable<List<WindowedValue<InputT>>> pushedBack;
+  private LongMin pushedBackWatermark = new LongMin();
+  private long currentInputWatermark = Long.MIN_VALUE;
+  private long currentOutputWatermark = currentInputWatermark;
+
+  private transient PushbackSideInputDoFnRunner<InputT, OutputT> pushbackDoFnRunner;
+  private transient SideInputHandler sideInputHandler;
+  private transient Map<TupleTag<?>, DefaultOutputPort<ApexStreamTuple<?>>> sideOutputPortMapping =
+      Maps.newHashMapWithExpectedSize(5);
+
+  public ApexParDoOperator(
+      ApexPipelineOptions pipelineOptions,
+      OldDoFn<InputT, OutputT> doFn,
+      TupleTag<OutputT> mainOutputTag,
+      List<TupleTag<?>> sideOutputTags,
+      WindowingStrategy<?, ?> windowingStrategy,
+      List<PCollectionView<?>> sideInputs,
+      Coder<WindowedValue<InputT>> inputCoder,
+      StateInternalsFactory<Void> stateInternalsFactory
+      ) {
+    this.pipelineOptions = new SerializablePipelineOptions(pipelineOptions);
+    this.doFn = doFn;
+    this.mainOutputTag = mainOutputTag;
+    this.sideOutputTags = sideOutputTags;
+    this.windowingStrategy = windowingStrategy;
+    this.sideInputs = sideInputs;
+    this.sideInputStateInternals = stateInternalsFactory.stateInternalsForKey(null);
+
+    if (sideOutputTags.size() > sideOutputPorts.length) {
+      String msg = String.format("Too many side outputs (currently only supporting %s).",
+          sideOutputPorts.length);
+      throw new UnsupportedOperationException(msg);
+    }
+
+    Coder<List<WindowedValue<InputT>>> coder = ListCoder.of(inputCoder);
+    this.pushedBack = new ValueAndCoderKryoSerializable<>(new ArrayList<WindowedValue<InputT>>(),
+        coder);
+
+  }
+
+  @SuppressWarnings("unused") // for Kryo
+  private ApexParDoOperator() {
+    this.pipelineOptions = null;
+    this.doFn = null;
+    this.mainOutputTag = null;
+    this.sideOutputTags = null;
+    this.windowingStrategy = null;
+    this.sideInputs = null;
+    this.pushedBack = null;
+    this.sideInputStateInternals = null;
+  }
+
+  public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> input =
+      new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() {
+    @Override
+    public void process(ApexStreamTuple<WindowedValue<InputT>> t) {
+      if (t instanceof ApexStreamTuple.WatermarkTuple) {
+        processWatermark((ApexStreamTuple.WatermarkTuple<?>) t);
+      } else {
+        if (traceTuples) {
+          LOG.debug("\ninput {}\n", t.getValue());
+        }
+        Iterable<WindowedValue<InputT>> justPushedBack = processElementInReadyWindows(t.getValue());
+        for (WindowedValue<InputT> pushedBackValue : justPushedBack) {
+          pushedBackWatermark.add(pushedBackValue.getTimestamp().getMillis());
+          pushedBack.get().add(pushedBackValue);
+        }
+      }
+    }
+  };
+
+  @InputPortFieldAnnotation(optional = true)
+  public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<Iterable<?>>>> sideInput1 =
+      new DefaultInputPort<ApexStreamTuple<WindowedValue<Iterable<?>>>>() {
+    @Override
+    public void process(ApexStreamTuple<WindowedValue<Iterable<?>>> t) {
+      if (t instanceof ApexStreamTuple.WatermarkTuple) {
+        // ignore side input watermarks
+        return;
+      }
+
+      int sideInputIndex = 0;
+      if (t instanceof ApexStreamTuple.DataTuple) {
+        sideInputIndex = ((ApexStreamTuple.DataTuple<?>) t).getUnionTag();
+      }
+
+      if (traceTuples) {
+        LOG.debug("\nsideInput {} {}\n", sideInputIndex, t.getValue());
+      }
+
+      PCollectionView<?> sideInput = sideInputs.get(sideInputIndex);
+      sideInputHandler.addSideInputValue(sideInput, t.getValue());
+
+      List<WindowedValue<InputT>> newPushedBack = new ArrayList<>();
+      for (WindowedValue<InputT> elem : pushedBack.get()) {
+        Iterable<WindowedValue<InputT>> justPushedBack = processElementInReadyWindows(elem);
+        Iterables.addAll(newPushedBack, justPushedBack);
+      }
+
+      pushedBack.get().clear();
+      pushedBackWatermark.clear();
+      for (WindowedValue<InputT> pushedBackValue : newPushedBack) {
+        pushedBackWatermark.add(pushedBackValue.getTimestamp().getMillis());
+        pushedBack.get().add(pushedBackValue);
+      }
+
+      // potentially emit watermark
+      processWatermark(ApexStreamTuple.WatermarkTuple.of(currentInputWatermark));
+    }
+  };
+
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<ApexStreamTuple<?>> output = new DefaultOutputPort<>();
+
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput1 =
+      new DefaultOutputPort<>();
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput2 =
+      new DefaultOutputPort<>();
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput3 =
+      new DefaultOutputPort<>();
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput4 =
+      new DefaultOutputPort<>();
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput5 =
+      new DefaultOutputPort<>();
+
+  public final transient DefaultOutputPort<?>[] sideOutputPorts = {sideOutput1, sideOutput2,
+      sideOutput3, sideOutput4, sideOutput5};
+
+  @Override
+  public <T> void output(TupleTag<T> tag, WindowedValue<T> tuple) {
+    DefaultOutputPort<ApexStreamTuple<?>> sideOutputPort = sideOutputPortMapping.get(tag);
+    if (sideOutputPort != null) {
+      sideOutputPort.emit(ApexStreamTuple.DataTuple.of(tuple));
+    } else {
+      output.emit(ApexStreamTuple.DataTuple.of(tuple));
+    }
+    if (traceTuples) {
+      LOG.debug("\nemitting {}\n", tuple);
+    }
+  }
+
+  private Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem) {
+    try {
+      pushbackDoFnRunner.startBundle();
+      Iterable<WindowedValue<InputT>> pushedBack = pushbackDoFnRunner
+          .processElementInReadyWindows(elem);
+      pushbackDoFnRunner.finishBundle();
+      return pushedBack;
+    } catch (UserCodeException ue) {
+      if (ue.getCause() instanceof AssertionError) {
+        ApexRunner.assertionError = (AssertionError) ue.getCause();
+      }
+      throw ue;
+    }
+  }
+
+  private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) {
+    this.currentInputWatermark = mark.getTimestamp();
+
+    if (sideInputs.isEmpty()) {
+      if (traceTuples) {
+        LOG.debug("\nemitting watermark {}\n", mark);
+      }
+      output.emit(mark);
+      return;
+    }
+
+    long potentialOutputWatermark =
+        Math.min(pushedBackWatermark.get(), currentInputWatermark);
+    if (potentialOutputWatermark > currentOutputWatermark) {
+      currentOutputWatermark = potentialOutputWatermark;
+      if (traceTuples) {
+        LOG.debug("\nemitting watermark {}\n", currentOutputWatermark);
+      }
+      output.emit(ApexStreamTuple.WatermarkTuple.of(currentOutputWatermark));
+    }
+  }
+
+  @Override
+  public void setup(OperatorContext context) {
+    this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(pipelineOptions.get(), this);
+    SideInputReader sideInputReader = NullSideInputReader.of(sideInputs);
+    if (!sideInputs.isEmpty()) {
+      sideInputHandler = new SideInputHandler(sideInputs, sideInputStateInternals);
+      sideInputReader = sideInputHandler;
+    }
+
+    for (int i = 0; i < sideOutputTags.size(); i++) {
+      @SuppressWarnings("unchecked")
+      DefaultOutputPort<ApexStreamTuple<?>> port = (DefaultOutputPort<ApexStreamTuple<?>>)
+          sideOutputPorts[i];
+      sideOutputPortMapping.put(sideOutputTags.get(i), port);
+    }
+
+    DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.createDefault(
+        pipelineOptions.get(),
+        doFn,
+        sideInputReader,
+        this,
+        mainOutputTag,
+        sideOutputTags,
+        new NoOpStepContext(),
+        new NoOpAggregatorFactory(),
+        windowingStrategy
+        );
+
+    pushbackDoFnRunner =
+        PushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler);
+
+    try {
+      doFn.setup();
+    } catch (Exception e) {
+      Throwables.propagateIfPossible(e);
+      throw new RuntimeException(e);
+    }
+
+  }
+
+  @Override
+  public void beginWindow(long windowId) {
+  }
+
+  @Override
+  public void endWindow() {
+  }
+
+  /**
+   * TODO: Placeholder for aggregation, to be implemented for embedded and cluster mode.
+   * It is called from {@link org.apache.beam.runners.core.SimpleDoFnRunner}.
+   */
+  public static class NoOpAggregatorFactory implements AggregatorFactory {
+
+    private NoOpAggregatorFactory() {
+    }
+
+    @Override
+    public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
+        Class<?> fnClass, ExecutionContext.StepContext step,
+        String name, CombineFn<InputT, AccumT, OutputT> combine) {
+      return new NoOpAggregator<>();
+    }
+
+    private static class NoOpAggregator<InputT, OutputT> implements Aggregator<InputT, OutputT>,
+        java.io.Serializable {
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public void addValue(InputT value) {
+      }
+
+      @Override
+      public String getName() {
+        // TODO Auto-generated method stub
+        return null;
+      }
+
+      @Override
+      public CombineFn<InputT, ?, OutputT> getCombineFn() {
+        // TODO Auto-generated method stub
+        return null;
+      }
+
+    };
+  }
+
+  private static class LongMin {
+    long state = Long.MAX_VALUE;
+
+    public void add(long l) {
+      state = Math.min(state, l);
+    }
+
+    public long get() {
+      return state;
+    }
+
+    public void clear() {
+      state = Long.MAX_VALUE;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java
new file mode 100644
index 0000000..6fc2f0c
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translation.operators;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.base.Throwables;
+
+import java.io.IOException;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
+import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple.DataTuple;
+import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
+import org.apache.beam.runners.apex.translation.utils.ValuesSource;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Apex input operator that wraps Beam {@link UnboundedSource}.
+ */
+public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT
+    extends UnboundedSource.CheckpointMark> implements InputOperator {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      ApexReadUnboundedInputOperator.class);
+  private boolean traceTuples = false;
+  private long outputWatermark = 0;
+
+  @Bind(JavaSerializer.class)
+  private final SerializablePipelineOptions pipelineOptions;
+  @Bind(JavaSerializer.class)
+  private final UnboundedSource<OutputT, CheckpointMarkT> source;
+  private final boolean isBoundedSource;
+  private transient UnboundedSource.UnboundedReader<OutputT> reader;
+  private transient boolean available = false;
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<OutputT>>> output =
+      new DefaultOutputPort<>();
+
+  public ApexReadUnboundedInputOperator(UnboundedSource<OutputT, CheckpointMarkT> source,
+      ApexPipelineOptions options) {
+    this.pipelineOptions = new SerializablePipelineOptions(options);
+    this.source = source;
+    this.isBoundedSource = false;
+  }
+
+  public ApexReadUnboundedInputOperator(UnboundedSource<OutputT, CheckpointMarkT> source,
+      boolean isBoundedSource, ApexPipelineOptions options) {
+    this.pipelineOptions = new SerializablePipelineOptions(options);
+    this.source = source;
+    this.isBoundedSource = isBoundedSource;
+  }
+
+  @SuppressWarnings("unused") // for Kryo
+  private ApexReadUnboundedInputOperator() {
+    this.pipelineOptions = null; this.source = null; this.isBoundedSource = false;
+  }
+
+  @Override
+  public void beginWindow(long windowId) {
+    if (!available && (isBoundedSource || source instanceof ValuesSource)) {
+      // if it's a Create and the input was consumed, emit final watermark
+      emitWatermarkIfNecessary(GlobalWindow.TIMESTAMP_MAX_VALUE.getMillis());
+      // terminate the stream (allows tests to finish faster)
+      BaseOperator.shutdown();
+    } else {
+      emitWatermarkIfNecessary(reader.getWatermark().getMillis());
+    }
+  }
+
+  private void emitWatermarkIfNecessary(long mark) {
+    if (mark > outputWatermark) {
+      outputWatermark = mark;
+      if (traceTuples) {
+        LOG.debug("\nemitting watermark {}\n", mark);
+      }
+      output.emit(ApexStreamTuple.WatermarkTuple.<WindowedValue<OutputT>>of(mark));
+    }
+  }
+
+  @Override
+  public void endWindow() {
+  }
+
+  @Override
+  public void setup(OperatorContext context) {
+    this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(pipelineOptions.get(), this);
+    try {
+      reader = source.createReader(this.pipelineOptions.get(), null);
+      available = reader.start();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void teardown() {
+    try {
+      if (reader != null) {
+        reader.close();
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void emitTuples() {
+    try {
+      if (!available) {
+        available = reader.advance();
+      }
+      if (available) {
+        OutputT data = reader.getCurrent();
+        Instant timestamp = reader.getCurrentTimestamp();
+        available = reader.advance();
+        if (traceTuples) {
+          LOG.debug("\nemitting '{}' timestamp {}\n", data, timestamp);
+        }
+        output.emit(DataTuple.of(WindowedValue.of(
+            data, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)));
+      }
+    } catch (Exception e) {
+      Throwables.propagateIfPossible(e);
+      throw new RuntimeException(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/package-info.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/package-info.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/package-info.java
new file mode 100644
index 0000000..6bc0194
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation of the Beam runner for Apache Apex.
+ */
+package org.apache.beam.runners.apex.translation.operators;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/package-info.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/package-info.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/package-info.java
new file mode 100644
index 0000000..de954c0
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation of the Beam runner for Apache Apex.
+ */
+package org.apache.beam.runners.apex.translation;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
new file mode 100644
index 0000000..17a4f81
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translation.utils;
+
+import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Table;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.Context;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
+import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.util.CombineFnUtil;
+import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.util.state.StateContext;
+import org.apache.beam.sdk.util.state.StateContexts;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTag.StateBinder;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.apache.beam.sdk.util.state.WatermarkHoldState;
+import org.joda.time.Instant;
+
+/**
+ * Implementation of {@link StateInternals} that can be serialized and
+ * checkpointed with the operator. Suitable for small states, in the future this
+ * should be based on the incremental state saving components in the Apex
+ * library.
+ */
+@DefaultSerializer(JavaSerializer.class)
+public class ApexStateInternals<K> implements StateInternals<K>, Serializable {
+  private static final long serialVersionUID = 1L;
+  public static <K> ApexStateInternals<K> forKey(K key) {
+    return new ApexStateInternals<>(key);
+  }
+
+  private final K key;
+
+  protected ApexStateInternals(K key) {
+    this.key = key;
+  }
+
+  @Override
+  public K getKey() {
+    return key;
+  }
+
+  /**
+   * Serializable state for internals (namespace to state tag to coded value).
+   */
+  private final Table<String, String, byte[]> stateTable = HashBasedTable.create();
+
+  @Override
+  public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address) {
+    return state(namespace, address, StateContexts.nullContext());
+  }
+
+  @Override
+  public <T extends State> T state(
+      StateNamespace namespace, StateTag<? super K, T> address, final StateContext<?> c) {
+    return address.bind(new ApexStateBinder(key, namespace, address, c));
+  }
+
+  /**
+   * A {@link StateBinder} that returns {@link State} wrappers for serialized state.
+   */
+  private class ApexStateBinder implements StateBinder<K> {
+    private final K key;
+    private final StateNamespace namespace;
+    private final StateContext<?> c;
+
+    private ApexStateBinder(K key, StateNamespace namespace, StateTag<? super K, ?> address,
+        StateContext<?> c) {
+      this.key = key;
+      this.namespace = namespace;
+      this.c = c;
+    }
+
+    @Override
+    public <T> ValueState<T> bindValue(
+        StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
+      return new ApexValueState<>(namespace, address, coder);
+    }
+
+    @Override
+    public <T> BagState<T> bindBag(
+        final StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
+      return new ApexBagState<>(namespace, address, elemCoder);
+    }
+
+    @Override
+    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
+        bindCombiningValue(
+            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+            Coder<AccumT> accumCoder,
+            final CombineFn<InputT, AccumT, OutputT> combineFn) {
+      return new ApexAccumulatorCombiningState<>(
+          namespace,
+          address,
+          accumCoder,
+          key,
+          combineFn.<K>asKeyedFn()
+          );
+    }
+
+    @Override
+    public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+        StateTag<? super K, WatermarkHoldState<W>> address,
+        OutputTimeFn<? super W> outputTimeFn) {
+      return new ApexWatermarkHoldState<>(namespace, address, outputTimeFn);
+    }
+
+    @Override
+    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
+        bindKeyedCombiningValue(
+            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+            Coder<AccumT> accumCoder,
+            KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
+      return new ApexAccumulatorCombiningState<>(
+          namespace,
+          address,
+          accumCoder,
+          key, combineFn);
+    }
+
+    @Override
+    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
+        bindKeyedCombiningValueWithContext(
+            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+            Coder<AccumT> accumCoder,
+            KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
+      return bindKeyedCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
+    }
+  }
+
+  private class AbstractState<T> {
+    protected final StateNamespace namespace;
+    protected final StateTag<?, ? extends State> address;
+    protected final Coder<T> coder;
+
+    private AbstractState(
+        StateNamespace namespace,
+        StateTag<?, ? extends State> address,
+        Coder<T> coder) {
+      this.namespace = namespace;
+      this.address = address;
+      this.coder = coder;
+    }
+
+    protected T readValue() {
+      T value = null;
+      byte[] buf = stateTable.get(namespace.stringKey(), address.getId());
+      if (buf != null) {
+        // TODO: reuse input
+        Input input = new Input(buf);
+        try {
+          return coder.decode(input, Context.OUTER);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+      return value;
+    }
+
+    public void writeValue(T input) {
+      ByteArrayOutputStream output = new ByteArrayOutputStream();
+      try {
+        coder.encode(input, output, Context.OUTER);
+        stateTable.put(namespace.stringKey(), address.getId(), output.toByteArray());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    public void clear() {
+      stateTable.remove(namespace.stringKey(), address.getId());
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      @SuppressWarnings("unchecked")
+      AbstractState<?> that = (AbstractState<?>) o;
+      return namespace.equals(that.namespace) && address.equals(that.address);
+    }
+
+    @Override
+    public int hashCode() {
+      int result = namespace.hashCode();
+      result = 31 * result + address.hashCode();
+      return result;
+    }
+  }
+
+  private class ApexValueState<T> extends AbstractState<T> implements ValueState<T> {
+
+    private ApexValueState(
+        StateNamespace namespace,
+        StateTag<?, ValueState<T>> address,
+        Coder<T> coder) {
+      super(namespace, address, coder);
+    }
+
+    @Override
+    public ApexValueState<T> readLater() {
+      return this;
+    }
+
+    @Override
+    public T read() {
+      return readValue();
+    }
+
+    @Override
+    public void write(T input) {
+      writeValue(input);
+    }
+  }
+
+  private final class ApexWatermarkHoldState<W extends BoundedWindow>
+      extends AbstractState<Instant> implements WatermarkHoldState<W> {
+
+    private final OutputTimeFn<? super W> outputTimeFn;
+
+    public ApexWatermarkHoldState(
+        StateNamespace namespace,
+        StateTag<?, WatermarkHoldState<W>> address,
+        OutputTimeFn<? super W> outputTimeFn) {
+      super(namespace, address, InstantCoder.of());
+      this.outputTimeFn = outputTimeFn;
+    }
+
+    @Override
+    public ApexWatermarkHoldState<W> readLater() {
+      return this;
+    }
+
+    @Override
+    public Instant read() {
+      return readValue();
+    }
+
+    @Override
+    public void add(Instant outputTime) {
+      Instant combined = read();
+      combined = (combined == null) ? outputTime : outputTimeFn.combine(combined, outputTime);
+      writeValue(combined);
+    }
+
+    @Override
+    public ReadableState<Boolean> isEmpty() {
+      return new ReadableState<Boolean>() {
+        @Override
+        public ReadableState<Boolean> readLater() {
+          return this;
+        }
+        @Override
+        public Boolean read() {
+          return stateTable.get(namespace.stringKey(), address.getId()) == null;
+        }
+      };
+    }
+
+    @Override
+    public OutputTimeFn<? super W> getOutputTimeFn() {
+      return outputTimeFn;
+    }
+
+  }
+
+  private final class ApexAccumulatorCombiningState<K, InputT, AccumT, OutputT>
+      extends AbstractState<AccumT>
+      implements AccumulatorCombiningState<InputT, AccumT, OutputT> {
+    private final K key;
+    private final KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;
+
+    private ApexAccumulatorCombiningState(StateNamespace namespace,
+        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
+        Coder<AccumT> coder,
+        K key, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
+      super(namespace, address, coder);
+      this.key = key;
+      this.combineFn = combineFn;
+    }
+
+    @Override
+    public ApexAccumulatorCombiningState<K, InputT, AccumT, OutputT> readLater() {
+      return this;
+    }
+
+    @Override
+    public OutputT read() {
+      return combineFn.extractOutput(key, getAccum());
+    }
+
+    @Override
+    public void add(InputT input) {
+      AccumT accum = getAccum();
+      combineFn.addInput(key, accum, input);
+      writeValue(accum);
+    }
+
+    @Override
+    public AccumT getAccum() {
+      AccumT accum = readValue();
+      if (accum == null) {
+        accum = combineFn.createAccumulator(key);
+      }
+      return accum;
+    }
+
+    @Override
+    public ReadableState<Boolean> isEmpty() {
+      return new ReadableState<Boolean>() {
+        @Override
+        public ReadableState<Boolean> readLater() {
+          return this;
+        }
+        @Override
+        public Boolean read() {
+          return stateTable.get(namespace.stringKey(), address.getId()) == null;
+        }
+      };
+    }
+
+    @Override
+    public void addAccum(AccumT accum) {
+      accum = combineFn.mergeAccumulators(key, Arrays.asList(getAccum(), accum));
+      writeValue(accum);
+    }
+
+    @Override
+    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
+      return combineFn.mergeAccumulators(key, accumulators);
+    }
+
+  }
+
+  private final class ApexBagState<T> extends AbstractState<List<T>> implements BagState<T> {
+    private ApexBagState(
+        StateNamespace namespace,
+        StateTag<?, BagState<T>> address,
+        Coder<T> coder) {
+      super(namespace, address, ListCoder.of(coder));
+    }
+
+    @Override
+    public ApexBagState<T> readLater() {
+      return this;
+    }
+
+    @Override
+    public List<T> read() {
+      List<T> value = super.readValue();
+      if (value == null) {
+        value = new ArrayList<>();
+      }
+      return value;
+    }
+
+    @Override
+    public void add(T input) {
+      List<T> value = read();
+      value.add(input);
+      writeValue(value);
+    }
+
+    @Override
+    public ReadableState<Boolean> isEmpty() {
+      return new ReadableState<Boolean>() {
+        @Override
+        public ReadableState<Boolean> readLater() {
+          return this;
+        }
+
+        @Override
+        public Boolean read() {
+          return stateTable.get(namespace.stringKey(), address.getId()) == null;
+        }
+      };
+    }
+  }
+
+  /**
+   * Factory for {@link ApexStateInternals}.
+   *
+   * @param <K> key type
+   */
+  public static class ApexStateInternalsFactory<K>
+      implements StateInternalsFactory<K>, Serializable {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public StateInternals<K> stateInternalsForKey(K key) {
+      return ApexStateInternals.forKey(key);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java
new file mode 100644
index 0000000..79a4f1b
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translation.utils;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.datatorrent.api.Operator;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StandardCoder;
+
+/**
+ * The common interface for all objects transmitted through streams.
+ *
+ * @param <T> The actual payload type.
+ */
+public interface ApexStreamTuple<T> {
+  /**
+   * Gets the value of the tuple.
+   *
+   * @return tuple
+   */
+  T getValue();
+
+  /**
+   * Data tuple class.
+   *
+   * @param <T> tuple type
+   */
+  class DataTuple<T> implements ApexStreamTuple<T> {
+    private int unionTag;
+    private T value;
+
+    public static <T> DataTuple<T> of(T value) {
+      return new DataTuple<>(value, 0);
+    }
+
+    private DataTuple(T value, int unionTag) {
+      this.value = value;
+      this.unionTag = unionTag;
+    }
+
+    @Override
+    public T getValue() {
+      return value;
+    }
+
+    public void setValue(T value) {
+      this.value = value;
+    }
+
+    public int getUnionTag() {
+      return unionTag;
+    }
+
+    public void setUnionTag(int unionTag) {
+      this.unionTag = unionTag;
+    }
+
+    @Override
+    public String toString() {
+      return value.toString();
+    }
+
+  }
+
+  /**
+   * Tuple that includes a timestamp.
+   *
+   * @param <T> tuple type
+   */
+  class TimestampedTuple<T> extends DataTuple<T> {
+    private long timestamp;
+
+    public TimestampedTuple(long timestamp, T value) {
+      super(value, 0);
+      this.timestamp = timestamp;
+    }
+
+    public long getTimestamp() {
+      return timestamp;
+    }
+
+    public void setTimestamp(long timestamp) {
+      this.timestamp = timestamp;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(timestamp);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (!(obj instanceof TimestampedTuple)) {
+        return false;
+      } else {
+        TimestampedTuple<?> other = (TimestampedTuple<?>) obj;
+        return (timestamp == other.timestamp) && Objects.equals(this.getValue(), other.getValue());
+      }
+    }
+
+  }
+
+  /**
+   * Tuple that represents a watermark.
+   *
+   * @param <T> tuple type
+   */
+  class WatermarkTuple<T> extends TimestampedTuple<T> {
+    public static <T> WatermarkTuple<T> of(long timestamp) {
+      return new WatermarkTuple<>(timestamp);
+    }
+
+    protected WatermarkTuple(long timestamp) {
+      super(timestamp, null);
+    }
+
+    @Override
+    public String toString() {
+      return "[Watermark " + getTimestamp() + "]";
+    }
+  }
+
+  /**
+   * Coder for {@link ApexStreamTuple}.
+   */
+  class ApexStreamTupleCoder<T> extends StandardCoder<ApexStreamTuple<T>> {
+    private static final long serialVersionUID = 1L;
+    final Coder<T> valueCoder;
+
+    public static <T> ApexStreamTupleCoder<T> of(Coder<T> valueCoder) {
+      return new ApexStreamTupleCoder<>(valueCoder);
+    }
+
+    protected ApexStreamTupleCoder(Coder<T> valueCoder) {
+      this.valueCoder = checkNotNull(valueCoder);
+    }
+
+    @Override
+    public void encode(ApexStreamTuple<T> value, OutputStream outStream, Context context)
+        throws CoderException, IOException {
+      if (value instanceof WatermarkTuple) {
+        outStream.write(1);
+        new DataOutputStream(outStream).writeLong(((WatermarkTuple<?>) value).getTimestamp());
+      } else {
+        outStream.write(0);
+        outStream.write(((DataTuple<?>) value).unionTag);
+        valueCoder.encode(value.getValue(), outStream, context);
+      }
+    }
+
+    @Override
+    public ApexStreamTuple<T> decode(InputStream inStream, Context context)
+        throws CoderException, IOException {
+      int b = inStream.read();
+      if (b == 1) {
+        return new WatermarkTuple<>(new DataInputStream(inStream).readLong());
+      } else {
+        int unionTag = inStream.read();
+        return new DataTuple<>(valueCoder.decode(inStream, context), unionTag);
+      }
+    }
+
+    @Override
+    public List<? extends Coder<?>> getCoderArguments() {
+      return Arrays.<Coder<?>>asList(valueCoder);
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+      verifyDeterministic(
+          this.getClass().getSimpleName() + " requires a deterministic valueCoder",
+          valueCoder);
+    }
+
+    /**
+     * Returns the value coder.
+     */
+    public Coder<T> getValueCoder() {
+      return valueCoder;
+    }
+
+  }
+
+  /**
+   * Central if data tuples received on and emitted from ports should be logged.
+   * Should be called in setup and value cached in operator.
+   */
+  final class Logging {
+    public static boolean isDebugEnabled(ApexPipelineOptions options, Operator operator) {
+      return options.isTupleTracingEnabled();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/CoderAdapterStreamCodec.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/CoderAdapterStreamCodec.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/CoderAdapterStreamCodec.java
new file mode 100644
index 0000000..d08e76f
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/CoderAdapterStreamCodec.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translation.utils;
+
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.netlet.util.Slice;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.Context;
+
+/**
+ * The Apex {@link StreamCodec} adapter for using Beam {@link Coder}.
+ */
+public class CoderAdapterStreamCodec implements StreamCodec<Object>, Serializable {
+  private static final long serialVersionUID = 1L;
+  private final Coder<? super Object> coder;
+
+  public CoderAdapterStreamCodec(Coder<? super Object> coder) {
+    this.coder = coder;
+  }
+
+  @Override
+  public Object fromByteArray(Slice fragment) {
+    ByteArrayInputStream bis = new ByteArrayInputStream(fragment.buffer, fragment.offset,
+        fragment.length);
+    try {
+      return coder.decode(bis, Context.OUTER);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Slice toByteArray(Object wv) {
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    try {
+      coder.encode(wv, bos, Context.OUTER);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return new Slice(bos.toByteArray());
+  }
+
+  @Override
+  public int getPartition(Object o) {
+    return o.hashCode();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
new file mode 100644
index 0000000..078f95f
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translation.utils;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.ExecutionContext;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Serializable {@link ExecutionContext.StepContext} that does nothing.
+ */
+public class NoOpStepContext implements ExecutionContext.StepContext, Serializable {
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  public String getStepName() {
+    return null;
+  }
+
+  @Override
+  public String getTransformName() {
+    return null;
+  }
+
+  @Override
+  public void noteOutput(WindowedValue<?> output) {
+  }
+
+  @Override
+  public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) {
+  }
+
+  @Override
+  public <T, W extends BoundedWindow> void writePCollectionViewData(TupleTag<?> tag,
+      Iterable<WindowedValue<T>> data,
+      Coder<Iterable<WindowedValue<T>>> dataCoder, W window, Coder<W> windowCoder) throws
+      IOException {
+
+  }
+
+  @Override
+  public StateInternals<?> stateInternals() {
+    return null;
+  }
+
+  @Override
+  public TimerInternals timerInternals() {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
new file mode 100644
index 0000000..d0dce2b
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translation.utils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * A wrapper to enable serialization of {@link PipelineOptions}.
+ */
+public class SerializablePipelineOptions implements Externalizable {
+
+  private transient ApexPipelineOptions pipelineOptions;
+
+  public SerializablePipelineOptions(ApexPipelineOptions pipelineOptions) {
+    this.pipelineOptions = pipelineOptions;
+  }
+
+  public SerializablePipelineOptions() {
+  }
+
+  public ApexPipelineOptions get() {
+    return this.pipelineOptions;
+  }
+
+  @Override
+  public void writeExternal(ObjectOutput out) throws IOException {
+    out.writeUTF(new ObjectMapper().writeValueAsString(pipelineOptions));
+  }
+
+  @Override
+  public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+    String s = in.readUTF();
+    this.pipelineOptions = new ObjectMapper().readValue(s, PipelineOptions.class)
+        .as(ApexPipelineOptions.class);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValueAndCoderKryoSerializable.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValueAndCoderKryoSerializable.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValueAndCoderKryoSerializable.java
new file mode 100644
index 0000000..395ad1f
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValueAndCoderKryoSerializable.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translation.utils;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoSerializable;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+
+import java.io.IOException;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.Context;
+
+
+/**
+ * A {@link KryoSerializable} holder that uses the specified {@link Coder}.
+ * @param <T> element type
+ */
+public class ValueAndCoderKryoSerializable<T> implements KryoSerializable {
+  private static final JavaSerializer JAVA_SERIALIZER = new JavaSerializer();
+  private T value;
+  private Coder<T> coder;
+
+  public ValueAndCoderKryoSerializable(T value, Coder<T> coder) {
+    this.value = value;
+    this.coder = coder;
+  }
+
+  @SuppressWarnings("unused") // for Kryo
+  private ValueAndCoderKryoSerializable() {
+  }
+
+  public T get() {
+    return value;
+  }
+
+  @Override
+  public void write(Kryo kryo, Output output) {
+    try {
+      kryo.writeClass(output, coder.getClass());
+      kryo.writeObject(output, coder, JAVA_SERIALIZER);
+      coder.encode(value, output, Context.OUTER);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void read(Kryo kryo, Input input) {
+    try {
+      @SuppressWarnings("unchecked")
+      Class<Coder<T>> type = kryo.readClass(input).getType();
+      coder = kryo.readObject(input, type, JAVA_SERIALIZER);
+      value = coder.decode(input, Context.OUTER);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java
new file mode 100644
index 0000000..8526618
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translation.utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.Context;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.joda.time.Instant;
+
+/**
+ * Unbounded source that reads from a Java {@link Iterable}.
+ */
+public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.CheckpointMark> {
+  private static final long serialVersionUID = 1L;
+
+  private final byte[] codedValues;
+  private final IterableCoder<T> iterableCoder;
+
+  public ValuesSource(Iterable<T> values, Coder<T> coder) {
+    this.iterableCoder = IterableCoder.of(coder);
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    try {
+      iterableCoder.encode(values, bos, Context.OUTER);
+    } catch (IOException ex) {
+      throw new RuntimeException(ex);
+    }
+    this.codedValues = bos.toByteArray();
+  }
+
+  @Override
+  public java.util.List<? extends UnboundedSource<T, CheckpointMark>> generateInitialSplits(
+      int desiredNumSplits, PipelineOptions options) throws Exception {
+    return Collections.singletonList(this);
+  }
+
+  @Override
+  public UnboundedReader<T> createReader(PipelineOptions options,
+      @Nullable CheckpointMark checkpointMark) {
+    ByteArrayInputStream bis = new ByteArrayInputStream(codedValues);
+    try {
+      Iterable<T> values = this.iterableCoder.decode(bis, Context.OUTER);
+      return new ValuesReader<>(values, this);
+    } catch (IOException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  @Nullable
+  @Override
+  public Coder<CheckpointMark> getCheckpointMarkCoder() {
+    return null;
+  }
+
+  @Override
+  public void validate() {
+  }
+
+  @Override
+  public Coder<T> getDefaultOutputCoder() {
+    return iterableCoder.getElemCoder();
+  }
+
+  private static class ValuesReader<T> extends UnboundedReader<T> {
+
+    private final Iterable<T> values;
+    private final UnboundedSource<T, CheckpointMark> source;
+    private transient Iterator<T> iterator;
+    private T current;
+
+    public ValuesReader(Iterable<T> values, UnboundedSource<T, CheckpointMark> source) {
+      this.values = values;
+      this.source = source;
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      if (null == iterator) {
+        iterator = values.iterator();
+      }
+      return advance();
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      if (iterator.hasNext()) {
+        current = iterator.next();
+        return true;
+      } else {
+        return false;
+      }
+    }
+
+    @Override
+    public T getCurrent() throws NoSuchElementException {
+      return current;
+    }
+
+    @Override
+    public Instant getCurrentTimestamp() throws NoSuchElementException {
+      return Instant.now();
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public Instant getWatermark() {
+      return Instant.now();
+    }
+
+    @Override
+    public CheckpointMark getCheckpointMark() {
+      return null;
+    }
+
+    @Override
+    public UnboundedSource<T, ?> getCurrentSource() {
+      return source;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/package-info.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/package-info.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/package-info.java
new file mode 100644
index 0000000..534b645
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation of the Beam runner for Apache Apex.
+ */
+package org.apache.beam.runners.apex.translation.utils;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java
deleted file mode 100644
index 539f311..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.apex.translators;
-
-import org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator;
-import org.apache.beam.runners.apex.translators.io.ValuesSource;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.values.PBegin;
-
-
-/**
- * Wraps elements from Create.Values into an {@link UnboundedSource}.
- * mainly used for testing
- */
-public class CreateValuesTranslator<T> implements TransformTranslator<Create.Values<T>> {
-  private static final long serialVersionUID = 1451000241832745629L;
-
-  @Override
-  public void translate(Create.Values<T> transform, TranslationContext context) {
-    try {
-      UnboundedSource<T, ?> unboundedSource = new ValuesSource<>(transform.getElements(),
-          transform.getDefaultOutputCoder((PBegin) context.getInput()));
-      ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>(
-          unboundedSource, context.getPipelineOptions());
-      context.addOperator(operator, operator.output);
-    } catch (CannotProvideCoderException e) {
-      throw new RuntimeException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
deleted file mode 100644
index a39aacb..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.apex.translators;
-
-import com.google.common.collect.Lists;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.beam.runners.apex.translators.functions.ApexFlattenOperator;
-import org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator;
-import org.apache.beam.runners.apex.translators.io.ValuesSource;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
-
-/**
- * {@link Flatten.FlattenPCollectionList} translation to Apex operator.
- */
-public class FlattenPCollectionTranslator<T> implements
-    TransformTranslator<Flatten.FlattenPCollectionList<T>> {
-  private static final long serialVersionUID = 1L;
-
-  @Override
-  public void translate(Flatten.FlattenPCollectionList<T> transform, TranslationContext context) {
-    PCollectionList<T> input = context.getInput();
-    List<PCollection<T>> collections = input.getAll();
-
-    if (collections.isEmpty()) {
-      // create a dummy source that never emits anything
-      @SuppressWarnings("unchecked")
-      UnboundedSource<T, ?> unboundedSource = new ValuesSource<>(Collections.EMPTY_LIST,
-          (Coder<T>) VoidCoder.of());
-      ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>(
-          unboundedSource, context.getPipelineOptions());
-      context.addOperator(operator, operator.output);
-    } else {
-      PCollection<T> output = context.getOutput();
-      Map<PCollection<?>, Integer> unionTags = Collections.emptyMap();
-      flattenCollections(collections, unionTags, output, context);
-    }
-  }
-
-  /**
-   * Flatten the given collections into the given result collection. Translates
-   * into a cascading merge with 2 input ports per operator. The optional union
-   * tags can be used to identify the source in the result stream, used to
-   * channel multiple side inputs to a single Apex operator port.
-   *
-   * @param collections
-   * @param unionTags
-   * @param finalCollection
-   * @param context
-   */
-  static <T> void flattenCollections(List<PCollection<T>> collections, Map<PCollection<?>,
-      Integer> unionTags, PCollection<T> finalCollection, TranslationContext context) {
-    List<PCollection<T>> remainingCollections = Lists.newArrayList();
-    PCollection<T> firstCollection = null;
-    while (!collections.isEmpty()) {
-      for (PCollection<T> collection : collections) {
-        if (null == firstCollection) {
-          firstCollection = collection;
-        } else {
-          ApexFlattenOperator<T> operator = new ApexFlattenOperator<>();
-          context.addStream(firstCollection, operator.data1);
-          Integer unionTag = unionTags.get(firstCollection);
-          operator.data1Tag = (unionTag != null) ? unionTag : 0;
-          context.addStream(collection, operator.data2);
-          unionTag = unionTags.get(collection);
-          operator.data2Tag = (unionTag != null) ? unionTag : 0;
-
-          if (!collection.getCoder().equals(firstCollection.getCoder())) {
-              throw new UnsupportedOperationException("coders don't match");
-          }
-
-          if (collections.size() > 2) {
-            PCollection<T> intermediateCollection = intermediateCollection(collection,
-                collection.getCoder());
-            context.addOperator(operator, operator.out, intermediateCollection);
-            remainingCollections.add(intermediateCollection);
-          } else {
-            // final stream merge
-            context.addOperator(operator, operator.out, finalCollection);
-          }
-          firstCollection = null;
-        }
-      }
-      if (firstCollection != null) {
-        // push to next merge level
-        remainingCollections.add(firstCollection);
-        firstCollection = null;
-      }
-      if (remainingCollections.size() > 1) {
-        collections = remainingCollections;
-        remainingCollections = Lists.newArrayList();
-      } else {
-        collections = Lists.newArrayList();
-      }
-    }
-  }
-
-  static <T> PCollection<T> intermediateCollection(PCollection<T> input, Coder<T> outputCoder) {
-    PCollection<T> output = PCollection.createPrimitiveOutputInternal(input.getPipeline(),
-        input.getWindowingStrategy(), input.isBounded());
-    output.setCoder(outputCoder);
-    return output;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java
deleted file mode 100644
index cb78579..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.apex.translators;
-
-import org.apache.beam.runners.apex.translators.functions.ApexGroupByKeyOperator;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * {@link GroupByKey} translation to Apex operator.
- */
-public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKey<K, V>> {
-  private static final long serialVersionUID = 1L;
-
-  @Override
-  public void translate(GroupByKey<K, V> transform, TranslationContext context) {
-    PCollection<KV<K, V>> input = context.getInput();
-    ApexGroupByKeyOperator<K, V> group = new ApexGroupByKeyOperator<>(context.getPipelineOptions(),
-        input, context.<K>stateInternalsFactory()
-        );
-    context.addOperator(group, group.output);
-    context.addStream(input, group.input);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
deleted file mode 100644
index 987b729..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.apex.translators;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.datatorrent.api.Operator;
-import com.datatorrent.api.Operator.OutputPort;
-import com.google.common.collect.Maps;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.beam.runners.apex.translators.functions.ApexParDoOperator;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
-import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link ParDo.BoundMulti} is translated to {@link ApexParDoOperator} that wraps the {@link DoFn}.
- */
-public class ParDoBoundMultiTranslator<InputT, OutputT>
-    implements TransformTranslator<ParDo.BoundMulti<InputT, OutputT>> {
-  private static final long serialVersionUID = 1L;
-  private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundMultiTranslator.class);
-
-  @Override
-  public void translate(ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) {
-    OldDoFn<InputT, OutputT> doFn = transform.getFn();
-    PCollectionTuple output = context.getOutput();
-    PCollection<InputT> input = context.getInput();
-    List<PCollectionView<?>> sideInputs = transform.getSideInputs();
-    Coder<InputT> inputCoder = input.getCoder();
-    WindowedValueCoder<InputT> wvInputCoder = FullWindowedValueCoder.of(inputCoder,
-        input.getWindowingStrategy().getWindowFn().windowCoder());
-
-    ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(
-        context.getPipelineOptions(),
-        doFn, transform.getMainOutputTag(), transform.getSideOutputTags().getAll(),
-        context.<PCollection<?>>getInput().getWindowingStrategy(), sideInputs, wvInputCoder,
-        context.<Void>stateInternalsFactory()
-        );
-
-    Map<TupleTag<?>, PCollection<?>> outputs = output.getAll();
-    Map<PCollection<?>, OutputPort<?>> ports = Maps.newHashMapWithExpectedSize(outputs.size());
-    for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) {
-      if (outputEntry.getKey() == transform.getMainOutputTag()) {
-        ports.put(outputEntry.getValue(), operator.output);
-      } else {
-        int portIndex = 0;
-        for (TupleTag<?> tag : transform.getSideOutputTags().getAll()) {
-          if (tag == outputEntry.getKey()) {
-            ports.put(outputEntry.getValue(), operator.sideOutputPorts[portIndex]);
-            break;
-          }
-          portIndex++;
-        }
-      }
-    }
-    context.addOperator(operator, ports);
-    context.addStream(context.getInput(), operator.input);
-    if (!sideInputs.isEmpty()) {
-      addSideInputs(operator, sideInputs, context);
-    }
-  }
-
-  static void addSideInputs(ApexParDoOperator<?, ?> operator, List<PCollectionView<?>> sideInputs,
-      TranslationContext context) {
-    Operator.InputPort<?>[] sideInputPorts = {operator.sideInput1};
-    if (sideInputs.size() > sideInputPorts.length) {
-      PCollection<?> unionCollection = unionSideInputs(sideInputs, context);
-      context.addStream(unionCollection, sideInputPorts[0]);
-    } else {
-      // the number of ports for side inputs is fixed and each port can only take one input.
-      for (int i = 0; i < sideInputs.size(); i++) {
-        context.addStream(context.getViewInput(sideInputs.get(i)), sideInputPorts[i]);
-      }
-    }
-  }
-
-  private static PCollection<?> unionSideInputs(List<PCollectionView<?>> sideInputs,
-      TranslationContext context) {
-    checkArgument(sideInputs.size() > 1, "requires multiple side inputs");
-    // flatten and assign union tag
-    List<PCollection<Object>> sourceCollections = new ArrayList<>();
-    Map<PCollection<?>, Integer> unionTags = new HashMap<>();
-    PCollection<Object> firstSideInput = context.getViewInput(sideInputs.get(0));
-    for (int i = 0; i < sideInputs.size(); i++) {
-      PCollectionView<?> sideInput = sideInputs.get(i);
-      PCollection<?> sideInputCollection = context.getViewInput(sideInput);
-      if (!sideInputCollection.getWindowingStrategy().equals(
-          firstSideInput.getWindowingStrategy())) {
-        // TODO: check how to handle this in stream codec
-        //String msg = "Multiple side inputs with different window strategies.";
-        //throw new UnsupportedOperationException(msg);
-        LOG.warn("Side inputs union with different windowing strategies {} {}",
-            firstSideInput.getWindowingStrategy(), sideInputCollection.getWindowingStrategy());
-      }
-      if (!sideInputCollection.getCoder().equals(firstSideInput.getCoder())) {
-        String msg = "Multiple side inputs with different coders.";
-        throw new UnsupportedOperationException(msg);
-      }
-      sourceCollections.add(context.<PCollection<Object>>getViewInput(sideInput));
-      unionTags.put(sideInputCollection, i);
-    }
-
-    PCollection<Object> resultCollection = FlattenPCollectionTranslator.intermediateCollection(
-        firstSideInput, firstSideInput.getCoder());
-    FlattenPCollectionTranslator.flattenCollections(sourceCollections, unionTags, resultCollection,
-        context);
-    return resultCollection;
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
deleted file mode 100644
index 92567a6..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.apex.translators;
-
-import java.util.List;
-
-import org.apache.beam.runners.apex.translators.functions.ApexParDoOperator;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
-import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-
-/**
- * {@link ParDo.Bound} is translated to {link ApexParDoOperator} that wraps the {@link DoFn}.
- */
-public class ParDoBoundTranslator<InputT, OutputT> implements
-    TransformTranslator<ParDo.Bound<InputT, OutputT>> {
-  private static final long serialVersionUID = 1L;
-
-  @Override
-  public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) {
-    OldDoFn<InputT, OutputT> doFn = transform.getFn();
-    PCollection<OutputT> output = context.getOutput();
-    PCollection<InputT> input = context.getInput();
-    List<PCollectionView<?>> sideInputs = transform.getSideInputs();
-    Coder<InputT> inputCoder = input.getCoder();
-    WindowedValueCoder<InputT> wvInputCoder = FullWindowedValueCoder.of(inputCoder,
-        input.getWindowingStrategy().getWindowFn().windowCoder());
-
-    ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(
-        context.getPipelineOptions(),
-        doFn, new TupleTag<OutputT>(), TupleTagList.empty().getAll() /*sideOutputTags*/,
-        output.getWindowingStrategy(), sideInputs, wvInputCoder,
-        context.<Void>stateInternalsFactory()
-        );
-    context.addOperator(operator, operator.output);
-    context.addStream(context.getInput(), operator.input);
-    if (!sideInputs.isEmpty()) {
-       ParDoBoundMultiTranslator.addSideInputs(operator, sideInputs, context);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ReadUnboundedTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ReadUnboundedTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ReadUnboundedTranslator.java
deleted file mode 100644
index 3097276..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ReadUnboundedTranslator.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.apex.translators;
-
-import com.datatorrent.api.InputOperator;
-
-import org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.UnboundedSource;
-
-/**
- * {@link Read.Unbounded} is translated to Apex {@link InputOperator}
- * that wraps {@link UnboundedSource}.
- */
-public class ReadUnboundedTranslator<T> implements TransformTranslator<Read.Unbounded<T>> {
-  private static final long serialVersionUID = 1L;
-
-  @Override
-  public void translate(Read.Unbounded<T> transform, TranslationContext context) {
-    UnboundedSource<T, ?> unboundedSource = transform.getSource();
-    ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>(
-        unboundedSource, context.getPipelineOptions());
-    context.addOperator(operator, operator.output);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TransformTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TransformTranslator.java
deleted file mode 100644
index dfd2045..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TransformTranslator.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.apex.translators;
-
-
-import java.io.Serializable;
-
-import org.apache.beam.sdk.transforms.PTransform;
-
-/**
- * Translates {@link PTransform} to Apex functions.
- */
-public interface TransformTranslator<T extends PTransform<?, ?>> extends Serializable {
-  void translate(T transform, TranslationContext context);
-}