You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ta...@apache.org on 2018/11/21 09:03:49 UTC

[incubator-nemo] branch master updated: [NEMO-216, 251, 259] Support side inputs and windowing (#159)

This is an automated email from the ASF dual-hosted git repository.

taegeonum pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d9452d  [NEMO-216,251,259] Support side inputs and windowing (#159)
7d9452d is described below

commit 7d9452d31d41fdaaeed8da4d84bcb06adb32bcca
Author: John Yang <jo...@gmail.com>
AuthorDate: Wed Nov 21 18:03:45 2018 +0900

    [NEMO-216,251,259] Support side inputs and windowing (#159)
    
    JIRA: [NEMO-216: Support side inputs and windowing](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-216)
    (+ NEMO-251, NEMO-259)
    
    **Major changes:**
    - MaterializedViewReader (implements ReadyCheckingSideInputReader) and appropriate push-back logics
    - SideInputTransform: Wraps materialized views with an index in SideInputElement to distinguish multiple sideinputs destined to the same DoFn (replaces the previous usage of BroadcastVariable ids)
    - In the runtime, treat a SideInput as just another DataFetcher (do not use BroadcastManagerWorker): all sideinput-specific logics stay in the Beam frontend
    
    **Minor changes to note:**
    - Clean up watermark propagation flow for DataFetchers
    - In MultiThreadParentTaskDataFetcher, let the task thread handle watermarks just like other elements and not data fetcher threads
    
    **Tests for the changes:**
    - WindowedBroadcastITCase
    - DoFnTransformTest#testSideInputs
    
    **Other comments:**
    - This is the last sub-issue of the umbrella issue: [NEMO-229] Support watermark, so once this PR is merged we should be ready to mark that as resolved
---
 .../org/apache/nemo/common/dag/DAGBuilder.java     |   9 --
 .../apache/nemo/common/punctuation/Watermark.java  |   2 +-
 .../frontend/beam/InMemorySideInputReader.java     | 108 ++++++++++++++
 .../frontend/beam/PipelineTranslationContext.java  |  98 +++++++------
 .../compiler/frontend/beam/PipelineTranslator.java |  69 ++++++---
 .../compiler/frontend/beam/SideInputElement.java   |  36 +++--
 .../{FloatArrayCoder.java => SideInputCoder.java}  |  35 ++---
 .../beam/source/BeamBoundedSourceVertex.java       |  12 +-
 .../beam/source/BeamUnboundedSourceVertex.java     |  12 +-
 .../beam/transform/AbstractDoFnTransform.java      |  99 +++++++++----
 .../BroadcastVariableSideInputReader.java          |  63 --------
 .../beam/transform/CreateViewTransform.java        |  28 ++--
 .../frontend/beam/transform/DoFnTransform.java     |  26 +---
 .../GroupByKeyAndWindowDoFnTransform.java          |  41 ++----
 .../beam/transform/GroupByKeyTransform.java        |   8 --
 .../beam/transform/PushBackDoFnTransform.java      | 160 +++++++++++++++++++++
 .../beam/transform/SideInputTransform.java         |  72 ++++++++++
 .../frontend/beam/transform/WindowFnTransform.java |   7 +-
 .../frontend/beam/BeamFrontendALSTest.java         |   2 +-
 .../frontend/beam/BeamFrontendMLRTest.java         |   4 +-
 .../beam/transform/CreateViewTransformTest.java    |   2 +
 .../frontend/beam/transform/DoFnTransformTest.java | 121 ++++++++++------
 .../GroupByKeyAndWindowDoFnTransformTest.java      |  19 ++-
 .../reshaping/LoopInvariantCodeMotionPassTest.java |   2 +-
 .../nemo/examples/beam/AlternatingLeastSquare.java |   2 -
 .../beam/AlternatingLeastSquareInefficient.java    |   2 -
 .../nemo/examples/beam}/FloatArrayCoder.java       |   2 +-
 .../apache/nemo/examples/beam}/IntArrayCoder.java  |   2 +-
 .../nemo/examples/beam/WindowedBroadcast.java      |  90 ++++++++++++
 .../nemo/examples/beam/WindowedWordCount.java      |   2 +-
 .../examples/beam/WindowedBroadcastITCase.java     |  67 +++++++++
 .../examples/beam/WindowedWordCountITCase.java     |  11 +-
 .../org/apache/nemo/runtime/executor/Executor.java |   2 +-
 .../executor/data/BroadcastManagerWorker.java      |  68 +++------
 .../DedicatedKeyPerElementPartitioner.java         |   1 +
 .../datatransfer/MultiInputWatermarkManager.java   |   1 -
 .../OperatorVertexOutputCollector.java             |   1 -
 .../executor/datatransfer/PipeOutputWriter.java    |  15 +-
 .../datatransfer/SingleInputWatermarkManager.java  |   3 +
 .../nemo/runtime/executor/task/DataFetcher.java    |   8 ++
 .../task/MultiThreadParentTaskDataFetcher.java     |   6 +-
 .../executor/task/ParentTaskDataFetcher.java       |   3 +-
 .../nemo/runtime/executor/task/TaskExecutor.java   | 100 ++++++-------
 .../executor/task/ParentTaskDataFetcherTest.java   |   7 +-
 .../runtime/executor/task/TaskExecutorTest.java    |  25 +---
 45 files changed, 948 insertions(+), 505 deletions(-)

diff --git a/common/src/main/java/org/apache/nemo/common/dag/DAGBuilder.java b/common/src/main/java/org/apache/nemo/common/dag/DAGBuilder.java
index a6269a0..6a6ca4d 100644
--- a/common/src/main/java/org/apache/nemo/common/dag/DAGBuilder.java
+++ b/common/src/main/java/org/apache/nemo/common/dag/DAGBuilder.java
@@ -20,7 +20,6 @@ package org.apache.nemo.common.dag;
 
 import org.apache.nemo.common.exception.CompileTimeOptimizationException;
 import org.apache.nemo.common.ir.edge.IREdge;
-import org.apache.nemo.common.ir.edge.executionproperty.BroadcastVariableIdProperty;
 import org.apache.nemo.common.ir.edge.executionproperty.DataFlowProperty;
 import org.apache.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
 import org.apache.nemo.common.ir.vertex.*;
@@ -258,14 +257,6 @@ public final class DAGBuilder<V extends Vertex, E extends Edge<V>> implements Se
    * Helper method to check that all execution properties are correct and makes sense.
    */
   private void executionPropertyCheck() {
-    // SideInput is not compatible with Push
-    vertices.forEach(v -> incomingEdges.get(v).stream().filter(e -> e instanceof IREdge).map(e -> (IREdge) e)
-        .filter(e -> e.getPropertyValue(BroadcastVariableIdProperty.class).isPresent())
-        .filter(e -> DataFlowProperty.Value.Push.equals(e.getPropertyValue(DataFlowProperty.class).get()))
-        .forEach(e -> {
-          throw new CompileTimeOptimizationException("DAG execution property check: "
-              + "Broadcast edge is not compatible with push" + e.getId());
-        }));
     // DataSizeMetricCollection is not compatible with Push (All data have to be stored before the data collection)
     vertices.forEach(v -> incomingEdges.get(v).stream().filter(e -> e instanceof IREdge).map(e -> (IREdge) e)
         .filter(e -> Optional.of(MetricCollectionProperty.Value.DataSkewRuntimePass)
diff --git a/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java b/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
index e676e6e..1055f0b 100644
--- a/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
+++ b/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
@@ -49,7 +49,7 @@ public final class Watermark implements Serializable, Comparable<Watermark> {
 
   @Override
   public String toString() {
-    return String.valueOf(timestamp);
+    return String.valueOf("Watermark(" + timestamp + ")");
   }
 
   @Override
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/InMemorySideInputReader.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/InMemorySideInputReader.java
new file mode 100644
index 0000000..9a18b0c
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/InMemorySideInputReader.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam;
+
+import org.apache.beam.runners.core.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.transforms.ViewFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.nemo.common.Pair;
+import org.apache.nemo.compiler.frontend.beam.transform.CreateViewTransform;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.*;
+
+/**
+ * Accumulates and provides side inputs in memory.
+ */
+public final class InMemorySideInputReader implements ReadyCheckingSideInputReader {
+  private static final Logger LOG = LoggerFactory.getLogger(InMemorySideInputReader.class.getName());
+
+  private long curWatermark = Long.MIN_VALUE;
+
+  private final Collection<PCollectionView<?>> sideInputsToRead;
+  private final Map<Pair<PCollectionView<?>, BoundedWindow>, Object> inMemorySideInputs;
+
+  public InMemorySideInputReader(final Collection<PCollectionView<?>> sideInputsToRead) {
+    this.sideInputsToRead = sideInputsToRead;
+    this.inMemorySideInputs = new HashMap<>();
+  }
+
+  @Override
+  public boolean isReady(final PCollectionView view, final BoundedWindow window) {
+    return window.maxTimestamp().getMillis() < curWatermark
+      || inMemorySideInputs.containsKey(Pair.of(view, window));
+  }
+
+  @Nullable
+  @Override
+  public <T> T get(final PCollectionView<T> view, final BoundedWindow window) {
+    // This gets called after isReady()
+    final T sideInputData = (T) inMemorySideInputs.get(Pair.of(view, window));
+    return sideInputData == null
+      // The upstream gave us an empty sideInput
+      ? ((ViewFn<Object, T>) view.getViewFn()).apply(new CreateViewTransform.MultiView<T>(Collections.emptyList()))
+      // The upstream gave us a concrete sideInput
+      : sideInputData;
+  }
+
+  @Override
+  public <T> boolean contains(final PCollectionView<T> view) {
+    return sideInputsToRead.contains(view);
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return sideInputsToRead.isEmpty();
+  }
+
+  /**
+   * Stores the side input in memory to be used with main inputs.
+   * @param view of the side input.
+   * @param sideInputElement to add.
+   */
+  public void addSideInputElement(final PCollectionView<?> view,
+                                  final WindowedValue<SideInputElement<?>> sideInputElement) {
+    for (final BoundedWindow bw : sideInputElement.getWindows()) {
+      inMemorySideInputs.put(Pair.of(view, bw), sideInputElement.getValue().getSideInputValue());
+    }
+  }
+
+  /**
+   * Say a DoFn of this reader has 3 main inputs and 4 side inputs.
+   * {@link org.apache.nemo.runtime.executor.datatransfer.InputWatermarkManager} guarantees that the watermark here
+   * is the minimum of the all 7 input streams.
+   * @param newWatermark to set.
+   */
+  public void setCurrentWatermarkOfAllMainAndSideInputs(final long newWatermark) {
+    if (curWatermark > newWatermark) {
+      // Cannot go backwards in time.
+      throw new IllegalStateException(curWatermark + " > " + newWatermark);
+    }
+
+    this.curWatermark = newWatermark;
+    // TODO #282: Handle late data
+    inMemorySideInputs.entrySet().removeIf(entry -> {
+      return entry.getKey().right().maxTimestamp().getMillis() <= this.curWatermark; // Discard old sideinputs.
+    });
+  }
+}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java
index 722f421..d54a7cd 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java
@@ -35,6 +35,7 @@ import org.apache.nemo.common.ir.vertex.OperatorVertex;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
 import org.apache.nemo.compiler.frontend.beam.coder.BeamDecoderFactory;
 import org.apache.nemo.compiler.frontend.beam.coder.BeamEncoderFactory;
+import org.apache.nemo.compiler.frontend.beam.coder.SideInputCoder;
 import org.apache.nemo.compiler.frontend.beam.transform.*;
 
 import java.util.*;
@@ -92,68 +93,83 @@ final class PipelineTranslationContext {
   }
 
   /**
+   * Say the dstIRVertex consumes three views: view0, view1, and view2.
+   *
+   * We translate that as the following:
+   * view0 -> SideInputTransform(index=0) ->
+   * view1 -> SideInputTransform(index=1) -> dstIRVertex(with a map from indices to PCollectionViews)
+   * view2 -> SideInputTransform(index=2) ->
+   *
+   * @param dstVertex vertex.
+   * @param sideInputs of the vertex.
+   */
+  void addSideInputEdges(final IRVertex dstVertex, final Map<Integer, PCollectionView<?>> sideInputs) {
+    for (final Map.Entry<Integer, PCollectionView<?>> entry : sideInputs.entrySet()) {
+      final int index = entry.getKey();
+      final PCollectionView view = entry.getValue();
+
+      final IRVertex srcVertex = pValueToProducerVertex.get(view);
+      final IRVertex sideInputTransformVertex = new OperatorVertex(new SideInputTransform(index));
+      addVertex(sideInputTransformVertex);
+      final Coder viewCoder = getCoderForView(view, this);
+      final Coder windowCoder = view.getPCollection().getWindowingStrategy().getWindowFn().windowCoder();
+
+      // First edge: view to transform
+      final IREdge firstEdge =
+        new IREdge(CommunicationPatternProperty.Value.OneToOne, srcVertex, sideInputTransformVertex);
+      addEdge(firstEdge, viewCoder, windowCoder);
+
+      // Second edge: transform to the dstIRVertex
+      final IREdge secondEdge =
+        new IREdge(CommunicationPatternProperty.Value.OneToOne, sideInputTransformVertex, dstVertex);
+      final WindowedValue.FullWindowedValueCoder sideInputElementCoder =
+        WindowedValue.getFullCoder(SideInputCoder.of(viewCoder), windowCoder);
+
+      secondEdge.setProperty(EncoderProperty.of(new BeamEncoderFactory(sideInputElementCoder)));
+      secondEdge.setProperty(DecoderProperty.of(new BeamDecoderFactory(sideInputElementCoder)));
+      builder.connectVertices(secondEdge);
+    }
+  }
+
+  /**
    * Add IR edge to the builder.
    *
    * @param dst the destination IR vertex.
    * @param input the {@link PValue} {@code dst} consumes
    */
   void addEdgeTo(final IRVertex dst, final PValue input) {
-    final Coder coder;
     if (input instanceof PCollection) {
-      coder = ((PCollection) input).getCoder();
-    } else if (input instanceof PCollectionView) {
-      coder = getCoderForView((PCollectionView) input, this);
-    } else {
-      throw new RuntimeException(String.format("While adding an edge to %s, coder for PValue %s cannot "
-        + "be determined", dst, input));
-    }
-    addEdgeTo(dst, input, coder);
-  }
+      final Coder elementCoder = ((PCollection) input).getCoder();
+      final Coder windowCoder = ((PCollection) input).getWindowingStrategy().getWindowFn().windowCoder();
+      final IRVertex src = pValueToProducerVertex.get(input);
+      if (src == null) {
+        throw new IllegalStateException(String.format("Cannot find a vertex that emits pValue %s", input));
+      }
 
-  void addEdgeTo(final IRVertex dst, final PValue input, final Coder elementCoder) {
-    final IRVertex src = pValueToProducerVertex.get(input);
-    if (src == null) {
-      throw new IllegalStateException(String.format("Cannot find a vertex that emits pValue %s", input));
-    }
+      final CommunicationPatternProperty.Value communicationPattern = getCommPattern(src, dst);
+      final IREdge edge = new IREdge(communicationPattern, src, dst);
 
-    final Coder windowCoder;
-    final CommunicationPatternProperty.Value communicationPattern = getCommPattern(src, dst);
-    final IREdge edge = new IREdge(communicationPattern, src, dst);
+      if (pValueToTag.containsKey(input)) {
+        edge.setProperty(AdditionalOutputTagProperty.of(pValueToTag.get(input).getId()));
+      }
 
-    if (pValueToTag.containsKey(input)) {
-      edge.setProperty(AdditionalOutputTagProperty.of(pValueToTag.get(input).getId()));
-    }
-    if (input instanceof PCollectionView) {
-      edge.setProperty(BroadcastVariableIdProperty.of((PCollectionView) input));
-    }
-    if (input instanceof PCollection) {
-      windowCoder = ((PCollection) input).getWindowingStrategy().getWindowFn().windowCoder();
-    } else if (input instanceof PCollectionView) {
-      windowCoder = ((PCollectionView) input).getPCollection()
-        .getWindowingStrategy().getWindowFn().windowCoder();
+      addEdge(edge, elementCoder, windowCoder);
     } else {
-      throw new RuntimeException(String.format("While adding an edge from %s, to %s, coder for PValue %s cannot "
-        + "be determined", src, dst, input));
+      throw new IllegalStateException(input.toString());
     }
-
-    addEdgeTo(edge, elementCoder, windowCoder);
   }
 
-  void addEdgeTo(final IREdge edge,
-                 final Coder elementCoder,
-                 final Coder windowCoder) {
+  void addEdge(final IREdge edge, final Coder elementCoder, final Coder windowCoder) {
     edge.setProperty(KeyExtractorProperty.of(new BeamKeyExtractor()));
-
     if (elementCoder instanceof KvCoder) {
       Coder keyCoder = ((KvCoder) elementCoder).getKeyCoder();
       edge.setProperty(KeyEncoderProperty.of(new BeamEncoderFactory(keyCoder)));
       edge.setProperty(KeyDecoderProperty.of(new BeamDecoderFactory(keyCoder)));
     }
 
-    edge.setProperty(EncoderProperty.of(
-      new BeamEncoderFactory<>(WindowedValue.getFullCoder(elementCoder, windowCoder))));
-    edge.setProperty(DecoderProperty.of(
-      new BeamDecoderFactory<>(WindowedValue.getFullCoder(elementCoder, windowCoder))));
+    final WindowedValue.FullWindowedValueCoder coder = WindowedValue.getFullCoder(elementCoder, windowCoder);
+    edge.setProperty(EncoderProperty.of(new BeamEncoderFactory<>(coder)));
+    edge.setProperty(DecoderProperty.of(new BeamDecoderFactory<>(coder)));
 
     builder.connectVertices(edge);
   }
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index 7a22ba8..a6ae6ce 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -25,6 +25,7 @@ import org.apache.beam.runners.core.construction.TransformInputs;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.nemo.common.ir.edge.IREdge;
 import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
@@ -48,7 +49,9 @@ import java.lang.annotation.*;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.*;
+import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 /**
  * A collection of translators for the Beam PTransforms.
@@ -164,7 +167,7 @@ final class PipelineTranslator {
   private static void unboundedReadTranslator(final PipelineTranslationContext ctx,
                                               final TransformHierarchy.Node beamNode,
                                               final Read.Unbounded<?> transform) {
-    final IRVertex vertex = new BeamUnboundedSourceVertex<>(transform.getSource());
+    final IRVertex vertex = new BeamUnboundedSourceVertex<>(transform.getSource(), DisplayData.from(transform));
     ctx.addVertex(vertex);
     beamNode.getInputs().values().forEach(input -> ctx.addEdgeTo(vertex, input));
     beamNode.getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(beamNode, vertex, output));
@@ -174,7 +177,7 @@ final class PipelineTranslator {
   private static void boundedReadTranslator(final PipelineTranslationContext ctx,
                                             final TransformHierarchy.Node beamNode,
                                             final Read.Bounded<?> transform) {
-    final IRVertex vertex = new BeamBoundedSourceVertex<>(transform.getSource());
+    final IRVertex vertex = new BeamBoundedSourceVertex<>(transform.getSource(), DisplayData.from(transform));
     ctx.addVertex(vertex);
     beamNode.getInputs().values().forEach(input -> ctx.addEdgeTo(vertex, input));
     beamNode.getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(beamNode, vertex, output));
@@ -184,14 +187,15 @@ final class PipelineTranslator {
   private static void parDoSingleOutputTranslator(final PipelineTranslationContext ctx,
                                                   final TransformHierarchy.Node beamNode,
                                                   final ParDo.SingleOutput<?, ?> transform) {
-    final DoFnTransform doFnTransform = createDoFnTransform(ctx, beamNode);
+    final Map<Integer, PCollectionView<?>> sideInputMap = getSideInputMap(transform.getSideInputs());
+    final AbstractDoFnTransform doFnTransform = createDoFnTransform(ctx, beamNode, sideInputMap);
     final IRVertex vertex = new OperatorVertex(doFnTransform);
 
     ctx.addVertex(vertex);
     beamNode.getInputs().values().stream()
       .filter(input -> !transform.getAdditionalInputs().values().contains(input))
       .forEach(input -> ctx.addEdgeTo(vertex, input));
-    transform.getSideInputs().forEach(input -> ctx.addEdgeTo(vertex, input));
+    ctx.addSideInputEdges(vertex, sideInputMap);
     beamNode.getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(beamNode, vertex, output));
   }
 
@@ -199,13 +203,14 @@ final class PipelineTranslator {
   private static void parDoMultiOutputTranslator(final PipelineTranslationContext ctx,
                                                  final TransformHierarchy.Node beamNode,
                                                  final ParDo.MultiOutput<?, ?> transform) {
-    final DoFnTransform doFnTransform = createDoFnTransform(ctx, beamNode);
+    final Map<Integer, PCollectionView<?>> sideInputMap = getSideInputMap(transform.getSideInputs());
+    final AbstractDoFnTransform doFnTransform = createDoFnTransform(ctx, beamNode, sideInputMap);
     final IRVertex vertex = new OperatorVertex(doFnTransform);
     ctx.addVertex(vertex);
     beamNode.getInputs().values().stream()
       .filter(input -> !transform.getAdditionalInputs().values().contains(input))
       .forEach(input -> ctx.addEdgeTo(vertex, input));
-    transform.getSideInputs().forEach(input -> ctx.addEdgeTo(vertex, input));
+    ctx.addSideInputEdges(vertex, sideInputMap);
     beamNode.getOutputs().entrySet().stream()
       .filter(pValueWithTupleTag -> pValueWithTupleTag.getKey().equals(transform.getMainOutputTag()))
       .forEach(pValueWithTupleTag -> ctx.registerMainOutputFrom(beamNode, vertex, pValueWithTupleTag.getValue()));
@@ -237,7 +242,8 @@ final class PipelineTranslator {
     } else {
       throw new UnsupportedOperationException(String.format("%s is not supported", transform));
     }
-    final IRVertex vertex = new OperatorVertex(new WindowFnTransform(windowFn));
+    final IRVertex vertex = new OperatorVertex(
+      new WindowFnTransform(windowFn, DisplayData.from(beamNode.getTransform())));
     ctx.addVertex(vertex);
     beamNode.getInputs().values().forEach(input -> ctx.addEdgeTo(vertex, input));
     beamNode.getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(beamNode, vertex, output));
@@ -317,7 +323,7 @@ final class PipelineTranslator {
     final IRVertex finalCombine = new OperatorVertex(new CombineFnFinalTransform<>(combineFn));
     ctx.addVertex(finalCombine);
     final IREdge edge = new IREdge(CommunicationPatternProperty.Value.Shuffle, partialCombine, finalCombine);
-    ctx.addEdgeTo(
+    ctx.addEdge(
       edge,
       KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder),
       input.getWindowingStrategy().getWindowFn().windowCoder());
@@ -348,27 +354,45 @@ final class PipelineTranslator {
   ////////////////////////////////////////////////////////////////////////////////////////////////////////
   /////////////////////// HELPER METHODS
 
-  private static DoFnTransform createDoFnTransform(final PipelineTranslationContext ctx,
-                                                   final TransformHierarchy.Node beamNode) {
+  private static Map<Integer, PCollectionView<?>> getSideInputMap(final List<PCollectionView<?>> viewList) {
+    return IntStream.range(0, viewList.size()).boxed().collect(Collectors.toMap(Function.identity(), viewList::get));
+  }
+
+  private static AbstractDoFnTransform createDoFnTransform(final PipelineTranslationContext ctx,
+                                                           final TransformHierarchy.Node beamNode,
+                                                           final Map<Integer, PCollectionView<?>> sideInputMap) {
     try {
       final AppliedPTransform pTransform = beamNode.toAppliedPTransform(ctx.getPipeline());
       final DoFn doFn = ParDoTranslation.getDoFn(pTransform);
       final TupleTag mainOutputTag = ParDoTranslation.getMainOutputTag(pTransform);
-      final List<PCollectionView<?>> sideInputs = ParDoTranslation.getSideInputs(pTransform);
       final TupleTagList additionalOutputTags = ParDoTranslation.getAdditionalOutputTags(pTransform);
 
       final PCollection<?> mainInput = (PCollection<?>)
         Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform));
 
-      return new DoFnTransform(
-        doFn,
-        mainInput.getCoder(),
-        getOutputCoders(pTransform),
-        mainOutputTag,
-        additionalOutputTags.getAll(),
-        mainInput.getWindowingStrategy(),
-        sideInputs,
-        ctx.getPipelineOptions());
+      if (sideInputMap.isEmpty()) {
+        return new DoFnTransform(
+          doFn,
+          mainInput.getCoder(),
+          getOutputCoders(pTransform),
+          mainOutputTag,
+          additionalOutputTags.getAll(),
+          mainInput.getWindowingStrategy(),
+          ctx.getPipelineOptions(),
+          DisplayData.from(beamNode.getTransform()));
+      } else {
+        return new PushBackDoFnTransform(
+          doFn,
+          mainInput.getCoder(),
+          getOutputCoders(pTransform),
+          mainOutputTag,
+          additionalOutputTags.getAll(),
+          mainInput.getWindowingStrategy(),
+          sideInputMap,
+          ctx.getPipelineOptions(),
+          DisplayData.from(beamNode.getTransform()));
+
+      }
     } catch (final IOException e) {
       throw new RuntimeException(e);
     }
@@ -404,11 +428,10 @@ final class PipelineTranslator {
       return new GroupByKeyAndWindowDoFnTransform(
         getOutputCoders(pTransform),
         mainOutputTag,
-        Collections.emptyList(),  /*  GBK does not have additional outputs */
         mainInput.getWindowingStrategy(),
-        Collections.emptyList(), /*  GBK does not have additional side inputs */
         ctx.getPipelineOptions(),
-        SystemReduceFn.buffering(mainInput.getCoder()));
+        SystemReduceFn.buffering(mainInput.getCoder()),
+        DisplayData.from(beamNode.getTransform()));
     }
   }
 
diff --git a/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/BroadcastVariableIdProperty.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/SideInputElement.java
similarity index 54%
rename from common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/BroadcastVariableIdProperty.java
rename to compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/SideInputElement.java
index d7e8aa4..22f8d72 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/BroadcastVariableIdProperty.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/SideInputElement.java
@@ -16,31 +16,27 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.nemo.common.ir.edge.executionproperty;
-
-import org.apache.nemo.common.ir.executionproperty.EdgeExecutionProperty;
-
-import java.io.Serializable;
+package org.apache.nemo.compiler.frontend.beam;
 
 /**
- * Edges with this property fetch a broadcast variable.
+ * {@link org.apache.nemo.compiler.frontend.beam.transform.DoFnTransform} treats elements of this type as side inputs.
+ * TODO #289: Prevent using SideInputElement in UDFs
+ * @param <T> type of the side input value.
  */
-public final class BroadcastVariableIdProperty extends EdgeExecutionProperty<Serializable> {
+public final class SideInputElement<T> {
+  private final int sideInputIndex;
+  private final T sideInputValue;
+
+  public SideInputElement(final int sideInputIndex, final T sideInputValue) {
+    this.sideInputIndex = sideInputIndex;
+    this.sideInputValue = sideInputValue;
+  }
 
-  /**
-   * Constructor.
-   * @param value id.
-   */
-  private BroadcastVariableIdProperty(final Serializable value) {
-    super(value);
+  public int getSideInputIndex() {
+    return sideInputIndex;
   }
 
-  /**
-   * Static method exposing constructor.
-   * @param value id.
-   * @return the newly created execution property.
-   */
-  public static BroadcastVariableIdProperty of(final Serializable value) {
-    return new BroadcastVariableIdProperty(value);
+  public T getSideInputValue() {
+    return sideInputValue;
   }
 }
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/FloatArrayCoder.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/SideInputCoder.java
similarity index 55%
copy from compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/FloatArrayCoder.java
copy to compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/SideInputCoder.java
index dff48ee..59a1792 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/FloatArrayCoder.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/SideInputCoder.java
@@ -19,43 +19,44 @@
 package org.apache.nemo.compiler.frontend.beam.coder;
 
 import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.nemo.compiler.frontend.beam.SideInputElement;
 
 import java.io.*;
 
 /**
- * EncoderFactory for float[].
+ * EncoderFactory for side inputs.
+ * @param <T> type of the side input value.
  */
-public final class FloatArrayCoder extends AtomicCoder<float[]> {
+public final class SideInputCoder<T> extends AtomicCoder<SideInputElement<T>> {
+  private final Coder<T> valueCoder;
+
   /**
    * Private constructor.
    */
-  private FloatArrayCoder() {
+  private SideInputCoder(final Coder<T> valueCoder) {
+    this.valueCoder = valueCoder;
   }
 
   /**
    * @return a new coder
    */
-  public static FloatArrayCoder of() {
-    return new FloatArrayCoder();
+  public static SideInputCoder of(final Coder valueCoder) {
+    return new SideInputCoder<>(valueCoder);
   }
 
   @Override
-  public void encode(final float[] ary, final OutputStream outStream) throws IOException {
+  public void encode(final SideInputElement<T> sideInputElement, final OutputStream outStream) throws IOException {
     final DataOutputStream dataOutputStream = new DataOutputStream(outStream);
-    dataOutputStream.writeInt(ary.length);
-    for (float f : ary) {
-      dataOutputStream.writeFloat(f);
-    }
+    dataOutputStream.writeInt(sideInputElement.getSideInputIndex());
+    valueCoder.encode(sideInputElement.getSideInputValue(), dataOutputStream);
   }
 
   @Override
-  public float[] decode(final InputStream inStream) throws IOException {
+  public SideInputElement<T> decode(final InputStream inStream) throws IOException {
     final DataInputStream dataInputStream = new DataInputStream(inStream);
-    final int floatArrayLen = dataInputStream.readInt();
-    final float[] floatArray = new float[floatArrayLen];
-    for (int i = 0; i < floatArrayLen; i++) {
-      floatArray[i] = dataInputStream.readFloat();
-    }
-    return floatArray;
+    final int index = dataInputStream.readInt();
+    final T value = valueCoder.decode(inStream);
+    return new SideInputElement<>(index, value);
   }
 }
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
index bc672b7..30947fd 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
@@ -19,6 +19,7 @@
 package org.apache.nemo.compiler.frontend.beam.source;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.nemo.common.ir.Readable;
 
@@ -42,17 +43,18 @@ import org.slf4j.LoggerFactory;
 public final class BeamBoundedSourceVertex<O> extends SourceVertex<WindowedValue<O>> {
   private static final Logger LOG = LoggerFactory.getLogger(BeamBoundedSourceVertex.class.getName());
   private BoundedSource<O> source;
-  private final String sourceDescription;
+  private final DisplayData displayData;
 
   /**
    * Constructor of BeamBoundedSourceVertex.
    *
    * @param source BoundedSource to read from.
+   * @param displayData data to display.
    */
-  public BeamBoundedSourceVertex(final BoundedSource<O> source) {
+  public BeamBoundedSourceVertex(final BoundedSource<O> source, final DisplayData displayData) {
     super();
     this.source = source;
-    this.sourceDescription = source.toString();
+    this.displayData = displayData;
   }
 
   /**
@@ -63,7 +65,7 @@ public final class BeamBoundedSourceVertex<O> extends SourceVertex<WindowedValue
   public BeamBoundedSourceVertex(final BeamBoundedSourceVertex that) {
     super(that);
     this.source = that.source;
-    this.sourceDescription = that.source.toString();
+    this.displayData = that.displayData;
   }
 
   @Override
@@ -94,7 +96,7 @@ public final class BeamBoundedSourceVertex<O> extends SourceVertex<WindowedValue
   @Override
   public ObjectNode getPropertiesAsJsonNode() {
     final ObjectNode node = getIRVertexPropertiesAsJsonNode();
-    node.put("source", sourceDescription);
+    node.put("source", displayData.toString());
     return node;
   }
 
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
index ad40d1b..5942925 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
@@ -20,6 +20,7 @@ package org.apache.nemo.compiler.frontend.beam.source;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.nemo.common.ir.Readable;
@@ -44,22 +45,23 @@ public final class BeamUnboundedSourceVertex<O, M extends UnboundedSource.Checkp
 
   private static final Logger LOG = LoggerFactory.getLogger(BeamUnboundedSourceVertex.class.getName());
   private UnboundedSource<O, M> source;
-  private final String sourceDescription;
+  private final DisplayData displayData;
 
   /**
    * The default constructor for beam unbounded source.
    * @param source unbounded source.
    */
-  public BeamUnboundedSourceVertex(final UnboundedSource<O, M> source) {
+  public BeamUnboundedSourceVertex(final UnboundedSource<O, M> source,
+                                   final DisplayData displayData) {
     super();
     this.source = source;
-    this.sourceDescription = source.toString();
+    this.displayData = displayData;
   }
 
   private BeamUnboundedSourceVertex(final BeamUnboundedSourceVertex<O, M> that) {
     super(that);
     this.source = that.source;
-    this.sourceDescription = that.source.toString();
+    this.displayData = that.displayData;
   }
 
   @Override
@@ -88,7 +90,7 @@ public final class BeamUnboundedSourceVertex<O, M extends UnboundedSource.Checkp
   @Override
   public ObjectNode getPropertiesAsJsonNode() {
     final ObjectNode node = getIRVertexPropertiesAsJsonNode();
-    node.put("source", sourceDescription);
+    node.put("source", displayData.toString());
     return node;
   }
 
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
index dd5ca35..72113d6 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
@@ -23,6 +23,7 @@ import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -31,11 +32,12 @@ import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.compiler.frontend.beam.InMemorySideInputReader;
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collection;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -52,7 +54,7 @@ public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
 
   private final TupleTag<OutputT> mainOutputTag;
   private final List<TupleTag<?>> additionalOutputTags;
-  private final Collection<PCollectionView<?>> sideInputs;
+  private final Map<Integer, PCollectionView<?>> sideInputs;
   private final WindowingStrategy<?, ?> windowingStrategy;
   private final DoFn<InterT, OutputT> doFn;
   private final SerializablePipelineOptions serializedOptions;
@@ -61,9 +63,13 @@ public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
 
   private transient OutputCollector<WindowedValue<OutputT>> outputCollector;
   private transient DoFnRunner<InterT, OutputT> doFnRunner;
-  private transient SideInputReader sideInputReader;
+
+  // null when there is no side input.
+  private transient PushbackSideInputDoFnRunner<InterT, OutputT> pushBackRunner;
+
   private transient DoFnInvoker<InterT, OutputT> doFnInvoker;
   private transient DoFnRunners.OutputManager outputManager;
+  private transient InMemorySideInputReader sideInputReader;
 
   // Variables for bundle.
   // We consider count and time millis for start/finish bundle.
@@ -74,6 +80,7 @@ public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
   private long prevBundleStartTime;
   private long currBundleCount = 0;
   private boolean bundleFinished = true;
+  private final DisplayData displayData;
 
   /**
    * AbstractDoFnTransform constructor.
@@ -85,6 +92,7 @@ public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
    * @param windowingStrategy windowing strategy
    * @param sideInputs side inputs
    * @param options pipeline options
+   * @param displayData display data.
    */
   public AbstractDoFnTransform(final DoFn<InterT, OutputT> doFn,
                                final Coder<InputT> inputCoder,
@@ -92,8 +100,9 @@ public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
                                final TupleTag<OutputT> mainOutputTag,
                                final List<TupleTag<?>> additionalOutputTags,
                                final WindowingStrategy<?, ?> windowingStrategy,
-                               final Collection<PCollectionView<?>> sideInputs,
-                               final PipelineOptions options) {
+                               final Map<Integer, PCollectionView<?>> sideInputs,
+                               final PipelineOptions options,
+                               final DisplayData displayData) {
     this.doFn = doFn;
     this.inputCoder = inputCoder;
     this.outputCoders = outputCoders;
@@ -102,28 +111,37 @@ public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
     this.sideInputs = sideInputs;
     this.serializedOptions = new SerializablePipelineOptions(options);
     this.windowingStrategy = windowingStrategy;
+    this.displayData = displayData;
   }
 
-  protected final DoFnRunners.OutputManager getOutputManager() {
-    return outputManager;
+  final Map<Integer, PCollectionView<?>> getSideInputs() {
+    return sideInputs;
   }
 
-  protected final WindowingStrategy getWindowingStrategy() {
-    return windowingStrategy;
+  final DoFnRunners.OutputManager getOutputManager() {
+    return outputManager;
   }
 
-  protected final SideInputReader getSideInputReader() {
-    return sideInputReader;
+  final WindowingStrategy getWindowingStrategy() {
+    return windowingStrategy;
   }
 
-  protected final TupleTag<OutputT> getMainOutputTag() {
+  final TupleTag<OutputT> getMainOutputTag() {
     return mainOutputTag;
   }
 
-  protected final DoFnRunner<InterT, OutputT> getDoFnRunner() {
+  final DoFnRunner<InterT, OutputT> getDoFnRunner() {
     return doFnRunner;
   }
 
+  final PushbackSideInputDoFnRunner<InterT, OutputT> getPushBackRunner() {
+    return pushBackRunner;
+  }
+
+  final InMemorySideInputReader getSideInputReader() {
+    return sideInputReader;
+  }
+
   public final DoFn getDoFn() {
     return doFn;
   }
@@ -131,26 +149,51 @@ public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
   /**
    * Checks whether the bundle is finished or not.
    * Starts the bundle if it is done.
+   *
+   * TODO #263: Partial Combining for Beam Streaming
+   * We may want to use separate methods for doFnRunner/pushBackRunner
+   * (same applies to the other bundle-related methods)
    */
-  protected final void checkAndInvokeBundle() {
+  final void checkAndInvokeBundle() {
     if (bundleFinished) {
       bundleFinished = false;
-      doFnRunner.startBundle();
+      if (pushBackRunner == null) {
+        doFnRunner.startBundle();
+      } else {
+        pushBackRunner.startBundle();
+      }
       prevBundleStartTime = System.currentTimeMillis();
       currBundleCount = 0;
     }
     currBundleCount += 1;
   }
 
-
   /**
    * Checks whether it is time to finish the bundle and finish it.
    */
-  protected final void checkAndFinishBundle() {
+  final void checkAndFinishBundle() {
     if (!bundleFinished) {
       if (currBundleCount >= bundleSize || System.currentTimeMillis() - prevBundleStartTime >= bundleMillis) {
         bundleFinished = true;
+        if (pushBackRunner == null) {
+          doFnRunner.finishBundle();
+        } else {
+          pushBackRunner.finishBundle();
+        }
+      }
+    }
+  }
+
+  /**
+   * Finish bundle without checking for conditions.
+   */
+  final void forceFinishBundle() {
+    if (!bundleFinished) {
+      bundleFinished = true;
+      if (pushBackRunner == null) {
         doFnRunner.finishBundle();
+      } else {
+        pushBackRunner.finishBundle();
       }
     }
   }
@@ -168,11 +211,7 @@ public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
     outputManager = new DefaultOutputManager<>(outputCollector, mainOutputTag);
 
     // create side input reader
-    if (!sideInputs.isEmpty()) {
-      sideInputReader = new BroadcastVariableSideInputReader(context, sideInputs);
-    } else {
-      sideInputReader = NullSideInputReader.of(sideInputs);
-    }
+    sideInputReader = new InMemorySideInputReader(new ArrayList<>(sideInputs.values()));
 
     // this transform does not support state and timer.
     final StepContext stepContext = new StepContext() {
@@ -205,6 +244,10 @@ public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
       inputCoder,
       outputCoders,
       windowingStrategy);
+
+    pushBackRunner = sideInputs.isEmpty()
+      ? null
+      : SimplePushbackSideInputDoFnRunner.<InterT, OutputT>create(doFnRunner, sideInputs.values(), sideInputReader);
   }
 
   public final OutputCollector<WindowedValue<OutputT>> getOutputCollector() {
@@ -214,12 +257,15 @@ public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
   @Override
   public final void close() {
     beforeClose();
-    if (!bundleFinished) {
-      doFnRunner.finishBundle();
-    }
+    forceFinishBundle();
     doFnInvoker.invokeTeardown();
   }
 
+  @Override
+  public final String toString() {
+    return this.getClass().getSimpleName() + " / " + displayData.toString().replace(":", " / ");
+  }
+
   /**
    * An abstract function that wraps the original doFn.
    * @param originalDoFn the original doFn.
@@ -234,9 +280,6 @@ public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
    */
   abstract OutputCollector wrapOutputCollector(final OutputCollector oc);
 
-  @Override
-  public abstract void onData(final WindowedValue<InputT> data);
-
   /**
    * An abstract function that is called before close.
    */
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/BroadcastVariableSideInputReader.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/BroadcastVariableSideInputReader.java
deleted file mode 100644
index 64460f9..0000000
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/BroadcastVariableSideInputReader.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.compiler.frontend.beam.transform;
-
-import org.apache.beam.runners.core.SideInputReader;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
-
-import javax.annotation.Nullable;
-import java.util.Collection;
-
-/**
- * A sideinput reader that reads/writes side input values to context.
- */
-public final class BroadcastVariableSideInputReader implements SideInputReader {
-
-  // Nemo context for storing/getting side inputs
-  private final Transform.Context context;
-
-  // The list of side inputs that we're handling
-  private final Collection<PCollectionView<?>> sideInputs;
-
-  BroadcastVariableSideInputReader(final Transform.Context context,
-                                   final Collection<PCollectionView<?>> sideInputs) {
-    this.context = context;
-    this.sideInputs = sideInputs;
-  }
-
-  @Nullable
-  @Override
-  public <T> T get(final PCollectionView<T> view, final BoundedWindow window) {
-    // TODO #216: implement side input and windowing
-    return ((WindowedValue<T>) context.getBroadcastVariable(view)).getValue();
-  }
-
-  @Override
-  public <T> boolean contains(final PCollectionView<T> view) {
-    return sideInputs.contains(view);
-  }
-
-  @Override
-  public boolean isEmpty() {
-    return sideInputs.isEmpty();
-  }
-}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
index 05e5af6..03313c4 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
@@ -37,14 +37,12 @@ import java.util.*;
  * @param <I> input type
  * @param <O> materialized output type
  */
-public final class CreateViewTransform<I, O> implements
-  Transform<WindowedValue<KV<?, I>>, WindowedValue<O>> {
-  private OutputCollector<WindowedValue<O>> outputCollector;
+public final class CreateViewTransform<I, O> implements Transform<WindowedValue<KV<?, I>>, WindowedValue<O>> {
   private final ViewFn<Materializations.MultimapView<Void, ?>, O> viewFn;
   private final Map<BoundedWindow, List<I>> windowListMap;
 
-  // TODO #259: we can remove this variable by implementing ReadyCheckingSideInputReader
-  private boolean isEmitted = false;
+  private OutputCollector<WindowedValue<O>> outputCollector;
+
   private long currentOutputWatermark;
 
   /**
@@ -75,7 +73,6 @@ public final class CreateViewTransform<I, O> implements
 
   @Override
   public void onWatermark(final Watermark inputWatermark) {
-
     // If no data, just forwards the watermark
     if (windowListMap.size() == 0 && currentOutputWatermark < inputWatermark.getTimestamp()) {
       currentOutputWatermark = inputWatermark.getTimestamp();
@@ -90,11 +87,10 @@ public final class CreateViewTransform<I, O> implements
       final Map.Entry<BoundedWindow, List<I>> entry = iterator.next();
       if (entry.getKey().maxTimestamp().getMillis() <= inputWatermark.getTimestamp()) {
         // emit the windowed data if the watermark timestamp > the window max boundary
-        final O view = viewFn.apply(new MultiView<>(entry.getValue()));
+        final O output = viewFn.apply(new MultiView<>(entry.getValue()));
         outputCollector.emit(WindowedValue.of(
-          view, entry.getKey().maxTimestamp(), entry.getKey(), PaneInfo.ON_TIME_AND_ONLY_FIRING));
+          output, entry.getKey().maxTimestamp(), entry.getKey(), PaneInfo.ON_TIME_AND_ONLY_FIRING));
         iterator.remove();
-        isEmitted = true;
 
         minOutputTimestampOfEmittedWindows =
           Math.min(minOutputTimestampOfEmittedWindows, entry.getKey().maxTimestamp().getMillis());
@@ -112,20 +108,12 @@ public final class CreateViewTransform<I, O> implements
   @Override
   public void close() {
     onWatermark(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
-
-    if (!isEmitted) {
-      // TODO #259: This is an ad-hoc code to resolve the view that has no data
-      // Currently, broadCastWorker reads the view data, but it throws exception if no data is available for a view.
-      // We should use watermark value to track whether the materialized data in a view is available or not.
-      final O view = viewFn.apply(new MultiView<>(Collections.emptyList()));
-      outputCollector.emit(WindowedValue.valueInGlobalWindow(view));
-    }
   }
 
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder();
-    sb.append("CreateViewTransform:" + viewFn);
+    sb.append("CreateViewTransform  " + viewFn.getClass().getName());
     return sb.toString();
   }
 
@@ -133,13 +121,13 @@ public final class CreateViewTransform<I, O> implements
    * Represents {@code PrimitiveViewT} supplied to the {@link ViewFn}.
    * @param <T> primitive view type
    */
-  public final class MultiView<T> implements Materializations.MultimapView<Void, T>, Serializable {
+  public static final class MultiView<T> implements Materializations.MultimapView<Void, T>, Serializable {
     private final Iterable<T> iterable;
 
     /**
      * Constructor.
      */
-    MultiView(final Iterable<T> iterable) {
+    public MultiView(final Iterable<T> iterable) {
       // Create a placeholder for side input data. CreateViewTransform#onData stores data to this list.
       this.iterable = iterable;
     }
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
index 9f7a4e0..699a0dd 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
@@ -21,8 +21,8 @@ package org.apache.nemo.compiler.frontend.beam.transform;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.nemo.common.ir.OutputCollector;
@@ -30,12 +30,12 @@ import org.apache.nemo.common.punctuation.Watermark;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
 /**
- * DoFn transform implementation.
+ * DoFn transform implementation when there is no side input.
  *
  * @param <InputT> input type.
  * @param <OutputT> output type.
@@ -45,9 +45,6 @@ public final class DoFnTransform<InputT, OutputT> extends AbstractDoFnTransform<
 
   /**
    * DoFnTransform Constructor.
-   *
-   * @param doFn    doFn.
-   * @param options Pipeline options.
    */
   public DoFnTransform(final DoFn<InputT, OutputT> doFn,
                        final Coder<InputT> inputCoder,
@@ -55,10 +52,10 @@ public final class DoFnTransform<InputT, OutputT> extends AbstractDoFnTransform<
                        final TupleTag<OutputT> mainOutputTag,
                        final List<TupleTag<?>> additionalOutputTags,
                        final WindowingStrategy<?, ?> windowingStrategy,
-                       final Collection<PCollectionView<?>> sideInputs,
-                       final PipelineOptions options) {
+                       final PipelineOptions options,
+                       final DisplayData displayData) {
     super(doFn, inputCoder, outputCoders, mainOutputTag,
-      additionalOutputTags, windowingStrategy, sideInputs, options);
+      additionalOutputTags, windowingStrategy, Collections.emptyMap(), options, displayData);
   }
 
   @Override
@@ -68,6 +65,7 @@ public final class DoFnTransform<InputT, OutputT> extends AbstractDoFnTransform<
 
   @Override
   public void onData(final WindowedValue<InputT> data) {
+    // Do not need any push-back logic.
     checkAndInvokeBundle();
     getDoFnRunner().processElement(data);
     checkAndFinishBundle();
@@ -76,26 +74,16 @@ public final class DoFnTransform<InputT, OutputT> extends AbstractDoFnTransform<
   @Override
   public void onWatermark(final Watermark watermark) {
     checkAndInvokeBundle();
-    // TODO #216: We should consider push-back data that waits for side input
-    // TODO #216: If there are push-back data, input watermark >= output watermark
     getOutputCollector().emitWatermark(watermark);
     checkAndFinishBundle();
   }
 
   @Override
   protected void beforeClose() {
-    // nothing
   }
 
   @Override
   OutputCollector wrapOutputCollector(final OutputCollector oc) {
     return oc;
   }
-
-  @Override
-  public String toString() {
-    final StringBuilder sb = new StringBuilder();
-    sb.append("DoTransform:" + getDoFn());
-    return sb.toString();
-  }
 }
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
index 84b6835..06dde58 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
@@ -22,9 +22,9 @@ import org.apache.beam.runners.core.*;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
@@ -57,19 +57,19 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
    */
   public GroupByKeyAndWindowDoFnTransform(final Map<TupleTag<?>, Coder<?>> outputCoders,
                                           final TupleTag<KV<K, Iterable<InputT>>> mainOutputTag,
-                                          final List<TupleTag<?>> additionalOutputTags,
                                           final WindowingStrategy<?, ?> windowingStrategy,
-                                          final Collection<PCollectionView<?>> sideInputs,
                                           final PipelineOptions options,
-                                          final SystemReduceFn reduceFn) {
+                                          final SystemReduceFn reduceFn,
+                                          final DisplayData displayData) {
     super(null, /* doFn */
       null, /* inputCoder */
       outputCoders,
       mainOutputTag,
-      additionalOutputTags,
+      Collections.emptyList(),  /*  GBK does not have additional outputs */
       windowingStrategy,
-      sideInputs,
-      options);
+      Collections.emptyMap(), /*  GBK does not have additional side inputs */
+      options,
+      displayData);
     this.keyToValues = new HashMap<>();
     this.reduceFn = reduceFn;
     this.prevOutputWatermark = new Watermark(Long.MIN_VALUE);
@@ -93,7 +93,7 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
         getWindowingStrategy(),
         inMemoryStateInternalsFactory,
         inMemoryTimerInternalsFactory,
-        getSideInputReader(),
+        null, // GBK has no sideinput.
         reduceFn,
         getOutputManager(),
         getMainOutputTag());
@@ -163,23 +163,19 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
    * @param inputWatermark input watermark
    */
   private void emitOutputWatermark(final Watermark inputWatermark) {
-
-    if (keyAndWatermarkHoldMap.isEmpty()) {
-      return;
-    }
-
     // Find min watermark hold
-    final Watermark minWatermarkHold = Collections.min(keyAndWatermarkHoldMap.values());
+    final Watermark minWatermarkHold = keyAndWatermarkHoldMap.isEmpty()
+      ? new Watermark(Long.MAX_VALUE) // set this to MAX, in order to just use the input watermark.
+      : Collections.min(keyAndWatermarkHoldMap.values());
+    final Watermark outputWatermarkCandidate = new Watermark(
+      Math.max(prevOutputWatermark.getTimestamp(),
+        Math.min(minWatermarkHold.getTimestamp(), inputWatermark.getTimestamp())));
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Watermark hold: {}, "
         + "inputWatermark: {}, outputWatermark: {}", minWatermarkHold, inputWatermark, prevOutputWatermark);
     }
 
-    final Watermark outputWatermarkCandidate = new Watermark(
-      Math.max(prevOutputWatermark.getTimestamp(),
-        Math.min(minWatermarkHold.getTimestamp(), inputWatermark.getTimestamp())));
-
     if (outputWatermarkCandidate.getTimestamp() > prevOutputWatermark.getTimestamp()) {
       // progress!
       prevOutputWatermark = outputWatermarkCandidate;
@@ -211,8 +207,6 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
     // Finish any pending windows by advancing the input watermark to infinity.
     processElementsAndTriggerTimers(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()),
       BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE);
-    // Emit watermark to downstream operators
-    emitOutputWatermark(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
   }
 
   /**
@@ -258,13 +252,6 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
     }
   }
 
-  @Override
-  public String toString() {
-    final StringBuilder sb = new StringBuilder();
-    sb.append("GroupByKeyAndWindowDoFnTransform:");
-    return sb.toString();
-  }
-
   /**
    * Get timer data.
    */
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
index 0f4cf5b..71c68ea 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
@@ -69,12 +69,4 @@ public final class GroupByKeyTransform<I> extends NoWatermarkEmitTransform<I, Wi
       }
     }
   }
-
-  @Override
-  public String toString() {
-    final StringBuilder sb = new StringBuilder();
-    sb.append("GroupByKeyTransform:");
-    sb.append(super.toString());
-    return sb.toString();
-  }
 }
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/PushBackDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/PushBackDoFnTransform.java
new file mode 100644
index 0000000..d8f0d8f
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/PushBackDoFnTransform.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.apache.nemo.compiler.frontend.beam.SideInputElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * DoFn transform implementation with push backs for side inputs.
+ *
+ * @param <InputT> input type.
+ * @param <OutputT> output type.
+ */
+public final class PushBackDoFnTransform<InputT, OutputT> extends AbstractDoFnTransform<InputT, InputT, OutputT> {
+  private static final Logger LOG = LoggerFactory.getLogger(PushBackDoFnTransform.class.getName());
+
+  private List<WindowedValue<InputT>> curPushedBacks;
+  private long curPushedBackWatermark; // Long.MAX_VALUE when no pushed-back exists.
+  private long curInputWatermark;
+  private long curOutputWatermark;
+
+  /**
+   * PushBackDoFnTransform Constructor.
+   */
+  public PushBackDoFnTransform(final DoFn<InputT, OutputT> doFn,
+                               final Coder<InputT> inputCoder,
+                               final Map<TupleTag<?>, Coder<?>> outputCoders,
+                               final TupleTag<OutputT> mainOutputTag,
+                               final List<TupleTag<?>> additionalOutputTags,
+                               final WindowingStrategy<?, ?> windowingStrategy,
+                               final Map<Integer, PCollectionView<?>> sideInputs,
+                               final PipelineOptions options,
+                               final DisplayData displayData) {
+    super(doFn, inputCoder, outputCoders, mainOutputTag,
+      additionalOutputTags, windowingStrategy, sideInputs, options, displayData);
+    this.curPushedBacks = new ArrayList<>();
+    this.curPushedBackWatermark = Long.MAX_VALUE;
+    this.curInputWatermark = Long.MIN_VALUE;
+    this.curOutputWatermark = Long.MIN_VALUE;
+  }
+
+  @Override
+  protected DoFn wrapDoFn(final DoFn initDoFn) {
+    return initDoFn;
+  }
+
+  @Override
+  public void onData(final WindowedValue data) {
+    // Need to distinguish side/main inputs and push-back main inputs.
+    if (data.getValue() instanceof SideInputElement) {
+      // This element is a Side Input
+      // TODO #287: Consider Explicit Multi-Input IR Transform
+      final WindowedValue<SideInputElement> sideInputElement = (WindowedValue<SideInputElement>) data;
+      final PCollectionView view = getSideInputs().get(sideInputElement.getValue().getSideInputIndex());
+      getSideInputReader().addSideInputElement(view, data);
+
+      handlePushBacks();
+
+      // See if we can emit a new watermark, as we may have processed some pushed-back elements
+      onWatermark(new Watermark(curInputWatermark));
+    } else {
+      // This element is the Main Input
+      checkAndInvokeBundle();
+      final Iterable<WindowedValue<InputT>> pushedBack =
+        getPushBackRunner().processElementInReadyWindows(data);
+      for (final WindowedValue wv : pushedBack) {
+        curPushedBackWatermark = Math.min(curPushedBackWatermark, wv.getTimestamp().getMillis());
+        curPushedBacks.add(wv);
+      }
+      checkAndFinishBundle();
+    }
+  }
+
+  private void handlePushBacks() {
+    // Force-finish, before (possibly) processing pushed-back data.
+    //
+    // Main reason:
+    // {@link org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner}
+    // caches for each bundle the side inputs that are not ready.
+    // We need to re-start the bundle to advertise the (possibly) newly available side input.
+    forceFinishBundle(); // forced
+
+    // With the new side input added, we may be able to process some pushed-back elements.
+    final List<WindowedValue<InputT>> pushedBackAgain = new ArrayList<>();
+    long pushedBackAgainWatermark = Long.MAX_VALUE;
+    for (final WindowedValue<InputT> curPushedBack : curPushedBacks) {
+      checkAndInvokeBundle();
+      final Iterable<WindowedValue<InputT>> pushedBack =
+        getPushBackRunner().processElementInReadyWindows(curPushedBack);
+      checkAndFinishBundle();
+      for (final WindowedValue<InputT> wv : pushedBack) {
+        pushedBackAgainWatermark = Math.min(pushedBackAgainWatermark, wv.getTimestamp().getMillis());
+        pushedBackAgain.add(wv);
+      }
+    }
+    curPushedBacks = pushedBackAgain;
+    curPushedBackWatermark = pushedBackAgainWatermark;
+  }
+
+  @Override
+  public void onWatermark(final Watermark watermark) {
+    // TODO #298: Consider Processing DoFn PushBacks on Watermark
+    checkAndInvokeBundle();
+    curInputWatermark = watermark.getTimestamp();
+    getSideInputReader().setCurrentWatermarkOfAllMainAndSideInputs(curInputWatermark);
+
+    final long outputWatermarkCandidate = Math.min(curInputWatermark, curPushedBackWatermark);
+    if (outputWatermarkCandidate > curOutputWatermark) {
+      // Watermark advances!
+      getOutputCollector().emitWatermark(new Watermark(outputWatermarkCandidate));
+      curOutputWatermark = outputWatermarkCandidate;
+    }
+    checkAndFinishBundle();
+  }
+
+  @Override
+  protected void beforeClose() {
+    // This makes all unavailable side inputs as available empty side inputs.
+    onWatermark(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
+    // All push-backs should be processed here.
+    handlePushBacks();
+  }
+
+  @Override
+  OutputCollector wrapOutputCollector(final OutputCollector oc) {
+    return oc;
+  }
+}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/SideInputTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/SideInputTransform.java
new file mode 100644
index 0000000..b1536e6
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/SideInputTransform.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.apache.nemo.compiler.frontend.beam.SideInputElement;
+
+/**
+ * Side input transform implementation.
+ * TODO #297: Consider Removing SideInputTransform
+ * @param <T> input/output type.
+ */
+public final class SideInputTransform<T> implements Transform<WindowedValue<T>, WindowedValue<SideInputElement<T>>> {
+  private OutputCollector<WindowedValue<SideInputElement<T>>> outputCollector;
+  private final int index;
+
+  /**
+   * Constructor.
+   */
+  public SideInputTransform(final int index) {
+    this.index = index;
+  }
+
+  @Override
+  public void prepare(final Context context, final OutputCollector<WindowedValue<SideInputElement<T>>> oc) {
+    this.outputCollector = oc;
+  }
+
+  @Override
+  public void onData(final WindowedValue<T> element) {
+    outputCollector.emit(element.withValue(new SideInputElement<>(index, element.getValue())));
+  }
+
+  @Override
+  public void onWatermark(final Watermark watermark) {
+    outputCollector.emitWatermark(watermark);
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder();
+    sb.append("SideInputTransform:");
+    sb.append("(index-");
+    sb.append(String.valueOf(index));
+    sb.append(")");
+    sb.append(super.toString());
+    return sb.toString();
+  }
+}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java
index a434618..f8f5c9f 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java
@@ -19,6 +19,7 @@
 package org.apache.nemo.compiler.frontend.beam.transform;
 
 import com.google.common.collect.Iterables;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -40,14 +41,16 @@ import java.util.Collection;
 public final class WindowFnTransform<T, W extends BoundedWindow>
   implements Transform<WindowedValue<T>, WindowedValue<T>> {
   private final WindowFn windowFn;
+  private final DisplayData displayData;
   private OutputCollector<WindowedValue<T>> outputCollector;
 
   /**
    * Default Constructor.
    * @param windowFn windowFn for the Transform.
    */
-  public WindowFnTransform(final WindowFn windowFn) {
+  public WindowFnTransform(final WindowFn windowFn, final DisplayData displayData) {
     this.windowFn = windowFn;
+    this.displayData = displayData;
   }
 
   @Override
@@ -101,7 +104,7 @@ public final class WindowFnTransform<T, W extends BoundedWindow>
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder();
-    sb.append("WindowFnTransform:" + windowFn);
+    sb.append("WindowFnTransform / " + displayData.toString().replaceAll(":", " / "));
     return sb.toString();
   }
 }
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java
index be3f008..278c83c 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java
@@ -41,7 +41,7 @@ public final class BeamFrontendALSTest {
     final DAG<IRVertex, IREdge> producedDAG = CompilerTestUtil.compileALSDAG();
 
     assertEquals(producedDAG.getTopologicalSort(), producedDAG.getTopologicalSort());
-    assertEquals(38, producedDAG.getVertices().size());
+    assertEquals(44, producedDAG.getVertices().size());
 
 //    producedDAG.getTopologicalSort().forEach(v -> System.out.println(v.getId()));
     final IRVertex vertexX = producedDAG.getTopologicalSort().get(5);
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java
index 632c30a..c6f100d 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java
@@ -41,7 +41,7 @@ public class BeamFrontendMLRTest {
     final DAG<IRVertex, IREdge> producedDAG = CompilerTestUtil.compileMLRDAG();
 
     assertEquals(producedDAG.getTopologicalSort(), producedDAG.getTopologicalSort());
-    assertEquals(36, producedDAG.getVertices().size());
+    assertEquals(39, producedDAG.getVertices().size());
 
     final IRVertex vertexX = producedDAG.getTopologicalSort().get(5);
     assertEquals(1, producedDAG.getIncomingEdgesOf(vertexX).size());
@@ -51,6 +51,6 @@ public class BeamFrontendMLRTest {
     final IRVertex vertexY = producedDAG.getTopologicalSort().get(13);
     assertEquals(1, producedDAG.getIncomingEdgesOf(vertexY).size());
     assertEquals(1, producedDAG.getIncomingEdgesOf(vertexY.getId()).size());
-    assertEquals(2, producedDAG.getOutgoingEdgesOf(vertexY).size());
+    assertEquals(1, producedDAG.getOutgoingEdgesOf(vertexY).size());
   }
 }
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransformTest.java
index 762e327..702ca9d 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransformTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransformTest.java
@@ -21,6 +21,7 @@ package org.apache.nemo.compiler.frontend.beam.transform;
 import org.apache.beam.sdk.transforms.Materialization;
 import org.apache.beam.sdk.transforms.Materializations;
 import org.apache.beam.sdk.transforms.ViewFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -54,6 +55,7 @@ public final class CreateViewTransformTest {
   public void test() {
 
     final FixedWindows fixedwindows = FixedWindows.of(Duration.standardSeconds(1));
+
     final CreateViewTransform<String, Integer> viewTransform =
       new CreateViewTransform(new SumViewFn());
 
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
index bad7584..fa1169c 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
@@ -20,14 +20,14 @@ package org.apache.nemo.compiler.frontend.beam.transform;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
@@ -36,17 +36,17 @@ import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
+import org.apache.nemo.compiler.frontend.beam.SideInputElement;
 import org.apache.reef.io.Tuple;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.util.*;
 
-import static java.util.Collections.emptyList;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 public final class DoFnTransformTest {
 
@@ -78,8 +78,8 @@ public final class DoFnTransformTest {
         outputTag,
         Collections.emptyList(),
         WindowingStrategy.globalDefault(),
-        emptyList(), /* side inputs */
-        PipelineOptionsFactory.as(NemoPipelineOptions.class));
+        PipelineOptionsFactory.as(NemoPipelineOptions.class),
+        DisplayData.none());
 
     final Transform.Context context = mock(Transform.Context.class);
     final OutputCollector<WindowedValue<String>> oc = new TestOutputCollector<>();
@@ -112,8 +112,8 @@ public final class DoFnTransformTest {
         outputTag,
         Collections.emptyList(),
         WindowingStrategy.globalDefault(),
-        emptyList(), /* side inputs */
-        pipelineOptions);
+        pipelineOptions,
+        DisplayData.none());
 
     final Transform.Context context = mock(Transform.Context.class);
     final OutputCollector<WindowedValue<String>> oc = new TestOutputCollector<>();
@@ -156,8 +156,8 @@ public final class DoFnTransformTest {
         outputTag,
         Collections.emptyList(),
         WindowingStrategy.globalDefault(),
-        emptyList(), /* side inputs */
-        pipelineOptions);
+        pipelineOptions,
+        DisplayData.none());
 
     final Transform.Context context = mock(Transform.Context.class);
     final OutputCollector<WindowedValue<String>> oc = new TestOutputCollector<>();
@@ -208,8 +208,8 @@ public final class DoFnTransformTest {
         mainOutput,
         tags,
         WindowingStrategy.globalDefault(),
-        emptyList(), /* side inputs */
-        PipelineOptionsFactory.as(NemoPipelineOptions.class));
+        PipelineOptionsFactory.as(NemoPipelineOptions.class),
+        DisplayData.none());
 
     // mock context
     final Transform.Context context = mock(Transform.Context.class);
@@ -244,48 +244,70 @@ public final class DoFnTransformTest {
     doFnTransform.close();
   }
 
-  // TODO #216: implement side input and windowing
   @Test
   public void testSideInputs() {
     // mock context
     final Transform.Context context = mock(Transform.Context.class);
-    when(context.getBroadcastVariable(view1)).thenReturn(
-      WindowedValue.valueInGlobalWindow(ImmutableList.of("1")));
-    when(context.getBroadcastVariable(view2)).thenReturn(
-      WindowedValue.valueInGlobalWindow(ImmutableList.of("2")));
-
     TupleTag<Tuple<String, Iterable<String>>> outputTag = new TupleTag<>("main-output");
 
-    WindowedValue<String> first = WindowedValue.valueInGlobalWindow("first");
-    WindowedValue<String> second = WindowedValue.valueInGlobalWindow("second");
+    WindowedValue<String> firstElement = WindowedValue.valueInGlobalWindow("first");
+    WindowedValue<String> secondElement = WindowedValue.valueInGlobalWindow("second");
 
-    final Map<String, PCollectionView<Iterable<String>>> eventAndViewMap =
-      ImmutableMap.of(first.getValue(), view1, second.getValue(), view2);
+    SideInputElement firstSideinput = new SideInputElement<>(0, ImmutableList.of("1"));
+    SideInputElement secondSideinput = new SideInputElement(1, ImmutableList.of("2"));
 
-    final DoFnTransform<String, Tuple<String, Iterable<String>>> doFnTransform =
-      new DoFnTransform<>(
-        new SimpleSideInputDoFn<>(eventAndViewMap),
+    final Map<Integer, PCollectionView<?>> sideInputMap = new HashMap<>();
+    sideInputMap.put(firstSideinput.getSideInputIndex(), view1);
+    sideInputMap.put(secondSideinput.getSideInputIndex(), view2);
+    final PushBackDoFnTransform<String, String> doFnTransform =
+      new PushBackDoFnTransform(
+        new SimpleSideInputDoFn<String>(view1, view2),
         NULL_INPUT_CODER,
         NULL_OUTPUT_CODERS,
         outputTag,
         Collections.emptyList(),
         WindowingStrategy.globalDefault(),
-        ImmutableList.of(view1, view2), /* side inputs */
-        PipelineOptionsFactory.as(NemoPipelineOptions.class));
+        sideInputMap, /* side inputs */
+        PipelineOptionsFactory.as(NemoPipelineOptions.class),
+        DisplayData.none());
 
-    final OutputCollector<WindowedValue<Tuple<String, Iterable<String>>>> oc = new TestOutputCollector<>();
+    final TestOutputCollector<String> oc = new TestOutputCollector<>();
     doFnTransform.prepare(context, oc);
 
-    doFnTransform.onData(first);
-    doFnTransform.onData(second);
-
-    assertEquals(WindowedValue.valueInGlobalWindow(new Tuple<>("first", ImmutableList.of("1"))),
-      ((TestOutputCollector<Tuple<String,Iterable<String>>>) oc).getOutput().get(0));
-
-    assertEquals(WindowedValue.valueInGlobalWindow(new Tuple<>("second", ImmutableList.of("2"))),
-      ((TestOutputCollector<Tuple<String,Iterable<String>>>) oc).getOutput().get(1));
-
+    // Main input first, Side inputs later
+    doFnTransform.onData(firstElement);
+
+    doFnTransform.onData(WindowedValue.valueInGlobalWindow(firstSideinput));
+    doFnTransform.onData(WindowedValue.valueInGlobalWindow(secondSideinput));
+    assertEquals(
+      WindowedValue.valueInGlobalWindow(
+        concat(firstElement.getValue(), firstSideinput.getSideInputValue(), secondSideinput.getSideInputValue())),
+      oc.getOutput().get(0));
+
+    // Side inputs first, Main input later
+    doFnTransform.onData(secondElement);
+    assertEquals(
+      WindowedValue.valueInGlobalWindow(
+        concat(secondElement.getValue(), firstSideinput.getSideInputValue(), secondSideinput.getSideInputValue())),
+      oc.getOutput().get(1));
+
+    // There should be only 2 final outputs
+    assertEquals(2, oc.getOutput().size());
+
+    // The side inputs should be "READY"
+    assertTrue(doFnTransform.getSideInputReader().isReady(view1, GlobalWindow.INSTANCE));
+    assertTrue(doFnTransform.getSideInputReader().isReady(view2, GlobalWindow.INSTANCE));
+
+    // This watermark should remove the side inputs. (Now should be "NOT READY")
+    doFnTransform.onWatermark(new Watermark(GlobalWindow.TIMESTAMP_MAX_VALUE.getMillis()));
+    Iterable materializedSideInput1 = doFnTransform.getSideInputReader().get(view1, GlobalWindow.INSTANCE);
+    Iterable materializedSideInput2 = doFnTransform.getSideInputReader().get(view2, GlobalWindow.INSTANCE);
+    assertFalse(materializedSideInput1.iterator().hasNext());
+    assertFalse(materializedSideInput2.iterator().hasNext());
+
+    // There should be only 2 final outputs
     doFnTransform.close();
+    assertEquals(2, oc.getOutput().size());
   }
 
 
@@ -334,21 +356,30 @@ public final class DoFnTransformTest {
    * Side input do fn.
    * @param <T> type
    */
-  private static class SimpleSideInputDoFn<T, V> extends DoFn<T, Tuple<T, V>> {
-    private final Map<T, PCollectionView<V>> idAndViewMap;
-
-    public SimpleSideInputDoFn(final Map<T, PCollectionView<V>> idAndViewMap) {
-      this.idAndViewMap = idAndViewMap;
+  private static class SimpleSideInputDoFn<T> extends DoFn<T, String> {
+    private final PCollectionView<?> view1;
+    private final PCollectionView<?> view2;
+
+    public SimpleSideInputDoFn(final PCollectionView<?> view1,
+                               final PCollectionView<?> view2) {
+      this.view1 = view1;
+      this.view2 = view2;
     }
 
     @ProcessElement
     public void processElement(final ProcessContext c) throws Exception {
-      final PCollectionView<V> view = idAndViewMap.get(c.element());
-      final V sideInput = c.sideInput(view);
-      c.output(new Tuple<>(c.element(), sideInput));
+      final T element = c.element();
+      final Object view1Value = c.sideInput(view1);
+      final Object view2Value = c.sideInput(view2);
+
+      c.output(concat(element, view1Value, view2Value));
     }
   }
 
+  private static String concat(final Object obj1, final Object obj2, final Object obj3) {
+    return obj1.toString() + " / " + obj2 + " / " + obj3;
+  }
+
 
   /**
    * Multi output do fn.
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
index f9a44ec..474c79c 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
@@ -21,6 +21,7 @@ package org.apache.nemo.compiler.frontend.beam.transform;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.*;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
@@ -89,11 +90,10 @@ public final class GroupByKeyAndWindowDoFnTransformTest {
       new GroupByKeyAndWindowDoFnTransform(
         NULL_OUTPUT_CODERS,
         outputTag,
-        Collections.emptyList(), /* additional outputs */
         WindowingStrategy.of(slidingWindows),
-        emptyList(), /* side inputs */
         PipelineOptionsFactory.as(NemoPipelineOptions.class),
-        SystemReduceFn.buffering(NULL_INPUT_CODER));
+        SystemReduceFn.buffering(NULL_INPUT_CODER),
+        DisplayData.none());
 
     final Instant ts1 = new Instant(1);
     final Instant ts2 = new Instant(100);
@@ -167,10 +167,17 @@ public final class GroupByKeyAndWindowDoFnTransformTest {
     doFnTransform.onData(WindowedValue.of(
       KV.of("1", "a"), ts4, slidingWindows.assignWindows(ts4), PaneInfo.NO_FIRING));
 
-    // do not emit anything
+
     doFnTransform.onWatermark(watermark2);
-    assertEquals(0, oc.outputs.size());
-    assertEquals(0, oc.watermarks.size());
+
+    assertEquals(0, oc.outputs.size()); // do not emit anything
+   assertEquals(1, oc.watermarks.size());
+
+    // check output watermark
+    assertEquals(1400,
+      oc.watermarks.get(0).getTimestamp());
+
+    oc.watermarks.clear();
 
     doFnTransform.onData(WindowedValue.of(
       KV.of("3", "a"), ts5, slidingWindows.assignWindows(ts5), PaneInfo.NO_FIRING));
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
index bdc2a85..e2f2c0a 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
@@ -63,7 +63,7 @@ public class LoopInvariantCodeMotionPassTest {
     final LoopVertex alsLoop = alsLoopOpt.get();
 
     final IRVertex vertex7 = groupedDAG.getTopologicalSort().get(3);
-    final IRVertex vertex15 = alsLoop.getDAG().getTopologicalSort().get(4);
+    final IRVertex vertex15 = alsLoop.getDAG().getTopologicalSort().get(5);
 
     final Set<IREdge> oldDAGIncomingEdges = alsLoop.getDagIncomingEdges().get(vertex15);
     final List<IREdge> newDAGIncomingEdge = groupedDAG.getIncomingEdgesOf(vertex7);
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java
index 84da0dd..76bdc03 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java
@@ -21,8 +21,6 @@ package org.apache.nemo.examples.beam;
 import com.github.fommil.netlib.BLAS;
 import com.github.fommil.netlib.LAPACK;
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
-import org.apache.nemo.compiler.frontend.beam.coder.FloatArrayCoder;
-import org.apache.nemo.compiler.frontend.beam.coder.IntArrayCoder;
 import org.apache.nemo.compiler.frontend.beam.transform.LoopCompositeTransform;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CoderProviders;
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java
index e431019..ab1760f 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java
@@ -18,8 +18,6 @@
  */
 package org.apache.nemo.examples.beam;
 
-import org.apache.nemo.compiler.frontend.beam.coder.FloatArrayCoder;
-import org.apache.nemo.compiler.frontend.beam.coder.IntArrayCoder;
 import org.apache.nemo.compiler.frontend.beam.transform.LoopCompositeTransform;
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
 import org.apache.beam.sdk.Pipeline;
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/FloatArrayCoder.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/FloatArrayCoder.java
similarity index 97%
rename from compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/FloatArrayCoder.java
rename to examples/beam/src/main/java/org/apache/nemo/examples/beam/FloatArrayCoder.java
index dff48ee..104e994 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/FloatArrayCoder.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/FloatArrayCoder.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.nemo.compiler.frontend.beam.coder;
+package org.apache.nemo.examples.beam;
 
 import org.apache.beam.sdk.coders.AtomicCoder;
 
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/IntArrayCoder.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/IntArrayCoder.java
similarity index 97%
rename from compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/IntArrayCoder.java
rename to examples/beam/src/main/java/org/apache/nemo/examples/beam/IntArrayCoder.java
index ac9205b..f720137 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/IntArrayCoder.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/IntArrayCoder.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.nemo.compiler.frontend.beam.coder;
+package org.apache.nemo.examples.beam;
 
 import org.apache.beam.sdk.coders.AtomicCoder;
 
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java
new file mode 100644
index 0000000..30ee405
--- /dev/null
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.examples.beam;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.util.List;
+
+/**
+ * A Windowed WordCount application.
+ */
+public final class WindowedBroadcast {
+  /**
+   * Private Constructor.
+   */
+  private WindowedBroadcast() {
+  }
+
+  private static PCollection<Long> getSource(final Pipeline p) {
+    return p.apply(GenerateSequence
+      .from(1)
+      .withRate(2, Duration.standardSeconds(1))
+      .withTimestampFn(num -> new Instant(num * 500))); // 0.5 second between subsequent elements
+  }
+  /**
+   * Main function for the MR BEAM program.
+   * @param args arguments.
+   */
+  public static void main(final String[] args) {
+    final String outputFilePath = args[0];
+
+    final Window<Long> windowFn = Window
+      .<Long>into(SlidingWindows.of(Duration.standardSeconds(2))
+      .every(Duration.standardSeconds(1)));
+
+    final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
+    options.setRunner(NemoPipelineRunner.class);
+    options.setJobName("WindowedBroadcast");
+
+    final Pipeline p = Pipeline.create(options);
+
+    final PCollection<Long> windowedElements = getSource(p).apply(windowFn);
+    final PCollectionView<List<Long>> windowedView = windowedElements.apply(View.asList());
+
+    windowedElements.apply(ParDo.of(new DoFn<Long, String>() {
+        @ProcessElement
+        public void processElement(final ProcessContext c) {
+          final Long anElementInTheWindow = c.element();
+          final List<Long> allElementsInTheWindow = c.sideInput(windowedView);
+          System.out.println(anElementInTheWindow + " / " + allElementsInTheWindow);
+          if (!allElementsInTheWindow.contains(anElementInTheWindow)) {
+            throw new RuntimeException(anElementInTheWindow + " not in " + allElementsInTheWindow.toString());
+          } else {
+            c.output(anElementInTheWindow + " is in " + allElementsInTheWindow);
+          }
+        }
+      }).withSideInputs(windowedView)
+    ).apply(new WriteOneFilePerWindow(outputFilePath, 1));
+
+    p.run();
+  }
+}
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
index d7f8c85..0f13dc4 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
@@ -78,7 +78,7 @@ public final class WindowedWordCount {
       return p.apply(GenerateSequence
         .from(1)
         .withRate(2, Duration.standardSeconds(1))
-        .withTimestampFn(num -> new Instant(num * 500)))
+        .withTimestampFn(num -> new Instant(num * 500))) // 0.5 second between subsequent elements
         .apply(MapElements.via(new SimpleFunction<Long, KV<String, Long>>() {
           @Override
           public KV<String, Long> apply(final Long val) {
diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedBroadcastITCase.java b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedBroadcastITCase.java
new file mode 100644
index 0000000..5e2fba3
--- /dev/null
+++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedBroadcastITCase.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.examples.beam;
+
+import org.apache.nemo.client.JobLauncher;
+import org.apache.nemo.common.test.ArgBuilder;
+import org.apache.nemo.common.test.ExampleTestUtil;
+import org.apache.nemo.examples.beam.policy.StreamingPolicyParallelismFive;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+/**
+ * Test Windowed word count program with JobLauncher.
+ * TODO #291: ITCase for Empty PCollectionViews
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(JobLauncher.class)
+public final class WindowedBroadcastITCase {
+
+  private static final int TIMEOUT = 120000;
+  private static ArgBuilder builder;
+  private static final String fileBasePath = System.getProperty("user.dir") + "/../resources/";
+
+  private static final String outputFileName = "test_output_windowed_broadcast";
+  private static final String expectedOutputFileName = "expected_output_windowed_broadcast";
+  private static final String expectedSlidingWindowOutputFileName = "expected_output_sliding_windowed_broadcast";
+  private static final String executorResourceFileName = fileBasePath + "beam_test_executor_resources.json";
+  private static final String outputFilePath =  fileBasePath + outputFileName;
+
+  // TODO #271: We currently disable this test because we cannot force close Nemo
+  // @Test (timeout = TIMEOUT)
+  public void testUnboundedSlidingWindow() throws Exception {
+    builder = new ArgBuilder()
+      .addScheduler("org.apache.nemo.runtime.master.scheduler.StreamingScheduler")
+      .addUserMain(WindowedBroadcast.class.getCanonicalName())
+      .addUserArgs(outputFilePath);
+
+    JobLauncher.main(builder
+      .addResourceJson(executorResourceFileName)
+      .addJobId(WindowedBroadcastITCase.class.getSimpleName())
+      .addOptimizationPolicy(StreamingPolicyParallelismFive.class.getCanonicalName())
+      .build());
+
+    try {
+      ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, expectedSlidingWindowOutputFileName);
+    } finally {
+    }
+  }
+}
diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
index c0134aa..7ef0e02 100644
--- a/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
+++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
@@ -33,6 +33,7 @@ import static org.apache.nemo.examples.beam.WindowedWordCount.INPUT_TYPE_UNBOUND
 
 /**
  * Test Windowed word count program with JobLauncher.
+ * TODO #299: WindowedWordCountITCase Hangs (Heisenbug)
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(JobLauncher.class)
@@ -58,7 +59,7 @@ public final class WindowedWordCountITCase {
 
     JobLauncher.main(builder
         .addResourceJson(executorResourceFileName)
-        .addJobId(WindowedWordCountITCase.class.getSimpleName())
+        .addJobId(WindowedWordCountITCase.class.getSimpleName() + "testBatchFixedWindow")
         .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
         .build());
 
@@ -78,7 +79,7 @@ public final class WindowedWordCountITCase {
 
     JobLauncher.main(builder
       .addResourceJson(executorResourceFileName)
-      .addJobId(WindowedWordCountITCase.class.getSimpleName())
+      .addJobId(WindowedWordCountITCase.class.getSimpleName() + "testBatchSlidingWindow")
       .addOptimizationPolicy(DefaultPolicy.class.getCanonicalName())
       .build());
 
@@ -98,7 +99,7 @@ public final class WindowedWordCountITCase {
 
     JobLauncher.main(builder
       .addResourceJson(executorResourceFileName)
-      .addJobId(WindowedWordCountITCase.class.getSimpleName())
+      .addJobId(WindowedWordCountITCase.class.getSimpleName() + "testStreamingSchedulerAndPipeFixedWindow")
       .addOptimizationPolicy(StreamingPolicyParallelismFive.class.getCanonicalName())
       .build());
 
@@ -119,7 +120,7 @@ public final class WindowedWordCountITCase {
 
     JobLauncher.main(builder
       .addResourceJson(executorResourceFileName)
-      .addJobId(WindowedWordCountITCase.class.getSimpleName())
+      .addJobId(WindowedWordCountITCase.class.getSimpleName() + "testStreamingSchedulerAndPipeSlidingWindow")
       .addOptimizationPolicy(StreamingPolicyParallelismFive.class.getCanonicalName())
       .build());
 
@@ -132,7 +133,7 @@ public final class WindowedWordCountITCase {
 
 
   // TODO #271: We currently disable this test because we cannot force close Nemo
-  //@Test (timeout = TIMEOUT)
+  // @Test (timeout = TIMEOUT)
   public void testUnboundedSlidingWindow() throws Exception {
     builder = new ArgBuilder()
       .addScheduler("org.apache.nemo.runtime.master.scheduler.StreamingScheduler")
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java
index f7aa184..f972ce0 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java
@@ -120,7 +120,7 @@ public final class Executor {
    * @param task to launch.
    */
   private void launchTask(final Task task) {
-    LOG.info("Launch task: {}", task);
+    LOG.info("Launch task: {}", task.getTaskId());
     try {
       final DAG<IRVertex, RuntimeEdge<IRVertex>> irDag =
           SerializationUtils.deserialize(task.getSerializedIRDag());
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BroadcastManagerWorker.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BroadcastManagerWorker.java
index 17a62c5..42806b7 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BroadcastManagerWorker.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BroadcastManagerWorker.java
@@ -27,7 +27,6 @@ import org.apache.nemo.runtime.common.RuntimeIdManager;
 import org.apache.nemo.runtime.common.comm.ControlMessage;
 import org.apache.nemo.runtime.common.message.MessageEnvironment;
 import org.apache.nemo.runtime.common.message.PersistentConnectionToMasterMap;
-import org.apache.nemo.runtime.executor.datatransfer.InputReader;
 import net.jcip.annotations.ThreadSafe;
 import org.apache.commons.lang.SerializationUtils;
 import org.apache.reef.tang.annotations.Parameter;
@@ -36,9 +35,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.inject.Inject;
 import java.io.Serializable;
-import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
@@ -50,7 +47,6 @@ public final class BroadcastManagerWorker {
   private static final Logger LOG = LoggerFactory.getLogger(BroadcastManagerWorker.class.getName());
   private static BroadcastManagerWorker staticReference;
 
-  private final ConcurrentHashMap<Serializable, InputReader> idToReader;
   private final LoadingCache<Serializable, Object> idToVariableCache;
 
   /**
@@ -63,66 +59,34 @@ public final class BroadcastManagerWorker {
    */
   @Inject
   private BroadcastManagerWorker(@Parameter(JobConf.ExecutorId.class) final String executorId,
-                                final PersistentConnectionToMasterMap toMaster) {
+                                 final PersistentConnectionToMasterMap toMaster) {
     staticReference = this;
-    this.idToReader = new ConcurrentHashMap<>();
     this.idToVariableCache = CacheBuilder.newBuilder()
       .maximumSize(100)
       .expireAfterWrite(10, TimeUnit.MINUTES)
       .build(
         new CacheLoader<Serializable, Object>() {
           public Object load(final Serializable id) throws Exception {
-            LOG.info("Start to load broadcast {}", id.toString());
-            if (idToReader.containsKey(id)) {
-              // Get from reader
-              final InputReader inputReader = idToReader.get(id);
-              final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> iterators = inputReader.read();
-              if (iterators.size() != 1) {
-                throw new IllegalStateException(id.toString());
-              }
-              final DataUtil.IteratorWithNumBytes iterator = iterators.get(0).get();
-              if (!iterator.hasNext()) {
-                throw new IllegalStateException(id.toString() + " (no element) " + iterator.toString());
-              }
-              final Object result = iterator.next();
-              if (iterator.hasNext()) {
-                throw new IllegalStateException(id.toString() + " (more than single element) " + iterator.toString());
-              }
-              return result;
-            } else {
-              // Get from master
-              final CompletableFuture<ControlMessage.Message> responseFromMasterFuture = toMaster
-                .getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).request(
-                  ControlMessage.Message.newBuilder()
-                    .setId(RuntimeIdManager.generateMessageId())
-                    .setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
-                    .setType(ControlMessage.MessageType.RequestBroadcastVariable)
-                    .setRequestbroadcastVariableMsg(
-                      ControlMessage.RequestBroadcastVariableMessage.newBuilder()
-                        .setExecutorId(executorId)
-                        .setBroadcastId(ByteString.copyFrom(SerializationUtils.serialize(id)))
-                        .build())
-                    .build());
-              return SerializationUtils.deserialize(
-                responseFromMasterFuture.get().getBroadcastVariableMsg().getVariable().toByteArray());
-            }
+            // Get from master
+            final CompletableFuture<ControlMessage.Message> responseFromMasterFuture = toMaster
+              .getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).request(
+                ControlMessage.Message.newBuilder()
+                  .setId(RuntimeIdManager.generateMessageId())
+                  .setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
+                  .setType(ControlMessage.MessageType.RequestBroadcastVariable)
+                  .setRequestbroadcastVariableMsg(
+                    ControlMessage.RequestBroadcastVariableMessage.newBuilder()
+                      .setExecutorId(executorId)
+                      .setBroadcastId(ByteString.copyFrom(SerializationUtils.serialize(id)))
+                      .build())
+                  .build());
+            return SerializationUtils.deserialize(
+              responseFromMasterFuture.get().getBroadcastVariableMsg().getVariable().toByteArray());
           }
         });
   }
 
   /**
-   * When the broadcast variable can be read by an input reader.
-   * (i.e., the variable is expressed as an IREdge, and reside in a executor as a block)
-   *
-   * @param id of the broadcast variable.
-   * @param inputReader the {@link InputReader} to register.
-   */
-  public void registerInputReader(final Serializable id,
-                                  final InputReader inputReader) {
-    this.idToReader.put(id, inputReader);
-  }
-
-  /**
    * Get the variable with the id.
    * @param id of the variable.
    * @return the variable.
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElementPartitioner.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElementPartitioner.java
index ed6073e..43d11cf 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElementPartitioner.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElementPartitioner.java
@@ -33,6 +33,7 @@ public final class DedicatedKeyPerElementPartitioner implements Partitioner<Inte
    * Constructor.
    */
   public DedicatedKeyPerElementPartitioner() {
+    // TODO #288: DedicatedKeyPerElementPartitioner should not always return 0
     key = 0;
   }
 
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java
index 613eccc..0402fd4 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java
@@ -62,7 +62,6 @@ public final class MultiInputWatermarkManager implements InputWatermarkManager {
 
   @Override
   public void trackAndEmitWatermarks(final int edgeIndex, final Watermark watermark) {
-
     if (LOG.isDebugEnabled()) {
       LOG.debug("Track watermark {} emitted from edge {}:, {}", watermark.getTimestamp(), edgeIndex,
         watermarks.toString());
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
index 12d9932..74c3ffb 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
@@ -105,7 +105,6 @@ public final class OperatorVertexOutputCollector<O> implements OutputCollector<O
 
   @Override
   public void emitWatermark(final Watermark watermark) {
-
     if (LOG.isDebugEnabled()) {
       LOG.debug("{} emits watermark {}", irVertex.getId(), watermark);
     }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
index dd70394..03d7470 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
@@ -78,7 +78,6 @@ public final class PipeOutputWriter implements OutputWriter {
 
   private void writeData(final Object element, final List<ByteOutputContext> pipeList) {
     pipeList.forEach(pipe -> {
-
       try (final ByteOutputContext.ByteOutputStream pipeToWriteTo = pipe.newOutputStream()) {
         // Serialize (Do not compress)
         final DirectByteArrayOutputStream bytesOutputStream = new DirectByteArrayOutputStream();
@@ -149,10 +148,14 @@ public final class PipeOutputWriter implements OutputWriter {
   }
 
   private List<ByteOutputContext> getPipeToWrite(final Object element) {
-    return runtimeEdge.getPropertyValue(CommunicationPatternProperty.class)
-      .get()
-      .equals(CommunicationPatternProperty.Value.OneToOne)
-      ? Collections.singletonList(pipes.get(0))
-      : Collections.singletonList(pipes.get((int) partitioner.partition(element)));
+    final CommunicationPatternProperty.Value comm =
+      (CommunicationPatternProperty.Value) runtimeEdge.getPropertyValue(CommunicationPatternProperty.class).get();
+    if (comm.equals(CommunicationPatternProperty.Value.OneToOne)) {
+      return Collections.singletonList(pipes.get(0));
+    } else if (comm.equals(CommunicationPatternProperty.Value.BroadCast)) {
+      return pipes;
+    } else {
+      return Collections.singletonList(pipes.get((int) partitioner.partition(element)));
+    }
   }
 }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java
index e8135f9..e3562ed 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java
@@ -20,12 +20,15 @@ package org.apache.nemo.runtime.executor.datatransfer;
 
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.punctuation.Watermark;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * This is a special implementation for single input data stream for optimization.
  */
 public final class SingleInputWatermarkManager implements InputWatermarkManager {
+  private static final Logger LOG = LoggerFactory.getLogger(SingleInputWatermarkManager.class.getName());
 
   private final OutputCollector watermarkCollector;
 
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java
index 2ca3df8..215a6a8 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java
@@ -31,6 +31,10 @@ abstract class DataFetcher implements AutoCloseable {
   private final IRVertex dataSource;
   private final OutputCollector outputCollector;
 
+  /**
+   * @param dataSource to fetch from.
+   * @param outputCollector for the data fetched.
+   */
   DataFetcher(final IRVertex dataSource,
               final OutputCollector outputCollector) {
     this.dataSource = dataSource;
@@ -48,4 +52,8 @@ abstract class DataFetcher implements AutoCloseable {
   OutputCollector getOutputCollector() {
     return outputCollector;
   }
+
+  IRVertex getDataSource() {
+    return dataSource;
+  }
 }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
index 001060c..7ce1ed9 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
@@ -63,6 +63,7 @@ class MultiThreadParentTaskDataFetcher extends DataFetcher {
   // A watermark manager
   private InputWatermarkManager inputWatermarkManager;
 
+
   MultiThreadParentTaskDataFetcher(final IRVertex dataSource,
                                    final InputReader readerForParentTask,
                                    final OutputCollector outputCollector) {
@@ -113,8 +114,6 @@ class MultiThreadParentTaskDataFetcher extends DataFetcher {
           // Consume this iterator to the end.
           while (iterator.hasNext()) { // blocked on the iterator.
             final Object element = iterator.next();
-
-
             if (element instanceof WatermarkWithIndex) {
               // watermark element
               // the input watermark manager is accessed by multiple threads
@@ -177,17 +176,14 @@ class MultiThreadParentTaskDataFetcher extends DataFetcher {
    * It receives the watermark from InputWatermarkManager.
    */
   private final class WatermarkCollector implements OutputCollector {
-
     @Override
     public void emit(final Object output) {
       throw new IllegalStateException("Should not be called");
     }
-
     @Override
     public void emitWatermark(final Watermark watermark) {
       elementQueue.offer(watermark);
     }
-
     @Override
     public void emit(final String dstVertexId, final Object output) {
       throw new IllegalStateException("Should not be called");
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
index 80bbea2..d3c223f 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
@@ -50,7 +50,8 @@ class ParentTaskDataFetcher extends DataFetcher {
   private long serBytes = 0;
   private long encodedBytes = 0;
 
-  ParentTaskDataFetcher(final IRVertex dataSource, final InputReader readerForParentTask,
+  ParentTaskDataFetcher(final IRVertex dataSource,
+                        final InputReader readerForParentTask,
                         final OutputCollector outputCollector) {
     super(dataSource, outputCollector);
     this.readersForParentTask = readerForParentTask;
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
index 518bff3..b08b12a 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
@@ -25,12 +25,11 @@ import org.apache.nemo.common.dag.Edge;
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.Readable;
 import org.apache.nemo.common.ir.edge.executionproperty.AdditionalOutputTagProperty;
-import org.apache.nemo.common.ir.edge.executionproperty.BroadcastVariableIdProperty;
 import org.apache.nemo.common.ir.vertex.*;
 import org.apache.nemo.common.ir.vertex.transform.AggregateMetricTransform;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
-import org.apache.nemo.runtime.executor.datatransfer.MultiInputWatermarkManager;
 import org.apache.nemo.common.punctuation.Watermark;
+import org.apache.nemo.runtime.executor.datatransfer.MultiInputWatermarkManager;
 import org.apache.nemo.common.punctuation.Finishmark;
 import org.apache.nemo.runtime.common.RuntimeIdManager;
 import org.apache.nemo.runtime.common.comm.ControlMessage;
@@ -71,7 +70,7 @@ public final class TaskExecutor {
   private boolean isExecuted;
   private final String taskId;
   private final TaskStateManager taskStateManager;
-  private final List<DataFetcher> nonBroadcastDataFetchers;
+  private final List<DataFetcher> dataFetchers;
   private final BroadcastManagerWorker broadcastManagerWorker;
   private final List<VertexHarness> sortedHarnesses;
 
@@ -121,7 +120,7 @@ public final class TaskExecutor {
 
     // Prepare data structures
     final Pair<List<DataFetcher>, List<VertexHarness>> pair = prepare(task, irVertexDag, intermediateDataIOFactory);
-    this.nonBroadcastDataFetchers = pair.left();
+    this.dataFetchers = pair.left();
     this.sortedHarnesses = pair.right();
   }
 
@@ -188,7 +187,6 @@ public final class TaskExecutor {
     // in {@link this#getInternalMainOutputs and this#internalMainOutputs}
     final Map<IRVertex, InputWatermarkManager> operatorWatermarkManagerMap = new HashMap<>();
     reverseTopologicallySorted.forEach(childVertex -> {
-
       if (childVertex instanceof OperatorVertex) {
         final List<Edge> edges = getAllIncomingEdges(task, irVertexDag, childVertex);
         if (edges.size() == 1) {
@@ -201,11 +199,10 @@ public final class TaskExecutor {
               new OperatorWatermarkCollector((OperatorVertex) childVertex)));
         }
       }
-
     });
 
     // Create a harness for each vertex
-    final List<DataFetcher> nonBroadcastDataFetcherList = new ArrayList<>();
+    final List<DataFetcher> dataFetcherList = new ArrayList<>();
     final Map<String, VertexHarness> vertexIdToHarness = new HashMap<>();
 
     reverseTopologicallySorted.forEach(irVertex -> {
@@ -250,38 +247,17 @@ public final class TaskExecutor {
       // Source read
       if (irVertex instanceof SourceVertex) {
         // Source vertex read
-        nonBroadcastDataFetcherList.add(new SourceVertexDataFetcher(
-          (SourceVertex) irVertex, sourceReader.get(), outputCollector));
+        dataFetcherList.add(new SourceVertexDataFetcher(
+          (SourceVertex) irVertex,
+          sourceReader.get(),
+          outputCollector));
       }
 
-      // Parent-task read (broadcasts)
-      final List<StageEdge> inEdgesForThisVertex = task.getTaskIncomingEdges()
-        .stream()
-        .filter(inEdge -> inEdge.getDstIRVertex().getId().equals(irVertex.getId()))
-        .collect(Collectors.toList());
-      final List<StageEdge> broadcastInEdges = inEdgesForThisVertex
-        .stream()
-        .filter(stageEdge -> stageEdge.getPropertyValue(BroadcastVariableIdProperty.class).isPresent())
-        .collect(Collectors.toList());
-      final List<InputReader> broadcastReaders =
-        getParentTaskReaders(taskIndex, broadcastInEdges, intermediateDataIOFactory);
-      if (broadcastInEdges.size() != broadcastReaders.size()) {
-        throw new IllegalStateException(broadcastInEdges.toString() + ", " + broadcastReaders.toString());
-      }
-      for (int i = 0; i < broadcastInEdges.size(); i++) {
-        final StageEdge inEdge = broadcastInEdges.get(i);
-        broadcastManagerWorker.registerInputReader(
-          inEdge.getPropertyValue(BroadcastVariableIdProperty.class)
-            .orElseThrow(() -> new IllegalStateException(inEdge.toString())),
-          broadcastReaders.get(i));
-      }
-
-      // Parent-task read (non-broadcasts)
-      final List<StageEdge> nonBroadcastInEdges = new ArrayList<>(inEdgesForThisVertex);
-      nonBroadcastInEdges.removeAll(broadcastInEdges);
-
-      nonBroadcastInEdges
+      // Parent-task read
+      // TODO #285: Cache broadcasted data
+      task.getTaskIncomingEdges()
         .stream()
+        .filter(inEdge -> inEdge.getDstIRVertex().getId().equals(irVertex.getId())) // edge to this vertex
         .map(incomingEdge ->
           Pair.of(incomingEdge, intermediateDataIOFactory
             .createReader(taskIndex, incomingEdge.getSrcIRVertex(), incomingEdge)))
@@ -291,14 +267,21 @@ public final class TaskExecutor {
             final int edgeIndex = edgeIndexMap.get(edge);
             final InputWatermarkManager watermarkManager = operatorWatermarkManagerMap.get(irVertex);
             final InputReader parentTaskReader = pair.right();
+            final OutputCollector dataFetcherOutputCollector =
+              new DataFetcherOutputCollector((OperatorVertex) irVertex, edgeIndex, watermarkManager);
+
             if (parentTaskReader instanceof PipeInputReader) {
-              nonBroadcastDataFetcherList.add(
-                new MultiThreadParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), parentTaskReader,
-                  new DataFetcherOutputCollector((OperatorVertex) irVertex, edgeIndex, watermarkManager)));
+              dataFetcherList.add(
+                new MultiThreadParentTaskDataFetcher(
+                  parentTaskReader.getSrcIrVertex(),
+                  parentTaskReader,
+                  dataFetcherOutputCollector));
             } else {
-              nonBroadcastDataFetcherList.add(
-                new ParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), parentTaskReader,
-                  new DataFetcherOutputCollector((OperatorVertex) irVertex, edgeIndex, watermarkManager)));
+              dataFetcherList.add(
+                new ParentTaskDataFetcher(
+                  parentTaskReader.getSrcIrVertex(),
+                  parentTaskReader,
+                  dataFetcherOutputCollector));
             }
           }
         });
@@ -309,7 +292,7 @@ public final class TaskExecutor {
       .map(vertex -> vertexIdToHarness.get(vertex.getId()))
       .collect(Collectors.toList());
 
-    return Pair.of(nonBroadcastDataFetcherList, sortedHarnessList);
+    return Pair.of(dataFetcherList, sortedHarnessList);
   }
 
   /**
@@ -319,7 +302,8 @@ public final class TaskExecutor {
     outputCollector.emit(dataElement);
   }
 
-  private void processWatermark(final OutputCollector outputCollector, final Watermark watermark) {
+  private void processWatermark(final OutputCollector outputCollector,
+                                final Watermark watermark) {
     outputCollector.emitWatermark(watermark);
   }
 
@@ -338,7 +322,7 @@ public final class TaskExecutor {
 
   /**
    * The task is executed in the following two phases.
-   * - Phase 1: Consume task-external input data (non-broadcasts)
+   * - Phase 1: Consume task-external input data
    * - Phase 2: Finalize task-internal states and data elements
    */
   private void doExecute() {
@@ -349,8 +333,8 @@ public final class TaskExecutor {
     LOG.info("{} started", taskId);
     taskStateManager.onTaskStateChanged(TaskState.State.EXECUTING, Optional.empty(), Optional.empty());
 
-    // Phase 1: Consume task-external input data. (non-broadcasts)
-    if (!handleDataFetchers(nonBroadcastDataFetchers)) {
+    // Phase 1: Consume task-external input data.
+    if (!handleDataFetchers(dataFetchers)) {
       return;
     }
 
@@ -383,14 +367,14 @@ public final class TaskExecutor {
   }
 
   /**
-   * Process an element generated from the dataFetcher.
-   * If the element is an instance of Finishmark, we remove the dataFetcher from the current list.
-   * @param element element
+   * Process an event generated from the dataFetcher.
+   * If the event is an instance of Finishmark, we remove the dataFetcher from the current list.
+   * @param event event
    * @param dataFetcher current data fetcher
    */
-  private void handleElement(final Object element,
-                             final DataFetcher dataFetcher) {
-    if (element instanceof Finishmark) {
+  private void onEventFromDataFetcher(final Object event,
+                                      final DataFetcher dataFetcher) {
+    if (event instanceof Finishmark) {
       // We've consumed all the data from this data fetcher.
       if (dataFetcher instanceof SourceVertexDataFetcher) {
         boundedSourceReadTime += ((SourceVertexDataFetcher) dataFetcher).getBoundedSourceReadTime();
@@ -401,12 +385,12 @@ public final class TaskExecutor {
         serializedReadBytes += ((MultiThreadParentTaskDataFetcher) dataFetcher).getSerializedBytes();
         encodedReadBytes += ((MultiThreadParentTaskDataFetcher) dataFetcher).getEncodedBytes();
       }
-    } else if (element instanceof Watermark) {
+    } else if (event instanceof Watermark) {
       // Watermark
-      processWatermark(dataFetcher.getOutputCollector(), (Watermark) element);
+      processWatermark(dataFetcher.getOutputCollector(), (Watermark) event);
     } else {
       // Process data element
-      processElement(dataFetcher.getOutputCollector(), element);
+      processElement(dataFetcher.getOutputCollector(), event);
     }
   }
 
@@ -457,7 +441,7 @@ public final class TaskExecutor {
         final DataFetcher dataFetcher = availableIterator.next();
         try {
           final Object element = dataFetcher.fetchDataElement();
-          handleElement(element, dataFetcher);
+          onEventFromDataFetcher(element, dataFetcher);
           if (element instanceof Finishmark) {
             availableIterator.remove();
           }
@@ -485,7 +469,7 @@ public final class TaskExecutor {
         final DataFetcher dataFetcher = pendingIterator.next();
         try {
           final Object element = dataFetcher.fetchDataElement();
-          handleElement(element, dataFetcher);
+          onEventFromDataFetcher(element, dataFetcher);
 
           // We processed data. This means the data fetcher is now available.
           // Add current data fetcher to available
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
index 6cbf694..6b94ca7 100644
--- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
@@ -24,6 +24,7 @@ import org.apache.nemo.common.punctuation.Finishmark;
 import org.apache.nemo.runtime.executor.data.DataUtil;
 import org.apache.nemo.runtime.executor.datatransfer.BlockInputReader;
 import org.apache.nemo.runtime.executor.datatransfer.InputReader;
+import org.apache.nemo.runtime.executor.datatransfer.InputWatermarkManager;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mockito;
@@ -121,9 +122,9 @@ public final class ParentTaskDataFetcherTest {
 
   private ParentTaskDataFetcher createFetcher(final InputReader readerForParentTask) {
     return new ParentTaskDataFetcher(
-        mock(IRVertex.class),
-        readerForParentTask, // This is the only argument that affects the behavior of ParentTaskDataFetcher
-        mock(OutputCollector.class));
+      mock(IRVertex.class),
+      readerForParentTask, // This is the only argument that affects the behavior of ParentTaskDataFetcher
+      mock(OutputCollector.class));
   }
 
   private InputReader generateInputReader(final CompletableFuture completableFuture) {
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
index 6ae716a..919143b 100644
--- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -26,7 +26,6 @@ import org.apache.nemo.common.dag.DAGBuilder;
 import org.apache.nemo.common.ir.Readable;
 import org.apache.nemo.common.ir.edge.IREdge;
 import org.apache.nemo.common.ir.edge.executionproperty.AdditionalOutputTagProperty;
-import org.apache.nemo.common.ir.edge.executionproperty.BroadcastVariableIdProperty;
 import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
 import org.apache.nemo.common.ir.edge.executionproperty.DataStoreProperty;
 import org.apache.nemo.common.ir.executionproperty.EdgeExecutionProperty;
@@ -66,7 +65,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -422,16 +420,16 @@ public final class TaskExecutorTest {
         .buildWithoutSourceSinkCheck();
 
     final StageEdge taskOutEdge = mockStageEdgeFrom(operatorIRVertex2);
+    final StageEdge taskInEdge = mockStageEdgeTo(operatorIRVertex1);
 
-    final StageEdge broadcastInEdge = mockBroadcastVariableStageEdgeTo(
-      new OperatorVertex(singleListTransform), operatorIRVertex2, broadcastId, elements);
+    when(broadcastManagerWorker.get(broadcastId)).thenReturn(new ArrayList<>(elements));
 
     final Task task = new Task(
         "testSourceVertexDataFetching",
         generateTaskId(),
         TASK_EXECUTION_PROPERTY_MAP,
         new byte[0],
-        Arrays.asList(mockStageEdgeTo(operatorIRVertex1), broadcastInEdge),
+        Collections.singletonList(taskInEdge),
         Collections.singletonList(taskOutEdge),
         Collections.emptyMap());
 
@@ -541,23 +539,6 @@ public final class TaskExecutorTest {
       mock(Stage.class));
   }
 
-  private StageEdge mockBroadcastVariableStageEdgeTo(final IRVertex srcVertex,
-                                                     final IRVertex dstVertex,
-                                                     final Serializable broadcastVariableId,
-                                                     final Object broadcastVariable) {
-    when(broadcastManagerWorker.get(broadcastVariableId)).thenReturn(broadcastVariable);
-
-    final ExecutionPropertyMap executionPropertyMap =
-      ExecutionPropertyMap.of(mock(IREdge.class), CommunicationPatternProperty.Value.OneToOne);
-    executionPropertyMap.put(BroadcastVariableIdProperty.of(broadcastVariableId));
-    return new StageEdge("runtime outgoing edge id",
-      executionPropertyMap,
-      srcVertex,
-      dstVertex,
-      mock(Stage.class),
-      mock(Stage.class));
-  }
-
   /**
    * Represents the answer return an inter-stage {@link InputReader},
    * which will have multiple iterable according to the source parallelism.