You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/12 02:28:24 UTC

[06/39] incubator-beam git commit: BEAM-261 Add support for ParDo.BoundMulti

BEAM-261 Add support for ParDo.BoundMulti


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/047cff49
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/047cff49
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/047cff49

Branch: refs/heads/master
Commit: 047cff492f1f804785dee73b4768293d3569e8de
Parents: 0975494
Author: Thomas Weise <th...@apache.org>
Authored: Thu Oct 6 22:36:01 2016 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Sun Oct 16 23:27:15 2016 -0700

----------------------------------------------------------------------
 runners/apex/pom.xml                            |  3 +-
 .../runners/apex/ApexPipelineTranslator.java    |  2 +
 .../apache/beam/runners/apex/ApexRunner.java    |  3 +-
 .../FlattenPCollectionTranslator.java           |  1 +
 .../translators/ParDoBoundMultiTranslator.java  | 74 ++++++++++++++++++++
 .../apex/translators/ParDoBoundTranslator.java  |  5 +-
 .../apex/translators/TranslationContext.java    | 17 +++++
 .../functions/ApexFlattenOperator.java          |  2 +
 .../functions/ApexParDoOperator.java            | 68 ++++++++++++++----
 .../FlattenPCollectionTranslatorTest.java       | 42 +++++------
 .../translators/ParDoBoundTranslatorTest.java   | 29 ++++----
 11 files changed, 194 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/047cff49/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index e9377b4..929feb4 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -185,8 +185,7 @@
               <systemPropertyVariables>
                 <beamTestPipelineOptions>
                   [
-                    "--runner=org.apache.beam.runners.apex.TestApexRunner",
-                    "--streaming=true"
+                    "--runner=org.apache.beam.runners.apex.TestApexRunner"
                   ]
                 </beamTestPipelineOptions>
               </systemPropertyVariables>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/047cff49/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
index ad8c283..40edfb1 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
@@ -22,6 +22,7 @@ import org.apache.beam.runners.apex.ApexRunner.CreateApexPCollectionView;
 import org.apache.beam.runners.apex.translators.CreateValuesTranslator;
 import org.apache.beam.runners.apex.translators.FlattenPCollectionTranslator;
 import org.apache.beam.runners.apex.translators.GroupByKeyTranslator;
+import org.apache.beam.runners.apex.translators.ParDoBoundMultiTranslator;
 import org.apache.beam.runners.apex.translators.ParDoBoundTranslator;
 import org.apache.beam.runners.apex.translators.ReadUnboundedTranslator;
 import org.apache.beam.runners.apex.translators.TransformTranslator;
