You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/12 02:28:50 UTC

[32/39] incubator-beam git commit: BEAM-261 Make translators package private.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
deleted file mode 100644
index 07c6494..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translators;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.datatorrent.api.Context.PortContext;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.Operator;
-import com.datatorrent.api.Operator.InputPort;
-import com.datatorrent.api.Operator.OutputPort;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.runners.apex.translators.utils.ApexStateInternals;
-import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
-import org.apache.beam.runners.apex.translators.utils.CoderAdapterStreamCodec;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.runners.TransformTreeNode;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
-import org.apache.beam.sdk.util.state.StateInternalsFactory;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-
-/**
- * Maintains context data for {@link TransformTranslator}s.
- */
-@SuppressWarnings({"rawtypes", "unchecked"})
-public class TranslationContext {
-
-  private final ApexPipelineOptions pipelineOptions;
-  private AppliedPTransform<?, ?, ?> currentTransform;
-  private final Map<PCollection, Pair<OutputPort<?>, List<InputPort<?>>>> streams = new HashMap<>();
-  private final Map<String, Operator> operators = new HashMap<>();
-  private final Map<PCollectionView<?>, PInput> viewInputs = new HashMap<>();
-
-  public void addView(PCollectionView<?> view) {
-    this.viewInputs.put(view, this.getInput());
-  }
-
-  public <InputT extends PInput> InputT getViewInput(PCollectionView<?> view) {
-    PInput input = this.viewInputs.get(view);
-    checkArgument(input != null, "unknown view " + view.getName());
-    return (InputT) input;
-  }
-
-  public TranslationContext(ApexPipelineOptions pipelineOptions) {
-    this.pipelineOptions = pipelineOptions;
-  }
-
-  public void setCurrentTransform(TransformTreeNode treeNode) {
-    this.currentTransform = AppliedPTransform.of(treeNode.getFullName(),
-        treeNode.getInput(), treeNode.getOutput(), (PTransform) treeNode.getTransform());
-  }
-
-  public ApexPipelineOptions getPipelineOptions() {
-    return pipelineOptions;
-  }
-
-  public <InputT extends PInput> InputT getInput() {
-    return (InputT) getCurrentTransform().getInput();
-  }
-
-  public <OutputT extends POutput> OutputT getOutput() {
-    return (OutputT) getCurrentTransform().getOutput();
-  }
-
-  private AppliedPTransform<?, ?, ?> getCurrentTransform() {
-    checkArgument(currentTransform != null, "current transform not set");
-    return currentTransform;
-  }
-
-  public void addOperator(Operator operator, OutputPort port) {
-    addOperator(operator, port, this.<PCollection<?>>getOutput());
-  }
-
-  /**
-   * Register operator and output ports for the given collections.
-   * @param operator
-   * @param ports
-   */
-  public void addOperator(Operator operator, Map<PCollection<?>, OutputPort<?>> ports) {
-    boolean first = true;
-    for (Map.Entry<PCollection<?>, OutputPort<?>> portEntry : ports.entrySet()) {
-      if (first) {
-        addOperator(operator, portEntry.getValue(), portEntry.getKey());
-        first = false;
-      } else {
-        this.streams.put(portEntry.getKey(), (Pair) new ImmutablePair<>(portEntry.getValue(),
-            new ArrayList<>()));
-      }
-    }
-  }
-
-  /**
-   * Add the operator with its output port for the given result {link PCollection}.
-   * @param operator
-   * @param port
-   * @param output
-   */
-  public void addOperator(Operator operator, OutputPort port, PCollection output) {
-    // Apex DAG requires a unique operator name
-    // use the transform's name and make it unique
-    String name = getCurrentTransform().getFullName();
-    for (int i = 1; this.operators.containsKey(name); i++) {
-      name = getCurrentTransform().getFullName() + i;
-    }
-    this.operators.put(name, operator);
-    this.streams.put(output, (Pair) new ImmutablePair<>(port, new ArrayList<>()));
-  }
-
-  public void addStream(PInput input, InputPort inputPort) {
-    Pair<OutputPort<?>, List<InputPort<?>>> stream = this.streams.get(input);
-    checkArgument(stream != null, "no upstream operator defined for %s", input);
-    stream.getRight().add(inputPort);
-  }
-
-  public void populateDAG(DAG dag) {
-    for (Map.Entry<String, Operator> nameAndOperator : this.operators.entrySet()) {
-      dag.addOperator(nameAndOperator.getKey(), nameAndOperator.getValue());
-    }
-    int streamIndex = 0;
-    for (Map.Entry<PCollection, Pair<OutputPort<?>, List<InputPort<?>>>> streamEntry : this.
-        streams.entrySet()) {
-      List<InputPort<?>> sinksList = streamEntry.getValue().getRight();
-      InputPort[] sinks = sinksList.toArray(new InputPort[sinksList.size()]);
-      if (sinks.length > 0) {
-        dag.addStream("stream" + streamIndex++, streamEntry.getValue().getLeft(), sinks);
-        for (InputPort port : sinks) {
-          PCollection pc = streamEntry.getKey();
-          Coder coder = pc.getCoder();
-          if (pc.getWindowingStrategy() != null) {
-            coder = FullWindowedValueCoder.of(pc.getCoder(),
-                pc.getWindowingStrategy().getWindowFn().windowCoder()
-                );
-          }
-          Coder<Object> wrapperCoder = ApexStreamTuple.ApexStreamTupleCoder.of(coder);
-          CoderAdapterStreamCodec streamCodec = new CoderAdapterStreamCodec(wrapperCoder);
-          dag.setInputPortAttribute(port, PortContext.STREAM_CODEC, streamCodec);
-        }
-      }
-    }
-  }
-
-  /**
-   * Return the {@link StateInternalsFactory} for the pipeline translation.
-   * @return
-   */
-  public <K> StateInternalsFactory<K> stateInternalsFactory() {
-    return new ApexStateInternals.ApexStateInternalsFactory();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
deleted file mode 100644
index 703b1f4..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translators.functions;
-
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.datatorrent.common.util.BaseOperator;
-
-import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
-import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple.WatermarkTuple;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Apex operator for Beam {@link Flatten.FlattenPCollectionList}.
- */
-public class ApexFlattenOperator<InputT> extends BaseOperator {
-
-  private static final Logger LOG = LoggerFactory.getLogger(ApexFlattenOperator.class);
-  private boolean traceTuples = false;
-
-  private long inputWM1;
-  private long inputWM2;
-  private long outputWM;
-
-  public int data1Tag;
-  public int data2Tag;
-
-  /**
-   * Data input port 1.
-   */
-  public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> data1 =
-      new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() {
-    /**
-     * Emits to port "out"
-     */
-    @Override
-    public void process(ApexStreamTuple<WindowedValue<InputT>> tuple) {
-      if (tuple instanceof WatermarkTuple) {
-        WatermarkTuple<?> wmTuple = (WatermarkTuple<?>) tuple;
-        if (wmTuple.getTimestamp() > inputWM1) {
-          inputWM1 = wmTuple.getTimestamp();
-          if (inputWM1 <= inputWM2) {
-            // move output watermark and emit it
-            outputWM = inputWM1;
-            if (traceTuples) {
-              LOG.debug("\nemitting watermark {}\n", outputWM);
-            }
-            out.emit(tuple);
-          }
-        }
-        return;
-      }
-      if (traceTuples) {
-        LOG.debug("\nemitting {}\n", tuple);
-      }
-
-      if (data1Tag > 0 && tuple instanceof ApexStreamTuple.DataTuple) {
-        ((ApexStreamTuple.DataTuple<?>) tuple).setUnionTag(data1Tag);
-      }
-      out.emit(tuple);
-    }
-  };
-
-  /**
-   * Data input port 2.
-   */
-  public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> data2 =
-      new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() {
-    /**
-     * Emits to port "out"
-     */
-    @Override
-    public void process(ApexStreamTuple<WindowedValue<InputT>> tuple) {
-      if (tuple instanceof WatermarkTuple) {
-        WatermarkTuple<?> wmTuple = (WatermarkTuple<?>) tuple;
-        if (wmTuple.getTimestamp() > inputWM2) {
-          inputWM2 = wmTuple.getTimestamp();
-          if (inputWM2 <= inputWM1) {
-            // move output watermark and emit it
-            outputWM = inputWM2;
-            if (traceTuples) {
-              LOG.debug("\nemitting watermark {}\n", outputWM);
-            }
-            out.emit(tuple);
-          }
-        }
-        return;
-      }
-      if (traceTuples) {
-        LOG.debug("\nemitting {}\n", tuple);
-      }
-
-      if (data2Tag > 0 && tuple instanceof ApexStreamTuple.DataTuple) {
-        ((ApexStreamTuple.DataTuple<?>) tuple).setUnionTag(data2Tag);
-      }
-      out.emit(tuple);
-    }
-  };
-
-  /**
-   * Output port.
-   */
-  @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<InputT>>> out =
-    new DefaultOutputPort<ApexStreamTuple<WindowedValue<InputT>>>();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
deleted file mode 100644
index 4c28c85..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
+++ /dev/null
@@ -1,478 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translators.functions;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Operator;
-import com.datatorrent.api.StreamCodec;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
-import com.google.common.base.Throwables;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
-import org.apache.beam.runners.apex.translators.utils.SerializablePipelineOptions;
-import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
-import org.apache.beam.runners.core.SystemReduceFn;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.KeyedWorkItems;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.TimerInternals;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingInternals;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.util.state.StateInternalsFactory;
-import org.apache.beam.sdk.util.state.StateNamespace;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Apex operator for Beam {@link GroupByKey}.
- * This operator expects the input stream already partitioned by K,
- * which is determined by the {@link StreamCodec} on the input port.
- *
- * @param <K>
- * @param <V>
- */
-public class ApexGroupByKeyOperator<K, V> implements Operator {
-  private static final Logger LOG = LoggerFactory.getLogger(ApexGroupByKeyOperator.class);
-  private boolean traceTuples = true;
-
-  @Bind(JavaSerializer.class)
-  private WindowingStrategy<V, BoundedWindow> windowingStrategy;
-  @Bind(JavaSerializer.class)
-  private Coder<K> keyCoder;
-  @Bind(JavaSerializer.class)
-  private Coder<V> valueCoder;
-
-  @Bind(JavaSerializer.class)
-  private final SerializablePipelineOptions serializedOptions;
-  @Bind(JavaSerializer.class)
-  private final StateInternalsFactory<K> stateInternalsFactory;
-  private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new HashMap<>();
-  private Map<ByteBuffer, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>();
-
-  private transient ProcessContext context;
-  private transient OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> fn;
-  private transient ApexTimerInternals timerInternals = new ApexTimerInternals();
-  private Instant inputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
-
-  public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<KV<K, V>>>> input =
-      new DefaultInputPort<ApexStreamTuple<WindowedValue<KV<K, V>>>>() {
-    @Override
-    public void process(ApexStreamTuple<WindowedValue<KV<K, V>>> t) {
-      try {
-        if (t instanceof ApexStreamTuple.WatermarkTuple) {
-          ApexStreamTuple.WatermarkTuple<?> mark = (ApexStreamTuple.WatermarkTuple<?>) t;
-          processWatermark(mark);
-          if (traceTuples) {
-            LOG.debug("\nemitting watermark {}\n", mark.getTimestamp());
-          }
-          output.emit(ApexStreamTuple.WatermarkTuple.<WindowedValue<KV<K, Iterable<V>>>>of(
-              mark.getTimestamp()));
-          return;
-        }
-        if (traceTuples) {
-          LOG.debug("\ninput {}\n", t.getValue());
-        }
-        processElement(t.getValue());
-      } catch (Exception e) {
-        Throwables.propagateIfPossible(e);
-        throw new RuntimeException(e);
-      }
-    }
-  };
-
-  @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<KV<K, Iterable<V>>>>>
-      output = new DefaultOutputPort<>();
-
-  @SuppressWarnings("unchecked")
-  public ApexGroupByKeyOperator(ApexPipelineOptions pipelineOptions, PCollection<KV<K, V>> input,
-      StateInternalsFactory<K> stateInternalsFactory) {
-    checkNotNull(pipelineOptions);
-    this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
-    this.windowingStrategy = (WindowingStrategy<V, BoundedWindow>) input.getWindowingStrategy();
-    this.keyCoder = ((KvCoder<K, V>) input.getCoder()).getKeyCoder();
-    this.valueCoder = ((KvCoder<K, V>) input.getCoder()).getValueCoder();
-    this.stateInternalsFactory = stateInternalsFactory;
-  }
-
-  @SuppressWarnings("unused") // for Kryo
-  private ApexGroupByKeyOperator() {
-    this.serializedOptions = null;
-    this.stateInternalsFactory = null;
-  }
-
-  @Override
-  public void beginWindow(long l) {
-  }
-
-  @Override
-  public void endWindow() {
-  }
-
-  @Override
-  public void setup(OperatorContext context) {
-    this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(serializedOptions.get(), this);
-    StateInternalsFactory<K> stateInternalsFactory = new GroupByKeyStateInternalsFactory();
-    this.fn = GroupAlsoByWindowViaWindowSetDoFn.create(this.windowingStrategy,
-        stateInternalsFactory, SystemReduceFn.<K, V, BoundedWindow>buffering(this.valueCoder));
-    this.context = new ProcessContext(fn, this.timerInternals);
-  }
-
-  @Override
-  public void teardown() {
-  }
-
-  /**
-   * Returns the list of timers that are ready to fire. These are the timers
-   * that are registered to be triggered at a time before the current watermark.
-   * We keep these timers in a Set, so that they are deduplicated, as the same
-   * timer can be registered multiple times.
-   */
-  private Multimap<ByteBuffer, TimerInternals.TimerData> getTimersReadyToProcess(
-      long currentWatermark) {
-
-    // we keep the timers to return in a different list and launch them later
-    // because we cannot prevent a trigger from registering another trigger,
-    // which would lead to concurrent modification exception.
-    Multimap<ByteBuffer, TimerInternals.TimerData> toFire = HashMultimap.create();
-
-    Iterator<Map.Entry<ByteBuffer, Set<TimerInternals.TimerData>>> it =
-        activeTimers.entrySet().iterator();
-    while (it.hasNext()) {
-      Map.Entry<ByteBuffer, Set<TimerInternals.TimerData>> keyWithTimers = it.next();
-
-      Iterator<TimerInternals.TimerData> timerIt = keyWithTimers.getValue().iterator();
-      while (timerIt.hasNext()) {
-        TimerInternals.TimerData timerData = timerIt.next();
-        if (timerData.getTimestamp().isBefore(currentWatermark)) {
-          toFire.put(keyWithTimers.getKey(), timerData);
-          timerIt.remove();
-        }
-      }
-
-      if (keyWithTimers.getValue().isEmpty()) {
-        it.remove();
-      }
-    }
-    return toFire;
-  }
-
-  private void processElement(WindowedValue<KV<K, V>> windowedValue) throws Exception {
-    final KV<K, V> kv = windowedValue.getValue();
-    final WindowedValue<V> updatedWindowedValue = WindowedValue.of(kv.getValue(),
-        windowedValue.getTimestamp(),
-        windowedValue.getWindows(),
-        windowedValue.getPane());
-
-    KeyedWorkItem<K, V> kwi = KeyedWorkItems.elementsWorkItem(
-            kv.getKey(),
-            Collections.singletonList(updatedWindowedValue));
-
-    context.setElement(kwi, getStateInternalsForKey(kwi.key()));
-    fn.processElement(context);
-  }
-
-  private StateInternals<K> getStateInternalsForKey(K key) {
-    final ByteBuffer keyBytes;
-    try {
-      keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key));
-    } catch (CoderException e) {
-      throw new RuntimeException(e);
-    }
-    StateInternals<K> stateInternals = perKeyStateInternals.get(keyBytes);
-    if (stateInternals == null) {
-      stateInternals = stateInternalsFactory.stateInternalsForKey(key);
-      perKeyStateInternals.put(keyBytes, stateInternals);
-    }
-    return stateInternals;
-  }
-
-  private void registerActiveTimer(K key, TimerInternals.TimerData timer) {
-    final ByteBuffer keyBytes;
-    try {
-      keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key));
-    } catch (CoderException e) {
-      throw new RuntimeException(e);
-    }
-    Set<TimerInternals.TimerData> timersForKey = activeTimers.get(keyBytes);
-    if (timersForKey == null) {
-      timersForKey = new HashSet<>();
-    }
-    timersForKey.add(timer);
-    activeTimers.put(keyBytes, timersForKey);
-  }
-
-  private void unregisterActiveTimer(K key, TimerInternals.TimerData timer) {
-    final ByteBuffer keyBytes;
-    try {
-      keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key));
-    } catch (CoderException e) {
-      throw new RuntimeException(e);
-    }
-    Set<TimerInternals.TimerData> timersForKey = activeTimers.get(keyBytes);
-    if (timersForKey != null) {
-      timersForKey.remove(timer);
-      if (timersForKey.isEmpty()) {
-        activeTimers.remove(keyBytes);
-      } else {
-        activeTimers.put(keyBytes, timersForKey);
-      }
-    }
-  }
-
-  private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) throws Exception {
-    this.inputWatermark = new Instant(mark.getTimestamp());
-    Multimap<ByteBuffer, TimerInternals.TimerData> timers = getTimersReadyToProcess(
-        mark.getTimestamp());
-    if (!timers.isEmpty()) {
-      for (ByteBuffer keyBytes : timers.keySet()) {
-        K key = CoderUtils.decodeFromByteArray(keyCoder, keyBytes.array());
-        KeyedWorkItem<K, V> kwi = KeyedWorkItems.<K, V>timersWorkItem(key, timers.get(keyBytes));
-        context.setElement(kwi, getStateInternalsForKey(kwi.key()));
-        fn.processElement(context);
-      }
-    }
-  }
-
-  private class ProcessContext extends GroupAlsoByWindowViaWindowSetDoFn<K, V, Iterable<V>, ?,
-      KeyedWorkItem<K, V>>.ProcessContext {
-
-    private final ApexTimerInternals timerInternals;
-    private StateInternals<K> stateInternals;
-    private KeyedWorkItem<K, V> element;
-
-    public ProcessContext(OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> function,
-                          ApexTimerInternals timerInternals) {
-      function.super();
-      this.timerInternals = checkNotNull(timerInternals);
-    }
-
-    public void setElement(KeyedWorkItem<K, V> element, StateInternals<K> stateForKey) {
-      this.element = element;
-      this.stateInternals = stateForKey;
-    }
-
-    @Override
-    public KeyedWorkItem<K, V> element() {
-      return this.element;
-    }
-
-    @Override
-    public Instant timestamp() {
-      throw new UnsupportedOperationException(
-          "timestamp() is not available when processing KeyedWorkItems.");
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return serializedOptions.get();
-    }
-
-    @Override
-    public void output(KV<K, Iterable<V>> output) {
-      throw new UnsupportedOperationException(
-          "output() is not available when processing KeyedWorkItems.");
-    }
-
-    @Override
-    public void outputWithTimestamp(KV<K, Iterable<V>> output, Instant timestamp) {
-      throw new UnsupportedOperationException(
-          "outputWithTimestamp() is not available when processing KeyedWorkItems.");
-    }
-
-    @Override
-    public PaneInfo pane() {
-      throw new UnsupportedOperationException(
-          "pane() is not available when processing KeyedWorkItems.");
-    }
-
-    @Override
-    public BoundedWindow window() {
-      throw new UnsupportedOperationException(
-          "window() is not available when processing KeyedWorkItems.");
-    }
-
-    @Override
-    public WindowingInternals<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> windowingInternals() {
-      return new WindowingInternals<KeyedWorkItem<K, V>, KV<K, Iterable<V>>>() {
-
-        @Override
-        public StateInternals<K> stateInternals() {
-          return stateInternals;
-        }
-
-        @Override
-        public void outputWindowedValue(KV<K, Iterable<V>> output, Instant timestamp,
-            Collection<? extends BoundedWindow> windows, PaneInfo pane) {
-          if (traceTuples) {
-            LOG.debug("\nemitting {} timestamp {}\n", output, timestamp);
-          }
-          ApexGroupByKeyOperator.this.output.emit(ApexStreamTuple.DataTuple.of(
-              WindowedValue.of(output, timestamp, windows, pane)));
-        }
-
-        @Override
-        public TimerInternals timerInternals() {
-          return timerInternals;
-        }
-
-        @Override
-        public Collection<? extends BoundedWindow> windows() {
-          throw new UnsupportedOperationException("windows() is not available in Streaming mode.");
-        }
-
-        @Override
-        public PaneInfo pane() {
-          throw new UnsupportedOperationException("pane() is not available in Streaming mode.");
-        }
-
-        @Override
-        public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data,
-            Coder<T> elemCoder) throws IOException {
-          throw new RuntimeException("writePCollectionViewData() not available in Streaming mode.");
-        }
-
-        @Override
-        public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
-          throw new RuntimeException("sideInput() is not available in Streaming mode.");
-        }
-      };
-    }
-
-    @Override
-    public <T> T sideInput(PCollectionView<T> view) {
-      throw new RuntimeException("sideInput() is not supported in Streaming mode.");
-    }
-
-    @Override
-    public <T> void sideOutput(TupleTag<T> tag, T output) {
-      // ignore the side output, this can happen when a user does not register
-      // side outputs but then outputs using a freshly created TupleTag.
-      throw new RuntimeException("sideOutput() is not available when grouping by window.");
-    }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      sideOutput(tag, output);
-    }
-
-    @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
-        String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      throw new UnsupportedOperationException();
-    }
-  }
-
-  /**
-   * An implementation of Beam's {@link TimerInternals}.
-   *
-   */
-  public class ApexTimerInternals implements TimerInternals {
-
-    @Override
-    public void setTimer(TimerData timerKey) {
-      registerActiveTimer(context.element().key(), timerKey);
-    }
-
-    @Override
-    public void deleteTimer(TimerData timerKey) {
-      unregisterActiveTimer(context.element().key(), timerKey);
-    }
-
-    @Override
-    public Instant currentProcessingTime() {
-      return Instant.now();
-    }
-
-    @Override
-    public Instant currentSynchronizedProcessingTime() {
-      // TODO Auto-generated method stub
-      return null;
-    }
-
-    @Override
-    public Instant currentInputWatermarkTime() {
-      return inputWatermark;
-    }
-
-    @Override
-    public Instant currentOutputWatermarkTime() {
-      // TODO Auto-generated method stub
-      return null;
-    }
-
-    @Override
-    public void setTimer(StateNamespace namespace, String timerId, Instant target,
-        TimeDomain timeDomain) {
-      throw new UnsupportedOperationException("Setting timer by ID not yet supported.");
-    }
-
-    @Override
-    public void deleteTimer(StateNamespace namespace, String timerId) {
-      throw new UnsupportedOperationException("Canceling of timer by ID is not yet supported.");
-    }
-
-  }
-
-  private class GroupByKeyStateInternalsFactory implements StateInternalsFactory<K>, Serializable {
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public StateInternals<K> stateInternalsForKey(K key) {
-      return getStateInternalsForKey(key);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
deleted file mode 100644
index 43384d6..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
+++ /dev/null
@@ -1,375 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translators.functions;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.InputPortFieldAnnotation;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.datatorrent.common.util.BaseOperator;
-import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.runners.apex.ApexRunner;
-import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
-import org.apache.beam.runners.apex.translators.utils.NoOpStepContext;
-import org.apache.beam.runners.apex.translators.utils.SerializablePipelineOptions;
-import org.apache.beam.runners.apex.translators.utils.ValueAndCoderKryoSerializable;
-import org.apache.beam.runners.core.DoFnRunner;
-import org.apache.beam.runners.core.DoFnRunners;
-import org.apache.beam.runners.core.DoFnRunners.OutputManager;
-import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
-import org.apache.beam.runners.core.SideInputHandler;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.util.ExecutionContext;
-import org.apache.beam.sdk.util.NullSideInputReader;
-import org.apache.beam.sdk.util.SideInputReader;
-import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.util.state.StateInternalsFactory;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Apex operator for Beam {@link DoFn}.
- */
-public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements OutputManager {
-  private static final Logger LOG = LoggerFactory.getLogger(ApexParDoOperator.class);
-  private boolean traceTuples = true;
-
-  @Bind(JavaSerializer.class)
-  private final SerializablePipelineOptions pipelineOptions;
-  @Bind(JavaSerializer.class)
-  private final OldDoFn<InputT, OutputT> doFn;
-  @Bind(JavaSerializer.class)
-  private final TupleTag<OutputT> mainOutputTag;
-  @Bind(JavaSerializer.class)
-  private final List<TupleTag<?>> sideOutputTags;
-  @Bind(JavaSerializer.class)
-  private final WindowingStrategy<?, ?> windowingStrategy;
-  @Bind(JavaSerializer.class)
-  private final List<PCollectionView<?>> sideInputs;
-
-  private final StateInternals<Void> sideInputStateInternals;
-  private final ValueAndCoderKryoSerializable<List<WindowedValue<InputT>>> pushedBack;
-  private LongMin pushedBackWatermark = new LongMin();
-  private long currentInputWatermark = Long.MIN_VALUE;
-  private long currentOutputWatermark = currentInputWatermark;
-
-  private transient PushbackSideInputDoFnRunner<InputT, OutputT> pushbackDoFnRunner;
-  private transient SideInputHandler sideInputHandler;
-  private transient Map<TupleTag<?>, DefaultOutputPort<ApexStreamTuple<?>>> sideOutputPortMapping =
-      Maps.newHashMapWithExpectedSize(5);
-
-  public ApexParDoOperator(
-      ApexPipelineOptions pipelineOptions,
-      OldDoFn<InputT, OutputT> doFn,
-      TupleTag<OutputT> mainOutputTag,
-      List<TupleTag<?>> sideOutputTags,
-      WindowingStrategy<?, ?> windowingStrategy,
-      List<PCollectionView<?>> sideInputs,
-      Coder<WindowedValue<InputT>> inputCoder,
-      StateInternalsFactory<Void> stateInternalsFactory
-      ) {
-    this.pipelineOptions = new SerializablePipelineOptions(pipelineOptions);
-    this.doFn = doFn;
-    this.mainOutputTag = mainOutputTag;
-    this.sideOutputTags = sideOutputTags;
-    this.windowingStrategy = windowingStrategy;
-    this.sideInputs = sideInputs;
-    this.sideInputStateInternals = stateInternalsFactory.stateInternalsForKey(null);
-
-    if (sideOutputTags.size() > sideOutputPorts.length) {
-      String msg = String.format("Too many side outputs (currently only supporting %s).",
-          sideOutputPorts.length);
-      throw new UnsupportedOperationException(msg);
-    }
-
-    Coder<List<WindowedValue<InputT>>> coder = ListCoder.of(inputCoder);
-    this.pushedBack = new ValueAndCoderKryoSerializable<>(new ArrayList<WindowedValue<InputT>>(),
-        coder);
-
-  }
-
-  @SuppressWarnings("unused") // for Kryo
-  private ApexParDoOperator() {
-    this.pipelineOptions = null;
-    this.doFn = null;
-    this.mainOutputTag = null;
-    this.sideOutputTags = null;
-    this.windowingStrategy = null;
-    this.sideInputs = null;
-    this.pushedBack = null;
-    this.sideInputStateInternals = null;
-  }
-
-  public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> input =
-      new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() {
-    @Override
-    public void process(ApexStreamTuple<WindowedValue<InputT>> t) {
-      if (t instanceof ApexStreamTuple.WatermarkTuple) {
-        processWatermark((ApexStreamTuple.WatermarkTuple<?>) t);
-      } else {
-        if (traceTuples) {
-          LOG.debug("\ninput {}\n", t.getValue());
-        }
-        Iterable<WindowedValue<InputT>> justPushedBack = processElementInReadyWindows(t.getValue());
-        for (WindowedValue<InputT> pushedBackValue : justPushedBack) {
-          pushedBackWatermark.add(pushedBackValue.getTimestamp().getMillis());
-          pushedBack.get().add(pushedBackValue);
-        }
-      }
-    }
-  };
-
-  @InputPortFieldAnnotation(optional = true)
-  public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<Iterable<?>>>> sideInput1 =
-      new DefaultInputPort<ApexStreamTuple<WindowedValue<Iterable<?>>>>() {
-    @Override
-    public void process(ApexStreamTuple<WindowedValue<Iterable<?>>> t) {
-      if (t instanceof ApexStreamTuple.WatermarkTuple) {
-        // ignore side input watermarks
-        return;
-      }
-
-      int sideInputIndex = 0;
-      if (t instanceof ApexStreamTuple.DataTuple) {
-        sideInputIndex = ((ApexStreamTuple.DataTuple<?>) t).getUnionTag();
-      }
-
-      if (traceTuples) {
-        LOG.debug("\nsideInput {} {}\n", sideInputIndex, t.getValue());
-      }
-
-      PCollectionView<?> sideInput = sideInputs.get(sideInputIndex);
-      sideInputHandler.addSideInputValue(sideInput, t.getValue());
-
-      List<WindowedValue<InputT>> newPushedBack = new ArrayList<>();
-      for (WindowedValue<InputT> elem : pushedBack.get()) {
-        Iterable<WindowedValue<InputT>> justPushedBack = processElementInReadyWindows(elem);
-        Iterables.addAll(newPushedBack, justPushedBack);
-      }
-
-      pushedBack.get().clear();
-      pushedBackWatermark.clear();
-      for (WindowedValue<InputT> pushedBackValue : newPushedBack) {
-        pushedBackWatermark.add(pushedBackValue.getTimestamp().getMillis());
-        pushedBack.get().add(pushedBackValue);
-      }
-
-      // potentially emit watermark
-      processWatermark(ApexStreamTuple.WatermarkTuple.of(currentInputWatermark));
-    }
-  };
-
-  @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<ApexStreamTuple<?>> output = new DefaultOutputPort<>();
-
-  @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput1 =
-      new DefaultOutputPort<>();
-  @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput2 =
-      new DefaultOutputPort<>();
-  @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput3 =
-      new DefaultOutputPort<>();
-  @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput4 =
-      new DefaultOutputPort<>();
-  @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput5 =
-      new DefaultOutputPort<>();
-
-  public final transient DefaultOutputPort<?>[] sideOutputPorts = {sideOutput1, sideOutput2,
-      sideOutput3, sideOutput4, sideOutput5};
-
-  @Override
-  public <T> void output(TupleTag<T> tag, WindowedValue<T> tuple) {
-    DefaultOutputPort<ApexStreamTuple<?>> sideOutputPort = sideOutputPortMapping.get(tag);
-    if (sideOutputPort != null) {
-      sideOutputPort.emit(ApexStreamTuple.DataTuple.of(tuple));
-    } else {
-      output.emit(ApexStreamTuple.DataTuple.of(tuple));
-    }
-    if (traceTuples) {
-      LOG.debug("\nemitting {}\n", tuple);
-    }
-  }
-
-  private Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem) {
-    try {
-      pushbackDoFnRunner.startBundle();
-      Iterable<WindowedValue<InputT>> pushedBack = pushbackDoFnRunner
-          .processElementInReadyWindows(elem);
-      pushbackDoFnRunner.finishBundle();
-      return pushedBack;
-    } catch (UserCodeException ue) {
-      if (ue.getCause() instanceof AssertionError) {
-        ApexRunner.assertionError = (AssertionError) ue.getCause();
-      }
-      throw ue;
-    }
-  }
-
-  private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) {
-    this.currentInputWatermark = mark.getTimestamp();
-
-    if (sideInputs.isEmpty()) {
-      if (traceTuples) {
-        LOG.debug("\nemitting watermark {}\n", mark);
-      }
-      output.emit(mark);
-      return;
-    }
-
-    long potentialOutputWatermark =
-        Math.min(pushedBackWatermark.get(), currentInputWatermark);
-    if (potentialOutputWatermark > currentOutputWatermark) {
-      currentOutputWatermark = potentialOutputWatermark;
-      if (traceTuples) {
-        LOG.debug("\nemitting watermark {}\n", currentOutputWatermark);
-      }
-      output.emit(ApexStreamTuple.WatermarkTuple.of(currentOutputWatermark));
-    }
-  }
-
-  @Override
-  public void setup(OperatorContext context) {
-    this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(pipelineOptions.get(), this);
-    SideInputReader sideInputReader = NullSideInputReader.of(sideInputs);
-    if (!sideInputs.isEmpty()) {
-      sideInputHandler = new SideInputHandler(sideInputs, sideInputStateInternals);
-      sideInputReader = sideInputHandler;
-    }
-
-    for (int i = 0; i < sideOutputTags.size(); i++) {
-      @SuppressWarnings("unchecked")
-      DefaultOutputPort<ApexStreamTuple<?>> port = (DefaultOutputPort<ApexStreamTuple<?>>)
-          sideOutputPorts[i];
-      sideOutputPortMapping.put(sideOutputTags.get(i), port);
-    }
-
-    DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.createDefault(
-        pipelineOptions.get(),
-        doFn,
-        sideInputReader,
-        this,
-        mainOutputTag,
-        sideOutputTags,
-        new NoOpStepContext(),
-        new NoOpAggregatorFactory(),
-        windowingStrategy
-        );
-
-    pushbackDoFnRunner =
-        PushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler);
-
-    try {
-      doFn.setup();
-    } catch (Exception e) {
-      Throwables.propagateIfPossible(e);
-      throw new RuntimeException(e);
-    }
-
-  }
-
-  @Override
-  public void beginWindow(long windowId) {
-  }
-
-  @Override
-  public void endWindow() {
-  }
-
-  /**
-   * TODO: Placeholder for aggregation, to be implemented for embedded and cluster mode.
-   * It is called from {@link org.apache.beam.sdk.util.SimpleDoFnRunner}.
-   */
-  public static class NoOpAggregatorFactory implements AggregatorFactory {
-
-    private NoOpAggregatorFactory() {
-    }
-
-    @Override
-    public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
-        Class<?> fnClass, ExecutionContext.StepContext step,
-        String name, CombineFn<InputT, AccumT, OutputT> combine) {
-      return new NoOpAggregator<InputT, OutputT>();
-    }
-
-    private static class NoOpAggregator<InputT, OutputT> implements Aggregator<InputT, OutputT>,
-        java.io.Serializable {
-      private static final long serialVersionUID = 1L;
-
-      @Override
-      public void addValue(InputT value) {
-      }
-
-      @Override
-      public String getName() {
-        // TODO Auto-generated method stub
-        return null;
-      }
-
-      @Override
-      public CombineFn<InputT, ?, OutputT> getCombineFn() {
-        // TODO Auto-generated method stub
-        return null;
-      }
-
-    };
-  }
-
-  private static class LongMin {
-    long state = Long.MAX_VALUE;
-
-    public void add(long l) {
-      state = Math.min(state, l);
-    }
-
-    public long get() {
-      return state;
-    }
-
-    public void clear() {
-      state = Long.MAX_VALUE;
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/package-info.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/package-info.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/package-info.java
deleted file mode 100644
index ecb0adb..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Implementation of the Beam runner for Apache Apex.
- */
-package org.apache.beam.runners.apex.translators.functions;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
deleted file mode 100644
index 61236ca..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.apex.translators.io;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.datatorrent.common.util.BaseOperator;
-import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
-import com.google.common.base.Throwables;
-
-import java.io.IOException;
-
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
-import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple.DataTuple;
-import org.apache.beam.runners.apex.translators.utils.SerializablePipelineOptions;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Apex input operator that wraps Beam {@link UnboundedSource}.
- */
-public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT
-    extends UnboundedSource.CheckpointMark> implements InputOperator {
-  private static final Logger LOG = LoggerFactory.getLogger(
-      ApexReadUnboundedInputOperator.class);
-  private boolean traceTuples = false;
-  private long outputWatermark = 0;
-
-  @Bind(JavaSerializer.class)
-  private final SerializablePipelineOptions pipelineOptions;
-  @Bind(JavaSerializer.class)
-  private final UnboundedSource<OutputT, CheckpointMarkT> source;
-  private final boolean isBoundedSource;
-  private transient UnboundedSource.UnboundedReader<OutputT> reader;
-  private transient boolean available = false;
-  @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<OutputT>>> output =
-      new DefaultOutputPort<>();
-
-  public ApexReadUnboundedInputOperator(UnboundedSource<OutputT, CheckpointMarkT> source,
-      ApexPipelineOptions options) {
-    this.pipelineOptions = new SerializablePipelineOptions(options);
-    this.source = source;
-    this.isBoundedSource = false;
-  }
-
-  public ApexReadUnboundedInputOperator(UnboundedSource<OutputT, CheckpointMarkT> source,
-      boolean isBoundedSource, ApexPipelineOptions options) {
-    this.pipelineOptions = new SerializablePipelineOptions(options);
-    this.source = source;
-    this.isBoundedSource = isBoundedSource;
-  }
-
-  @SuppressWarnings("unused") // for Kryo
-  private ApexReadUnboundedInputOperator() {
-    this.pipelineOptions = null; this.source = null; this.isBoundedSource = false;
-  }
-
-  @Override
-  public void beginWindow(long windowId) {
-    if (!available && (isBoundedSource || source instanceof ValuesSource)) {
-      // if it's a Create and the input was consumed, emit final watermark
-      emitWatermarkIfNecessary(GlobalWindow.TIMESTAMP_MAX_VALUE.getMillis());
-      // terminate the stream (allows tests to finish faster)
-      BaseOperator.shutdown();
-    } else {
-      emitWatermarkIfNecessary(reader.getWatermark().getMillis());
-    }
-  }
-
-  private void emitWatermarkIfNecessary(long mark) {
-    if (mark > outputWatermark) {
-      outputWatermark = mark;
-      if (traceTuples) {
-        LOG.debug("\nemitting watermark {}\n", mark);
-      }
-      output.emit(ApexStreamTuple.WatermarkTuple.<WindowedValue<OutputT>>of(mark));
-    }
-  }
-
-  @Override
-  public void endWindow() {
-  }
-
-  @Override
-  public void setup(OperatorContext context) {
-    this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(pipelineOptions.get(), this);
-    try {
-      reader = source.createReader(this.pipelineOptions.get(), null);
-      available = reader.start();
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public void teardown() {
-    try {
-      if (reader != null) {
-        reader.close();
-      }
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public void emitTuples() {
-    try {
-      if (!available) {
-        available = reader.advance();
-      }
-      if (available) {
-        OutputT data = reader.getCurrent();
-        Instant timestamp = reader.getCurrentTimestamp();
-        available = reader.advance();
-        if (traceTuples) {
-          LOG.debug("\nemitting '{}' timestamp {}\n", data, timestamp);
-        }
-        output.emit(DataTuple.of(WindowedValue.of(
-            data, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)));
-      }
-    } catch (Exception e) {
-      Throwables.propagateIfPossible(e);
-      throw new RuntimeException(e);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ValuesSource.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ValuesSource.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ValuesSource.java
deleted file mode 100644
index fadf8ec4..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ValuesSource.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.apex.translators.io;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import javax.annotation.Nullable;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.Coder.Context;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.joda.time.Instant;
-
-/**
- * Unbounded source that reads from a Java {@link Iterable}.
- */
-public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.CheckpointMark> {
-  private static final long serialVersionUID = 1L;
-
-  private final byte[] codedValues;
-  private final IterableCoder<T> iterableCoder;
-
-  public ValuesSource(Iterable<T> values, Coder<T> coder) {
-    this.iterableCoder = IterableCoder.of(coder);
-    ByteArrayOutputStream bos = new ByteArrayOutputStream();
-    try {
-      iterableCoder.encode(values, bos, Context.OUTER);
-    } catch (IOException ex) {
-      throw new RuntimeException(ex);
-    }
-    this.codedValues = bos.toByteArray();
-  }
-
-  @Override
-  public java.util.List<? extends UnboundedSource<T, CheckpointMark>> generateInitialSplits(
-      int desiredNumSplits, PipelineOptions options) throws Exception {
-    return Collections.singletonList(this);
-  }
-
-  @Override
-  public UnboundedReader<T> createReader(PipelineOptions options,
-      @Nullable CheckpointMark checkpointMark) {
-    ByteArrayInputStream bis = new ByteArrayInputStream(codedValues);
-    try {
-      Iterable<T> values = this.iterableCoder.decode(bis, Context.OUTER);
-      return new ValuesReader<>(values, this);
-    } catch (IOException ex) {
-      throw new RuntimeException(ex);
-    }
-  }
-
-  @Nullable
-  @Override
-  public Coder<CheckpointMark> getCheckpointMarkCoder() {
-    return null;
-  }
-
-  @Override
-  public void validate() {
-  }
-
-  @Override
-  public Coder<T> getDefaultOutputCoder() {
-    return iterableCoder.getElemCoder();
-  }
-
-  private static class ValuesReader<T> extends UnboundedReader<T> {
-
-    private final Iterable<T> values;
-    private final UnboundedSource<T, CheckpointMark> source;
-    private transient Iterator<T> iterator;
-    private T current;
-
-    public ValuesReader(Iterable<T> values, UnboundedSource<T, CheckpointMark> source) {
-      this.values = values;
-      this.source = source;
-    }
-
-    @Override
-    public boolean start() throws IOException {
-      if (null == iterator) {
-        iterator = values.iterator();
-      }
-      return advance();
-    }
-
-    @Override
-    public boolean advance() throws IOException {
-      if (iterator.hasNext()) {
-        current = iterator.next();
-        return true;
-      } else {
-        return false;
-      }
-    }
-
-    @Override
-    public T getCurrent() throws NoSuchElementException {
-      return current;
-    }
-
-    @Override
-    public Instant getCurrentTimestamp() throws NoSuchElementException {
-      return Instant.now();
-    }
-
-    @Override
-    public void close() throws IOException {
-    }
-
-    @Override
-    public Instant getWatermark() {
-      return Instant.now();
-    }
-
-    @Override
-    public CheckpointMark getCheckpointMark() {
-      return null;
-    }
-
-    @Override
-    public UnboundedSource<T, ?> getCurrentSource() {
-      return source;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/package-info.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/package-info.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/package-info.java
deleted file mode 100644
index 0d17f19..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Implementation of the Beam runner for Apache Apex.
- */
-package org.apache.beam.runners.apex.translators.io;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/package-info.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/package-info.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/package-info.java
deleted file mode 100644
index 7d7c6cc..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Implementation of the Beam runner for Apache Apex.
- */
-package org.apache.beam.runners.apex.translators;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStateInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStateInternals.java
deleted file mode 100644
index edc1220..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStateInternals.java
+++ /dev/null
@@ -1,438 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translators.utils;
-
-import com.esotericsoftware.kryo.DefaultSerializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
-import com.google.common.collect.HashBasedTable;
-import com.google.common.collect.Table;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.Coder.Context;
-import org.apache.beam.sdk.coders.InstantCoder;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.util.CombineFnUtil;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
-import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.util.state.StateContext;
-import org.apache.beam.sdk.util.state.StateContexts;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.util.state.StateInternalsFactory;
-import org.apache.beam.sdk.util.state.StateNamespace;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTag.StateBinder;
-import org.apache.beam.sdk.util.state.ValueState;
-import org.apache.beam.sdk.util.state.WatermarkHoldState;
-import org.joda.time.Instant;
-
-/**
- * Implementation of {@link StateInternals} that can be serialized and
- * checkpointed with the operator. Suitable for small states, in the future this
- * should be based on the incremental state saving components in the Apex
- * library.
- */
-@DefaultSerializer(JavaSerializer.class)
-public class ApexStateInternals<K> implements StateInternals<K>, Serializable {
-  private static final long serialVersionUID = 1L;
-  public static <K> ApexStateInternals<K> forKey(K key) {
-    return new ApexStateInternals<>(key);
-  }
-
-  private final K key;
-
-  protected ApexStateInternals(K key) {
-    this.key = key;
-  }
-
-  @Override
-  public K getKey() {
-    return key;
-  }
-
-  /**
-   * Serializable state for internals (namespace -> state tag -> coded value).
-   */
-  private final Table<String, String, byte[]> stateTable = HashBasedTable.create();
-
-  @Override
-  public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address) {
-    return state(namespace, address, StateContexts.nullContext());
-  }
-
-  @Override
-  public <T extends State> T state(
-      StateNamespace namespace, StateTag<? super K, T> address, final StateContext<?> c) {
-    return address.bind(new ApexStateBinder(key, namespace, address, c));
-  }
-
-  /**
-   * A {@link StateBinder} that returns {@link State} wrappers for serialized state.
-   */
-  private class ApexStateBinder implements StateBinder<K> {
-    private final K key;
-    private final StateNamespace namespace;
-    private final StateContext<?> c;
-
-    private ApexStateBinder(K key, StateNamespace namespace, StateTag<? super K, ?> address,
-        StateContext<?> c) {
-      this.key = key;
-      this.namespace = namespace;
-      this.c = c;
-    }
-
-    @Override
-    public <T> ValueState<T> bindValue(
-        StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
-      return new ApexValueState<T>(namespace, address, coder);
-    }
-
-    @Override
-    public <T> BagState<T> bindBag(
-        final StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
-      return new ApexBagState<T>(namespace, address, elemCoder);
-    }
-
-    @Override
-    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
-        bindCombiningValue(
-            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
-            Coder<AccumT> accumCoder,
-            final CombineFn<InputT, AccumT, OutputT> combineFn) {
-      return new ApexAccumulatorCombiningState<K, InputT, AccumT, OutputT>(
-          namespace,
-          address,
-          accumCoder,
-          key,
-          combineFn.<K>asKeyedFn()
-          );
-    }
-
-    @Override
-    public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
-        StateTag<? super K, WatermarkHoldState<W>> address,
-        OutputTimeFn<? super W> outputTimeFn) {
-      return new ApexWatermarkHoldState<W>(namespace, address, outputTimeFn);
-    }
-
-    @Override
-    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
-        bindKeyedCombiningValue(
-            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
-            Coder<AccumT> accumCoder,
-            KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
-      return new ApexAccumulatorCombiningState<K, InputT, AccumT, OutputT>(
-          namespace,
-          address,
-          accumCoder,
-          key, combineFn);
-    }
-
-    @Override
-    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
-        bindKeyedCombiningValueWithContext(
-            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
-            Coder<AccumT> accumCoder,
-            KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
-      return bindKeyedCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
-    }
-  }
-
-  private class AbstractState<T> {
-    protected final StateNamespace namespace;
-    protected final StateTag<?, ? extends State> address;
-    protected final Coder<T> coder;
-
-    private AbstractState(
-        StateNamespace namespace,
-        StateTag<?, ? extends State> address,
-        Coder<T> coder) {
-      this.namespace = namespace;
-      this.address = address;
-      this.coder = coder;
-    }
-
-    protected T readValue() {
-      T value = null;
-      byte[] buf = stateTable.get(namespace.stringKey(), address.getId());
-      if (buf != null) {
-        // TODO: reuse input
-        Input input = new Input(buf);
-        try {
-          return coder.decode(input, Context.OUTER);
-        } catch (IOException e) {
-          throw new RuntimeException(e);
-        }
-      }
-      return value;
-    }
-
-    public void writeValue(T input) {
-      ByteArrayOutputStream output = new ByteArrayOutputStream();
-      try {
-        coder.encode(input, output, Context.OUTER);
-        stateTable.put(namespace.stringKey(), address.getId(), output.toByteArray());
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    public void clear() {
-      stateTable.remove(namespace.stringKey(), address.getId());
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-      @SuppressWarnings("unchecked")
-      AbstractState<?> that = (AbstractState<?>) o;
-      return namespace.equals(that.namespace) && address.equals(that.address);
-    }
-
-    @Override
-    public int hashCode() {
-      int result = namespace.hashCode();
-      result = 31 * result + address.hashCode();
-      return result;
-    }
-  }
-
-  private class ApexValueState<T> extends AbstractState<T> implements ValueState<T> {
-
-    private ApexValueState(
-        StateNamespace namespace,
-        StateTag<?, ValueState<T>> address,
-        Coder<T> coder) {
-      super(namespace, address, coder);
-    }
-
-    @Override
-    public ApexValueState<T> readLater() {
-      return this;
-    }
-
-    @Override
-    public T read() {
-      return readValue();
-    }
-
-    @Override
-    public void write(T input) {
-      writeValue(input);
-    }
-  }
-
-  private final class ApexWatermarkHoldState<W extends BoundedWindow>
-      extends AbstractState<Instant> implements WatermarkHoldState<W> {
-
-    private final OutputTimeFn<? super W> outputTimeFn;
-
-    public ApexWatermarkHoldState(
-        StateNamespace namespace,
-        StateTag<?, WatermarkHoldState<W>> address,
-        OutputTimeFn<? super W> outputTimeFn) {
-      super(namespace, address, InstantCoder.of());
-      this.outputTimeFn = outputTimeFn;
-    }
-
-    @Override
-    public ApexWatermarkHoldState<W> readLater() {
-      return this;
-    }
-
-    @Override
-    public Instant read() {
-      return readValue();
-    }
-
-    @Override
-    public void add(Instant outputTime) {
-      Instant combined = read();
-      combined = (combined == null) ? outputTime : outputTimeFn.combine(combined, outputTime);
-      writeValue(combined);
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-      return new ReadableState<Boolean>() {
-        @Override
-        public ReadableState<Boolean> readLater() {
-          return this;
-        }
-        @Override
-        public Boolean read() {
-          return stateTable.get(namespace.stringKey(), address.getId()) == null;
-        }
-      };
-    }
-
-    @Override
-    public OutputTimeFn<? super W> getOutputTimeFn() {
-      return outputTimeFn;
-    }
-
-  }
-
-  private final class ApexAccumulatorCombiningState<K, InputT, AccumT, OutputT>
-      extends AbstractState<AccumT>
-      implements AccumulatorCombiningState<InputT, AccumT, OutputT> {
-    private final K key;
-    private final KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;
-
-    private ApexAccumulatorCombiningState(StateNamespace namespace,
-        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
-        Coder<AccumT> coder,
-        K key, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
-      super(namespace, address, coder);
-      this.key = key;
-      this.combineFn = combineFn;
-    }
-
-    @Override
-    public ApexAccumulatorCombiningState<K, InputT, AccumT, OutputT> readLater() {
-      return this;
-    }
-
-    @Override
-    public OutputT read() {
-      return combineFn.extractOutput(key, getAccum());
-    }
-
-    @Override
-    public void add(InputT input) {
-      AccumT accum = getAccum();
-      combineFn.addInput(key, accum, input);
-      writeValue(accum);
-    }
-
-    @Override
-    public AccumT getAccum() {
-      AccumT accum = readValue();
-      if (accum == null) {
-        accum = combineFn.createAccumulator(key);
-      }
-      return accum;
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-      return new ReadableState<Boolean>() {
-        @Override
-        public ReadableState<Boolean> readLater() {
-          return this;
-        }
-        @Override
-        public Boolean read() {
-          return stateTable.get(namespace.stringKey(), address.getId()) == null;
-        }
-      };
-    }
-
-    @Override
-    public void addAccum(AccumT accum) {
-      accum = combineFn.mergeAccumulators(key, Arrays.asList(getAccum(), accum));
-      writeValue(accum);
-    }
-
-    @Override
-    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
-      return combineFn.mergeAccumulators(key, accumulators);
-    }
-
-  }
-
-  private final class ApexBagState<T> extends AbstractState<List<T>> implements BagState<T> {
-    private ApexBagState(
-        StateNamespace namespace,
-        StateTag<?, BagState<T>> address,
-        Coder<T> coder) {
-      super(namespace, address, ListCoder.of(coder));
-    }
-
-    @Override
-    public ApexBagState<T> readLater() {
-      return this;
-    }
-
-    @Override
-    public List<T> read() {
-      List<T> value = super.readValue();
-      if (value == null) {
-        value = new ArrayList<T>();
-      }
-      return value;
-    }
-
-    @Override
-    public void add(T input) {
-      List<T> value = read();
-      value.add(input);
-      writeValue(value);
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-      return new ReadableState<Boolean>() {
-        @Override
-        public ReadableState<Boolean> readLater() {
-          return this;
-        }
-
-        @Override
-        public Boolean read() {
-          return stateTable.get(namespace.stringKey(), address.getId()) == null;
-        }
-      };
-    }
-  }
-
-  /**
-   * Factory for {@link ApexStateInternals}.
-   *
-   * @param <K>
-   */
-  public static class ApexStateInternalsFactory<K>
-      implements StateInternalsFactory<K>, Serializable {
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public StateInternals<K> stateInternalsForKey(K key) {
-      return ApexStateInternals.forKey(key);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
deleted file mode 100644
index 25518dc..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translators.utils;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.datatorrent.api.Operator;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StandardCoder;
-
-/**
- * The common interface for all objects transmitted through streams.
- *
- * @param <T> The actual payload type.
- */
-public interface ApexStreamTuple<T> {
-  /**
-   * Gets the value of the tuple.
-   *
-   * @return
-   */
-  T getValue();
-
-  /**
-   * Data tuple class.
-   *
-   * @param <T>
-   */
-  class DataTuple<T> implements ApexStreamTuple<T> {
-    private int unionTag;
-    private T value;
-
-    public static <T> DataTuple<T> of(T value) {
-      return new DataTuple<>(value, 0);
-    }
-
-    private DataTuple(T value, int unionTag) {
-      this.value = value;
-      this.unionTag = unionTag;
-    }
-
-    @Override
-    public T getValue() {
-      return value;
-    }
-
-    public void setValue(T value) {
-      this.value = value;
-    }
-
-    public int getUnionTag() {
-      return unionTag;
-    }
-
-    public void setUnionTag(int unionTag) {
-      this.unionTag = unionTag;
-    }
-
-    @Override
-    public String toString() {
-      return value.toString();
-    }
-
-  }
-
-  /**
-   * Tuple that includes a timestamp.
-   *
-   * @param <T>
-   */
-  class TimestampedTuple<T> extends DataTuple<T> {
-    private long timestamp;
-
-    public TimestampedTuple(long timestamp, T value) {
-      super(value, 0);
-      this.timestamp = timestamp;
-    }
-
-    public long getTimestamp() {
-      return timestamp;
-    }
-
-    public void setTimestamp(long timestamp) {
-      this.timestamp = timestamp;
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(timestamp);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (!(obj instanceof TimestampedTuple)) {
-        return false;
-      } else {
-        TimestampedTuple<?> other = (TimestampedTuple<?>) obj;
-        return (timestamp == other.timestamp) && Objects.equals(this.getValue(), other.getValue());
-      }
-    }
-
-  }
-
-  /**
-   * Tuple that represents a watermark.
-   *
-   * @param <T>
-   */
-  class WatermarkTuple<T> extends TimestampedTuple<T> {
-    public static <T> WatermarkTuple<T> of(long timestamp) {
-      return new WatermarkTuple<>(timestamp);
-    }
-
-    protected WatermarkTuple(long timestamp) {
-      super(timestamp, null);
-    }
-
-    @Override
-    public String toString() {
-      return "[Watermark " + getTimestamp() + "]";
-    }
-  }
-
-  /**
-   * Coder for {@link ApexStreamTuple}.
-   */
-  class ApexStreamTupleCoder<T> extends StandardCoder<ApexStreamTuple<T>> {
-    private static final long serialVersionUID = 1L;
-    final Coder<T> valueCoder;
-
-    public static <T> ApexStreamTupleCoder<T> of(Coder<T> valueCoder) {
-      return new ApexStreamTupleCoder<>(valueCoder);
-    }
-
-    protected ApexStreamTupleCoder(Coder<T> valueCoder) {
-      this.valueCoder = checkNotNull(valueCoder);
-    }
-
-    @Override
-    public void encode(ApexStreamTuple<T> value, OutputStream outStream, Context context)
-        throws CoderException, IOException {
-      if (value instanceof WatermarkTuple) {
-        outStream.write(1);
-        new DataOutputStream(outStream).writeLong(((WatermarkTuple<?>) value).getTimestamp());
-      } else {
-        outStream.write(0);
-        outStream.write(((DataTuple<?>) value).unionTag);
-        valueCoder.encode(value.getValue(), outStream, context);
-      }
-    }
-
-    @Override
-    public ApexStreamTuple<T> decode(InputStream inStream, Context context)
-        throws CoderException, IOException {
-      int b = inStream.read();
-      if (b == 1) {
-        return new WatermarkTuple<>(new DataInputStream(inStream).readLong());
-      } else {
-        int unionTag = inStream.read();
-        return new DataTuple<>(valueCoder.decode(inStream, context), unionTag);
-      }
-    }
-
-    @Override
-    public List<? extends Coder<?>> getCoderArguments() {
-      return Arrays.<Coder<?>>asList(valueCoder);
-    }
-
-    @Override
-    public void verifyDeterministic() throws NonDeterministicException {
-      verifyDeterministic(
-          this.getClass().getSimpleName() + " requires a deterministic valueCoder",
-          valueCoder);
-    }
-
-    /**
-     * Returns the value coder.
-     */
-    public Coder<T> getValueCoder() {
-      return valueCoder;
-    }
-
-  }
-
-  /**
-   * Central if data tuples received on and emitted from ports should be logged.
-   * Should be called in setup and value cached in operator.
-   */
-  final class Logging {
-    public static boolean isDebugEnabled(ApexPipelineOptions options, Operator operator) {
-      return options.isTupleTracingEnabled();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/CoderAdapterStreamCodec.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/CoderAdapterStreamCodec.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/CoderAdapterStreamCodec.java
deleted file mode 100644
index 61e3b83..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/CoderAdapterStreamCodec.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translators.utils;
-
-import com.datatorrent.api.StreamCodec;
-import com.datatorrent.netlet.util.Slice;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.Coder.Context;
-
-/**
- * The Apex {@link StreamCodec} adapter for using Beam {@link Coder}.
- */
-public class CoderAdapterStreamCodec implements StreamCodec<Object>, Serializable {
-  private static final long serialVersionUID = 1L;
-  private final Coder<? super Object> coder;
-
-  public CoderAdapterStreamCodec(Coder<? super Object> coder) {
-    this.coder = coder;
-  }
-
-  @Override
-  public Object fromByteArray(Slice fragment) {
-    ByteArrayInputStream bis = new ByteArrayInputStream(fragment.buffer, fragment.offset,
-        fragment.length);
-    try {
-      return coder.decode(bis, Context.OUTER);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public Slice toByteArray(Object wv) {
-    ByteArrayOutputStream bos = new ByteArrayOutputStream();
-    try {
-      coder.encode(wv, bos, Context.OUTER);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    return new Slice(bos.toByteArray());
-  }
-
-  @Override
-  public int getPartition(Object o) {
-    return o.hashCode();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpStepContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpStepContext.java
deleted file mode 100644
index 3b19c37..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpStepContext.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translators.utils;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.ExecutionContext;
-import org.apache.beam.sdk.util.TimerInternals;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.values.TupleTag;
-
-/**
- * Serializable {@link ExecutionContext.StepContext} that does nothing.
- */
-public class NoOpStepContext implements ExecutionContext.StepContext, Serializable {
-  private static final long serialVersionUID = 1L;
-
-  @Override
-  public String getStepName() {
-    return null;
-  }
-
-  @Override
-  public String getTransformName() {
-    return null;
-  }
-
-  @Override
-  public void noteOutput(WindowedValue<?> output) {
-  }
-
-  @Override
-  public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) {
-  }
-
-  @Override
-  public <T, W extends BoundedWindow> void writePCollectionViewData(TupleTag<?> tag,
-      Iterable<WindowedValue<T>> data,
-      Coder<Iterable<WindowedValue<T>>> dataCoder, W window, Coder<W> windowCoder) throws
-      IOException {
-
-  }
-
-  @Override
-  public StateInternals<?> stateInternals() {
-    return null;
-  }
-
-  @Override
-  public TimerInternals timerInternals() {
-    return null;
-  }
-}