You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/12 02:28:49 UTC

[31/39] incubator-beam git commit: BEAM-261 Make translators package private.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/SerializablePipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/SerializablePipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/SerializablePipelineOptions.java
deleted file mode 100644
index d32b869..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/SerializablePipelineOptions.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translators.utils;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-/**
- * A wrapper to enable serialization of {@link PipelineOptions}.
- */
-public class SerializablePipelineOptions implements Externalizable {
-
-  private transient ApexPipelineOptions pipelineOptions;
-
-  public SerializablePipelineOptions(ApexPipelineOptions pipelineOptions) {
-    this.pipelineOptions = pipelineOptions;
-  }
-
-  public SerializablePipelineOptions() {
-  }
-
-  public ApexPipelineOptions get() {
-    return this.pipelineOptions;
-  }
-
-  @Override
-  public void writeExternal(ObjectOutput out) throws IOException {
-    out.writeUTF(new ObjectMapper().writeValueAsString(pipelineOptions));
-  }
-
-  @Override
-  public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-    String s = in.readUTF();
-    this.pipelineOptions = new ObjectMapper().readValue(s, PipelineOptions.class)
-        .as(ApexPipelineOptions.class);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ValueAndCoderKryoSerializable.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ValueAndCoderKryoSerializable.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ValueAndCoderKryoSerializable.java
deleted file mode 100644
index c06c500..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ValueAndCoderKryoSerializable.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translators.utils;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.KryoSerializable;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
-
-import java.io.IOException;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.Coder.Context;
-
-
-/**
- * A {@link KryoSerializable} holder that uses the specified {@link Coder}.
- * @param <T>
- */
-public class ValueAndCoderKryoSerializable<T> implements KryoSerializable {
-  private static final JavaSerializer JAVA_SERIALIZER = new JavaSerializer();
-  private T value;
-  private Coder<T> coder;
-
-  public ValueAndCoderKryoSerializable(T value, Coder<T> coder) {
-    this.value = value;
-    this.coder = coder;
-  }
-
-  @SuppressWarnings("unused") // for Kryo
-  private ValueAndCoderKryoSerializable() {
-  }
-
-  public T get() {
-    return value;
-  }
-
-  @Override
-  public void write(Kryo kryo, Output output) {
-    try {
-      kryo.writeClass(output, coder.getClass());
-      kryo.writeObject(output, coder, JAVA_SERIALIZER);
-      coder.encode(value, output, Context.OUTER);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public void read(Kryo kryo, Input input) {
-    try {
-      @SuppressWarnings("unchecked")
-      Class<Coder<T>> type = kryo.readClass(input).getType();
-      coder = kryo.readObject(input, type, JAVA_SERIALIZER);
-      value = coder.decode(input, Context.OUTER);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/package-info.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/package-info.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/package-info.java
deleted file mode 100644
index 4aeba35..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Implementation of the Beam runner for Apache Apex.
- */
-package org.apache.beam.runners.apex.translators.utils;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java
new file mode 100644
index 0000000..c0ddb83
--- /dev/null
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translation;
+
+import com.datatorrent.api.Sink;
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.TestApexRunner;
+import org.apache.beam.runners.apex.translation.operators.ApexGroupByKeyOperator;
+import org.apache.beam.runners.apex.translation.utils.ApexStateInternals;
+import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link ApexGroupByKeyOperator}.
+ */
+public class ApexGroupByKeyOperatorTest {
+
+  @Test
+  public void testGlobalWindowMinTimestamp() throws Exception {
+    ApexPipelineOptions options = PipelineOptionsFactory.create()
+        .as(ApexPipelineOptions.class);
+    options.setRunner(TestApexRunner.class);
+    Pipeline pipeline = Pipeline.create(options);
+
+    WindowingStrategy<?, ?> ws = WindowingStrategy.of(FixedWindows.of(
+        Duration.standardSeconds(10)));
+    PCollection<KV<String, Integer>> input = PCollection.createPrimitiveOutputInternal(pipeline,
+        ws, IsBounded.BOUNDED);
+    input.setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
+
+    ApexGroupByKeyOperator<String, Integer> operator = new ApexGroupByKeyOperator<>(options,
+        input, new ApexStateInternals.ApexStateInternalsFactory<String>()
+        );
+
+    final List<Object> results = Lists.newArrayList();
+    Sink<Object> sink =  new Sink<Object>() {
+      @Override
+      public void put(Object tuple) {
+        results.add(tuple);
+      }
+      @Override
+      public int getCount(boolean reset) {
+        return 0;
+      }
+    };
+    operator.output.setSink(sink);
+    operator.setup(null);
+    operator.beginWindow(1);
+
+    Instant windowStart = BoundedWindow.TIMESTAMP_MIN_VALUE;
+    BoundedWindow window = new IntervalWindow(windowStart, windowStart.plus(10000));
+    PaneInfo paneInfo = PaneInfo.NO_FIRING;
+
+    WindowedValue<KV<String, Integer>> wv1 =
+        WindowedValue.of(KV.of("foo", 1), windowStart, window, paneInfo);
+    operator.input.process(ApexStreamTuple.DataTuple.of(wv1));
+
+    WindowedValue<KV<String, Integer>> wv2 =
+        WindowedValue.of(KV.of("foo", 1), windowStart, window, paneInfo);
+    operator.input.process(ApexStreamTuple.DataTuple.of(wv2));
+
+    ApexStreamTuple<WindowedValue<KV<String, Integer>>> watermark =
+        ApexStreamTuple.WatermarkTuple.of(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+
+    Assert.assertEquals("number outputs", 0, results.size());
+    operator.input.process(watermark);
+    Assert.assertEquals("number outputs", 2, results.size());
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    ApexStreamTuple.DataTuple<WindowedValue<KV<String, Iterable<Integer>>>> dataTuple =
+        (ApexStreamTuple.DataTuple) results.get(0);
+    List<Integer> counts = Lists.newArrayList(1, 1);
+    Assert.assertEquals("iterable", KV.of("foo", counts), dataTuple.getValue().getValue());
+    Assert.assertEquals("expected watermark", watermark, results.get(1));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java
new file mode 100644
index 0000000..6b62a58
--- /dev/null
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translation;
+
+import com.google.common.collect.Sets;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.ApexRunner;
+import org.apache.beam.runners.apex.ApexRunnerResult;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration test for {@link FlattenPCollectionTranslator}.
+ */
+public class FlattenPCollectionTranslatorTest {
+  private static final Logger LOG = LoggerFactory.getLogger(FlattenPCollectionTranslatorTest.class);
+
+  @Test
+  public void test() throws Exception {
+    ApexPipelineOptions options = PipelineOptionsFactory.as(ApexPipelineOptions.class);
+    options.setApplicationName("FlattenPCollection");
+    options.setRunner(ApexRunner.class);
+    Pipeline p = Pipeline.create(options);
+
+    String[][] collections = {
+        {"1"}, {"2"}, {"3"}, {"4"}, {"5"}
+    };
+
+    Set<String> expected = Sets.newHashSet();
+    List<PCollection<String>> pcList = new ArrayList<PCollection<String>>();
+    for (String[] collection : collections) {
+      pcList.add(p.apply(Create.of(collection).withCoder(StringUtf8Coder.of())));
+      expected.addAll(Arrays.asList(collection));
+    }
+
+    PCollection<String> actual = PCollectionList.of(pcList).apply(Flatten.<String>pCollections());
+    actual.apply(ParDo.of(new EmbeddedCollector()));
+
+    ApexRunnerResult result = (ApexRunnerResult) p.run();
+    // TODO: verify translation
+    result.getApexDAG();
+    long timeout = System.currentTimeMillis() + 30000;
+    while (System.currentTimeMillis() < timeout
+        && EmbeddedCollector.RESULTS.size() < expected.size()) {
+      LOG.info("Waiting for expected results.");
+      Thread.sleep(500);
+    }
+
+    Assert.assertEquals("number results", expected.size(), EmbeddedCollector.RESULTS.size());
+    Assert.assertEquals(expected, Sets.newHashSet(EmbeddedCollector.RESULTS));
+  }
+
+  @SuppressWarnings("serial")
+  private static class EmbeddedCollector extends OldDoFn<Object, Void> {
+    protected static final ArrayList<Object> RESULTS = new ArrayList<>();
+
+    public EmbeddedCollector() {
+    }
+
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      RESULTS.add(c.element());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
new file mode 100644
index 0000000..d627cd9
--- /dev/null
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translation;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import javax.annotation.Nullable;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.ApexRunner;
+import org.apache.beam.runners.apex.ApexRunnerResult;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Integration test for {@link GroupByKeyTranslator}.
+ */
+public class GroupByKeyTranslatorTest {
+
+  @SuppressWarnings({"unchecked"})
+  @Test
+  public void test() throws Exception {
+    ApexPipelineOptions options =
+        PipelineOptionsFactory.as(ApexPipelineOptions.class);
+    options.setApplicationName("GroupByKey");
+    options.setRunner(ApexRunner.class);
+    Pipeline p = Pipeline.create(options);
+
+    List<KV<String, Instant>> data =
+        Lists.newArrayList(
+            KV.of("foo", new Instant(1000)), KV.of("foo", new Instant(1000)),
+            KV.of("foo", new Instant(2000)),
+            KV.of("bar", new Instant(1000)), KV.of("bar", new Instant(2000)),
+            KV.of("bar", new Instant(2000))
+        );
+
+    // expected results assume outputAtLatestInputTimestamp
+    List<KV<Instant, KV<String, Long>>> expected =
+        Lists.newArrayList(
+            KV.of(new Instant(1000), KV.of("foo", 2L)),
+            KV.of(new Instant(1000), KV.of("bar", 1L)),
+            KV.of(new Instant(2000), KV.of("foo", 1L)),
+            KV.of(new Instant(2000), KV.of("bar", 2L))
+        );
+
+    p.apply(Read.from(new TestSource(data, new Instant(5000))))
+        .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))
+            .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp()))
+        .apply(Count.<String>perElement())
+        .apply(ParDo.of(new KeyedByTimestamp<KV<String, Long>>()))
+        .apply(ParDo.of(new EmbeddedCollector()))
+        ;
+
+    ApexRunnerResult result = (ApexRunnerResult) p.run();
+    result.getApexDAG();
+
+    long timeout = System.currentTimeMillis() + 30000;
+    while (System.currentTimeMillis() < timeout) {
+      if (EmbeddedCollector.RESULTS.containsAll(expected)) {
+        break;
+      }
+      Thread.sleep(1000);
+    }
+    Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS);
+
+  }
+
+  @SuppressWarnings("serial")
+  private static class EmbeddedCollector extends OldDoFn<Object, Void> {
+    protected static final HashSet<Object> RESULTS = new HashSet<>();
+
+    public EmbeddedCollector() {
+    }
+
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      RESULTS.add(c.element());
+    }
+  }
+
+  private static class KeyedByTimestamp<T> extends OldDoFn<T, KV<Instant, T>> {
+
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      c.output(KV.of(c.timestamp(), c.element()));
+    }
+  }
+
+  private static class TestSource extends UnboundedSource<String, UnboundedSource.CheckpointMark> {
+
+    private final List<KV<String, Instant>> data;
+    private final Instant watermark;
+
+    public TestSource(List<KV<String, Instant>> data, Instant watermark) {
+      this.data = data;
+      this.watermark = watermark;
+    }
+
+    @Override
+    public List<? extends UnboundedSource<String, CheckpointMark>> generateInitialSplits(
+        int desiredNumSplits, PipelineOptions options) throws Exception {
+      return Collections.<UnboundedSource<String, CheckpointMark>>singletonList(this);
+    }
+
+    @Override
+    public UnboundedReader<String> createReader(PipelineOptions options,
+        @Nullable CheckpointMark checkpointMark) {
+      return new TestReader(data, watermark, this);
+    }
+
+    @Nullable
+    @Override
+    public Coder<CheckpointMark> getCheckpointMarkCoder() {
+      return null;
+    }
+
+    @Override
+    public void validate() {
+    }
+
+    @Override
+    public Coder<String> getDefaultOutputCoder() {
+      return StringUtf8Coder.of();
+    }
+
+    private static class TestReader extends UnboundedReader<String> implements Serializable {
+
+      private static final long serialVersionUID = 7526472295622776147L;
+
+      private final List<KV<String, Instant>> data;
+      private final TestSource source;
+
+      private Iterator<KV<String, Instant>> iterator;
+      private String currentRecord;
+      private Instant currentTimestamp;
+      private Instant watermark;
+      private boolean collected;
+
+      public TestReader(List<KV<String, Instant>> data, Instant watermark, TestSource source) {
+        this.data = data;
+        this.source = source;
+        this.watermark = watermark;
+      }
+
+      @Override
+      public boolean start() throws IOException {
+        iterator = data.iterator();
+        return advance();
+      }
+
+      @Override
+      public boolean advance() throws IOException {
+        if (iterator.hasNext()) {
+          KV<String, Instant> kv = iterator.next();
+          collected = false;
+          currentRecord = kv.getKey();
+          currentTimestamp = kv.getValue();
+          return true;
+        } else {
+          return false;
+        }
+      }
+
+      @Override
+      public byte[] getCurrentRecordId() throws NoSuchElementException {
+        return new byte[0];
+      }
+
+      @Override
+      public String getCurrent() throws NoSuchElementException {
+        collected = true;
+        return this.currentRecord;
+      }
+
+      @Override
+      public Instant getCurrentTimestamp() throws NoSuchElementException {
+        return currentTimestamp;
+      }
+
+      @Override
+      public void close() throws IOException {
+      }
+
+      @Override
+      public Instant getWatermark() {
+        if (!iterator.hasNext() && collected) {
+          return watermark;
+        } else {
+          return new Instant(0);
+        }
+      }
+
+      @Override
+      public CheckpointMark getCheckpointMark() {
+        return null;
+      }
+
+      @Override
+      public UnboundedSource<String, ?> getCurrentSource() {
+        return this.source;
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java
new file mode 100644
index 0000000..2e86152
--- /dev/null
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translation;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Sink;
+import com.datatorrent.lib.util.KryoCloneUtils;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.ApexRunner;
+import org.apache.beam.runners.apex.ApexRunnerResult;
+import org.apache.beam.runners.apex.TestApexRunner;
+import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
+import org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator;
+import org.apache.beam.runners.apex.translation.utils.ApexStateInternals;
+import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * integration test for {@link ParDoBoundTranslator}.
+ */
+@RunWith(JUnit4.class)
+public class ParDoBoundTranslatorTest {
+  private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslatorTest.class);
+  private static final long SLEEP_MILLIS = 500;
+  private static final long TIMEOUT_MILLIS = 30000;
+
+  @Test
+  public void test() throws Exception {
+    ApexPipelineOptions options = PipelineOptionsFactory.create()
+        .as(ApexPipelineOptions.class);
+    options.setApplicationName("ParDoBound");
+    options.setRunner(ApexRunner.class);
+
+    Pipeline p = Pipeline.create(options);
+
+    List<Integer> collection = Lists.newArrayList(1, 2, 3, 4, 5);
+    List<Integer> expected = Lists.newArrayList(6, 7, 8, 9, 10);
+    p.apply(Create.of(collection).withCoder(SerializableCoder.of(Integer.class)))
+        .apply(ParDo.of(new Add(5)))
+        .apply(ParDo.of(new EmbeddedCollector()));
+
+    ApexRunnerResult result = (ApexRunnerResult) p.run();
+    DAG dag = result.getApexDAG();
+
+    DAG.OperatorMeta om = dag.getOperatorMeta("Create.Values");
+    Assert.assertNotNull(om);
+    Assert.assertEquals(om.getOperator().getClass(), ApexReadUnboundedInputOperator.class);
+
+    om = dag.getOperatorMeta("ParDo(Add)");
+    Assert.assertNotNull(om);
+    Assert.assertEquals(om.getOperator().getClass(), ApexParDoOperator.class);
+
+    long timeout = System.currentTimeMillis() + TIMEOUT_MILLIS;
+    while (System.currentTimeMillis() < timeout) {
+      if (EmbeddedCollector.RESULTS.containsAll(expected)) {
+        break;
+      }
+      LOG.info("Waiting for expected results.");
+      Thread.sleep(SLEEP_MILLIS);
+    }
+    Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS);
+  }
+
+  @SuppressWarnings("serial")
+  private static class Add extends OldDoFn<Integer, Integer> {
+    private Integer number;
+    private PCollectionView<Integer> sideInputView;
+
+    private Add(Integer number) {
+      this.number = number;
+    }
+
+    private Add(PCollectionView<Integer> sideInputView) {
+      this.sideInputView = sideInputView;
+    }
+
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      if (sideInputView != null) {
+        number = c.sideInput(sideInputView);
+      }
+      c.output(c.element() + number);
+    }
+  }
+
+  private static class EmbeddedCollector extends OldDoFn<Object, Void> {
+    private static final long serialVersionUID = 1L;
+    protected static final HashSet<Object> RESULTS = new HashSet<>();
+
+    public EmbeddedCollector() {
+      RESULTS.clear();
+    }
+
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      RESULTS.add(c.element());
+    }
+  }
+
+  private static Throwable runExpectingAssertionFailure(Pipeline pipeline) {
+    // We cannot use thrown.expect(AssertionError.class) because the AssertionError
+    // is first caught by JUnit and causes a test failure.
+    try {
+      pipeline.run();
+    } catch (AssertionError exc) {
+      return exc;
+    }
+    fail("assertion should have failed");
+    throw new RuntimeException("unreachable");
+  }
+
+  @Test
+  public void testAssertionFailure() throws Exception {
+    ApexPipelineOptions options = PipelineOptionsFactory.create()
+        .as(ApexPipelineOptions.class);
+    options.setRunner(TestApexRunner.class);
+    Pipeline pipeline = Pipeline.create(options);
+
+    PCollection<Integer> pcollection = pipeline
+        .apply(Create.of(1, 2, 3, 4));
+    PAssert.that(pcollection).containsInAnyOrder(2, 1, 4, 3, 7);
+
+    Throwable exc = runExpectingAssertionFailure(pipeline);
+    Pattern expectedPattern = Pattern.compile(
+        "Expected: iterable over \\[((<4>|<7>|<3>|<2>|<1>)(, )?){5}\\] in any order");
+    // A loose pattern, but should get the job done.
+    assertTrue(
+        "Expected error message from PAssert with substring matching "
+            + expectedPattern
+            + " but the message was \""
+            + exc.getMessage()
+            + "\"",
+        expectedPattern.matcher(exc.getMessage()).find());
+  }
+
+  @Test
+  public void testContainsInAnyOrder() throws Exception {
+    ApexPipelineOptions options = PipelineOptionsFactory.create().as(ApexPipelineOptions.class);
+    options.setRunner(TestApexRunner.class);
+    Pipeline pipeline = Pipeline.create(options);
+    PCollection<Integer> pcollection = pipeline.apply(Create.of(1, 2, 3, 4));
+    PAssert.that(pcollection).containsInAnyOrder(2, 1, 4, 3);
+    // TODO: terminate faster based on processed assertion vs. auto-shutdown
+    pipeline.run();
+  }
+
+  @Test
+  public void testSerialization() throws Exception {
+    ApexPipelineOptions options = PipelineOptionsFactory.create()
+        .as(ApexPipelineOptions.class);
+    options.setRunner(TestApexRunner.class);
+    Pipeline pipeline = Pipeline.create(options);
+    Coder<WindowedValue<Integer>> coder = WindowedValue.getValueOnlyCoder(VarIntCoder.of());
+
+    PCollectionView<Integer> singletonView = pipeline.apply(Create.of(1))
+            .apply(Sum.integersGlobally().asSingletonView());
+
+    ApexParDoOperator<Integer, Integer> operator = new ApexParDoOperator<>(options,
+        new Add(singletonView), new TupleTag<Integer>(), TupleTagList.empty().getAll(),
+        WindowingStrategy.globalDefault(),
+        Collections.<PCollectionView<?>>singletonList(singletonView),
+        coder,
+        new ApexStateInternals.ApexStateInternalsFactory<Void>()
+        );
+    operator.setup(null);
+    operator.beginWindow(0);
+    WindowedValue<Integer> wv1 = WindowedValue.valueInGlobalWindow(1);
+    WindowedValue<Iterable<?>> sideInput = WindowedValue.<Iterable<?>>valueInGlobalWindow(
+        Lists.<Integer>newArrayList(22));
+    operator.input.process(ApexStreamTuple.DataTuple.of(wv1)); // pushed back input
+
+    final List<Object> results = Lists.newArrayList();
+    Sink<Object> sink =  new Sink<Object>() {
+      @Override
+      public void put(Object tuple) {
+        results.add(tuple);
+      }
+      @Override
+      public int getCount(boolean reset) {
+        return 0;
+      }
+    };
+
+    // verify pushed back input checkpointing
+    Assert.assertNotNull("Serialization", operator = KryoCloneUtils.cloneObject(operator));
+    operator.output.setSink(sink);
+    operator.setup(null);
+    operator.beginWindow(1);
+    WindowedValue<Integer> wv2 = WindowedValue.valueInGlobalWindow(2);
+    operator.sideInput1.process(ApexStreamTuple.DataTuple.of(sideInput));
+    Assert.assertEquals("number outputs", 1, results.size());
+    Assert.assertEquals("result", WindowedValue.valueInGlobalWindow(23),
+        ((ApexStreamTuple.DataTuple) results.get(0)).getValue());
+
+    // verify side input checkpointing
+    results.clear();
+    Assert.assertNotNull("Serialization", operator = KryoCloneUtils.cloneObject(operator));
+    operator.output.setSink(sink);
+    operator.setup(null);
+    operator.beginWindow(2);
+    operator.input.process(ApexStreamTuple.DataTuple.of(wv2));
+    Assert.assertEquals("number outputs", 1, results.size());
+    Assert.assertEquals("result", WindowedValue.valueInGlobalWindow(24),
+        ((ApexStreamTuple.DataTuple) results.get(0)).getValue());
+  }
+
+  @Test
+  public void testMultiOutputParDoWithSideInputs() throws Exception {
+    ApexPipelineOptions options = PipelineOptionsFactory.create().as(ApexPipelineOptions.class);
+    options.setRunner(ApexRunner.class); // non-blocking run
+    Pipeline pipeline = Pipeline.create(options);
+
+    List<Integer> inputs = Arrays.asList(3, -42, 666);
+    final TupleTag<String> mainOutputTag = new TupleTag<>("main");
+    final TupleTag<Void> sideOutputTag = new TupleTag<>("sideOutput");
+
+    PCollectionView<Integer> sideInput1 = pipeline
+        .apply("CreateSideInput1", Create.of(11))
+        .apply("ViewSideInput1", View.<Integer>asSingleton());
+    PCollectionView<Integer> sideInputUnread = pipeline
+        .apply("CreateSideInputUnread", Create.of(-3333))
+        .apply("ViewSideInputUnread", View.<Integer>asSingleton());
+    PCollectionView<Integer> sideInput2 = pipeline
+        .apply("CreateSideInput2", Create.of(222))
+        .apply("ViewSideInput2", View.<Integer>asSingleton());
+
+    PCollectionTuple outputs = pipeline
+        .apply(Create.of(inputs))
+        .apply(ParDo.withSideInputs(sideInput1)
+            .withSideInputs(sideInputUnread)
+            .withSideInputs(sideInput2)
+            .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))
+            .of(new TestMultiOutputWithSideInputsFn(
+                Arrays.asList(sideInput1, sideInput2),
+                Arrays.<TupleTag<String>>asList())));
+
+     outputs.get(mainOutputTag).apply(ParDo.of(new EmbeddedCollector()));
+     ApexRunnerResult result = (ApexRunnerResult) pipeline.run();
+
+     HashSet<String> expected = Sets.newHashSet("processing: 3: [11, 222]",
+         "processing: -42: [11, 222]", "processing: 666: [11, 222]");
+     long timeout = System.currentTimeMillis() + TIMEOUT_MILLIS;
+     while (System.currentTimeMillis() < timeout) {
+       if (EmbeddedCollector.RESULTS.containsAll(expected)) {
+         break;
+       }
+       LOG.info("Waiting for expected results.");
+       Thread.sleep(SLEEP_MILLIS);
+     }
+     result.cancel();
+     Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS);
+  }
+
+  private static class TestMultiOutputWithSideInputsFn extends OldDoFn<Integer, String> {
+    private static final long serialVersionUID = 1L;
+
+    final List<PCollectionView<Integer>> sideInputViews = new ArrayList<>();
+    final List<TupleTag<String>> sideOutputTupleTags = new ArrayList<>();
+
+    public TestMultiOutputWithSideInputsFn(List<PCollectionView<Integer>> sideInputViews,
+        List<TupleTag<String>> sideOutputTupleTags) {
+      this.sideInputViews.addAll(sideInputViews);
+      this.sideOutputTupleTags.addAll(sideOutputTupleTags);
+    }
+
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      outputToAllWithSideInputs(c, "processing: " + c.element());
+    }
+
+    private void outputToAllWithSideInputs(ProcessContext c, String value) {
+      if (!sideInputViews.isEmpty()) {
+        List<Integer> sideInputValues = new ArrayList<>();
+        for (PCollectionView<Integer> sideInputView : sideInputViews) {
+          sideInputValues.add(c.sideInput(sideInputView));
+        }
+        value += ": " + sideInputValues;
+      }
+      c.output(value);
+      for (TupleTag<String> sideOutputTupleTag : sideOutputTupleTags) {
+        c.sideOutput(sideOutputTupleTag,
+                     sideOutputTupleTag.getId() + ": " + value);
+      }
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ReadUnboundTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ReadUnboundTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ReadUnboundTranslatorTest.java
new file mode 100644
index 0000000..96ba663
--- /dev/null
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ReadUnboundTranslatorTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translation;
+
+import com.datatorrent.api.DAG;
+import com.google.common.collect.ContiguousSet;
+import com.google.common.collect.DiscreteDomain;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.ApexRunner;
+import org.apache.beam.runners.apex.ApexRunnerResult;
+import org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator;
+import org.apache.beam.runners.apex.translation.utils.CollectionSource;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.CountingSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * integration test for {@link ReadUnboundedTranslator}.
+ */
+public class ReadUnboundTranslatorTest {
+  private static final Logger LOG = LoggerFactory.getLogger(ReadUnboundTranslatorTest.class);
+
+  @Test
+  public void test() throws Exception {
+    ApexPipelineOptions options = PipelineOptionsFactory.create()
+        .as(ApexPipelineOptions.class);
+    EmbeddedCollector.RESULTS.clear();
+    options.setApplicationName("ReadUnbound");
+    options.setRunner(ApexRunner.class);
+    Pipeline p = Pipeline.create(options);
+
+    List<String> collection = Lists.newArrayList("1", "2", "3", "4", "5");
+    CollectionSource<String> source = new CollectionSource<>(collection, StringUtf8Coder.of());
+    p.apply(Read.from(source))
+        .apply(ParDo.of(new EmbeddedCollector()));
+
+    ApexRunnerResult result = (ApexRunnerResult) p.run();
+    DAG dag = result.getApexDAG();
+    DAG.OperatorMeta om = dag.getOperatorMeta("Read(CollectionSource)");
+    Assert.assertNotNull(om);
+    Assert.assertEquals(om.getOperator().getClass(), ApexReadUnboundedInputOperator.class);
+
+    long timeout = System.currentTimeMillis() + 30000;
+    while (System.currentTimeMillis() < timeout) {
+      if (EmbeddedCollector.RESULTS.containsAll(collection)) {
+        break;
+      }
+      LOG.info("Waiting for expected results.");
+      Thread.sleep(1000);
+    }
+    Assert.assertEquals(Sets.newHashSet(collection), EmbeddedCollector.RESULTS);
+  }
+
+  @Test
+  public void testReadBounded() throws Exception {
+    ApexPipelineOptions options = PipelineOptionsFactory.create()
+        .as(ApexPipelineOptions.class);
+    EmbeddedCollector.RESULTS.clear();
+    options.setApplicationName("ReadBounded");
+    options.setRunner(ApexRunner.class);
+    Pipeline p = Pipeline.create(options);
+
+    Set<Long> expected = ContiguousSet.create(Range.closedOpen(0L, 10L), DiscreteDomain.longs());
+    p.apply(Read.from(CountingSource.upTo(10)))
+        .apply(ParDo.of(new EmbeddedCollector()));
+
+    ApexRunnerResult result = (ApexRunnerResult) p.run();
+    DAG dag = result.getApexDAG();
+    DAG.OperatorMeta om = dag.getOperatorMeta("Read(BoundedCountingSource)");
+    Assert.assertNotNull(om);
+    Assert.assertEquals(om.getOperator().getClass(), ApexReadUnboundedInputOperator.class);
+
+    long timeout = System.currentTimeMillis() + 30000;
+    while (System.currentTimeMillis() < timeout) {
+      if (EmbeddedCollector.RESULTS.containsAll(expected)) {
+        break;
+      }
+      LOG.info("Waiting for expected results.");
+      Thread.sleep(1000);
+    }
+    Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS);
+  }
+
+  @SuppressWarnings("serial")
+  private static class EmbeddedCollector extends OldDoFn<Object, Void> {
+    protected static final HashSet<Object> RESULTS = new HashSet<>();
+
+    public EmbeddedCollector() {
+    }
+
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      RESULTS.add(c.element());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
new file mode 100644
index 0000000..1801358
--- /dev/null
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translation.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+import com.datatorrent.lib.util.KryoCloneUtils;
+
+import java.util.Arrays;
+
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.StateMerging;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.StateNamespaceForTest;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.apache.beam.sdk.util.state.WatermarkHoldState;
+import org.hamcrest.Matchers;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests for {@link ApexStateInternals}. This is based on the tests for
+ * {@code InMemoryStateInternals}.
+ */
+public class ApexStateInternalsTest {
+  private static final BoundedWindow WINDOW_1 = new IntervalWindow(new Instant(0), new Instant(10));
+  private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1");
+  private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2");
+  private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3");
+
+  private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
+      StateTags.value("stringValue", StringUtf8Coder.of());
+  private static final StateTag<Object, AccumulatorCombiningState<Integer, int[], Integer>>
+      SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
+          "sumInteger", VarIntCoder.of(), new Sum.SumIntegerFn());
+  private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
+      StateTags.bag("stringBag", StringUtf8Coder.of());
+  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
+      WATERMARK_EARLIEST_ADDR =
+      StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp());
+  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
+      WATERMARK_LATEST_ADDR =
+      StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtLatestInputTimestamp());
+  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_EOW_ADDR =
+      StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEndOfWindow());
+
+  private ApexStateInternals<String> underTest;
+
+  @Before
+  public void initStateInternals() {
+    underTest = new ApexStateInternals<>(null);
+  }
+
+  @Test
+  public void testBag() throws Exception {
+    BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+
+    assertEquals(value, underTest.state(NAMESPACE_1, STRING_BAG_ADDR));
+    assertFalse(value.equals(underTest.state(NAMESPACE_2, STRING_BAG_ADDR)));
+
+    assertThat(value.read(), Matchers.emptyIterable());
+    value.add("hello");
+    assertThat(value.read(), Matchers.containsInAnyOrder("hello"));
+
+    value.add("world");
+    assertThat(value.read(), Matchers.containsInAnyOrder("hello", "world"));
+
+    value.clear();
+    assertThat(value.read(), Matchers.emptyIterable());
+    assertEquals(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), value);
+
+  }
+
+  @Test
+  public void testBagIsEmpty() throws Exception {
+    BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+
+    assertThat(value.isEmpty().read(), Matchers.is(true));
+    ReadableState<Boolean> readFuture = value.isEmpty();
+    value.add("hello");
+    assertThat(readFuture.read(), Matchers.is(false));
+
+    value.clear();
+    assertThat(readFuture.read(), Matchers.is(true));
+  }
+
+  @Test
+  public void testMergeBagIntoSource() throws Exception {
+    BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+    BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
+
+    bag1.add("Hello");
+    bag2.add("World");
+    bag1.add("!");
+
+    StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1);
+
+    // Reading the merged bag gets both the contents
+    assertThat(bag1.read(), Matchers.containsInAnyOrder("Hello", "World", "!"));
+    assertThat(bag2.read(), Matchers.emptyIterable());
+  }
+
+  @Test
+  public void testMergeBagIntoNewNamespace() throws Exception {
+    BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+    BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
+    BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR);
+
+    bag1.add("Hello");
+    bag2.add("World");
+    bag1.add("!");
+
+    StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3);
+
+    // Reading the merged bag gets both the contents
+    assertThat(bag3.read(), Matchers.containsInAnyOrder("Hello", "World", "!"));
+    assertThat(bag1.read(), Matchers.emptyIterable());
+    assertThat(bag2.read(), Matchers.emptyIterable());
+  }
+
+  @Test
+  public void testCombiningValue() throws Exception {
+    CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+
+    // State instances are cached, but depend on the namespace.
+    assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
+    assertFalse(value.equals(underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR)));
+
+    assertThat(value.read(), Matchers.equalTo(0));
+    value.add(2);
+    assertThat(value.read(), Matchers.equalTo(2));
+
+    value.add(3);
+    assertThat(value.read(), Matchers.equalTo(5));
+
+    value.clear();
+    assertThat(value.read(), Matchers.equalTo(0));
+    assertEquals(underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), value);
+  }
+
+  @Test
+  public void testCombiningIsEmpty() throws Exception {
+    CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+
+    assertThat(value.isEmpty().read(), Matchers.is(true));
+    ReadableState<Boolean> readFuture = value.isEmpty();
+    value.add(5);
+    assertThat(readFuture.read(), Matchers.is(false));
+
+    value.clear();
+    assertThat(readFuture.read(), Matchers.is(true));
+  }
+
+  @Test
+  public void testMergeCombiningValueIntoSource() throws Exception {
+    AccumulatorCombiningState<Integer, int[], Integer> value1 =
+        underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+    AccumulatorCombiningState<Integer, int[], Integer> value2 =
+        underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
+
+    value1.add(5);
+    value2.add(10);
+    value1.add(6);
+
+    assertThat(value1.read(), Matchers.equalTo(11));
+    assertThat(value2.read(), Matchers.equalTo(10));
+
+    // Merging clears the old values and updates the result value.
+    StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1);
+
+    assertThat(value1.read(), Matchers.equalTo(21));
+    assertThat(value2.read(), Matchers.equalTo(0));
+  }
+
+  @Test
+  public void testMergeCombiningValueIntoNewNamespace() throws Exception {
+    AccumulatorCombiningState<Integer, int[], Integer> value1 =
+        underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+    AccumulatorCombiningState<Integer, int[], Integer> value2 =
+        underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
+    AccumulatorCombiningState<Integer, int[], Integer> value3 =
+        underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR);
+
+    value1.add(5);
+    value2.add(10);
+    value1.add(6);
+
+    StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3);
+
+    // Merging clears the old values and updates the result value.
+    assertThat(value1.read(), Matchers.equalTo(0));
+    assertThat(value2.read(), Matchers.equalTo(0));
+    assertThat(value3.read(), Matchers.equalTo(21));
+  }
+
+  @Test
+  public void testWatermarkEarliestState() throws Exception {
+    WatermarkHoldState<BoundedWindow> value =
+        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
+
+    // State instances are cached, but depend on the namespace.
+    assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR));
+    assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR)));
+
+    assertThat(value.read(), Matchers.nullValue());
+    value.add(new Instant(2000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
+
+    value.add(new Instant(3000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
+
+    value.add(new Instant(1000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(1000)));
+
+    value.clear();
+    assertThat(value.read(), Matchers.equalTo(null));
+    assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR), value);
+  }
+
+  @Test
+  public void testWatermarkLatestState() throws Exception {
+    WatermarkHoldState<BoundedWindow> value =
+        underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
+
+    // State instances are cached, but depend on the namespace.
+    assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR));
+    assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR)));
+
+    assertThat(value.read(), Matchers.nullValue());
+    value.add(new Instant(2000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
+
+    value.add(new Instant(3000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(3000)));
+
+    value.add(new Instant(1000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(3000)));
+
+    value.clear();
+    assertThat(value.read(), Matchers.equalTo(null));
+    assertEquals(underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR), value);
+  }
+
+  @Test
+  public void testWatermarkEndOfWindowState() throws Exception {
+    WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
+
+    // State instances are cached, but depend on the namespace.
+    assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
+    assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EOW_ADDR)));
+
+    assertThat(value.read(), Matchers.nullValue());
+    value.add(new Instant(2000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
+
+    value.clear();
+    assertThat(value.read(), Matchers.equalTo(null));
+    assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR), value);
+  }
+
+  @Test
+  public void testWatermarkStateIsEmpty() throws Exception {
+    WatermarkHoldState<BoundedWindow> value =
+        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
+
+    assertThat(value.isEmpty().read(), Matchers.is(true));
+    ReadableState<Boolean> readFuture = value.isEmpty();
+    value.add(new Instant(1000));
+    assertThat(readFuture.read(), Matchers.is(false));
+
+    value.clear();
+    assertThat(readFuture.read(), Matchers.is(true));
+  }
+
+  @Test
+  public void testMergeEarliestWatermarkIntoSource() throws Exception {
+    WatermarkHoldState<BoundedWindow> value1 =
+        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
+    WatermarkHoldState<BoundedWindow> value2 =
+        underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR);
+
+    value1.add(new Instant(3000));
+    value2.add(new Instant(5000));
+    value1.add(new Instant(4000));
+    value2.add(new Instant(2000));
+
+    // Merging clears the old values and updates the merged value.
+    StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value1, WINDOW_1);
+
+    assertThat(value1.read(), Matchers.equalTo(new Instant(2000)));
+    assertThat(value2.read(), Matchers.equalTo(null));
+  }
+
+  @Test
+  public void testMergeLatestWatermarkIntoSource() throws Exception {
+    WatermarkHoldState<BoundedWindow> value1 =
+        underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
+    WatermarkHoldState<BoundedWindow> value2 =
+        underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR);
+    WatermarkHoldState<BoundedWindow> value3 =
+        underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR);
+
+    value1.add(new Instant(3000));
+    value2.add(new Instant(5000));
+    value1.add(new Instant(4000));
+    value2.add(new Instant(2000));
+
+    // Merging clears the old values and updates the result value.
+    StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value3, WINDOW_1);
+
+    // Merging clears the old values and updates the result value.
+    assertThat(value3.read(), Matchers.equalTo(new Instant(5000)));
+    assertThat(value1.read(), Matchers.equalTo(null));
+    assertThat(value2.read(), Matchers.equalTo(null));
+  }
+
+  @Test
+  public void testSerialization() throws Exception {
+    ApexStateInternals<String> original = new ApexStateInternals<String>(null);
+    ValueState<String> value = original.state(NAMESPACE_1, STRING_VALUE_ADDR);
+    assertEquals(original.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
+    value.write("hello");
+
+    ApexStateInternals<String> cloned;
+    assertNotNull("Serialization", cloned = KryoCloneUtils.cloneObject(original));
+    ValueState<String> clonedValue = cloned.state(NAMESPACE_1, STRING_VALUE_ADDR);
+    assertThat(clonedValue.read(), Matchers.equalTo("hello"));
+    assertEquals(cloned.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java
new file mode 100644
index 0000000..c3b35f9
--- /dev/null
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translation.utils;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.joda.time.Instant;
+
+/**
+ * collection as {@link UnboundedSource}, used for tests.
+ */
+public class CollectionSource<T> extends UnboundedSource<T, UnboundedSource.CheckpointMark> {
+  private static final long serialVersionUID = 1L;
+  private final Collection<T> collection;
+  private final Coder<T> coder;
+
+  public CollectionSource(Collection<T> collection, Coder<T> coder) {
+    this.collection = collection;
+    this.coder = coder;
+  }
+
+  @Override
+  public List<? extends UnboundedSource<T, CheckpointMark>> generateInitialSplits(
+      int desiredNumSplits, PipelineOptions options) throws Exception {
+    return Collections.singletonList(this);
+  }
+
+  @Override
+  public UnboundedReader<T> createReader(PipelineOptions options,
+      @Nullable UnboundedSource.CheckpointMark checkpointMark) {
+    return new CollectionReader<>(collection, this);
+  }
+
+  @Nullable
+  @Override
+  public Coder<CheckpointMark> getCheckpointMarkCoder() {
+    return null;
+  }
+
+  @Override
+  public void validate() {
+  }
+
+  @Override
+  public Coder<T> getDefaultOutputCoder() {
+    return coder;
+  }
+
+  private static class CollectionReader<T> extends UnboundedSource.UnboundedReader<T>
+      implements Serializable {
+
+    private T current;
+    private final CollectionSource<T> source;
+    private final Collection<T> collection;
+    private Iterator<T> iterator;
+
+    public CollectionReader(Collection<T> collection, CollectionSource<T> source) {
+      this.collection = collection;
+      this.source = source;
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      if (null == iterator) {
+        iterator = collection.iterator();
+      }
+      return advance();
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      if (iterator.hasNext()) {
+        current = iterator.next();
+        return true;
+      } else {
+        return false;
+      }
+    }
+
+    @Override
+    public Instant getWatermark() {
+      return Instant.now();
+    }
+
+    @Override
+    public UnboundedSource.CheckpointMark getCheckpointMark() {
+      return null;
+    }
+
+    @Override
+    public UnboundedSource<T, ?> getCurrentSource() {
+      return source;
+    }
+
+    @Override
+    public T getCurrent() throws NoSuchElementException {
+      return current;
+    }
+
+    @Override
+    public Instant getCurrentTimestamp() throws NoSuchElementException {
+      return Instant.now();
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/PipelineOptionsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/PipelineOptionsTest.java
new file mode 100644
index 0000000..d5eb9a9
--- /dev/null
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/PipelineOptionsTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translation.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import com.datatorrent.common.util.FSStorageAgent;
+import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests the serialization of PipelineOptions.
+ */
+public class PipelineOptionsTest {
+
+  /**
+   * Interface for testing.
+   */
+  public interface MyOptions extends ApexPipelineOptions {
+    @Description("Bla bla bla")
+    @Default.String("Hello")
+    String getTestOption();
+    void setTestOption(String value);
+  }
+
+  private static class MyOptionsWrapper {
+    private MyOptionsWrapper() {
+      this(null); // required for Kryo
+    }
+    private MyOptionsWrapper(ApexPipelineOptions options) {
+      this.options = new SerializablePipelineOptions(options);
+    }
+    @Bind(JavaSerializer.class)
+    private final SerializablePipelineOptions options;
+  }
+
+  private static MyOptions options;
+
+  private static final String[] args = new String[]{"--testOption=nothing"};
+
+  @BeforeClass
+  public static void beforeTest() {
+    options = PipelineOptionsFactory.fromArgs(args).as(MyOptions.class);
+  }
+
+  @Test
+  public void testSerialization() {
+    MyOptionsWrapper wrapper = new MyOptionsWrapper(PipelineOptionsTest.options);
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    FSStorageAgent.store(bos, wrapper);
+
+    ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
+    MyOptionsWrapper wrapperCopy = (MyOptionsWrapper) FSStorageAgent.retrieve(bis);
+    assertNotNull(wrapperCopy.options);
+    assertEquals("nothing", wrapperCopy.options.get().as(MyOptions.class).getTestOption());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ApexGroupByKeyOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ApexGroupByKeyOperatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ApexGroupByKeyOperatorTest.java
deleted file mode 100644
index 3e8d575..0000000
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ApexGroupByKeyOperatorTest.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translators;
-
-import com.datatorrent.api.Sink;
-import com.google.common.collect.Lists;
-
-import java.util.List;
-
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.runners.apex.TestApexRunner;
-import org.apache.beam.runners.apex.translators.functions.ApexGroupByKeyOperator;
-import org.apache.beam.runners.apex.translators.utils.ApexStateInternals;
-import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test for {@link ApexGroupByKeyOperator}.
- */
-public class ApexGroupByKeyOperatorTest {
-
-  @Test
-  public void testGlobalWindowMinTimestamp() throws Exception {
-    ApexPipelineOptions options = PipelineOptionsFactory.create()
-        .as(ApexPipelineOptions.class);
-    options.setRunner(TestApexRunner.class);
-    Pipeline pipeline = Pipeline.create(options);
-
-    WindowingStrategy<?, ?> ws = WindowingStrategy.of(FixedWindows.of(
-        Duration.standardSeconds(10)));
-    PCollection<KV<String, Integer>> input = PCollection.createPrimitiveOutputInternal(pipeline,
-        ws, IsBounded.BOUNDED);
-    input.setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
-
-    ApexGroupByKeyOperator<String, Integer> operator = new ApexGroupByKeyOperator<>(options,
-        input, new ApexStateInternals.ApexStateInternalsFactory<String>()
-        );
-
-    final List<Object> results = Lists.newArrayList();
-    Sink<Object> sink =  new Sink<Object>() {
-      @Override
-      public void put(Object tuple) {
-        results.add(tuple);
-      }
-      @Override
-      public int getCount(boolean reset) {
-        return 0;
-      }
-    };
-    operator.output.setSink(sink);
-    operator.setup(null);
-    operator.beginWindow(1);
-
-    Instant windowStart = BoundedWindow.TIMESTAMP_MIN_VALUE;
-    BoundedWindow window = new IntervalWindow(windowStart, windowStart.plus(10000));
-    PaneInfo paneInfo = PaneInfo.NO_FIRING;
-
-    WindowedValue<KV<String, Integer>> wv1 =
-        WindowedValue.of(KV.of("foo", 1), windowStart, window, paneInfo);
-    operator.input.process(ApexStreamTuple.DataTuple.of(wv1));
-
-    WindowedValue<KV<String, Integer>> wv2 =
-        WindowedValue.of(KV.of("foo", 1), windowStart, window, paneInfo);
-    operator.input.process(ApexStreamTuple.DataTuple.of(wv2));
-
-    ApexStreamTuple<WindowedValue<KV<String, Integer>>> watermark =
-        ApexStreamTuple.WatermarkTuple.of(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
-
-    Assert.assertEquals("number outputs", 0, results.size());
-    operator.input.process(watermark);
-    Assert.assertEquals("number outputs", 2, results.size());
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    ApexStreamTuple.DataTuple<WindowedValue<KV<String, Iterable<Integer>>>> dataTuple =
-        (ApexStreamTuple.DataTuple) results.get(0);
-    List<Integer> counts = Lists.newArrayList(1, 1);
-    Assert.assertEquals("iterable", KV.of("foo", counts), dataTuple.getValue().getValue());
-    Assert.assertEquals("expected watermark", watermark, results.get(1));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslatorTest.java
deleted file mode 100644
index 7defc77..0000000
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslatorTest.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.apex.translators;
-
-import com.google.common.collect.Sets;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.runners.apex.ApexRunner;
-import org.apache.beam.runners.apex.ApexRunnerResult;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Integration test for {@link FlattenPCollectionTranslator}.
- */
-public class FlattenPCollectionTranslatorTest {
-  private static final Logger LOG = LoggerFactory.getLogger(FlattenPCollectionTranslatorTest.class);
-
-  @Test
-  public void test() throws Exception {
-    ApexPipelineOptions options = PipelineOptionsFactory.as(ApexPipelineOptions.class);
-    options.setApplicationName("FlattenPCollection");
-    options.setRunner(ApexRunner.class);
-    Pipeline p = Pipeline.create(options);
-
-    String[][] collections = {
-        {"1"}, {"2"}, {"3"}, {"4"}, {"5"}
-    };
-
-    Set<String> expected = Sets.newHashSet();
-    List<PCollection<String>> pcList = new ArrayList<PCollection<String>>();
-    for (String[] collection : collections) {
-      pcList.add(p.apply(Create.of(collection).withCoder(StringUtf8Coder.of())));
-      expected.addAll(Arrays.asList(collection));
-    }
-
-    PCollection<String> actual = PCollectionList.of(pcList).apply(Flatten.<String>pCollections());
-    actual.apply(ParDo.of(new EmbeddedCollector()));
-
-    ApexRunnerResult result = (ApexRunnerResult) p.run();
-    // TODO: verify translation
-    result.getApexDAG();
-    long timeout = System.currentTimeMillis() + 30000;
-    while (System.currentTimeMillis() < timeout
-        && EmbeddedCollector.RESULTS.size() < expected.size()) {
-      LOG.info("Waiting for expected results.");
-      Thread.sleep(500);
-    }
-
-    Assert.assertEquals("number results", expected.size(), EmbeddedCollector.RESULTS.size());
-    Assert.assertEquals(expected, Sets.newHashSet(EmbeddedCollector.RESULTS));
-  }
-
-  @SuppressWarnings("serial")
-  private static class EmbeddedCollector extends OldDoFn<Object, Void> {
-    protected static final ArrayList<Object> RESULTS = new ArrayList<>();
-
-    public EmbeddedCollector() {
-    }
-
-    @Override
-    public void processElement(ProcessContext c) throws Exception {
-      RESULTS.add(c.element());
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5553c603/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java
deleted file mode 100644
index e67e29e..0000000
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.apex.translators;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import javax.annotation.Nullable;
-
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.runners.apex.ApexRunner;
-import org.apache.beam.runners.apex.ApexRunnerResult;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Integration test for {@link GroupByKeyTranslator}.
- */
-public class GroupByKeyTranslatorTest {
-
-  @SuppressWarnings({"unchecked"})
-  @Test
-  public void test() throws Exception {
-    ApexPipelineOptions options =
-        PipelineOptionsFactory.as(ApexPipelineOptions.class);
-    options.setApplicationName("GroupByKey");
-    options.setRunner(ApexRunner.class);
-    Pipeline p = Pipeline.create(options);
-
-    List<KV<String, Instant>> data =
-        Lists.newArrayList(
-            KV.of("foo", new Instant(1000)), KV.of("foo", new Instant(1000)),
-            KV.of("foo", new Instant(2000)),
-            KV.of("bar", new Instant(1000)), KV.of("bar", new Instant(2000)),
-            KV.of("bar", new Instant(2000))
-        );
-
-    // expected results assume outputAtLatestInputTimestamp
-    List<KV<Instant, KV<String, Long>>> expected =
-        Lists.newArrayList(
-            KV.of(new Instant(1000), KV.of("foo", 2L)),
-            KV.of(new Instant(1000), KV.of("bar", 1L)),
-            KV.of(new Instant(2000), KV.of("foo", 1L)),
-            KV.of(new Instant(2000), KV.of("bar", 2L))
-        );
-
-    p.apply(Read.from(new TestSource(data, new Instant(5000))))
-        .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))
-            .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp()))
-        .apply(Count.<String>perElement())
-        .apply(ParDo.of(new KeyedByTimestamp<KV<String, Long>>()))
-        .apply(ParDo.of(new EmbeddedCollector()))
-        ;
-
-    ApexRunnerResult result = (ApexRunnerResult) p.run();
-    result.getApexDAG();
-
-    long timeout = System.currentTimeMillis() + 30000;
-    while (System.currentTimeMillis() < timeout) {
-      if (EmbeddedCollector.RESULTS.containsAll(expected)) {
-        break;
-      }
-      Thread.sleep(1000);
-    }
-    Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS);
-
-  }
-
-  @SuppressWarnings("serial")
-  private static class EmbeddedCollector extends OldDoFn<Object, Void> {
-    protected static final HashSet<Object> RESULTS = new HashSet<>();
-
-    public EmbeddedCollector() {
-    }
-
-    @Override
-    public void processElement(ProcessContext c) throws Exception {
-      RESULTS.add(c.element());
-    }
-  }
-
-  private static class KeyedByTimestamp<T> extends OldDoFn<T, KV<Instant, T>> {
-
-    @Override
-    public void processElement(ProcessContext c) throws Exception {
-      c.output(KV.of(c.timestamp(), c.element()));
-    }
-  }
-
-  private static class TestSource extends UnboundedSource<String, UnboundedSource.CheckpointMark> {
-
-    private final List<KV<String, Instant>> data;
-    private final Instant watermark;
-
-    public TestSource(List<KV<String, Instant>> data, Instant watermark) {
-      this.data = data;
-      this.watermark = watermark;
-    }
-
-    @Override
-    public List<? extends UnboundedSource<String, CheckpointMark>> generateInitialSplits(
-        int desiredNumSplits, PipelineOptions options) throws Exception {
-      return Collections.<UnboundedSource<String, CheckpointMark>>singletonList(this);
-    }
-
-    @Override
-    public UnboundedReader<String> createReader(PipelineOptions options,
-        @Nullable CheckpointMark checkpointMark) {
-      return new TestReader(data, watermark, this);
-    }
-
-    @Nullable
-    @Override
-    public Coder<CheckpointMark> getCheckpointMarkCoder() {
-      return null;
-    }
-
-    @Override
-    public void validate() {
-    }
-
-    @Override
-    public Coder<String> getDefaultOutputCoder() {
-      return StringUtf8Coder.of();
-    }
-
-    private static class TestReader extends UnboundedReader<String> implements Serializable {
-
-      private static final long serialVersionUID = 7526472295622776147L;
-
-      private final List<KV<String, Instant>> data;
-      private final TestSource source;
-
-      private Iterator<KV<String, Instant>> iterator;
-      private String currentRecord;
-      private Instant currentTimestamp;
-      private Instant watermark;
-      private boolean collected;
-
-      public TestReader(List<KV<String, Instant>> data, Instant watermark, TestSource source) {
-        this.data = data;
-        this.source = source;
-        this.watermark = watermark;
-      }
-
-      @Override
-      public boolean start() throws IOException {
-        iterator = data.iterator();
-        return advance();
-      }
-
-      @Override
-      public boolean advance() throws IOException {
-        if (iterator.hasNext()) {
-          KV<String, Instant> kv = iterator.next();
-          collected = false;
-          currentRecord = kv.getKey();
-          currentTimestamp = kv.getValue();
-          return true;
-        } else {
-          return false;
-        }
-      }
-
-      @Override
-      public byte[] getCurrentRecordId() throws NoSuchElementException {
-        return new byte[0];
-      }
-
-      @Override
-      public String getCurrent() throws NoSuchElementException {
-        collected = true;
-        return this.currentRecord;
-      }
-
-      @Override
-      public Instant getCurrentTimestamp() throws NoSuchElementException {
-        return currentTimestamp;
-      }
-
-      @Override
-      public void close() throws IOException {
-      }
-
-      @Override
-      public Instant getWatermark() {
-        if (!iterator.hasNext() && collected) {
-          return watermark;
-        } else {
-          return new Instant(0);
-        }
-      }
-
-      @Override
-      public CheckpointMark getCheckpointMark() {
-        return null;
-      }
-
-      @Override
-      public UnboundedSource<String, ?> getCurrentSource() {
-        return this.source;
-      }
-    }
-  }
-
-}