You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nemo.apache.org by GitBox <gi...@apache.org> on 2018/10/11 01:37:08 UTC

[GitHub] johnyangk closed pull request #122: [NEMO-213] Use Beam's DoFnRunners to execute DoFn

johnyangk closed pull request #122: [NEMO-213] Use Beam's DoFnRunners to execute DoFn
URL: https://github.com/apache/incubator-nemo/pull/122
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamKeyExtractor.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamKeyExtractor.java
index c3f4bbca9..b64cb6041 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamKeyExtractor.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamKeyExtractor.java
@@ -15,6 +15,7 @@
  */
 package org.apache.nemo.compiler.frontend.beam;
 
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.nemo.common.KeyExtractor;
 import org.apache.beam.sdk.values.KV;
 
@@ -25,9 +26,11 @@
 final class BeamKeyExtractor implements KeyExtractor {
   @Override
   public Object extractKey(final Object element) {
-    if (element instanceof KV) {
+    final WindowedValue windowedValue = (WindowedValue) element;
+    final Object value = windowedValue.getValue();
+    if (value instanceof KV) {
       // Handle null keys, since Beam allows KV with null keys.
-      final Object key = ((KV) element).getKey();
+      final Object key = ((KV) value).getKey();
       return key == null ? 0 : key;
     } else {
       return element;
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineRunner.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineRunner.java
index e3043bdf2..deb4dfea3 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineRunner.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineRunner.java
@@ -56,8 +56,9 @@ private NemoPipelineRunner(final NemoPipelineOptions nemoPipelineOptions) {
   public NemoPipelineResult run(final Pipeline pipeline) {
     final PipelineVisitor pipelineVisitor = new PipelineVisitor();
     pipeline.traverseTopologically(pipelineVisitor);
-    final DAG<IRVertex, IREdge> dag = PipelineTranslator.translate(pipelineVisitor.getConvertedPipeline(),
-        nemoPipelineOptions);
+    final DAG<IRVertex, IREdge> dag = PipelineTranslator.translate(pipeline,
+      pipelineVisitor.getConvertedPipeline(),
+      nemoPipelineOptions);
 
     final NemoPipelineResult nemoPipelineResult = new NemoPipelineResult();
     JobLauncher.launchDAG(dag);
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index 6ef96bf5b..2486a00f6 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -15,6 +15,12 @@
  */
 package org.apache.nemo.compiler.frontend.beam;
 
+import com.google.common.collect.Iterables;
+import org.apache.beam.runners.core.construction.ParDoTranslation;
+import org.apache.beam.runners.core.construction.TransformInputs;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.nemo.common.dag.DAG;
 import org.apache.nemo.common.dag.DAGBuilder;
 import org.apache.nemo.common.ir.edge.IREdge;
@@ -38,6 +44,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.lang.annotation.*;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -45,17 +52,19 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Stack;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /**
- * Converts DAG of Beam pipeline to Nemo IR DAG.
+ * Converts DAG of Beam root to Nemo IR DAG.
  * For a {@link PrimitiveTransformVertex}, it defines mapping to the corresponding {@link IRVertex}.
  * For a {@link CompositeTransformVertex}, it defines how to setup and clear {@link TranslationContext}
  * before start translating inner Beam transform hierarchy.
  */
 public final class PipelineTranslator
-    implements BiFunction<CompositeTransformVertex, PipelineOptions, DAG<IRVertex, IREdge>> {
+  implements BiFunction<CompositeTransformVertex, PipelineOptions, DAG<IRVertex, IREdge>> {
 
   private static final Logger LOG = LoggerFactory.getLogger(PipelineTranslator.class.getName());
 
@@ -64,15 +73,21 @@
   private final Map<Class<? extends PTransform>, Method> primitiveTransformToTranslator = new HashMap<>();
   private final Map<Class<? extends PTransform>, Method> compositeTransformToTranslator = new HashMap<>();
 
+  // TODO #220: Move this variable to TranslationContext
+  private static final AtomicReference<Pipeline> PIPELINE = new AtomicReference<>();
+
   /**
    * Static translator method.
-   * @param pipeline Top-level Beam transform hierarchy, usually given by {@link PipelineVisitor}
+   * @param pipeline the original root
+   * @param root Top-level Beam transform hierarchy, usually given by {@link PipelineVisitor}
    * @param pipelineOptions {@link PipelineOptions}
    * @return Nemo IR DAG
    */
-  public static DAG<IRVertex, IREdge> translate(final CompositeTransformVertex pipeline,
+  public static DAG<IRVertex, IREdge> translate(final Pipeline pipeline,
+                                                final CompositeTransformVertex root,
                                                 final PipelineOptions pipelineOptions) {
-    return INSTANCE.apply(pipeline, pipelineOptions);
+    PIPELINE.set(pipeline);
+    return INSTANCE.apply(root, pipelineOptions);
   }
 
   /**
@@ -113,38 +128,74 @@ private static void boundedReadTranslator(final TranslationContext ctx,
     transformVertex.getNode().getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(vertex, output));
   }
 
+  private static DoFnTransform createDoFnTransform(final TranslationContext ctx,
+                                                   final PrimitiveTransformVertex transformVertex) {
+    try {
+      final AppliedPTransform pTransform = transformVertex.getNode().toAppliedPTransform(PIPELINE.get());
+      final DoFn doFn = ParDoTranslation.getDoFn(pTransform);
+      final TupleTag mainOutputTag = ParDoTranslation.getMainOutputTag(pTransform);
+      final List<PCollectionView<?>> sideInputs = ParDoTranslation.getSideInputs(pTransform);
+      final TupleTagList additionalOutputTags = ParDoTranslation.getAdditionalOutputTags(pTransform);
+
+      final PCollection<?> mainInput = (PCollection<?>)
+        Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform));
+
+      return new DoFnTransform(
+        doFn,
+        mainInput.getCoder(),
+        getOutputCoders(pTransform),
+        mainOutputTag,
+        additionalOutputTags.getAll(),
+        mainInput.getWindowingStrategy(),
+        sideInputs,
+        ctx.pipelineOptions);
+    } catch (final IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   @PrimitiveTransformTranslator(ParDo.SingleOutput.class)
   private static void parDoSingleOutputTranslator(final TranslationContext ctx,
                                                   final PrimitiveTransformVertex transformVertex,
                                                   final ParDo.SingleOutput<?, ?> transform) {
-    final DoTransform doTransform = new DoTransform(transform.getFn(), ctx.pipelineOptions);
-    final IRVertex vertex = new OperatorVertex(doTransform);
+    final DoFnTransform doFnTransform = createDoFnTransform(ctx, transformVertex);
+    final IRVertex vertex = new OperatorVertex(doFnTransform);
+
     ctx.addVertex(vertex);
     transformVertex.getNode().getInputs().values().stream()
-        .filter(input -> !transform.getAdditionalInputs().values().contains(input))
-        .forEach(input -> ctx.addEdgeTo(vertex, input));
+      .filter(input -> !transform.getAdditionalInputs().values().contains(input))
+      .forEach(input -> ctx.addEdgeTo(vertex, input));
     transform.getSideInputs().forEach(input -> ctx.addEdgeTo(vertex, input));
     transformVertex.getNode().getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(vertex, output));
   }
 
+  private static Map<TupleTag<?>, Coder<?>> getOutputCoders(final AppliedPTransform<?, ?, ?> ptransform) {
+    return ptransform
+      .getOutputs()
+      .entrySet()
+      .stream()
+      .filter(e -> e.getValue() instanceof PCollection)
+      .collect(Collectors.toMap(e -> e.getKey(), e -> ((PCollection) e.getValue()).getCoder()));
+  }
+
   @PrimitiveTransformTranslator(ParDo.MultiOutput.class)
   private static void parDoMultiOutputTranslator(final TranslationContext ctx,
                                                  final PrimitiveTransformVertex transformVertex,
                                                  final ParDo.MultiOutput<?, ?> transform) {
-    final DoTransform doTransform = new DoTransform(transform.getFn(), ctx.pipelineOptions);
-    final IRVertex vertex = new OperatorVertex(doTransform);
+    final DoFnTransform doFnTransform = createDoFnTransform(ctx, transformVertex);
+    final IRVertex vertex = new OperatorVertex(doFnTransform);
     ctx.addVertex(vertex);
     transformVertex.getNode().getInputs().values().stream()
-        .filter(input -> !transform.getAdditionalInputs().values().contains(input))
-        .forEach(input -> ctx.addEdgeTo(vertex, input));
+      .filter(input -> !transform.getAdditionalInputs().values().contains(input))
+      .forEach(input -> ctx.addEdgeTo(vertex, input));
     transform.getSideInputs().forEach(input -> ctx.addEdgeTo(vertex, input));
     transformVertex.getNode().getOutputs().entrySet().stream()
-        .filter(pValueWithTupleTag -> pValueWithTupleTag.getKey().equals(transform.getMainOutputTag()))
-        .forEach(pValueWithTupleTag -> ctx.registerMainOutputFrom(vertex, pValueWithTupleTag.getValue()));
+      .filter(pValueWithTupleTag -> pValueWithTupleTag.getKey().equals(transform.getMainOutputTag()))
+      .forEach(pValueWithTupleTag -> ctx.registerMainOutputFrom(vertex, pValueWithTupleTag.getValue()));
     transformVertex.getNode().getOutputs().entrySet().stream()
-        .filter(pValueWithTupleTag -> !pValueWithTupleTag.getKey().equals(transform.getMainOutputTag()))
-        .forEach(pValueWithTupleTag -> ctx.registerAdditionalOutputFrom(vertex, pValueWithTupleTag.getValue(),
-            pValueWithTupleTag.getKey()));
+      .filter(pValueWithTupleTag -> !pValueWithTupleTag.getKey().equals(transform.getMainOutputTag()))
+      .forEach(pValueWithTupleTag -> ctx.registerAdditionalOutputFrom(vertex, pValueWithTupleTag.getValue(),
+        pValueWithTupleTag.getKey()));
   }
 
   @PrimitiveTransformTranslator(GroupByKey.class)
@@ -169,7 +220,7 @@ private static void windowTranslator(final TranslationContext ctx,
     } else {
       throw new UnsupportedOperationException(String.format("%s is not supported", transform));
     }
-    final IRVertex vertex = new OperatorVertex(new WindowTransform(windowFn));
+    final IRVertex vertex = new OperatorVertex(new WindowFnTransform(windowFn));
     ctx.addVertex(vertex);
     transformVertex.getNode().getInputs().values().forEach(input -> ctx.addEdgeTo(vertex, input));
     transformVertex.getNode().getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(vertex, output));
@@ -225,7 +276,7 @@ private static void combineTranslator(final TranslationContext ctx,
     final boolean handlesBeamRow = Stream
       .concat(transformVertex.getNode().getInputs().values().stream(),
         transformVertex.getNode().getOutputs().values().stream())
-      .map(pValue -> (KvCoder) getCoder(pValue, ctx.pipeline)) // Input and output of combine should be KV
+      .map(pValue -> (KvCoder) getCoder(pValue, ctx.root)) // Input and output of combine should be KV
       .map(kvCoder -> kvCoder.getValueCoder().getEncodedTypeDescriptor()) // We're interested in the 'Value' of KV
       .anyMatch(valueTypeDescriptor -> TypeDescriptor.of(Row.class).equals(valueTypeDescriptor));
     if (handlesBeamRow) {
@@ -280,7 +331,8 @@ private static void loopTranslator(final TranslationContext ctx,
   }
 
   @Override
-  public DAG<IRVertex, IREdge> apply(final CompositeTransformVertex pipeline, final PipelineOptions pipelineOptions) {
+  public DAG<IRVertex, IREdge> apply(final CompositeTransformVertex pipeline,
+                                     final PipelineOptions pipelineOptions) {
     final TranslationContext ctx = new TranslationContext(pipeline, primitiveTransformToTranslator,
         compositeTransformToTranslator, DefaultCommunicationPatternSelector.INSTANCE, pipelineOptions);
     ctx.translate(pipeline);
@@ -351,7 +403,7 @@ private static void loopTranslator(final TranslationContext ctx,
    * Translation context.
    */
   private static final class TranslationContext {
-    private final CompositeTransformVertex pipeline;
+    private final CompositeTransformVertex root;
     private final PipelineOptions pipelineOptions;
     private final DAGBuilder<IRVertex, IREdge> builder;
     private final Map<PValue, IRVertex> pValueToProducer;
@@ -363,18 +415,18 @@ private static void loopTranslator(final TranslationContext ctx,
     private final Map<Class<? extends PTransform>, Method> compositeTransformToTranslator;
 
     /**
-     * @param pipeline the pipeline to translate
+     * @param root the root to translate
      * @param primitiveTransformToTranslator provides translators for PrimitiveTransform
      * @param compositeTransformToTranslator provides translators for CompositeTransform
      * @param selector provides {@link CommunicationPatternProperty.Value} for IR edges
      * @param pipelineOptions {@link PipelineOptions}
      */
-    private TranslationContext(final CompositeTransformVertex pipeline,
+    private TranslationContext(final CompositeTransformVertex root,
                                final Map<Class<? extends PTransform>, Method> primitiveTransformToTranslator,
                                final Map<Class<? extends PTransform>, Method> compositeTransformToTranslator,
                                final BiFunction<IRVertex, IRVertex, CommunicationPatternProperty.Value> selector,
                                final PipelineOptions pipelineOptions) {
-      this.pipeline = pipeline;
+      this.root = root;
       this.builder = new DAGBuilder<>();
       this.pValueToProducer = new HashMap<>();
       this.pValueToTag = new HashMap<>();
@@ -393,7 +445,7 @@ private TranslationContext(final CompositeTransformVertex pipeline,
      */
     private TranslationContext(final TranslationContext ctx,
                                final BiFunction<IRVertex, IRVertex, CommunicationPatternProperty.Value> selector) {
-      this.pipeline = ctx.pipeline;
+      this.root = ctx.root;
       this.pipelineOptions = ctx.pipelineOptions;
       this.builder = ctx.builder;
       this.pValueToProducer = ctx.pValueToProducer;
@@ -465,7 +517,7 @@ private void addEdgeTo(final IRVertex dst, final PValue input) {
       if (src == null) {
         try {
           throw new RuntimeException(String.format("Cannot find a vertex that emits pValue %s, "
-              + "while PTransform %s is known to produce it.", input, pipeline.getPrimitiveProducerOf(input)));
+              + "while PTransform %s is known to produce it.", input, root.getPrimitiveProducerOf(input)));
         } catch (final RuntimeException e) {
           throw new RuntimeException(String.format("Cannot find a vertex that emits pValue %s, "
               + "and the corresponding PTransform was not found", input));
@@ -477,17 +529,18 @@ private void addEdgeTo(final IRVertex dst, final PValue input) {
             + "for an edge from %s to %s", communicationPatternSelector, src, dst));
       }
       final IREdge edge = new IREdge(communicationPattern, src, dst);
-      final Coder<?> coder;
+      final Coder coder;
+      final Coder windowCoder;
       if (input instanceof PCollection) {
         coder = ((PCollection) input).getCoder();
+        windowCoder = ((PCollection) input).getWindowingStrategy().getWindowFn().windowCoder();
       } else if (input instanceof PCollectionView) {
-        coder = getCoderForView((PCollectionView) input, pipeline);
+        coder = getCoderForView((PCollectionView) input, root);
+        windowCoder = ((PCollectionView) input).getPCollection()
+          .getWindowingStrategy().getWindowFn().windowCoder();
       } else {
-        coder = null;
-      }
-      if (coder == null) {
         throw new RuntimeException(String.format("While adding an edge from %s, to %s, coder for PValue %s cannot "
-            + "be determined", src, dst, input));
+          + "be determined", src, dst, input));
       }
 
       edge.setProperty(KeyExtractorProperty.of(new BeamKeyExtractor()));
@@ -497,8 +550,11 @@ private void addEdgeTo(final IRVertex dst, final PValue input) {
         edge.setProperty(KeyEncoderProperty.of(new BeamEncoderFactory(keyCoder)));
         edge.setProperty(KeyDecoderProperty.of(new BeamDecoderFactory(keyCoder)));
       }
-      edge.setProperty(EncoderProperty.of(new BeamEncoderFactory<>(coder)));
-      edge.setProperty(DecoderProperty.of(new BeamDecoderFactory<>(coder)));
+
+      edge.setProperty(EncoderProperty.of(
+        new BeamEncoderFactory<>(WindowedValue.getFullCoder(coder, windowCoder))));
+      edge.setProperty(DecoderProperty.of(
+        new BeamDecoderFactory<>(WindowedValue.getFullCoder(coder, windowCoder))));
 
       if (pValueToTag.containsKey(input)) {
         edge.setProperty(AdditionalOutputTagProperty.of(pValueToTag.get(input).getId()));
@@ -553,7 +609,7 @@ private void registerAdditionalOutputFrom(final IRVertex irVertex, final PValue
 
       final Transform srcTransform = src instanceof OperatorVertex ? ((OperatorVertex) src).getTransform() : null;
       final Transform dstTransform = dst instanceof OperatorVertex ? ((OperatorVertex) dst).getTransform() : null;
-      final DoFn srcDoFn = srcTransform instanceof DoTransform ? ((DoTransform) srcTransform).getDoFn() : null;
+      final DoFn srcDoFn = srcTransform instanceof DoFnTransform ? ((DoFnTransform) srcTransform).getDoFn() : null;
 
       if (srcDoFn != null && srcDoFn.getClass().equals(constructUnionTableFn)) {
         return CommunicationPatternProperty.Value.Shuffle;
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
index f17ab9825..09c9076e6 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
@@ -16,6 +16,7 @@
 package org.apache.nemo.compiler.frontend.beam.source;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.nemo.common.ir.Readable;
 
 import java.io.IOException;
@@ -35,7 +36,7 @@
  * SourceVertex implementation for BoundedSource.
  * @param <O> output type.
  */
-public final class BeamBoundedSourceVertex<O> extends SourceVertex<O> {
+public final class BeamBoundedSourceVertex<O> extends SourceVertex<WindowedValue<O>> {
   private static final Logger LOG = LoggerFactory.getLogger(BeamBoundedSourceVertex.class.getName());
   private BoundedSource<O> source;
   private final String sourceDescription;
@@ -68,8 +69,8 @@ public BeamBoundedSourceVertex getClone() {
   }
 
   @Override
-  public List<Readable<O>> getReadables(final int desiredNumOfSplits) throws Exception {
-    final List<Readable<O>> readables = new ArrayList<>();
+  public List<Readable<WindowedValue<O>>> getReadables(final int desiredNumOfSplits) throws Exception {
+    final List<Readable<WindowedValue<O>>> readables = new ArrayList<>();
     LOG.info("estimate: {}", source.getEstimatedSizeBytes(null));
     LOG.info("desired: {}", desiredNumOfSplits);
     source.split(source.getEstimatedSizeBytes(null) / desiredNumOfSplits, null)
@@ -93,7 +94,7 @@ public ObjectNode getPropertiesAsJsonNode() {
    * BoundedSourceReadable class.
    * @param <T> type.
    */
-  private static final class BoundedSourceReadable<T> implements Readable<T> {
+  private static final class BoundedSourceReadable<T> implements Readable<WindowedValue<T>> {
     private final BoundedSource<T> boundedSource;
 
     /**
@@ -105,11 +106,29 @@ public ObjectNode getPropertiesAsJsonNode() {
     }
 
     @Override
-    public Iterable<T> read() throws IOException {
-      final ArrayList<T> elements = new ArrayList<>();
+    public Iterable<WindowedValue<T>> read() throws IOException {
+      boolean started = false;
+      boolean windowed = false;
+
+      final ArrayList<WindowedValue<T>> elements = new ArrayList<>();
       try (BoundedSource.BoundedReader<T> reader = boundedSource.createReader(null)) {
         for (boolean available = reader.start(); available; available = reader.advance()) {
-          elements.add(reader.getCurrent());
+          final T elem = reader.getCurrent();
+
+          // Check whether the element is windowed or not
+          // We only have to check the first element.
+          if (!started) {
+            started = true;
+            if (elem instanceof WindowedValue) {
+              windowed = true;
+            }
+          }
+
+          if (!windowed) {
+            elements.add(WindowedValue.valueInGlobalWindow(reader.getCurrent()));
+          } else {
+            elements.add((WindowedValue<T>) elem);
+          }
         }
       }
       return elements;
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/BroadcastVariableSideInputReader.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/BroadcastVariableSideInputReader.java
new file mode 100644
index 000000000..4725fa9c1
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/BroadcastVariableSideInputReader.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+
+/**
+ * A sideinput reader that reads/writes side input values to context.
+ */
+public final class BroadcastVariableSideInputReader implements SideInputReader {
+
+  // Nemo context for storing/getting side inputs
+  private final Transform.Context context;
+
+  // The list of side inputs that we're handling
+  private final Collection<PCollectionView<?>> sideInputs;
+
+  BroadcastVariableSideInputReader(final Transform.Context context,
+                                   final Collection<PCollectionView<?>> sideInputs) {
+    this.context = context;
+    this.sideInputs = sideInputs;
+  }
+
+  @Nullable
+  @Override
+  public <T> T get(final PCollectionView<T> view, final BoundedWindow window) {
+    // TODO #216: implement side input and windowing
+    return ((WindowedValue<T>) context.getBroadcastVariable(view)).getValue();
+  }
+
+  @Override
+  public <T> boolean contains(final PCollectionView<T> view) {
+    return sideInputs.contains(view);
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return sideInputs.isEmpty();
+  }
+}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
index cd9329657..91fb3e21c 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
@@ -15,6 +15,7 @@
  */
 package org.apache.nemo.compiler.frontend.beam.transform;
 
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
 import org.apache.beam.sdk.transforms.Materializations;
@@ -31,9 +32,9 @@
  * @param <I> input type.
  * @param <O> output type.
  */
-public final class CreateViewTransform<I, O> implements Transform<I, O> {
+public final class CreateViewTransform<I, O> implements Transform<WindowedValue<I>, WindowedValue<O>> {
   private final PCollectionView pCollectionView;
-  private OutputCollector<O> outputCollector;
+  private OutputCollector<WindowedValue<O>> outputCollector;
   private final ViewFn<Materializations.MultimapView<Void, ?>, O> viewFn;
   private final MultiView<Object> multiView;
 
@@ -48,21 +49,22 @@ public CreateViewTransform(final PCollectionView<O> pCollectionView) {
   }
 
   @Override
-  public void prepare(final Context context, final OutputCollector<O> oc) {
+  public void prepare(final Context context, final OutputCollector<WindowedValue<O>> oc) {
     this.outputCollector = oc;
   }
 
   @Override
-  public void onData(final I element) {
-    // Since CreateViewTransform takes KV(Void, value), this is okay
-    final KV<?, ?> kv = (KV<?, ?>) element; // It will throw a type cast exception if the element is not KV
+  public void onData(final WindowedValue<I> element) {
+    // TODO #216: support window in view
+    final KV kv = ((WindowedValue<KV>) element).getValue();
     multiView.getDataList().add(kv.getValue());
   }
 
   @Override
   public void close() {
     final Object view = viewFn.apply(multiView);
-    outputCollector.emit((O) view);
+    // TODO #216: support window in view
+    outputCollector.emit(WindowedValue.valueInGlobalWindow((O) view));
   }
 
   @Override
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DefaultOutputManager.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DefaultOutputManager.java
new file mode 100644
index 000000000..4174c6ca0
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DefaultOutputManager.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+
+import java.util.Map;
+
+/**
+ * Default output emitter that uses outputCollector.
+ * @param <OutputT> output type
+ */
+public final class DefaultOutputManager<OutputT> implements DoFnRunners.OutputManager {
+  private final TupleTag<OutputT> mainOutputTag;
+  private final OutputCollector<WindowedValue<OutputT>> outputCollector;
+  private final Map<String, String> additionalOutputs;
+
+  DefaultOutputManager(final OutputCollector<WindowedValue<OutputT>> outputCollector,
+                       final Transform.Context context,
+                       final TupleTag<OutputT> mainOutputTag) {
+    this.outputCollector = outputCollector;
+    this.mainOutputTag = mainOutputTag;
+    this.additionalOutputs = context.getTagToAdditionalChildren();
+  }
+
+  @Override
+  public <T> void output(final TupleTag<T> tag, final WindowedValue<T> output) {
+    if (tag.equals(mainOutputTag)) {
+      outputCollector.emit((WindowedValue<OutputT>) output);
+    } else {
+      outputCollector.emit(additionalOutputs.get(tag.getId()), output);
+    }
+  }
+}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
new file mode 100644
index 000000000..8dbf0518e
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
@@ -0,0 +1,157 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.*;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * DoFn transform implementation.
+ *
+ * @param <InputT> input type.
+ * @param <OutputT> output type.
+ */
+public final class DoFnTransform<InputT, OutputT> implements
+  Transform<WindowedValue<InputT>, WindowedValue<OutputT>> {
+
+  private OutputCollector<WindowedValue<OutputT>> outputCollector;
+  private final TupleTag<OutputT> mainOutputTag;
+  private final List<TupleTag<?>> additionalOutputTags;
+  private final Collection<PCollectionView<?>> sideInputs;
+  private final WindowingStrategy<?, ?> windowingStrategy;
+  private final DoFn<InputT, OutputT> doFn;
+  private final SerializablePipelineOptions serializedOptions;
+  private transient DoFnRunner<InputT, OutputT> doFnRunner;
+  private transient SideInputReader sideInputReader;
+  private transient DoFnInvoker<InputT, OutputT> doFnInvoker;
+  private final Coder<InputT> inputCoder;
+  private final Map<TupleTag<?>, Coder<?>> outputCoders;
+
+  /**
+   * DoFnTransform Constructor.
+   *
+   * @param doFn    doFn.
+   * @param options Pipeline options.
+   */
+  public DoFnTransform(final DoFn<InputT, OutputT> doFn,
+                       final Coder<InputT> inputCoder,
+                       final Map<TupleTag<?>, Coder<?>> outputCoders,
+                       final TupleTag<OutputT> mainOutputTag,
+                       final List<TupleTag<?>> additionalOutputTags,
+                       final WindowingStrategy<?, ?> windowingStrategy,
+                       final Collection<PCollectionView<?>> sideInputs,
+                       final PipelineOptions options) {
+    this.doFn = doFn;
+    this.inputCoder = inputCoder;
+    this.outputCoders = outputCoders;
+    this.mainOutputTag = mainOutputTag;
+    this.additionalOutputTags = additionalOutputTags;
+    this.sideInputs = sideInputs;
+    this.serializedOptions = new SerializablePipelineOptions(options);
+    this.windowingStrategy = windowingStrategy;
+  }
+
+  @Override
+  public void prepare(final Context context, final OutputCollector<WindowedValue<OutputT>> oc) {
+    // deserialize pipeline option
+    final NemoPipelineOptions options = serializedOptions.get().as(NemoPipelineOptions.class);
+
+    this.outputCollector = oc;
+
+    // create output manager
+    final DoFnRunners.OutputManager outputManager = new DefaultOutputManager<>(
+      outputCollector, context, mainOutputTag);
+
+    // create side input reader
+    if (!sideInputs.isEmpty()) {
+      sideInputReader = new BroadcastVariableSideInputReader(context, sideInputs);
+    } else {
+      sideInputReader = NullSideInputReader.of(sideInputs);
+    }
+
+    // create step context
+    // this transform does not support state and timer.
+    final StepContext stepContext = new StepContext() {
+      @Override
+      public StateInternals stateInternals() {
+        throw new UnsupportedOperationException("Not support stateInternals in DoFnTransform");
+      }
+
+      @Override
+      public TimerInternals timerInternals() {
+        throw new UnsupportedOperationException("Not support timerInternals in DoFnTransform");
+      }
+    };
+
+    // invoker
+    doFnInvoker = DoFnInvokers.invokerFor(doFn);
+    doFnInvoker.invokeSetup();
+
+    // DoFnRunners.simpleRunner takes care of all the hard stuff of running the DoFn
+    // and that this approach is the standard used by most of the Beam runners
+    doFnRunner = DoFnRunners.simpleRunner(
+      options,
+      doFn,
+      sideInputReader,
+      outputManager,
+      mainOutputTag,
+      additionalOutputTags,
+      stepContext,
+      inputCoder,
+      outputCoders,
+      windowingStrategy);
+
+    doFnRunner.startBundle();
+  }
+
+  @Override
+  public void onData(final WindowedValue<InputT> data) {
+    doFnRunner.processElement(data);
+  }
+
+  public DoFn getDoFn() {
+    return doFn;
+  }
+
+  @Override
+  public void close() {
+    doFnRunner.finishBundle();
+    doFnInvoker.invokeTeardown();
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder();
+    sb.append("DoTransform:" + doFn);
+    return sb.toString();
+  }
+}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoTransform.java
deleted file mode 100644
index 3a36ec0d8..000000000
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoTransform.java
+++ /dev/null
@@ -1,449 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nemo.compiler.frontend.beam.transform;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
-import org.apache.nemo.runtime.executor.datatransfer.OutputCollectorImpl;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.state.State;
-import org.apache.beam.sdk.state.TimeDomain;
-import org.apache.beam.sdk.state.Timer;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-/**
- * DoFn transform implementation.
- *
- * @param <I> input type.
- * @param <O> output type.
- */
-public final class DoTransform<I, O> implements Transform<I, O> {
-  private final DoFn doFn;
-  private final ObjectMapper mapper;
-  private final String serializedOptions;
-  private OutputCollector<O> outputCollector;
-  private StartBundleContext startBundleContext;
-  private FinishBundleContext finishBundleContext;
-  private ProcessContext processContext;
-  private DoFnInvoker invoker;
-
-  /**
-   * DoTransform Constructor.
-   *
-   * @param doFn    doFn.
-   * @param options Pipeline options.
-   */
-  public DoTransform(final DoFn doFn, final PipelineOptions options) {
-    this.doFn = doFn;
-    this.mapper = new ObjectMapper();
-    try {
-      this.serializedOptions = mapper.writeValueAsString(options);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public void prepare(final Context context, final OutputCollector<O> oc) {
-    this.outputCollector = oc;
-    this.startBundleContext = new StartBundleContext(doFn, serializedOptions);
-    this.finishBundleContext = new FinishBundleContext(doFn, outputCollector, serializedOptions);
-    this.processContext = new ProcessContext(doFn, outputCollector, context, serializedOptions);
-    this.invoker = DoFnInvokers.invokerFor(doFn);
-    invoker.invokeSetup();
-    invoker.invokeStartBundle(startBundleContext);
-  }
-
-  @Override
-  public void onData(final I data) {
-    processContext.setElement(data);
-    invoker.invokeProcessElement(processContext);
-  }
-
-  @Override
-  public void close() {
-    invoker.invokeFinishBundle(finishBundleContext);
-    invoker.invokeTeardown();
-  }
-
-  @Override
-  public String toString() {
-    final StringBuilder sb = new StringBuilder();
-    sb.append("DoTransform:" + doFn);
-    return sb.toString();
-  }
-
-  /**
-   * StartBundleContext.
-   *
-   * @param <I> input type.
-   * @param <O> output type.
-   */
-  private static final class StartBundleContext<I, O> extends DoFn<I, O>.StartBundleContext {
-    private final ObjectMapper mapper;
-    private final PipelineOptions options;
-
-    /**
-     * StartBundleContext.
-     *
-     * @param fn                DoFn.
-     * @param serializedOptions serialized options of the DoTransform.
-     */
-    StartBundleContext(final DoFn<I, O> fn,
-                       final String serializedOptions) {
-      fn.super();
-      this.mapper = new ObjectMapper();
-      try {
-        this.options = mapper.readValue(serializedOptions, PipelineOptions.class);
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return options;
-    }
-  }
-
-  /**
-   * FinishBundleContext.
-   *
-   * @param <I> input type.
-   * @param <O> output type.
-   */
-  private static final class FinishBundleContext<I, O> extends DoFn<I, O>.FinishBundleContext {
-    private final OutputCollector<O> outputCollector;
-    private final ObjectMapper mapper;
-    private final PipelineOptions options;
-
-    /**
-     * Constructor.
-     *
-     * @param fn                DoFn.
-     * @param outputCollector   outputCollector of the DoTransform.
-     * @param serializedOptions serialized options of the DoTransform.
-     */
-    FinishBundleContext(final DoFn<I, O> fn,
-                        final OutputCollector<O> outputCollector,
-                        final String serializedOptions) {
-      fn.super();
-      this.outputCollector = outputCollector;
-      this.mapper = new ObjectMapper();
-      try {
-        this.options = mapper.readValue(serializedOptions, PipelineOptions.class);
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return options;
-    }
-
-    @Override
-    public void output(final O output, final Instant instant, final BoundedWindow boundedWindow) {
-      outputCollector.emit(output);
-    }
-
-    @Override
-    public <T> void output(final TupleTag<T> tupleTag,
-                           final T t,
-                           final Instant instant,
-                           final BoundedWindow boundedWindow) {
-      throw new UnsupportedOperationException("output(TupleTag, T, Instant, BoundedWindow)"
-          + "in FinishBundleContext under DoTransform");
-    }
-  }
-
-  /**
-   * ProcessContext class. Reference: SimpleDoFnRunner.DoFnProcessContext in BEAM.
-   *
-   * @param <I> input type.
-   * @param <O> output type.
-   */
-  private static final class ProcessContext<I, O> extends DoFn<I, O>.ProcessContext
-      implements DoFnInvoker.ArgumentProvider<I, O> {
-    private I input;
-    private final OutputCollector<O> outputCollector;
-    private final Map<String, String> additionalOutputs;
-    private final Context context;
-    private final ObjectMapper mapper;
-    private final PipelineOptions options;
-
-    /**
-     * ProcessContext Constructor.
-     *
-     * @param fn                 Dofn.
-     * @param outputCollector    OutputCollector.
-     * @param context            Context.
-     * @param serializedOptions  Options, serialized.
-     */
-    ProcessContext(final DoFn<I, O> fn,
-                   final OutputCollector<O> outputCollector,
-                   final Context context,
-                   final String serializedOptions) {
-      fn.super();
-      this.outputCollector = outputCollector;
-      this.context = context;
-      this.additionalOutputs = context.getTagToAdditionalChildren();
-      this.mapper = new ObjectMapper();
-      try {
-        this.options = mapper.readValue(serializedOptions, PipelineOptions.class);
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    /**
-     * Setter for input element.
-     *
-     * @param in input element.
-     */
-    void setElement(final I in) {
-      this.input = in;
-    }
-
-    @Override
-    public I element() {
-      return this.input;
-    }
-
-    @Override
-    public Row asRow(final String id) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public <T> T sideInput(final PCollectionView<T> view) {
-      return (T) context.getBroadcastVariable(view);
-    }
-
-    @Override
-    public Instant timestamp() {
-      throw new UnsupportedOperationException("timestamp() in ProcessContext under DoTransform");
-    }
-
-    @Override
-    public PaneInfo pane() {
-      return PaneInfo.createPane(true, true, PaneInfo.Timing.UNKNOWN);
-    }
-
-    @Override
-    public void updateWatermark(final Instant instant) {
-      throw new UnsupportedOperationException("updateWatermark() in ProcessContext under DoTransform");
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return this.options;
-    }
-
-    @Override
-    public void output(final O output) {
-      outputCollector.emit(output);
-    }
-
-    @Override
-    public void outputWithTimestamp(final O output, final Instant timestamp) {
-      outputCollector.emit(output);
-    }
-
-    @Override
-    public <T> void output(final TupleTag<T> tupleTag, final T t) {
-      final Object dstVertexId = additionalOutputs.get(tupleTag.getId());
-
-      if (dstVertexId == null) {
-        outputCollector.emit((O) t);
-      } else {
-        outputCollector.emit(additionalOutputs.get(tupleTag.getId()), t);
-      }
-    }
-
-    @Override
-    public <T> void outputWithTimestamp(final TupleTag<T> tupleTag, final T t, final Instant instant) {
-      throw new UnsupportedOperationException("output(TupleTag, T, Instant) in ProcessContext under DoTransform");
-    }
-
-    @Override
-    public BoundedWindow window() {
-      // Unbounded windows are not supported for now.
-      return GlobalWindow.INSTANCE;
-    }
-
-    @Override
-    public PaneInfo paneInfo(final DoFn<I, O> doFn) {
-      return PaneInfo.createPane(true, true, PaneInfo.Timing.UNKNOWN);
-    }
-
-    @Override
-    public PipelineOptions pipelineOptions() {
-      return options;
-    }
-
-    @Override
-    public DoFn<I, O>.StartBundleContext startBundleContext(final DoFn<I, O> doFn) {
-      throw new UnsupportedOperationException("StartBundleContext parameters are not supported.");
-    }
-
-    @Override
-    public DoFn<I, O>.FinishBundleContext finishBundleContext(final DoFn<I, O> doFn) {
-      throw new UnsupportedOperationException("FinishBundleContext parameters are not supported.");
-    }
-
-    @Override
-    public DoFn.ProcessContext
-    processContext(final DoFn<I, O> doFn) {
-      return this;
-    }
-
-    @Override
-    public DoFn.OnTimerContext
-    onTimerContext(final DoFn<I, O> doFn) {
-      throw new UnsupportedOperationException("onTimerContext() in ProcessContext under DoTransform");
-    }
-
-    @Override
-    public I element(final DoFn<I, O> doFn) {
-      return this.input;
-    }
-
-    @Override
-    public Instant timestamp(final DoFn<I, O> doFn) {
-      return Instant.now();
-    }
-
-    @Override
-    public RestrictionTracker<?, ?> restrictionTracker() {
-      throw new UnsupportedOperationException("restrictionTracker() in ProcessContext under DoTransform");
-    }
-
-    @Override
-    public TimeDomain timeDomain(final DoFn<I, O> doFn) {
-      throw new UnsupportedOperationException("timeDomain() in ProcessContext under DoTransform");
-    }
-
-    @Override
-    public DoFn.OutputReceiver<O> outputReceiver(final DoFn<I, O> doFn) {
-      return new OutputReceiver<>((OutputCollectorImpl) outputCollector);
-    }
-
-    @Override
-    public DoFn.OutputReceiver<Row> outputRowReceiver(final DoFn<I, O> doFn) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public DoFn.MultiOutputReceiver taggedOutputReceiver(final DoFn<I, O> doFn) {
-      return new MultiOutputReceiver((OutputCollectorImpl) outputCollector, additionalOutputs);
-    }
-
-    @Override
-    public State state(final String stateId) {
-      throw new UnsupportedOperationException("state() in ProcessContext under DoTransform");
-    }
-
-    @Override
-    public Timer timer(final String timerId) {
-      throw new UnsupportedOperationException("timer() in ProcessContext under DoTransform");
-    }
-  }
-
-  /**
-   * @return {@link DoFn} for this transform.
-   */
-  public DoFn getDoFn() {
-    return doFn;
-  }
-
-  /**
-   * OutputReceiver class.
-   * @param <O> output type
-   */
-  static final class OutputReceiver<O> implements DoFn.OutputReceiver<O> {
-    private final List<O> dataElements;
-
-    OutputReceiver(final OutputCollectorImpl<O> outputCollector) {
-      this.dataElements = outputCollector.getMainTagOutputQueue();
-    }
-
-    OutputReceiver(final OutputCollectorImpl outputCollector,
-                   final TupleTag<O> tupleTag,
-                   final Map<String, String> tagToVertex) {
-      final Object dstVertexId = tagToVertex.get(tupleTag.getId());
-      if (dstVertexId == null) {
-        this.dataElements = outputCollector.getMainTagOutputQueue();
-      } else {
-        this.dataElements = (List<O>) outputCollector.getAdditionalTagOutputQueue((String) dstVertexId);
-      }
-    }
-
-    @Override
-    public void output(final O output) {
-      dataElements.add(output);
-    }
-
-    @Override
-    public void outputWithTimestamp(final O output, final Instant timestamp) {
-      dataElements.add(output);
-    }
-  }
-
-  /**
-   * MultiOutputReceiver class.
-   */
-  static final class MultiOutputReceiver implements DoFn.MultiOutputReceiver {
-    private final OutputCollectorImpl outputCollector;
-    private final Map<String, String> tagToVertex;
-
-    /**
-     * Constructor.
-     * @param outputCollector outputCollector
-     * @param tagToVertex     tag to vertex map
-     */
-    MultiOutputReceiver(final OutputCollectorImpl outputCollector,
-                               final Map<String, String> tagToVertex) {
-      this.outputCollector = outputCollector;
-      this.tagToVertex = tagToVertex;
-    }
-
-    @Override
-    public <T> DoFn.OutputReceiver<T> get(final TupleTag<T> tag) {
-      return new OutputReceiver<>(this.outputCollector, tag, tagToVertex);
-    }
-
-    @Override
-    public <T> OutputReceiver<Row> getRowReceiver(final TupleTag<T> tag) {
-      throw new UnsupportedOperationException();
-    }
-  }
-}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
index b0e64290a..38b26411d 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
@@ -15,6 +15,7 @@
  */
 package org.apache.nemo.compiler.frontend.beam.transform;
 
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
 import org.apache.beam.sdk.values.KV;
@@ -27,10 +28,10 @@
  * Group Beam KVs.
  * @param <I> input type.
  */
-public final class GroupByKeyTransform<I> implements Transform<I, KV<Object, List>> {
+public final class GroupByKeyTransform<I> implements Transform<I, WindowedValue<KV<Object, List>>> {
   private static final Logger LOG = LoggerFactory.getLogger(GroupByKeyTransform.class.getName());
   private final Map<Object, List> keyToValues;
-  private OutputCollector<KV<Object, List>> outputCollector;
+  private OutputCollector<WindowedValue<KV<Object, List>>> outputCollector;
 
   /**
    * GroupByKey constructor.
@@ -40,23 +41,27 @@ public GroupByKeyTransform() {
   }
 
   @Override
-  public void prepare(final Context context, final OutputCollector<KV<Object, List>> oc) {
+  public void prepare(final Context context, final OutputCollector<WindowedValue<KV<Object, List>>> oc) {
     this.outputCollector = oc;
   }
 
   @Override
   public void onData(final I element) {
-    final KV kv = (KV) element;
+    // TODO #129: support window in group by key for windowed groupByKey
+    final WindowedValue<KV> windowedValue = (WindowedValue<KV>) element;
+    final KV kv = windowedValue.getValue();
     keyToValues.putIfAbsent(kv.getKey(), new ArrayList());
     keyToValues.get(kv.getKey()).add(kv.getValue());
   }
 
   @Override
   public void close() {
+    // TODO #129: support window in group by key for windowed groupByKey
     if (keyToValues.isEmpty()) {
       LOG.warn("Beam GroupByKeyTransform received no data!");
     } else {
-      keyToValues.entrySet().stream().map(entry -> KV.of(entry.getKey(), entry.getValue()))
+      keyToValues.entrySet().stream().map(entry ->
+        WindowedValue.valueInGlobalWindow(KV.of(entry.getKey(), entry.getValue())))
           .forEach(outputCollector::emit);
       keyToValues.clear();
     }
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java
new file mode 100644
index 000000000..e7c91354b
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import com.google.common.collect.Iterables;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.joda.time.Instant;
+
+import java.util.Collection;
+
+/**
+ * Windowing transform implementation.
+ * This transform simply windows the given elements into
+ * finite windows according to a user-specified WindowFnTransform.
+ * @param <T> input/output type.
+ * @param <W> window type
+ */
+public final class WindowFnTransform<T, W extends BoundedWindow>
+  implements Transform<WindowedValue<T>, WindowedValue<T>> {
+  private final WindowFn windowFn;
+  private OutputCollector<WindowedValue<T>> outputCollector;
+
+  /**
+   * Default Constructor.
+   * @param windowFn windowFn for the Transform.
+   */
+  public WindowFnTransform(final WindowFn windowFn) {
+    this.windowFn = windowFn;
+  }
+
+  @Override
+  public void prepare(final Context context, final OutputCollector<WindowedValue<T>> oc) {
+    this.outputCollector = oc;
+  }
+
+  @Override
+  public void onData(final WindowedValue<T> windowedValue) {
+    final BoundedWindow boundedWindow = Iterables.getOnlyElement(windowedValue.getWindows());
+    final T element = windowedValue.getValue();
+    final Instant timestamp = windowedValue.getTimestamp();
+
+    try {
+      final Collection<W> windows =
+        ((WindowFn<T, W>) windowFn)
+          .assignWindows(
+            ((WindowFn<T, W>) windowFn).new AssignContext() {
+              @Override
+              public T element() {
+                return element;
+              }
+
+              @Override
+              public Instant timestamp() {
+                return timestamp;
+              }
+
+              @Override
+              public BoundedWindow window() {
+                return boundedWindow;
+              }
+            });
+
+      // Emit compressed windows for efficiency
+      outputCollector.emit(WindowedValue.of(element, timestamp, windows, PaneInfo.NO_FIRING));
+    } catch (final Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder();
+    sb.append("WindowFnTransform:" + windowFn);
+    return sb.toString();
+  }
+}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowTransform.java
deleted file mode 100644
index 20bc7238d..000000000
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowTransform.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nemo.compiler.frontend.beam.transform;
-
-import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-
-/**
- * Windowing transform implementation.
- * This transform simply windows the given elements into finite windows according to a user-specified WindowTransform.
- * As this functionality is unnecessary for batch processing workloads and for Runtime, this is left as below.
- * @param <T> input/output type.
- */
-public final class WindowTransform<T> implements Transform<T, T> {
-  private final WindowFn windowFn;
-  private OutputCollector<T> outputCollector;
-
-  /**
-   * Default Constructor.
-   * @param windowFn windowFn for the Transform.
-   */
-  public WindowTransform(final WindowFn windowFn) {
-    this.windowFn = windowFn;
-  }
-
-  @Override
-  public void prepare(final Context context, final OutputCollector<T> oc) {
-    this.outputCollector = oc;
-  }
-
-  @Override
-  public void onData(final T element) {
-    // TODO #1: Support Beam Streaming in Compiler.
-    outputCollector.emit(element);
-  }
-
-  @Override
-  public void close() {
-  }
-
-  @Override
-  public String toString() {
-    final StringBuilder sb = new StringBuilder();
-    sb.append("WindowTransform:" + windowFn);
-    return sb.toString();
-  }
-}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/PairKeyExtractor.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/PairKeyExtractor.java
new file mode 100644
index 000000000..909af4b4a
--- /dev/null
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/PairKeyExtractor.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.compiler.optimizer;
+
+import org.apache.nemo.common.KeyExtractor;
+import org.apache.nemo.common.Pair;
+
+/**
+ * Extracts the key from a pair element.
+ */
+public final class PairKeyExtractor implements KeyExtractor {
+  @Override
+  public Object extractKey(final Object element) {
+    if (element instanceof Pair) {
+      return ((Pair) element).left();
+    } else {
+      throw new IllegalStateException(element.toString());
+    }
+  }
+}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java
index adf573b1c..e4743b5c6 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java
@@ -30,6 +30,7 @@
 import org.apache.nemo.common.ir.vertex.OperatorVertex;
 import org.apache.nemo.common.ir.vertex.transform.AggregateMetricTransform;
 import org.apache.nemo.common.ir.vertex.transform.MetricCollectTransform;
+import org.apache.nemo.compiler.optimizer.PairKeyExtractor;
 import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -178,7 +179,7 @@ private IREdge generateEdgeToABV(final IREdge edge,
     newEdge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
     newEdge.setProperty(DataPersistenceProperty.of(DataPersistenceProperty.Value.Keep));
     newEdge.setProperty(DataFlowProperty.of(DataFlowProperty.Value.Pull));
-    newEdge.setProperty(KeyExtractorProperty.of(edge.getPropertyValue(KeyExtractorProperty.class).get()));
+    newEdge.setProperty(KeyExtractorProperty.of(new PairKeyExtractor()));
     newEdge.setProperty(AdditionalOutputTagProperty.of("DynOptData"));
 
     // Dynamic optimization handles statistics on key-value data by default.
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/backend/nemo/DAGConverterTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/backend/nemo/DAGConverterTest.java
index 2f87188cd..0770c00b6 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/backend/nemo/DAGConverterTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/backend/nemo/DAGConverterTest.java
@@ -27,7 +27,6 @@
 import org.apache.nemo.common.ir.vertex.executionproperty.ResourcePriorityProperty;
 import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
-import org.apache.nemo.compiler.frontend.beam.transform.DoTransform;
 import org.apache.nemo.common.test.EmptyComponents;
 import org.apache.nemo.conf.JobConf;
 import org.apache.nemo.runtime.common.plan.PhysicalPlanGenerator;
@@ -66,7 +65,7 @@ public void testSimplePlan() throws Exception {
     v1.setProperty(ParallelismProperty.of(3));
     irDAGBuilder.addVertex(v1);
 
-    final IRVertex v2 = new OperatorVertex(new DoTransform(null, null));
+    final IRVertex v2 = new OperatorVertex(mock(Transform.class));
     v2.setProperty(ParallelismProperty.of(2));
     irDAGBuilder.addVertex(v2);
 
@@ -113,7 +112,7 @@ public void testComplexPlan() throws Exception {
     v1.setProperty(ResourcePriorityProperty.of(ResourcePriorityProperty.COMPUTE));
 
     final Transform t = mock(Transform.class);
-    final DoTransform dt = new DoTransform(null, null);
+    final Transform dt = mock(Transform.class);
     final IRVertex v2 = new OperatorVertex(t);
     v2.setProperty(ParallelismProperty.of(3));
     v2.setProperty(ResourcePriorityProperty.of(ResourcePriorityProperty.COMPUTE));
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
new file mode 100644
index 000000000..a30ee4638
--- /dev/null
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
@@ -0,0 +1,287 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
+import org.apache.reef.io.Tuple;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.*;
+
+import static java.util.Collections.emptyList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public final class DoFnTransformTest {
+
+  // views and windows for testing side inputs
+  private PCollectionView<Iterable<String>> view1;
+  private PCollectionView<Iterable<String>> view2;
+
+  private final static Coder NULL_INPUT_CODER = null;
+  private final static Map<TupleTag<?>, Coder<?>> NULL_OUTPUT_CODERS = null;
+
+  @Before
+  public void setUp() {
+    Pipeline.create().apply(Create.of("1"));
+    view1 = Pipeline.create().apply(Create.of("1")).apply(View.asIterable());
+    view2 = Pipeline.create().apply(Create.of("2")).apply(View.asIterable());
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testSingleOutput() {
+
+    final TupleTag<String> outputTag = new TupleTag<>("main-output");
+
+    final DoFnTransform<String, String> doFnTransform =
+      new DoFnTransform<>(
+        new IdentityDoFn<>(),
+        NULL_INPUT_CODER,
+        NULL_OUTPUT_CODERS,
+        outputTag,
+        Collections.emptyList(),
+        WindowingStrategy.globalDefault(),
+        emptyList(), /* side inputs */
+        PipelineOptionsFactory.as(NemoPipelineOptions.class));
+
+    final Transform.Context context = mock(Transform.Context.class);
+    final OutputCollector<WindowedValue<String>> oc = new TestOutputCollector<>();
+    doFnTransform.prepare(context, oc);
+
+    doFnTransform.onData(WindowedValue.valueInGlobalWindow("Hello"));
+
+    assertEquals(((TestOutputCollector<String>) oc).outputs.get(0), WindowedValue.valueInGlobalWindow("Hello"));
+
+    doFnTransform.close();
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testMultiOutputOutput() {
+
+    TupleTag<String> mainOutput = new TupleTag<>("main-output");
+    TupleTag<String> additionalOutput1 = new TupleTag<>("output-1");
+    TupleTag<String> additionalOutput2 = new TupleTag<>("output-2");
+
+    ImmutableList<TupleTag<?>> tags = ImmutableList.of(additionalOutput1, additionalOutput2);
+
+    ImmutableMap<String, String> tagsMap =
+      ImmutableMap.<String, String>builder()
+        .put(additionalOutput1.getId(), additionalOutput1.getId())
+        .put(additionalOutput2.getId(), additionalOutput2.getId())
+        .build();
+
+    final DoFnTransform<String, String> doFnTransform =
+      new DoFnTransform<>(
+        new MultiOutputDoFn(additionalOutput1, additionalOutput2),
+        NULL_INPUT_CODER,
+        NULL_OUTPUT_CODERS,
+        mainOutput,
+        tags,
+        WindowingStrategy.globalDefault(),
+        emptyList(), /* side inputs */
+        PipelineOptionsFactory.as(NemoPipelineOptions.class));
+
+    // mock context
+    final Transform.Context context = mock(Transform.Context.class);
+    when(context.getTagToAdditionalChildren()).thenReturn(tagsMap);
+
+    final OutputCollector<WindowedValue<String>> oc = new TestOutputCollector<>();
+    doFnTransform.prepare(context, oc);
+
+    doFnTransform.onData(WindowedValue.valueInGlobalWindow("one"));
+    doFnTransform.onData(WindowedValue.valueInGlobalWindow("two"));
+    doFnTransform.onData(WindowedValue.valueInGlobalWindow("hello"));
+
+    // main output
+    assertEquals(WindowedValue.valueInGlobalWindow("got: hello"),
+      ((TestOutputCollector<String>) oc).outputs.get(0));
+
+    // additional output 1
+    assertTrue(((TestOutputCollector<String>) oc).getTaggedOutputs().contains(
+      new Tuple<>(additionalOutput1.getId(), WindowedValue.valueInGlobalWindow("extra: one"))
+    ));
+    assertTrue(((TestOutputCollector<String>) oc).getTaggedOutputs().contains(
+      new Tuple<>(additionalOutput1.getId(), WindowedValue.valueInGlobalWindow("got: hello"))
+    ));
+
+    // additional output 2
+    assertTrue(((TestOutputCollector<String>) oc).getTaggedOutputs().contains(
+      new Tuple<>(additionalOutput2.getId(), WindowedValue.valueInGlobalWindow("extra: two"))
+    ));
+    assertTrue(((TestOutputCollector<String>) oc).getTaggedOutputs().contains(
+      new Tuple<>(additionalOutput2.getId(), WindowedValue.valueInGlobalWindow("got: hello"))
+    ));
+
+    doFnTransform.close();
+  }
+
+
+  // TODO #216: implement side input and windowing
+  @Test
+  public void testSideInputs() {
+    // mock context
+    final Transform.Context context = mock(Transform.Context.class);
+    when(context.getBroadcastVariable(view1)).thenReturn(
+      WindowedValue.valueInGlobalWindow(ImmutableList.of("1")));
+    when(context.getBroadcastVariable(view2)).thenReturn(
+      WindowedValue.valueInGlobalWindow(ImmutableList.of("2")));
+
+    TupleTag<Tuple<String, Iterable<String>>> outputTag = new TupleTag<>("main-output");
+
+    WindowedValue<String> first = WindowedValue.valueInGlobalWindow("first");
+    WindowedValue<String> second = WindowedValue.valueInGlobalWindow("second");
+
+    final Map<String, PCollectionView<Iterable<String>>> eventAndViewMap =
+      ImmutableMap.of(first.getValue(), view1, second.getValue(), view2);
+
+    final DoFnTransform<String, Tuple<String, Iterable<String>>> doFnTransform =
+      new DoFnTransform<>(
+        new SimpleSideInputDoFn<>(eventAndViewMap),
+        NULL_INPUT_CODER,
+        NULL_OUTPUT_CODERS,
+        outputTag,
+        Collections.emptyList(),
+        WindowingStrategy.globalDefault(),
+        ImmutableList.of(view1, view2), /* side inputs */
+        PipelineOptionsFactory.as(NemoPipelineOptions.class));
+
+    final OutputCollector<WindowedValue<Tuple<String, Iterable<String>>>> oc = new TestOutputCollector<>();
+    doFnTransform.prepare(context, oc);
+
+    doFnTransform.onData(first);
+    doFnTransform.onData(second);
+
+    assertEquals(WindowedValue.valueInGlobalWindow(new Tuple<>("first", ImmutableList.of("1"))),
+      ((TestOutputCollector<Tuple<String,Iterable<String>>>) oc).getOutput().get(0));
+
+    assertEquals(WindowedValue.valueInGlobalWindow(new Tuple<>("second", ImmutableList.of("2"))),
+      ((TestOutputCollector<Tuple<String,Iterable<String>>>) oc).getOutput().get(1));
+
+    doFnTransform.close();
+  }
+
+  private static final class TestOutputCollector<T> implements OutputCollector<WindowedValue<T>> {
+    private final List<WindowedValue<T>> outputs;
+    private final List<Tuple<String, WindowedValue<T>>> taggedOutputs;
+
+    TestOutputCollector() {
+      this.outputs = new LinkedList<>();
+      this.taggedOutputs = new LinkedList<>();
+    }
+
+    @Override
+    public void emit(WindowedValue<T> output) {
+      outputs.add(output);
+    }
+
+    @Override
+    public <O> void emit(String dstVertexId, O output) {
+      final WindowedValue<T> val = (WindowedValue<T>) output;
+      final Tuple<String, WindowedValue<T>> tuple = new Tuple<>(dstVertexId, val);
+      taggedOutputs.add(tuple);
+    }
+
+    public List<WindowedValue<T>> getOutput() {
+      return outputs;
+    }
+
+    public List<Tuple<String, WindowedValue<T>>> getTaggedOutputs() {
+      return taggedOutputs;
+    }
+  }
+
+  /**
+   * Identitiy do fn.
+   * @param <T> type
+   */
+  private static class IdentityDoFn<T> extends DoFn<T, T> {
+    @ProcessElement
+    public void processElement(final ProcessContext c) throws Exception {
+      c.output(c.element());
+    }
+  }
+
+  /**
+   * Side input do fn.
+   * @param <T> type
+   */
+  private static class SimpleSideInputDoFn<T, V> extends DoFn<T, Tuple<T, V>> {
+    private final Map<T, PCollectionView<V>> idAndViewMap;
+
+    public SimpleSideInputDoFn(final Map<T, PCollectionView<V>> idAndViewMap) {
+      this.idAndViewMap = idAndViewMap;
+    }
+
+    @ProcessElement
+    public void processElement(final ProcessContext c) throws Exception {
+      final PCollectionView<V> view = idAndViewMap.get(c.element());
+      final V sideInput = c.sideInput(view);
+      c.output(new Tuple<>(c.element(), sideInput));
+    }
+  }
+
+
+  /**
+   * Multi output do fn.
+   */
+  private static class MultiOutputDoFn extends DoFn<String, String> {
+    private TupleTag<String> additionalOutput1;
+    private TupleTag<String> additionalOutput2;
+
+    public MultiOutputDoFn(TupleTag<String> additionalOutput1, TupleTag<String> additionalOutput2) {
+      this.additionalOutput1 = additionalOutput1;
+      this.additionalOutput2 = additionalOutput2;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      if ("one".equals(c.element())) {
+        c.output(additionalOutput1, "extra: one");
+      } else if ("two".equals(c.element())) {
+        c.output(additionalOutput2, "extra: two");
+      } else {
+        c.output("got: " + c.element());
+        c.output(additionalOutput1, "got: " + c.element());
+        c.output(additionalOutput2, "got: " + c.element());
+      }
+    }
+  }
+}
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
index 01d9efe7f..b2315daf8 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
@@ -15,6 +15,8 @@
  */
 package org.apache.nemo.examples.beam;
 
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.nemo.compiler.frontend.beam.transform.LoopCompositeTransform;
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
 import org.apache.nemo.common.Pair;
@@ -225,7 +227,9 @@ public void processElement(final ProcessContext c) throws Exception {
     @FinishBundle
     public void finishBundle(final FinishBundleContext c) {
       for (Integer i = 0; i < gradients.size(); i++) {
-        c.output(KV.of(i, gradients.get(i)), null, null);
+        // this enforces a global window (batching),
+        // where all data elements of the corresponding PCollection are grouped and emitted downstream together
+        c.output(KV.of(i, gradients.get(i)), BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE);
       }
       LOG.info("stats: " + gradients.get(numClasses - 1).toString());
     }
diff --git a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java
index b8a2dcc4a..6081a9544 100644
--- a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java
+++ b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java
@@ -93,7 +93,7 @@ public void setUp() throws Exception {
     runPhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined, injector);
   }
 
-  @Test(timeout=7000)
+  @Test(timeout=60000)
   public void testExecutorRemoved() throws Exception {
     // Until the plan finishes, events happen
     while (!planStateManager.isPlanDone()) {
@@ -119,7 +119,7 @@ public void testExecutorRemoved() throws Exception {
     assertTrue(planStateManager.isPlanDone());
   }
 
-  @Test(timeout=7000)
+  @Test(timeout=60000)
   public void testTaskOutputWriteFailure() throws Exception {
     // Three executors are used
     executorAdded(1.0);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services