@@ -66,6 +67,7 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
   static {
     // register TransformTranslators
     registerTransformTranslator(ParDo.Bound.class, new ParDoBoundTranslator());
+    registerTransformTranslator(ParDo.BoundMulti.class, new ParDoBoundMultiTranslator<>());
     registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator());
     registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator());
     registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/047cff49/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index ae79a20..e2ebc29 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -230,7 +230,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
    * Records that the {@link PTransform} requires a deterministic key coder.
    */
   private void recordViewUsesNonDeterministicKeyCoder(PTransform<?, ?> ptransform) {
-    throw new UnsupportedOperationException();
+    //throw new UnsupportedOperationException();
   }
 
   /**
@@ -369,7 +369,6 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
 
     private final ApexRunner runner;
 
-    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
     public StreamingViewAsMap(ApexRunner runner, View.AsMap<K, V> transform) {
       this.runner = runner;
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/047cff49/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
index 712466a..90ab81f 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
@@ -80,6 +80,7 @@ public class FlattenPCollectionTranslator<T> implements
       if (firstCollection != null) {
         // push to next merge level
         remainingCollections.add(firstCollection);
+        firstCollection = null;
       }
       if (remainingCollections.size() > 1) {
         collections = remainingCollections;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/047cff49/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
new file mode 100644
index 0000000..6488bf6
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translators;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.beam.runners.apex.translators.functions.ApexParDoOperator;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.OutputPort;
+import com.google.common.collect.Maps;
+
+/**
+ * {@link ParDo.BoundMulti} is translated to Apex operator that wraps the {@link DoFn}
+ */
+public class ParDoBoundMultiTranslator<InputT, OutputT> implements TransformTranslator<ParDo.BoundMulti<InputT, OutputT>>  {
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  public void translate(ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) {
+    OldDoFn<InputT, OutputT> doFn = transform.getFn();
+    PCollectionTuple output = context.getOutput();
+    List<PCollectionView<?>> sideInputs = transform.getSideInputs();
+    ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(context.getPipelineOptions(),
+        doFn, transform.getMainOutputTag(), transform.getSideOutputTags().getAll(),
+        context.<PCollection<?>>getInput().getWindowingStrategy(), sideInputs);
+
+    Map<TupleTag<?>, PCollection<?>> outputs = output.getAll();
+    Map<PCollection<?>, OutputPort<?>> ports = Maps.newHashMapWithExpectedSize(outputs.size());
+    int i = 0;
+    for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) {
+      ports.put(outputEntry.getValue(), operator.sideOutputPorts[i++]);
+    }
+    context.addOperator(operator, ports);
+
+    context.addStream(context.getInput(), operator.input);
+    if (!sideInputs.isEmpty()) {
+      Operator.InputPort<?>[] sideInputPorts = {operator.sideInput1};
+      for (i=0; i<sideInputs.size(); i++) {
+        // the number of input ports for side inputs are fixed and each port can only take one input.
+        // more (optional) ports can be added to give reasonable capacity or an explicit union operation introduced.
+        if (i == sideInputPorts.length) {
+          String msg = String.format("Too many side inputs in %s (currently only supporting %s).",
+              transform.toString(), sideInputPorts.length);
+          throw new UnsupportedOperationException(msg);
+        }
+        context.addStream(context.getViewInput(sideInputs.get(i)), sideInputPorts[i]);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/047cff49/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
index 632829a..fa3df7c 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
@@ -25,6 +25,8 @@ import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
 
 import com.datatorrent.api.Operator;
 
@@ -41,7 +43,8 @@ public class ParDoBoundTranslator<InputT, OutputT> implements
     PCollection<OutputT> output = context.getOutput();
     List<PCollectionView<?>> sideInputs = transform.getSideInputs();
     ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(context.getPipelineOptions(),
-        doFn, output.getWindowingStrategy(), sideInputs);
+        doFn, new TupleTag<OutputT>(), TupleTagList.empty().getAll() /*sideOutputTags*/,
+        output.getWindowingStrategy(), sideInputs);
     context.addOperator(operator, operator.output);
     context.addStream(context.getInput(), operator.input);
     if (!sideInputs.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/047cff49/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
index 163cfd4..bd44a20 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
@@ -98,6 +98,23 @@ public class TranslationContext {
   }
 
   /**
+   * Register operator and output ports for the given collections.
+   * @param operator
+   * @param ports
+   */
+  public void addOperator(Operator operator, Map<PCollection<?>, OutputPort<?>> ports) {
+    boolean first = true;
+    for (Map.Entry<PCollection<?>, OutputPort<?>> portEntry : ports.entrySet()) {
+      if (first) {
+        addOperator(operator, portEntry.getValue(), portEntry.getKey());
+        first = false;
+      } else {
+        this.streams.put(portEntry.getKey(), (Pair)new ImmutablePair<>(portEntry.getValue(), new ArrayList<>()));
+      }
+    }
+  }
+
+  /**
    * Add intermediate operator for the current transformation.
    * @param operator
    * @param port

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/047cff49/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
index ce27abb..4675244 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
@@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
 
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 import com.datatorrent.common.util.BaseOperator;
 
 /**
@@ -109,5 +110,6 @@ public class ApexFlattenOperator<InputT> extends BaseOperator
   /**
    * Output port.
    */
+  @OutputPortFieldAnnotation(optional=true)
   public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<InputT>>> out = new DefaultOutputPort<ApexStreamTuple<WindowedValue<InputT>>>();
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/047cff49/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
index 13a8fc9..995fee1 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
@@ -15,11 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.beam.runners.apex.translators.functions;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.ApexRunner;
@@ -47,7 +47,6 @@ import org.apache.beam.sdk.util.state.InMemoryStateInternals;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,6 +58,7 @@ import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 import com.datatorrent.common.util.BaseOperator;
 import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
 import com.esotericsoftware.kryo.serializers.JavaSerializer;
 
 /**
@@ -68,43 +68,58 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
   private static final Logger LOG = LoggerFactory.getLogger(ApexParDoOperator.class);
   private boolean traceTuples = true;
 
-  private transient final TupleTag<OutputT> mainTag = new TupleTag<OutputT>();
-  private transient PushbackSideInputDoFnRunner<InputT, OutputT> pushbackDoFnRunner;
-
   @Bind(JavaSerializer.class)
   private final SerializablePipelineOptions pipelineOptions;
   @Bind(JavaSerializer.class)
   private final OldDoFn<InputT, OutputT> doFn;
   @Bind(JavaSerializer.class)
+  private final TupleTag<OutputT> mainOutputTag;
+  @Bind(JavaSerializer.class)
+  private final List<TupleTag<?>> sideOutputTags;
+  @Bind(JavaSerializer.class)
   private final WindowingStrategy<?, ?> windowingStrategy;
   @Bind(JavaSerializer.class)
-  List<PCollectionView<?>> sideInputs;
+  private final List<PCollectionView<?>> sideInputs;
+
 // TODO: not Kryo serializable, integrate codec
 //@Bind(JavaSerializer.class)
 private transient StateInternals<Void> sideInputStateInternals = InMemoryStateInternals.forKey(null);
-  private transient SideInputHandler sideInputHandler;
   // TODO: not Kryo serializable, integrate codec
   private List<WindowedValue<InputT>> pushedBack = new ArrayList<>();
   private LongMin pushedBackWatermark = new LongMin();
   private long currentInputWatermark = Long.MIN_VALUE;
   private long currentOutputWatermark = currentInputWatermark;
 
+  private transient PushbackSideInputDoFnRunner<InputT, OutputT> pushbackDoFnRunner;
+  private transient SideInputHandler sideInputHandler;
+  private transient Map<TupleTag<?>, DefaultOutputPort<ApexStreamTuple<?>>> sideOutputPortMapping = Maps.newHashMapWithExpectedSize(5);
+
   public ApexParDoOperator(
       ApexPipelineOptions pipelineOptions,
       OldDoFn<InputT, OutputT> doFn,
+      TupleTag<OutputT> mainOutputTag,
+      List<TupleTag<?>> sideOutputTags,
       WindowingStrategy<?, ?> windowingStrategy,
       List<PCollectionView<?>> sideInputs
       )
   {
     this.pipelineOptions = new SerializablePipelineOptions(pipelineOptions);
     this.doFn = doFn;
+    this.mainOutputTag = mainOutputTag;
+    this.sideOutputTags = sideOutputTags;
     this.windowingStrategy = windowingStrategy;
     this.sideInputs = sideInputs;
+
+    if (sideOutputTags != null && sideOutputTags.size() > sideOutputPorts.length) {
+      String msg = String.format("Too many side outputs (currently only supporting %s).",
+          sideOutputPorts.length);
+      throw new UnsupportedOperationException(msg);
+    }
   }
 
   @SuppressWarnings("unused") // for Kryo
   private ApexParDoOperator() {
-    this(null, null, null, null);
+    this(null, null, null, null, null, null);
   }
 
 
@@ -167,10 +182,28 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn
   @OutputPortFieldAnnotation(optional=true)
   public final transient DefaultOutputPort<ApexStreamTuple<?>> output = new DefaultOutputPort<>();
 
+  @OutputPortFieldAnnotation(optional=true)
+  public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput1 = new DefaultOutputPort<>();
+  @OutputPortFieldAnnotation(optional=true)
+  public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput2 = new DefaultOutputPort<>();
+  @OutputPortFieldAnnotation(optional=true)
+  public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput3 = new DefaultOutputPort<>();
+  @OutputPortFieldAnnotation(optional=true)
+  public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput4 = new DefaultOutputPort<>();
+  @OutputPortFieldAnnotation(optional=true)
+  public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput5 = new DefaultOutputPort<>();
+
+  public final transient DefaultOutputPort<?>[] sideOutputPorts = {sideOutput1, sideOutput2, sideOutput3, sideOutput4, sideOutput5};
+
   @Override
   public <T> void output(TupleTag<T> tag, WindowedValue<T> tuple)
   {
-    output.emit(ApexStreamTuple.DataTuple.of(tuple));
+    DefaultOutputPort<ApexStreamTuple<?>> sideOutputPort = sideOutputPortMapping.get(tag);
+    if (sideOutputPort != null) {
+      sideOutputPort.emit(ApexStreamTuple.DataTuple.of(tuple));
+    } else {
+      output.emit(ApexStreamTuple.DataTuple.of(tuple));
+    }
     if (traceTuples) {
       LOG.debug("\nemitting {}\n", tuple);
     }
@@ -178,7 +211,10 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn
 
   private Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem) {
     try {
-      return pushbackDoFnRunner.processElementInReadyWindows(elem);
+      pushbackDoFnRunner.startBundle();
+      Iterable<WindowedValue<InputT>> pushedBack = pushbackDoFnRunner.processElementInReadyWindows(elem);
+      pushbackDoFnRunner.finishBundle();
+      return pushedBack;
     } catch (UserCodeException ue) {
       if (ue.getCause() instanceof AssertionError) {
         ApexRunner.assertionError = (AssertionError)ue.getCause();
@@ -220,13 +256,19 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn
       sideInputReader = sideInputHandler;
     }
 
+    for (int i=0; i < sideOutputTags.size(); i++) {
+      @SuppressWarnings("unchecked")
+      DefaultOutputPort<ApexStreamTuple<?>> port = (DefaultOutputPort<ApexStreamTuple<?>>)sideOutputPorts[i];
+      sideOutputPortMapping.put(sideOutputTags.get(i), port);
+    }
+
     DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.createDefault(
         pipelineOptions.get(),
         doFn,
         sideInputReader,
         this,
-        mainTag,
-        TupleTagList.empty().getAll() /*sideOutputTags*/,
+        mainOutputTag,
+        sideOutputTags,
         new NoOpStepContext(),
         new NoOpAggregatorFactory(),
         windowingStrategy
@@ -246,7 +288,6 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn
   @Override
   public void beginWindow(long windowId)
   {
-    pushbackDoFnRunner.startBundle();
     /*
     Collection<Aggregator<?, ?>> aggregators = AggregatorRetriever.getAggregators(doFn);
     if (!aggregators.isEmpty()) {
@@ -258,7 +299,6 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn
   @Override
   public void endWindow()
   {
-    pushbackDoFnRunner.finishBundle();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/047cff49/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslatorTest.java
index d3b56bc..6b181ba 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslatorTest.java
@@ -31,15 +31,17 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
+import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashSet;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
+import java.util.Set;
 
 /**
  * integration test for {@link FlattenPCollectionTranslator}.
@@ -49,41 +51,41 @@ public class FlattenPCollectionTranslatorTest {
 
   @Test
   public void test() throws Exception {
-    ApexPipelineOptions options =
-        PipelineOptionsFactory.as(ApexPipelineOptions.class);
+    ApexPipelineOptions options = PipelineOptionsFactory.as(ApexPipelineOptions.class);
     options.setApplicationName("FlattenPCollection");
     options.setRunner(ApexRunner.class);
     Pipeline p = Pipeline.create(options);
 
-    List<String> collection1 = Lists.newArrayList("1", "2", "3");
-    List<String> collection2 = Lists.newArrayList("4", "5");
-    List<String> expected = Lists.newArrayList("1", "2", "3", "4", "5");
-    PCollection<String> pc1 =
-        p.apply(Create.of(collection1).withCoder(StringUtf8Coder.of()));
-    PCollection<String> pc2 =
-        p.apply(Create.of(collection2).withCoder(StringUtf8Coder.of()));
-    PCollectionList<String> pcs = PCollectionList.of(pc1).and(pc2);
-    PCollection<String> actual = pcs.apply(Flatten.<String>pCollections());
+    String[][] collections = {
+        {"1"}, {"2"}, {"3"}, {"4"}, {"5"}
+    };
+
+    Set<String> expected = Sets.newHashSet();
+    List<PCollection<String>> pcList = new ArrayList<PCollection<String>>();
+    for (String[] collection : collections) {
+      pcList.add(p.apply(Create.of(collection).withCoder(StringUtf8Coder.of())));
+      expected.addAll(Arrays.asList(collection));
+    }
+
+    PCollection<String> actual = PCollectionList.of(pcList).apply(Flatten.<String>pCollections());
     actual.apply(ParDo.of(new EmbeddedCollector()));
 
     ApexRunnerResult result = (ApexRunnerResult)p.run();
     // TODO: verify translation
     result.getApexDAG();
     long timeout = System.currentTimeMillis() + 30000;
-    while (System.currentTimeMillis() < timeout) {
-      if (EmbeddedCollector.results.containsAll(expected)) {
-        break;
-      }
+    while (System.currentTimeMillis() < timeout && EmbeddedCollector.results.size() < expected.size()) {
       LOG.info("Waiting for expected results.");
-      Thread.sleep(1000);
+      Thread.sleep(500);
     }
-    org.junit.Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.results);
 
+    Assert.assertEquals("number results", expected.size(), EmbeddedCollector.results.size());
+    Assert.assertEquals(expected, Sets.newHashSet(EmbeddedCollector.results));
   }
 
   @SuppressWarnings("serial")
   private static class EmbeddedCollector extends OldDoFn<Object, Void> {
-    protected static final HashSet<Object> results = new HashSet<>();
+    protected static final ArrayList<Object> results = new ArrayList<>();
 
     public EmbeddedCollector() {
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/047cff49/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
index 6239021..301f6f8 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
@@ -36,6 +36,8 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
 
 import com.datatorrent.api.DAG;
 import com.datatorrent.lib.util.KryoCloneUtils;
@@ -129,6 +131,18 @@ public class ParDoBoundTranslatorTest {
     }
   }
 
+  private static Throwable runExpectingAssertionFailure(Pipeline pipeline) {
+    // We cannot use thrown.expect(AssertionError.class) because the AssertionError
+    // is first caught by JUnit and causes a test failure.
+    try {
+      pipeline.run();
+    } catch (AssertionError exc) {
+      return exc;
+    }
+    fail("assertion should have failed");
+    throw new RuntimeException("unreachable");
+  }
+
   @Test
   public void testAssertionFailure() throws Exception {
     ApexPipelineOptions options = PipelineOptionsFactory.create()
@@ -163,24 +177,13 @@ public class ParDoBoundTranslatorTest {
     pipeline.run();
   }
 
-  private static Throwable runExpectingAssertionFailure(Pipeline pipeline) {
-    // We cannot use thrown.expect(AssertionError.class) because the AssertionError
-    // is first caught by JUnit and causes a test failure.
-    try {
-      pipeline.run();
-    } catch (AssertionError exc) {
-      return exc;
-    }
-    fail("assertion should have failed");
-    throw new RuntimeException("unreachable");
-  }
-
   @Test
   public void testSerialization() throws Exception {
     ApexPipelineOptions options = PipelineOptionsFactory.create()
         .as(ApexPipelineOptions.class);
     ApexParDoOperator<Integer, Integer> operator = new ApexParDoOperator<>(options,
-        new Add(0), WindowingStrategy.globalDefault(), Collections.<PCollectionView<?>> emptyList());
+        new Add(0), new TupleTag<Integer>(), TupleTagList.empty().getAll(),
+        WindowingStrategy.globalDefault(), Collections.<PCollectionView<?>> emptyList());
     operator.setup(null);
     operator.beginWindow(0);
     WindowedValue<Integer> wv = WindowedValue.valueInGlobalWindow(0);