You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ta...@apache.org on 2020/10/14 07:33:44 UTC

[incubator-nemo] branch master updated: [NEMO-392] Support combine in streaming (#299)

This is an automated email from the ASF dual-hosted git repository.

taegeonum pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e9a8db  [NEMO-392] Support combine in streaming (#299)
1e9a8db is described below

commit 1e9a8db7ea21c4b5069272a680f1f3613e44238e
Author: jaehwan0214 <60...@users.noreply.github.com>
AuthorDate: Wed Oct 14 16:33:37 2020 +0900

    [NEMO-392] Support combine in streaming (#299)
    
    JIRA: [NEMO-392: Support combine in streaming] (https://issues.apache.org/jira/projects/NEMO/issues/NEMO-392)
    
    **Major changes:**
    - Added GBKTransform class to support stream processing for GroupByKey and Combine.Perkey operation.
         - Each time an element arrives, GBKTransform invokes runner to process a single element and stores its state,       instead of waiting to process it until the time to emit it to downstream.
    - Removed GroupByKeyAndWindowDoFnTransform since GBKTransform can support both Combine.Perkey
       and GroupByKeyAndWindow operation.
    
    **Minor changes to note:**
    - N/A
    
    **Tests for the changes:**
    - Added GBKStreamingTransformTest.
    
    **Other comments:**
    - Most of the work is done by TaeGun Um
    
    Closes #299
---
 .../frontend/beam/PipelineTranslationContext.java  |   2 +-
 .../compiler/frontend/beam/PipelineTranslator.java | 114 +++-
 .../beam/transform/AbstractDoFnTransform.java      |   1 -
 .../frontend/beam/transform/FinalCombineFn.java    |  72 +++
 .../frontend/beam/transform/GBKTransform.java      | 300 ++++++++++
 .../GroupByKeyAndWindowDoFnTransform.java          | 405 -------------
 .../transform/InMemoryStateInternalsFactory.java   |  51 ++
 .../transform/InMemoryTimerInternalsFactory.java   |  81 +++
 .../frontend/beam/transform/PartialCombineFn.java  |  70 +++
 .../frontend/beam/transform/GBKTransformTest.java  | 654 +++++++++++++++++++++
 .../GroupByKeyAndWindowDoFnTransformTest.java      | 366 ------------
 11 files changed, 1313 insertions(+), 803 deletions(-)

diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java
index 523851d..2d1b90b 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java
@@ -275,7 +275,7 @@ final class PipelineTranslationContext {
     if (srcTransform instanceof FlattenTransform) {
       return CommunicationPatternProperty.Value.ONE_TO_ONE;
     }
-    if (dstTransform instanceof GroupByKeyAndWindowDoFnTransform
+    if (dstTransform instanceof GBKTransform
       || dstTransform instanceof GroupByKeyTransform) {
       return CommunicationPatternProperty.Value.SHUFFLE;
     }
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index e18f4ae..1e429c3 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.AppliedCombineFn;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
@@ -348,49 +349,100 @@ final class PipelineTranslator {
     final TransformHierarchy.Node beamNode,
     final PTransform<?, ?> transform) {
 
-    // Check if the partial combining optimization can be applied.
-    // If not, simply use the default Combine implementation by entering into it.
-    if (!(isMainInputBounded(beamNode, ctx.getPipeline()) && isGlobalWindow(beamNode, ctx.getPipeline()))) {
-      // TODO #263: Partial Combining for Beam Streaming
-      return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
-    }
     final Combine.PerKey perKey = (Combine.PerKey) transform;
+
+    // If there's any side inputs, translate each primitive transforms in this composite transform one by one.
     if (!perKey.getSideInputs().isEmpty()) {
       // TODO #264: Partial Combining with Beam SideInputs
       return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
     }
 
-    // This Combine can be optimized as the following sequence of Nemo IRVertices.
-    // Combine Input -> Combine(Partial Combine -> KV<InputT, AccumT> -> Final Combine) -> Combine Output
     final CombineFnBase.GlobalCombineFn combineFn = perKey.getFn();
-
-    // (Step 1) To Partial Combine
-    final IRVertex partialCombine = new OperatorVertex(new CombineFnPartialTransform<>(combineFn));
-    ctx.addVertex(partialCombine);
-    beamNode.getInputs().values().forEach(input -> ctx.addEdgeTo(partialCombine, input));
-
-    // (Step 2) To Final Combine
-    final PCollection input = (PCollection) Iterables.getOnlyElement(
+    final PCollection<?> mainInput = (PCollection<?>) Iterables.getOnlyElement(
+      TransformInputs.nonAdditionalInputs(beamNode.toAppliedPTransform(ctx.getPipeline())));
+    final PCollection inputs = (PCollection) Iterables.getOnlyElement(
       TransformInputs.nonAdditionalInputs(beamNode.toAppliedPTransform(ctx.getPipeline())));
-    final KvCoder inputCoder = (KvCoder) input.getCoder();
+    final KvCoder inputCoder = (KvCoder) inputs.getCoder();
     final Coder accumulatorCoder;
+
+    // Check if accumulator coder exists
     try {
       accumulatorCoder =
         combineFn.getAccumulatorCoder(ctx.getPipeline().getCoderRegistry(), inputCoder.getValueCoder());
     } catch (CannotProvideCoderException e) {
       throw new RuntimeException(e);
     }
-    final IRVertex finalCombine = new OperatorVertex(new CombineFnFinalTransform<>(combineFn));
-    ctx.addVertex(finalCombine);
-    final IREdge edge = new IREdge(CommunicationPatternProperty.Value.SHUFFLE, partialCombine, finalCombine);
-    ctx.addEdge(
-      edge,
-      KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder),
-      input.getWindowingStrategy().getWindowFn().windowCoder());
 
-    // (Step 3) To Combine Output
+    // If there's no side inputs,
+    // this Combine can be optimized as the following sequence of Nemo IRVertices.
+    // Combine Input -> partialCombine -> finalCombine -> Combine Output
+    final IRVertex partialCombine;
+    final IRVertex finalCombine;
+
+    // Choose between batch processing and stream processing based on window type and boundedness of data
+    if (isMainInputBounded(beamNode, ctx.getPipeline()) && isGlobalWindow(beamNode, ctx.getPipeline())) {
+      // Batch processing, using CombinePartialTransform and CombineFinalTransform
+      partialCombine = new OperatorVertex(new CombineFnPartialTransform<>(combineFn));
+      finalCombine = new OperatorVertex(new CombineFnFinalTransform<>(combineFn));
+      // Stream data processing, using GBKTransform
+    } else {
+      final AppliedPTransform pTransform = beamNode.toAppliedPTransform(ctx.getPipeline());
+      final CombineFnBase.GlobalCombineFn partialCombineFn = new PartialCombineFn(
+        (Combine.CombineFn) combineFn, accumulatorCoder);
+      final CombineFnBase.GlobalCombineFn finalCombineFn = new FinalCombineFn(
+        (Combine.CombineFn) combineFn, accumulatorCoder);
+      final SystemReduceFn partialSystemReduceFn =
+        SystemReduceFn.combining(
+          inputCoder.getKeyCoder(),
+          AppliedCombineFn.withInputCoder(partialCombineFn,
+            ctx.getPipeline().getCoderRegistry(), inputCoder,
+            null,
+            mainInput.getWindowingStrategy()));
+      final SystemReduceFn finalSystemReduceFn =
+        SystemReduceFn.combining(
+          inputCoder.getKeyCoder(),
+          AppliedCombineFn.withInputCoder(finalCombineFn,
+            ctx.getPipeline().getCoderRegistry(),
+            KvCoder.of(inputCoder.getKeyCoder(),
+              accumulatorCoder),
+            null, mainInput.getWindowingStrategy()));
+      final GBKTransform partialCombineStreamTransform =
+        new GBKTransform(
+          getOutputCoders(pTransform),
+          new TupleTag<>(),
+          mainInput.getWindowingStrategy(),
+          ctx.getPipelineOptions(),
+          partialSystemReduceFn,
+          DoFnSchemaInformation.create(),
+          DisplayData.from(beamNode.getTransform()));
+
+      final GBKTransform finalCombineStreamTransform =
+        new GBKTransform(
+          getOutputCoders(pTransform),
+          new TupleTag<>(),
+          mainInput.getWindowingStrategy(),
+          ctx.getPipelineOptions(),
+          finalSystemReduceFn,
+          DoFnSchemaInformation.create(),
+          DisplayData.from(beamNode.getTransform()));
+
+      partialCombine = new OperatorVertex(partialCombineStreamTransform);
+      finalCombine = new OperatorVertex(finalCombineStreamTransform);
+    }
+
+    // (Step 1) Partial Combine
+    ctx.addVertex(partialCombine);
+    beamNode.getInputs().values().forEach(input -> ctx.addEdgeTo(partialCombine, input));
+
+    // (Step 2) Final Combine
+    ctx.addVertex(finalCombine);
     beamNode.getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(beamNode, finalCombine, output));
 
+    // (Step 3) Adding an edge from partialCombine vertex to finalCombine vertex
+    final IREdge edge = new IREdge(CommunicationPatternProperty.Value.SHUFFLE, partialCombine, finalCombine);
+    final Coder intermediateCoder = KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder);
+    ctx.addEdge(edge, intermediateCoder, mainInput.getWindowingStrategy().getWindowFn().windowCoder());
+
     // This composite transform has been translated in its entirety.
     return Pipeline.PipelineVisitor.CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
   }
@@ -490,30 +542,32 @@ final class PipelineTranslator {
   }
 
   /**
-   * Create a group by key transform.
-   * It returns GroupByKeyAndWindowDoFnTransform if window function is not default.
+   * Returns the correct type of GroupByKey transform by checking whether global windowing strategy is used.
    *
    * @param ctx      translation context
    * @param beamNode the beam node to be translated
-   * @return group by key transform
+   * @return GroupByKey transform
    */
   private static Transform createGBKTransform(
     final PipelineTranslationContext ctx,
     final TransformHierarchy.Node beamNode) {
-    final AppliedPTransform pTransform = beamNode.toAppliedPTransform(ctx.getPipeline());
+    final AppliedPTransform<?, ?, ?> pTransform = beamNode.toAppliedPTransform(ctx.getPipeline());
     final PCollection<?> mainInput = (PCollection<?>)
       Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform));
     final TupleTag mainOutputTag = new TupleTag<>();
 
     if (isGlobalWindow(beamNode, ctx.getPipeline())) {
+      // GroupByKey Transform when using a global windowing strategy.
       return new GroupByKeyTransform();
     } else {
-      return new GroupByKeyAndWindowDoFnTransform(
+      // GroupByKey Transform when using a non-global windowing strategy.
+      return new GBKTransform<>(
         getOutputCoders(pTransform),
         mainOutputTag,
         mainInput.getWindowingStrategy(),
         ctx.getPipelineOptions(),
         SystemReduceFn.buffering(mainInput.getCoder()),
+        DoFnSchemaInformation.create(),
         DisplayData.from(beamNode.getTransform()));
     }
   }
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
index 9a50bb5..79b31c6 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
@@ -37,7 +37,6 @@ import org.apache.nemo.compiler.frontend.beam.InMemorySideInputReader;
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/FinalCombineFn.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/FinalCombineFn.java
new file mode 100644
index 0000000..c5ad209
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/FinalCombineFn.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.transforms.Combine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+
+/**
+ * Wrapper class for {@link Combine.CombineFn}.
+ * When adding input, it merges its accumulator and input accumulator into a single accumulator.
+ * @param <AccumT> accumulator type
+ * @param <Output> output type
+ */
+public final class FinalCombineFn<AccumT, Output> extends Combine.CombineFn<AccumT, AccumT, Output> {
+  private static final Logger LOG = LoggerFactory.getLogger(FinalCombineFn.class.getName());
+  private final Combine.CombineFn<?, AccumT, Output> originFn;
+  private final Coder<AccumT> accumCoder;
+
+  public FinalCombineFn(final Combine.CombineFn<?, AccumT, Output> originFn,
+                        final Coder<AccumT> accumCoder) {
+    this.originFn = originFn;
+    this.accumCoder = accumCoder;
+  }
+
+  @Override
+  public AccumT createAccumulator() {
+    return originFn.createAccumulator();
+  }
+
+  @Override
+  public AccumT addInput(final AccumT accumulator, final AccumT input) {
+    final AccumT result = originFn.mergeAccumulators(Arrays.asList(accumulator, input));
+    return result;
+  }
+
+  @Override
+  public Coder<AccumT> getAccumulatorCoder(final CoderRegistry registry, final Coder<AccumT> ac) {
+    return accumCoder;
+  }
+
+  @Override
+  public AccumT mergeAccumulators(final Iterable<AccumT> accumulators) {
+    return originFn.mergeAccumulators(accumulators);
+  }
+
+  @Override
+  public Output extractOutput(final AccumT accumulator) {
+    final Output result = originFn.extractOutput(accumulator);
+    return result;
+  }
+}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java
new file mode 100644
index 0000000..37952ea
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.*;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import java.util.*;
+
+/**
+ * This transform performs GroupByKey or CombinePerKey operation when input data is unbounded or is not in
+ * global window.
+ * @param <K> key type
+ * @param <InputT> input type
+ * @param <OutputT> output type
+ */
+public final class GBKTransform<K, InputT, OutputT>
+  extends AbstractDoFnTransform<KV<K, InputT>, KeyedWorkItem<K, InputT>, KV<K, OutputT>> {
+  private static final Logger LOG = LoggerFactory.getLogger(GBKTransform.class.getName());
+  private final SystemReduceFn reduceFn;
+  private transient InMemoryTimerInternalsFactory<K> inMemoryTimerInternalsFactory;
+  private transient InMemoryStateInternalsFactory<K> inMemoryStateInternalsFactory;
+  private final Map<K, Watermark> keyOutputWatermarkMap = new HashMap<>();
+  private Watermark prevOutputWatermark = new Watermark(Long.MIN_VALUE);
+  private Watermark inputWatermark = new Watermark(Long.MIN_VALUE);
+  private boolean dataReceived = false;
+  private transient OutputCollector originOc;
+
+  public GBKTransform(final Map<TupleTag<?>, Coder<?>> outputCoders,
+                      final TupleTag<KV<K, OutputT>> mainOutputTag,
+                      final WindowingStrategy<?, ?> windowingStrategy,
+                      final PipelineOptions options,
+                      final SystemReduceFn reduceFn,
+                      final DoFnSchemaInformation doFnSchemaInformation,
+                      final DisplayData displayData) {
+    super(null,
+      null,
+      outputCoders,
+      mainOutputTag,
+      Collections.emptyList(),  /* no additional outputs */
+      windowingStrategy,
+      Collections.emptyMap(), /* no additional side inputs */
+      options,
+      displayData,
+      doFnSchemaInformation,
+      Collections.emptyMap()); /* does not have side inputs */
+    this.reduceFn = reduceFn;
+  }
+
+  /**
+   * This creates a new DoFn that groups elements by key and window.
+   * @param doFn original doFn.
+   * @return GroupAlsoByWindowViaWindowSetNewDoFn
+   */
+  @Override
+  protected DoFn wrapDoFn(final DoFn doFn) {
+    if (inMemoryStateInternalsFactory == null) {
+      this.inMemoryStateInternalsFactory = new InMemoryStateInternalsFactory<>();
+    } else {
+      LOG.info("InMemoryStateInternalFactory is already set");
+    }
+
+    if (inMemoryTimerInternalsFactory == null) {
+      this.inMemoryTimerInternalsFactory = new InMemoryTimerInternalsFactory<>();
+    } else {
+      LOG.info("InMemoryTimerInternalsFactory is already set");
+    }
+
+    // This function performs group by key and window operation.
+    return
+      GroupAlsoByWindowViaWindowSetNewDoFn.create(
+        getWindowingStrategy(),
+        inMemoryStateInternalsFactory,
+        inMemoryTimerInternalsFactory,
+        null, // does not have side input.
+        reduceFn,
+        getOutputManager(),
+        getMainOutputTag());
+  }
+
+  /** Wrapper function of output collector. */
+  @Override
+  OutputCollector wrapOutputCollector(final OutputCollector oc) {
+    originOc = oc;
+    return new GBKOutputCollector(oc);
+  }
+
+  /**
+   * Every time a single element arrives, this method invokes runner to process a single element.
+   * @param element input data element.
+   */
+  @Override
+  public void onData(final WindowedValue<KV<K, InputT>> element) {
+      dataReceived = true;
+      try {
+        checkAndInvokeBundle();
+        final KV<K, InputT> kv = element.getValue();
+        final KeyedWorkItem<K, InputT> keyedWorkItem =
+          KeyedWorkItems.elementsWorkItem(kv.getKey(),
+            Collections.singletonList(element.withValue(kv.getValue())));
+        getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(keyedWorkItem));
+        checkAndFinishBundle();
+      } catch (final Exception e) {
+        e.printStackTrace();
+        throw new RuntimeException("Exception triggered element " + element.toString());
+      }
+  }
+
+  /**
+   * Trigger timers that need to be fired at {@param watermark} and emit output watermark.
+   * @param watermark watermark
+   */
+  @Override
+  public void onWatermark(final Watermark watermark) throws RuntimeException {
+    if (watermark.getTimestamp() <= inputWatermark.getTimestamp()) {
+      throw new RuntimeException(
+        "Received watermark " + watermark.getTimestamp()
+          + " is before the previous inputWatermark " + inputWatermark.getTimestamp() + " in GBKTransform.");
+    }
+    checkAndInvokeBundle();
+    inputWatermark = watermark;
+    try {
+      // Trigger timers
+      triggerTimers(Instant.now(), Instant.now(), inputWatermark);
+      // Emit output watermark
+      emitOutputWatermark();
+    } catch (final Exception e) {
+      e.printStackTrace();
+      throw new RuntimeException(e);
+    }
+    checkAndFinishBundle();
+  }
+
+  /**
+   * This advances the input watermark and processing time to the timestamp max value
+   * in order to emit all data.
+   */
+  @Override
+  protected void beforeClose() {
+    // Finish any pending windows by advancing the input watermark to timestamp max value.
+    inputWatermark = new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+    // Trigger all the remaining timers that have not been fired yet.
+    triggerTimers(BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE, inputWatermark);
+    // Emit output watermark
+    emitOutputWatermark();
+  }
+
+  /**
+   * Trigger eligible timers. When triggering, it emits the output to downstream operators.
+   * @param processingTime processing time
+   * @param synchronizedTime synchronized time
+   * @param watermark watermark
+   */
+  private void triggerTimers(final Instant processingTime,
+                            final Instant synchronizedTime,
+                            final Watermark watermark) {
+    final Iterator<Map.Entry<K, InMemoryTimerInternals>> iter =
+      inMemoryTimerInternalsFactory.getTimerInternalsMap().entrySet().iterator();
+    while (iter.hasNext()) {
+      final Map.Entry<K, InMemoryTimerInternals> curr = iter.next();
+      try {
+        curr.getValue().advanceInputWatermark(new Instant(watermark.getTimestamp()));
+        curr.getValue().advanceProcessingTime(processingTime);
+        curr.getValue().advanceSynchronizedProcessingTime(synchronizedTime);
+      } catch (final Exception e) {
+        e.printStackTrace();
+        throw new RuntimeException();
+      }
+      for (final TimeDomain domain : TimeDomain.values()) {
+        processTrigger(curr.getKey(), curr.getValue(), domain);
+      }
+      // Remove timerInternals and stateInternals that are no longer needed.
+      if (inMemoryTimerInternalsFactory.isEmpty(curr.getValue())) {
+        iter.remove();
+        inMemoryStateInternalsFactory.getStateInternalMap().remove(curr.getKey());
+      }
+    }
+  }
+
+  /**
+   * Fetch eligible timers in {@param timeDomain} and trigger them.
+   * @param key key
+   * @param timerInternal timerInternal to be accessed
+   * @param timeDomain time domain
+   */
+  private void processTrigger(final K key, final InMemoryTimerInternals timerInternal, final TimeDomain timeDomain) {
+    TimerInternals.TimerData timer;
+    // Get all eligible timers and trigger them.
+    while ((timer = inMemoryTimerInternalsFactory.pollTimer(timerInternal, timeDomain)) != null) {
+      final KeyedWorkItem<K, InputT> timerWorkItem =
+        KeyedWorkItems.timersWorkItem(key, Collections.singletonList(timer));
+      getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(timerWorkItem));
+    }
+  }
+
+  /**
+   * Emit watermark to downstream operators.
+   * Output watermark = max(prev output watermark, min(input watermark, watermark holds)).
+   */
+  private void emitOutputWatermark() {
+    // Find min watermark hold
+    Watermark minWatermarkHold = keyOutputWatermarkMap.isEmpty()
+      ? new Watermark(dataReceived ? Long.MIN_VALUE : Long.MAX_VALUE)
+      : Collections.min(keyOutputWatermarkMap.values());
+
+    Watermark outputWatermarkCandidate = new Watermark(
+      Math.max(prevOutputWatermark.getTimestamp(),
+        Math.min(minWatermarkHold.getTimestamp(), inputWatermark.getTimestamp())));
+
+    while (outputWatermarkCandidate.getTimestamp() > prevOutputWatermark.getTimestamp()) {
+      // Progress
+      prevOutputWatermark = outputWatermarkCandidate;
+      // Emit watermark
+      getOutputCollector().emitWatermark(outputWatermarkCandidate);
+      // Remove minimum watermark holds
+      if (minWatermarkHold.getTimestamp() == outputWatermarkCandidate.getTimestamp()) {
+        final long minWatermarkTimestamp = minWatermarkHold.getTimestamp();
+        keyOutputWatermarkMap.entrySet()
+          .removeIf(entry -> entry.getValue().getTimestamp() == minWatermarkTimestamp);
+      }
+
+      minWatermarkHold = keyOutputWatermarkMap.isEmpty()
+        ? new Watermark(Long.MAX_VALUE) : Collections.min(keyOutputWatermarkMap.values());
+
+      outputWatermarkCandidate = new Watermark(
+        Math.max(prevOutputWatermark.getTimestamp(),
+          Math.min(minWatermarkHold.getTimestamp(), inputWatermark.getTimestamp())));
+    }
+  }
+
+  /** Wrapper class for {@link OutputCollector}. */
+  public class GBKOutputCollector implements OutputCollector<WindowedValue<KV<K, OutputT>>> {
+    private final OutputCollector<WindowedValue<KV<K, OutputT>>> oc;
+
+    public GBKOutputCollector(final OutputCollector oc) {
+      this.oc = oc;
+    }
+
+    /** Emit output. If {@param output} is emitted on-time, save its timestamp in the output watermark map. */
+    @Override
+    public void emit(final WindowedValue<KV<K, OutputT>> output) {
+      // The watermark advances only in ON_TIME
+      if (output.getPane().getTiming().equals(PaneInfo.Timing.ON_TIME)) {
+        KV<K, OutputT> value = output.getValue();
+        final K key = value.getKey();
+        final InMemoryTimerInternals timerInternals =
+          (InMemoryTimerInternals) inMemoryTimerInternalsFactory.timerInternalsForKey(key);
+        // Add the output timestamp to the watermark hold of each key.
+        // +1 to the output timestamp because if the window is [0-5000), the timestamp is 4999.
+          keyOutputWatermarkMap.put(key,
+            new Watermark(output.getTimestamp().getMillis() + 1));
+          timerInternals.advanceOutputWatermark(new Instant(output.getTimestamp().getMillis() + 1));
+      }
+      oc.emit(output);
+    }
+
+    /** Emit watermark. */
+    @Override
+    public void emitWatermark(final Watermark watermark) {
+      oc.emitWatermark(watermark);
+    }
+
+    /** Emit output value to {@param dstVertexId}. */
+    @Override
+    public <T> void emit(final String dstVertexId, final T output) {
+      oc.emit(dstVertexId, output);
+    }
+  }
+}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
deleted file mode 100644
index 0818270..0000000
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
+++ /dev/null
@@ -1,405 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.compiler.frontend.beam.transform;
-
-import org.apache.beam.runners.core.*;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.punctuation.Watermark;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-/**
- * Groups elements according to key and window.
- *
- * @param <K>      key type.
- * @param <InputT> input type.
- */
-public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
-  extends AbstractDoFnTransform<KV<K, InputT>, KeyedWorkItem<K, InputT>, KV<K, Iterable<InputT>>> {
-  private static final Logger LOG = LoggerFactory.getLogger(GroupByKeyAndWindowDoFnTransform.class.getName());
-
-  private final SystemReduceFn reduceFn;
-  private final Map<K, List<WindowedValue<InputT>>> keyToValues;
-  private transient InMemoryTimerInternalsFactory inMemoryTimerInternalsFactory;
-  private transient InMemoryStateInternalsFactory inMemoryStateInternalsFactory;
-  private Watermark prevOutputWatermark;
-  private final Map<K, Watermark> keyAndWatermarkHoldMap;
-  private boolean dataReceived = false;
-
-  /**
-   * GroupByKey constructor.
-   *
-   * @param outputCoders      output coders
-   * @param mainOutputTag     main output tag
-   * @param windowingStrategy windowing strategy
-   * @param options           pipeline options
-   * @param reduceFn          reduce function
-   * @param displayData       display data.
-   */
-  public GroupByKeyAndWindowDoFnTransform(final Map<TupleTag<?>, Coder<?>> outputCoders,
-                                          final TupleTag<KV<K, Iterable<InputT>>> mainOutputTag,
-                                          final WindowingStrategy<?, ?> windowingStrategy,
-                                          final PipelineOptions options,
-                                          final SystemReduceFn reduceFn,
-                                          final DisplayData displayData) {
-    super(null, /* doFn */
-      null, /* inputCoder */
-      outputCoders,
-      mainOutputTag,
-      Collections.emptyList(),  /*  GBK does not have additional outputs */
-      windowingStrategy,
-      Collections.emptyMap(), /*  GBK does not have additional side inputs */
-      options,
-      displayData,
-      DoFnSchemaInformation.create(),
-      Collections.emptyMap());
-    this.keyToValues = new HashMap<>();
-    this.reduceFn = reduceFn;
-    this.prevOutputWatermark = new Watermark(Long.MIN_VALUE);
-    this.keyAndWatermarkHoldMap = new HashMap<>();
-  }
-
-  /**
-   * This creates a new DoFn that groups elements by key and window.
-   *
-   * @param doFn original doFn.
-   * @return GroupAlsoByWindowViaWindowSetNewDoFn
-   */
-  @Override
-  protected DoFn wrapDoFn(final DoFn doFn) {
-    final Map<K, StateAndTimerForKey> map = new HashMap<>();
-    this.inMemoryStateInternalsFactory = new InMemoryStateInternalsFactory(map);
-    this.inMemoryTimerInternalsFactory = new InMemoryTimerInternalsFactory(map);
-
-    // This function performs group by key and window operation
-    return
-      GroupAlsoByWindowViaWindowSetNewDoFn.create(
-        getWindowingStrategy(),
-        inMemoryStateInternalsFactory,
-        inMemoryTimerInternalsFactory,
-        null, // GBK has no sideinput.
-        reduceFn,
-        getOutputManager(),
-        getMainOutputTag());
-  }
-
-  @Override
-  OutputCollector wrapOutputCollector(final OutputCollector oc) {
-    return new GBKWOutputCollector(oc);
-  }
-
-  /**
-   * It collects data for each key.
-   * The collected data are emitted at {@link GroupByKeyAndWindowDoFnTransform#onWatermark(Watermark)}
-   *
-   * @param element data element
-   */
-  @Override
-  public void onData(final WindowedValue<KV<K, InputT>> element) {
-    checkAndInvokeBundle();
-    dataReceived = true;
-
-    // We can call Beam's DoFnRunner#processElement here,
-    // but it may generate some overheads if we call the method for each data.
-    // The `processElement` requires a `Iterator` of data, so we emit the buffered data every watermark.
-    // TODO #250: But, this approach can delay the event processing in streaming,
-    // TODO #250: if the watermark is not triggered for a long time.
-    final KV<K, InputT> kv = element.getValue();
-    keyToValues.putIfAbsent(kv.getKey(), new ArrayList<>());
-    keyToValues.get(kv.getKey()).add(element.withValue(kv.getValue()));
-
-    checkAndFinishBundle();
-  }
-
-  /**
-   * Process the collected data and trigger timers.
-   *
-   * @param inputWatermark   current input watermark
-   * @param processingTime   processing time
-   * @param synchronizedTime synchronized time
-   */
-  private void processElementsAndTriggerTimers(final Watermark inputWatermark,
-                                               final Instant processingTime,
-                                               final Instant synchronizedTime) {
-    for (final Map.Entry<K, List<WindowedValue<InputT>>> entry : keyToValues.entrySet()) {
-      final K key = entry.getKey();
-      final List<WindowedValue<InputT>> values = entry.getValue();
-
-      // for each key
-      // Process elements
-      if (!values.isEmpty()) {
-        final KeyedWorkItem<K, InputT> keyedWorkItem =
-          KeyedWorkItems.elementsWorkItem(key, values);
-        // The DoFnRunner interface requires WindowedValue,
-        // but this windowed value is actually not used in the ReduceFnRunner internal.
-        getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(keyedWorkItem));
-      }
-
-      // Trigger timers
-      triggerTimers(key, inputWatermark, processingTime, synchronizedTime);
-
-      // Remove values
-      values.clear();
-    }
-  }
-
-  /**
-   * Output watermark
-   * = max(prev output watermark,
-   * min(input watermark, watermark holds)).
-   *
-   * @param inputWatermark input watermark
-   */
-  private void emitOutputWatermark(final Watermark inputWatermark) {
-    // Find min watermark hold
-    final Watermark minWatermarkHold = keyAndWatermarkHoldMap.isEmpty()
-      ? new Watermark(dataReceived ? Long.MIN_VALUE : Long.MAX_VALUE)
-      // set this to MAX, in order not to emit input watermark when there are no outputs.
-      : Collections.min(keyAndWatermarkHoldMap.values());
-    final Watermark outputWatermarkCandidate = new Watermark(
-      Math.max(prevOutputWatermark.getTimestamp(),
-        Math.min(minWatermarkHold.getTimestamp(), inputWatermark.getTimestamp())));
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Watermark hold: {}, "
-        + "inputWatermark: {}, outputWatermark: {}", minWatermarkHold, inputWatermark, prevOutputWatermark);
-    }
-
-    if (outputWatermarkCandidate.getTimestamp() > prevOutputWatermark.getTimestamp()) {
-      // progress!
-      prevOutputWatermark = outputWatermarkCandidate;
-      // emit watermark
-      getOutputCollector().emitWatermark(outputWatermarkCandidate);
-      // Remove minimum watermark holds
-      if (minWatermarkHold.getTimestamp() == outputWatermarkCandidate.getTimestamp()) {
-        keyAndWatermarkHoldMap.entrySet()
-          .removeIf(entry -> entry.getValue().getTimestamp() == minWatermarkHold.getTimestamp());
-      }
-    }
-  }
-
-  @Override
-  public void onWatermark(final Watermark inputWatermark) {
-    checkAndInvokeBundle();
-    processElementsAndTriggerTimers(inputWatermark, Instant.now(), Instant.now());
-    // Emit watermark to downstream operators
-    emitOutputWatermark(inputWatermark);
-    checkAndFinishBundle();
-  }
-
-  /**
-   * This advances the input watermark and processing time to the timestamp max value
-   * in order to emit all data.
-   */
-  @Override
-  protected void beforeClose() {
-    // Finish any pending windows by advancing the input watermark to infinity.
-    processElementsAndTriggerTimers(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()),
-      BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE);
-  }
-
-  /**
-   * Trigger times for current key.
-   * When triggering, it emits the windowed data to downstream operators.
-   *
-   * @param key              key
-   * @param watermark        watermark
-   * @param processingTime   processing time
-   * @param synchronizedTime synchronized time
-   */
-  private void triggerTimers(final K key,
-                             final Watermark watermark,
-                             final Instant processingTime,
-                             final Instant synchronizedTime) {
-    final InMemoryTimerInternals timerInternals = (InMemoryTimerInternals)
-      inMemoryTimerInternalsFactory.timerInternalsForKey(key);
-    try {
-      timerInternals.advanceInputWatermark(new Instant(watermark.getTimestamp()));
-      timerInternals.advanceProcessingTime(processingTime);
-      timerInternals.advanceSynchronizedProcessingTime(synchronizedTime);
-    } catch (final Exception e) {
-      throw new RuntimeException(e);
-    }
-
-    final List<TimerInternals.TimerData> timerDataList = getEligibleTimers(timerInternals);
-
-    if (!timerDataList.isEmpty()) {
-      // Trigger timers and emit windowed data
-      final KeyedWorkItem<K, InputT> timerWorkItem =
-        KeyedWorkItems.timersWorkItem(key, timerDataList);
-      // The DoFnRunner interface requires WindowedValue,
-      // but this windowed value is actually not used in the ReduceFnRunner internal.
-      getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(timerWorkItem));
-    }
-  }
-
-  /**
-   * Get timer data.
-   *
-   * @param timerInternals in-memory timer internals.
-   * @return list of timer datas.
-   */
-  private List<TimerInternals.TimerData> getEligibleTimers(final InMemoryTimerInternals timerInternals) {
-    final List<TimerInternals.TimerData> timerData = new LinkedList<>();
-
-    while (true) {
-      TimerInternals.TimerData timer;
-      boolean hasFired = false;
-
-      while ((timer = timerInternals.removeNextEventTimer()) != null) {
-        hasFired = true;
-        timerData.add(timer);
-      }
-      while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
-        hasFired = true;
-        timerData.add(timer);
-      }
-      while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) {
-        hasFired = true;
-        timerData.add(timer);
-      }
-      if (!hasFired) {
-        break;
-      }
-    }
-
-    return timerData;
-  }
-
-  /**
-   * State and timer internal.
-   */
-  final class StateAndTimerForKey {
-    private StateInternals stateInternals;
-    private TimerInternals timerInternals;
-
-    /**
-     * @param stateInternals state internals.
-     * @param timerInternals timer internals.
-     */
-    StateAndTimerForKey(final StateInternals stateInternals,
-                        final TimerInternals timerInternals) {
-      this.stateInternals = stateInternals;
-      this.timerInternals = timerInternals;
-    }
-  }
-
-  /**
-   * InMemoryStateInternalsFactory.
-   */
-  final class InMemoryStateInternalsFactory implements StateInternalsFactory<K> {
-    private final Map<K, StateAndTimerForKey> map;
-
-    /**
-     * @param map initial map.
-     */
-    InMemoryStateInternalsFactory(final Map<K, StateAndTimerForKey> map) {
-      this.map = map;
-    }
-
-    @Override
-    public StateInternals stateInternalsForKey(final K key) {
-      map.putIfAbsent(key, new StateAndTimerForKey(InMemoryStateInternals.forKey(key), null));
-      final StateAndTimerForKey stateAndTimerForKey = map.get(key);
-      if (stateAndTimerForKey.stateInternals == null) {
-        stateAndTimerForKey.stateInternals = InMemoryStateInternals.forKey(key);
-      }
-      return stateAndTimerForKey.stateInternals;
-    }
-  }
-
-  /**
-   * InMemoryTimerInternalsFactory.
-   */
-  final class InMemoryTimerInternalsFactory implements TimerInternalsFactory<K> {
-    private final Map<K, StateAndTimerForKey> map;
-
-    /**
-     * @param map initial map.
-     */
-    InMemoryTimerInternalsFactory(final Map<K, StateAndTimerForKey> map) {
-      this.map = map;
-    }
-
-    @Override
-    public TimerInternals timerInternalsForKey(final K key) {
-      map.putIfAbsent(key, new StateAndTimerForKey(null, new InMemoryTimerInternals()));
-      final StateAndTimerForKey stateAndTimerForKey = map.get(key);
-      if (stateAndTimerForKey.timerInternals == null) {
-        stateAndTimerForKey.timerInternals = new InMemoryTimerInternals();
-      }
-      return stateAndTimerForKey.timerInternals;
-    }
-  }
-
-  /**
-   * This class wraps the output collector to track the watermark hold of each key.
-   */
-  final class GBKWOutputCollector implements OutputCollector<WindowedValue<KV<K, Iterable<InputT>>>> {
-    private final OutputCollector<WindowedValue<KV<K, Iterable<InputT>>>> outputCollector;
-
-    /**
-     * @param outputCollector output collector.
-     */
-    GBKWOutputCollector(final OutputCollector<WindowedValue<KV<K, Iterable<InputT>>>> outputCollector) {
-      this.outputCollector = outputCollector;
-    }
-
-    @Override
-    public void emit(final WindowedValue<KV<K, Iterable<InputT>>> output) {
-
-      // The watermark advances only in ON_TIME
-      if (output.getPane().getTiming().equals(PaneInfo.Timing.ON_TIME)) {
-        final K key = output.getValue().getKey();
-        final InMemoryTimerInternals timerInternals = (InMemoryTimerInternals)
-          inMemoryTimerInternalsFactory.timerInternalsForKey(key);
-        keyAndWatermarkHoldMap.put(key,
-          // adds the output timestamp to the watermark hold of each key
-          // +1 to the output timestamp because if the window is [0-5000), the timestamp is 4999
-          new Watermark(output.getTimestamp().getMillis() + 1));
-        timerInternals.advanceOutputWatermark(new Instant(output.getTimestamp().getMillis() + 1));
-      }
-      outputCollector.emit(output);
-    }
-
-    @Override
-    public void emitWatermark(final Watermark watermark) {
-      outputCollector.emitWatermark(watermark);
-    }
-
-    @Override
-    public <T> void emit(final String dstVertexId, final T output) {
-      outputCollector.emit(dstVertexId, output);
-    }
-  }
-}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/InMemoryStateInternalsFactory.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/InMemoryStateInternalsFactory.java
new file mode 100644
index 0000000..f57a707
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/InMemoryStateInternalsFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * InMemoryStateInternalsFactory.
+ * @param <K> key type
+ */
+public final class InMemoryStateInternalsFactory<K> implements StateInternalsFactory<K> {
+  private static final Logger LOG = LoggerFactory.getLogger(InMemoryStateInternalsFactory.class.getName());
+  private final Map<K, StateInternals> stateInternalMap = new HashMap<>();
+
+  @Override
+  public String toString() {
+    return "StateInternalMap: " + stateInternalMap;
+  }
+
+  @Override
+  public StateInternals stateInternalsForKey(final K key) {
+    stateInternalMap.putIfAbsent(key,
+      InMemoryStateInternals.forKey(key));
+    return stateInternalMap.get(key);
+  }
+
+  public Map<K, StateInternals> getStateInternalMap() {
+    return stateInternalMap;
+  }
+}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/InMemoryTimerInternalsFactory.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/InMemoryTimerInternalsFactory.java
new file mode 100644
index 0000000..0072463
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/InMemoryTimerInternalsFactory.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.InMemoryTimerInternals;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.TimerInternalsFactory;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.*;
+
+/**
+ * InMemoryTimerInternalsFactory.
+ * @param <K> key type
+ */
+public final class InMemoryTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
+  private static final Logger LOG = LoggerFactory.getLogger(InMemoryTimerInternalsFactory.class.getName());
+  private Map<K, InMemoryTimerInternals> timerInternalsMap = new HashMap<>();
+
+  @Override
+  public String toString() {
+    return "TimerInternalsMap: " + timerInternalsMap;
+  }
+
+  @Override
+  public TimerInternals timerInternalsForKey(final K key) {
+    if (timerInternalsMap.get(key) != null) {
+      return timerInternalsMap.get(key);
+    } else {
+      final InMemoryTimerInternals internal = new InMemoryTimerInternals();
+      timerInternalsMap.put(key, internal);
+      return internal;
+    }
+  }
+
+  /** Remove the next eligible timer in {@param timeDomain}. */
+  public TimerInternals.TimerData pollTimer(final InMemoryTimerInternals timerInternal, final TimeDomain timeDomain) {
+    switch (timeDomain) {
+      case EVENT_TIME :
+        return timerInternal.removeNextEventTimer();
+      case PROCESSING_TIME:
+        return timerInternal.removeNextProcessingTimer();
+      case SYNCHRONIZED_PROCESSING_TIME:
+        return timerInternal.removeNextSynchronizedProcessingTimer();
+      default :
+        return null;
+    }
+  }
+
+  /** Accessor for timerInternalsMap. */
+  public Map<K, InMemoryTimerInternals> getTimerInternalsMap() {
+    return timerInternalsMap;
+  }
+
+  /** Helper method to check if {@param timerInternals} doesn't have any timers left. */
+  public boolean isEmpty(final InMemoryTimerInternals timerInternal) {
+    for (final TimeDomain domain : TimeDomain.values()) {
+      if (timerInternal.getNextTimer(domain) != null) {
+        return false;
+      }
+    }
+    return true;
+  }
+}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/PartialCombineFn.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/PartialCombineFn.java
new file mode 100644
index 0000000..8a94dea
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/PartialCombineFn.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.transforms.Combine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Wrapper class for {@link Combine.CombineFn}.
+ * When invoked to output, it outputs its accumulator, instead of the output from its original combine function.
+ * @param <InputT> input type
+ * @param <AccumT> accumulator type
+ */
+public final class PartialCombineFn<InputT, AccumT> extends Combine.CombineFn<InputT, AccumT, AccumT> {
+  private static final Logger LOG = LoggerFactory.getLogger(PartialCombineFn.class.getName());
+  private final Combine.CombineFn<InputT, AccumT, ?> originFn;
+  private final Coder<AccumT> accumCoder;
+
+  public PartialCombineFn(final Combine.CombineFn<InputT, AccumT, ?> originFn,
+                          final Coder<AccumT> accumCoder) {
+    this.originFn = originFn;
+    this.accumCoder = accumCoder;
+  }
+
+  @Override
+  public AccumT createAccumulator() {
+    return originFn.createAccumulator();
+  }
+
+  @Override
+  public AccumT addInput(final AccumT accumulator, final InputT input) {
+    return originFn.addInput(accumulator, input);
+  }
+
+  @Override
+  public AccumT mergeAccumulators(final Iterable<AccumT> accumulators) {
+    return originFn.mergeAccumulators(accumulators);
+  }
+
+  @Override
+  public AccumT extractOutput(final AccumT accumulator) {
+    return accumulator;
+  }
+
+  @Override
+  public Coder<AccumT> getAccumulatorCoder(final CoderRegistry registry, final Coder<InputT> inputCoder)
+    throws CannotProvideCoderException {
+    return accumCoder;
+  }
+}
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransformTest.java
new file mode 100644
index 0000000..9a98932
--- /dev/null
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransformTest.java
@@ -0,0 +1,654 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import junit.framework.TestCase;
+import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.sdk.coders.*;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.*;
+import org.apache.beam.sdk.util.AppliedCombineFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.*;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.*;
+
+import static org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing.*;
+import static org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES;
+import static org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+public class GBKTransformTest extends TestCase {
+  private static final Logger LOG = LoggerFactory.getLogger(GBKTransformTest.class.getName());
+  private final static Coder STRING_CODER = StringUtf8Coder.of();
+  private final static Coder INTEGER_CODER = BigEndianIntegerCoder.of();
+  private final static Map<TupleTag<?>, Coder<?>> NULL_OUTPUT_CODERS = null;
+
+  private void checkOutput(final KV<String, Integer> expected, final KV<String, Integer> result) {
+    // check key
+    assertEquals(expected.getKey(), result.getKey());
+    // check value
+    assertEquals(expected.getValue(), result.getValue());
+  }
+
+  private void checkOutput2(final KV<String, List<String>> expected, final KV<String, Iterable<String>> result) {
+    // check key
+    assertEquals(expected.getKey(), result.getKey());
+    // check value
+    final List<String> resultValue = new ArrayList<>();
+    final List<String> expectedValue = new ArrayList<>(expected.getValue());
+    result.getValue().iterator().forEachRemaining(resultValue::add);
+    Collections.sort(resultValue);
+    Collections.sort(expectedValue);
+    assertEquals(expectedValue, resultValue);
+  }
+
+
+  // Test Combine.Perkey operation.
+
+  // Define combine function.
+  public static class CountFn extends Combine.CombineFn<Integer, CountFn.Accum, Integer> {
+
+    public static class Accum {
+      int sum = 0;
+    }
+
+    @Override
+    public Accum createAccumulator() {
+      return new Accum();
+    }
+
+    @Override
+    public Accum addInput(Accum accum, Integer input) {
+      accum.sum += input;
+      return accum;
+    }
+
+    @Override
+    public Accum mergeAccumulators(Iterable<Accum> accums) {
+      Accum merged = createAccumulator();
+      for (Accum accum : accums) {
+        merged.sum += accum.sum;
+      }
+      return merged;
+    }
+
+    @Override
+    public Integer extractOutput(Accum accum) {
+      return accum.sum;
+    }
+
+    @Override
+    public Coder<CountFn.Accum> getAccumulatorCoder(CoderRegistry registry, Coder<Integer> inputcoder) {
+      return AvroCoder.of(CountFn.Accum.class);
+    }
+  }
+
+  public final static Combine.CombineFn combine_fn = new CountFn();
+
+  // window size: 10 sec
+  // period: 5 sec
+  //
+  // [----------------------- window1 --------------------------]
+  //                             [---------------------------window2-------------------------]
+  //                                                            [-------------------------window3----------------------]
+  //
+  //  ts1 -- ts2 -------------------- ts3 -- w1 -- ts4 --------- ts5 - w2 ------------ ts6 -----ts7 -- w3 -- ts8 --ts9 --- w4
+
+  // Test without late data
+  @Test
+  @SuppressWarnings("unchecked")
+  public void test_combine() {
+    final TupleTag<String> outputTag = new TupleTag<>("main-output");
+    final SlidingWindows slidingWindows = SlidingWindows.of(Duration.standardSeconds(10))
+      .every(Duration.standardSeconds(5));
+
+    final Instant ts1 = new Instant(1000);
+    final Instant ts2 = new Instant(2000);
+    final Instant ts3 = new Instant(6000);
+    final Instant ts4 = new Instant(8000);
+    final Instant ts5 = new Instant(11000);
+    final Instant ts6 = new Instant(14000);
+    final Instant ts7 = new Instant(16000);
+    final Instant ts8 = new Instant(17000);
+    final Instant ts9 = new Instant(19000);
+    final Watermark watermark1 = new Watermark(7000);
+    final Watermark watermark2 = new Watermark(12000);
+    final Watermark watermark3 = new Watermark(18000);
+    final Watermark watermark4 = new Watermark(21000);
+
+    AppliedCombineFn<String, Integer, CountFn.Accum, Integer> applied_combine_fn =
+      AppliedCombineFn.withInputCoder(
+        combine_fn,
+        CoderRegistry.createDefault(),
+        KvCoder.of(STRING_CODER, INTEGER_CODER),
+        null,
+        WindowingStrategy.of(slidingWindows).withMode(ACCUMULATING_FIRED_PANES)
+      );
+
+    final GBKTransform<String, Integer, Integer> combine_transform =
+      new GBKTransform(
+        NULL_OUTPUT_CODERS,
+        outputTag,
+        WindowingStrategy.of(slidingWindows).withMode(ACCUMULATING_FIRED_PANES),
+        PipelineOptionsFactory.as(NemoPipelineOptions.class),
+        SystemReduceFn.combining(STRING_CODER, applied_combine_fn),
+        DoFnSchemaInformation.create(),
+        DisplayData.none());
+
+    // window1 : [-5000, 5000) in millisecond
+    // window2 : [0, 10000)
+    // window3 : [5000, 15000)
+    // window4 : [10000, 20000)
+    List<IntervalWindow> sortedWindows = new ArrayList<>(slidingWindows.assignWindows(ts1));
+    Collections.sort(sortedWindows, IntervalWindow::compareTo);
+    final IntervalWindow window1 = sortedWindows.get(0);
+    final IntervalWindow window2 = sortedWindows.get(1);
+    sortedWindows = new ArrayList<>(slidingWindows.assignWindows(ts5));
+    Collections.sort(sortedWindows, IntervalWindow::compareTo);
+    final IntervalWindow window3 = sortedWindows.get(0);
+    final IntervalWindow window4 = sortedWindows.get(1);
+
+    // Prepare to test CombineStreamTransform
+    final Transform.Context context = mock(Transform.Context.class);
+    final TestOutputCollector<KV<String, Integer>> oc = new TestOutputCollector();
+    combine_transform.prepare(context, oc);
+
+    combine_transform.onData(WindowedValue.of(
+      KV.of("a", 1), ts1, slidingWindows.assignWindows(ts1), PaneInfo.NO_FIRING));
+    combine_transform.onData(WindowedValue.of(
+      KV.of("c", 1), ts2, slidingWindows.assignWindows(ts2), PaneInfo.NO_FIRING));
+    combine_transform.onData(WindowedValue.of(
+      KV.of("b", 1), ts3, slidingWindows.assignWindows(ts3), PaneInfo.NO_FIRING));
+
+    // Emit outputs of window1
+    combine_transform.onWatermark(watermark1);
+    Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
+
+    // Check outputs
+    assertEquals(Arrays.asList(window1), oc.outputs.get(0).getWindows());
+    assertEquals(2, oc.outputs.size());
+    checkOutput(KV.of("a", 1), oc.outputs.get(0).getValue());
+    checkOutput(KV.of("c", 1), oc.outputs.get(1).getValue());
+    oc.outputs.clear();
+    oc.watermarks.clear();
+
+    combine_transform.onData(WindowedValue.of(
+      KV.of("a", 1), ts4, slidingWindows.assignWindows(ts4), PaneInfo.NO_FIRING));
+    combine_transform.onData(WindowedValue.of(
+      KV.of("c", 1), ts5, slidingWindows.assignWindows(ts5), PaneInfo.NO_FIRING));
+
+    // Emit outputs of window2
+    combine_transform.onWatermark(watermark2);
+    Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
+
+    // Check outputs
+    assertEquals(Arrays.asList(window2), oc.outputs.get(0).getWindows());
+    assertEquals(3, oc.outputs.size());
+    checkOutput(KV.of("a", 2), oc.outputs.get(0).getValue());
+    checkOutput(KV.of("b", 1), oc.outputs.get(1).getValue());
+    checkOutput(KV.of("c", 1), oc.outputs.get(2).getValue());
+    oc.outputs.clear();
+    oc.watermarks.clear();
+
+    combine_transform.onData(WindowedValue.of(
+      KV.of("b", 1), ts6, slidingWindows.assignWindows(ts6), PaneInfo.NO_FIRING));
+    combine_transform.onData(WindowedValue.of(
+      KV.of("b", 1), ts7, slidingWindows.assignWindows(ts7), PaneInfo.NO_FIRING));
+    combine_transform.onData(WindowedValue.of(
+      KV.of("a", 1), ts8, slidingWindows.assignWindows(ts8), PaneInfo.NO_FIRING));
+
+    // Emit outputs of window3
+    combine_transform.onWatermark(watermark3);
+    Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
+
+    // Check outputs
+    assertEquals(Arrays.asList(window3), oc.outputs.get(0).getWindows());
+    checkOutput(KV.of("a", 1), oc.outputs.get(0).getValue());
+    checkOutput(KV.of("b", 2), oc.outputs.get(1).getValue());
+    checkOutput(KV.of("c", 1), oc.outputs.get(2).getValue());
+    oc.outputs.clear();
+    oc.watermarks.clear();
+
+
+    combine_transform.onData(WindowedValue.of(
+      KV.of("c", 3), ts9, slidingWindows.assignWindows(ts9), PaneInfo.NO_FIRING));
+
+    // Emit outputs of window3
+    combine_transform.onWatermark(watermark4);
+    Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
+
+    // Check outputs
+    assertEquals(Arrays.asList(window4), oc.outputs.get(0).getWindows());
+    checkOutput(KV.of("a", 1), oc.outputs.get(0).getValue());
+    checkOutput(KV.of("b", 2), oc.outputs.get(1).getValue());
+    checkOutput(KV.of("c", 4), oc.outputs.get(2).getValue());
+
+    oc.outputs.clear();
+    oc.watermarks.clear();
+  }
+
+  // Test with late data
+  @Test
+  @SuppressWarnings("unchecked")
+  public void test_combine_lateData() {
+    final TupleTag<String> outputTag = new TupleTag<>("main-output");
+    final Duration lateness = Duration.standardSeconds(2);
+    final SlidingWindows slidingWindows = SlidingWindows.of(Duration.standardSeconds(10))
+      .every(Duration.standardSeconds(5));
+
+    final Instant ts1 = new Instant(1000);
+    final Instant ts2 = new Instant(2000);
+    final Instant ts3 = new Instant(4500);
+    final Instant ts4 = new Instant(11000);
+    final Watermark watermark1 = new Watermark(6500);
+    final Watermark watermark2 = new Watermark(8000);
+
+    AppliedCombineFn<String, Integer, CountFn.Accum, Integer> applied_combine_fn =
+      AppliedCombineFn.withInputCoder(
+        combine_fn,
+        CoderRegistry.createDefault(),
+        KvCoder.of(STRING_CODER, INTEGER_CODER),
+        null,
+        WindowingStrategy.of(slidingWindows).withMode(ACCUMULATING_FIRED_PANES).withAllowedLateness(lateness)
+      );
+
+    final GBKTransform<String, Integer, Integer> combine_transform =
+      new GBKTransform(
+        NULL_OUTPUT_CODERS,
+        outputTag,
+        WindowingStrategy.of(slidingWindows).withMode(ACCUMULATING_FIRED_PANES).withAllowedLateness(lateness),
+        PipelineOptionsFactory.as(NemoPipelineOptions.class),
+        SystemReduceFn.combining(STRING_CODER, applied_combine_fn),
+        DoFnSchemaInformation.create(),
+        DisplayData.none());
+
+    // window1 : [-5000, 5000) in millisecond
+    // window2 : [0, 10000)
+    // window3 : [5000, 15000)
+    // window4 : [10000, 20000)
+    List<IntervalWindow> sortedWindows = new ArrayList<>(slidingWindows.assignWindows(ts1));
+    Collections.sort(sortedWindows, IntervalWindow::compareTo);
+    final IntervalWindow window1 = sortedWindows.get(0);
+    final IntervalWindow window2 = sortedWindows.get(1);
+    sortedWindows = new ArrayList<>(slidingWindows.assignWindows(ts4));
+    Collections.sort(sortedWindows, IntervalWindow::compareTo);
+    final IntervalWindow window3 = sortedWindows.get(0);
+    final IntervalWindow window4 = sortedWindows.get(1);
+
+    // Prepare to test
+    final Transform.Context context = mock(Transform.Context.class);
+    final TestOutputCollector<KV<String, Integer>> oc = new TestOutputCollector();
+    combine_transform.prepare(context, oc);
+
+    combine_transform.onData(WindowedValue.of(
+      KV.of("a", 1), ts1, slidingWindows.assignWindows(ts1), PaneInfo.NO_FIRING));
+    combine_transform.onData(WindowedValue.of(
+      KV.of("b", 1), ts2, slidingWindows.assignWindows(ts2), PaneInfo.NO_FIRING));
+
+    // On-time firing of window1. Skipping checking outputs since test1 checks output from non-late data
+    combine_transform.onWatermark(watermark1);
+    oc.outputs.clear();
+
+    // Late data in window 1. Should be accumulated since EOW + allowed lateness > current Watermark
+    combine_transform.onData(WindowedValue.of(
+      KV.of("a", 5), ts1, slidingWindows.assignWindows(ts1), PaneInfo.NO_FIRING));
+
+    // Check outputs
+    assertEquals(Arrays.asList(window1), oc.outputs.get(0).getWindows());
+    assertEquals(1,oc.outputs.size());
+    assertEquals(LATE, oc.outputs.get(0).getPane().getTiming());
+    checkOutput(KV.of("a", 6), oc.outputs.get(0).getValue());
+
+    oc.outputs.clear();
+    oc.watermarks.clear();
+
+    // Late data in window 1. Should NOT be accumulated to outputs of window1 since EOW + allowed lateness > current Watermark
+    combine_transform.onWatermark(watermark2);
+    combine_transform.onData(WindowedValue.of(
+      KV.of("a", 10), ts3, slidingWindows.assignWindows(ts3), PaneInfo.NO_FIRING));
+    Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
+
+
+    // Check outputs
+    assertEquals(Arrays.asList(window1), oc.outputs.get(0).getWindows());
+    assertEquals(1, oc.outputs.size());
+    assertEquals(LATE, oc.outputs.get(0).getPane().getTiming());
+    checkOutput(KV.of("a", 10), oc.outputs.get(0).getValue());
+    oc.outputs.clear();
+    oc.watermarks.clear();
+  }
+
+  // Now testing GroupbyKey Operation.
+
+  // window size: 2 sec
+  // interval size: 1 sec
+  //
+  //                           [--------------window2------------------------------]
+  // [----------------------- window1 --------------------------]
+  // [-------window0-------]
+  // ts1 -- ts2 -- ts3 -- w -- ts4 -- w2 -- ts5 --ts6 --ts7 -- w3 -- ts8 --ts9 - --w4
+  // (1, "hello")
+  //      (1, "world")
+  //             (2, "hello")
+  //                   ==> window1: {(1,["hello","world"]), (2, ["hello"])}
+  //                                 (1, "a")
+  //                                                       (2,"a")
+  //                                                             (3,"a")
+  //                                                                  (2,"b")
+  //                                                       => window2: {(1,"a"), (2,["a","b"]), (3,"a")}
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void test_gbk() {
+
+    final TupleTag<String> outputTag = new TupleTag<>("main-output");
+    final SlidingWindows slidingWindows = SlidingWindows.of(Duration.standardSeconds(2))
+      .every(Duration.standardSeconds(1));
+
+    final GBKTransform<String, String, Iterable<String>> doFnTransform =
+      new GBKTransform(
+        NULL_OUTPUT_CODERS,
+        outputTag,
+        WindowingStrategy.of(slidingWindows),
+        PipelineOptionsFactory.as(NemoPipelineOptions.class),
+        SystemReduceFn.buffering(STRING_CODER),
+        DoFnSchemaInformation.create(),
+        DisplayData.none());
+
+    final Instant ts1 = new Instant(1);
+    final Instant ts2 = new Instant(100);
+    final Instant ts3 = new Instant(300);
+    final Watermark watermark = new Watermark(1003);
+    final Instant ts4 = new Instant(1200);
+    final Watermark watermark2 = new Watermark(1400);
+    final Instant ts5 = new Instant(1600);
+    final Instant ts6 = new Instant(1800);
+    final Instant ts7 = new Instant(1900);
+    final Watermark watermark3 = new Watermark(2100);
+    final Instant ts8 = new Instant(2200);
+    final Instant ts9 = new Instant(2300);
+    final Watermark watermark4 = new Watermark(3000);
+
+
+    List<IntervalWindow> sortedWindows = new ArrayList<>(slidingWindows.assignWindows(ts1));
+    Collections.sort(sortedWindows, IntervalWindow::compareTo);
+
+    // [0---1000)
+    final IntervalWindow window0 = sortedWindows.get(0);
+    // [0---2000)
+    final IntervalWindow window1 = sortedWindows.get(1);
+
+    sortedWindows.clear();
+    sortedWindows = new ArrayList<>(slidingWindows.assignWindows(ts4));
+    Collections.sort(sortedWindows, IntervalWindow::compareTo);
+
+    // [1000--3000)
+    final IntervalWindow window2 = sortedWindows.get(1);
+
+
+    final Transform.Context context = mock(Transform.Context.class);
+    final TestOutputCollector<KV<String, Iterable<String>>> oc = new TestOutputCollector();
+    doFnTransform.prepare(context, oc);
+
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("1", "hello"), ts1, slidingWindows.assignWindows(ts1), PaneInfo.NO_FIRING));
+
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("1", "world"), ts2, slidingWindows.assignWindows(ts2), PaneInfo.NO_FIRING));
+
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("2", "hello"), ts3, slidingWindows.assignWindows(ts3), PaneInfo.NO_FIRING));
+
+    doFnTransform.onWatermark(watermark);
+
+    // output
+    // 1: ["hello", "world"]
+    // 2: ["hello"]
+    Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
+
+    // windowed result for key 1
+    assertEquals(Arrays.asList(window0), oc.outputs.get(0).getWindows());
+    checkOutput2(KV.of("1", Arrays.asList("hello", "world")), oc.outputs.get(0).getValue());
+
+    // windowed result for key 2
+    assertEquals(Arrays.asList(window0), oc.outputs.get(1).getWindows());
+    checkOutput2(KV.of("2", Arrays.asList("hello")), oc.outputs.get(1).getValue());
+
+    assertEquals(2, oc.outputs.size());
+    assertEquals(2, oc.watermarks.size());
+
+    // check output watermark
+    assertEquals(1000,
+      oc.watermarks.get(0).getTimestamp());
+
+    oc.outputs.clear();
+    oc.watermarks.clear();
+
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("1", "a"), ts4, slidingWindows.assignWindows(ts4), PaneInfo.NO_FIRING));
+
+
+    doFnTransform.onWatermark(watermark2);
+
+    assertEquals(0, oc.outputs.size()); // do not emit anything
+    assertEquals(0, oc.watermarks.size());
+
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("3", "a"), ts5, slidingWindows.assignWindows(ts5), PaneInfo.NO_FIRING));
+
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("3", "a"), ts6, slidingWindows.assignWindows(ts6), PaneInfo.NO_FIRING));
+
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("3", "b"), ts7, slidingWindows.assignWindows(ts7), PaneInfo.NO_FIRING));
+
+    // emit window1
+    doFnTransform.onWatermark(watermark3);
+
+    // output
+    // 1: ["hello", "world", "a"]
+    // 2: ["hello"]
+    // 3: ["a", "a", "b"]
+    Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
+
+
+    // windowed result for key 1
+    assertEquals(Arrays.asList(window1), oc.outputs.get(0).getWindows());
+    checkOutput2(KV.of("1", Arrays.asList("hello", "world", "a")), oc.outputs.get(0).getValue());
+
+    // windowed result for key 2
+    assertEquals(Arrays.asList(window1), oc.outputs.get(1).getWindows());
+    checkOutput2(KV.of("2", Arrays.asList("hello")), oc.outputs.get(1).getValue());
+
+    // windowed result for key 3
+    assertEquals(Arrays.asList(window1), oc.outputs.get(2).getWindows());
+    checkOutput2(KV.of("3", Arrays.asList("a", "a", "b")), oc.outputs.get(2).getValue());
+
+    // check output watermark
+    assertEquals(2000,
+      oc.watermarks.get(0).getTimestamp());
+
+    oc.outputs.clear();
+    oc.watermarks.clear();
+
+
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("1", "a"), ts8, slidingWindows.assignWindows(ts8), PaneInfo.NO_FIRING));
+
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("3", "b"), ts9, slidingWindows.assignWindows(ts9), PaneInfo.NO_FIRING));
+
+    // emit window2
+    doFnTransform.onWatermark(watermark4);
+
+    // output
+    // 1: ["a", "a"]
+    // 3: ["a", "a", "b", "b"]
+    Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
+
+    assertEquals(2, oc.outputs.size());
+
+    // windowed result for key 1
+    assertEquals(Arrays.asList(window2), oc.outputs.get(0).getWindows());
+    checkOutput2(KV.of("1", Arrays.asList("a", "a")), oc.outputs.get(0).getValue());
+
+    // windowed result for key 3
+    assertEquals(Arrays.asList(window2), oc.outputs.get(1).getWindows());
+    checkOutput2(KV.of("3", Arrays.asList("a", "a", "b", "b")), oc.outputs.get(1).getValue());
+
+    // check output watermark
+    assertEquals(3000,
+      oc.watermarks.get(0).getTimestamp());
+
+    doFnTransform.close();
+  }
+
+  /**
+   * Test complex triggers that emit early and late firing.
+   */
+  @Test
+  public void test_gbk_eventTimeTrigger() {
+    final Duration lateness = Duration.standardSeconds(1);
+    final AfterWatermark.AfterWatermarkEarlyAndLate trigger = AfterWatermark.pastEndOfWindow()
+      // early firing
+      .withEarlyFirings(
+        AfterProcessingTime
+          .pastFirstElementInPane()
+          // early firing 1 sec after receiving an element
+          .plusDelayOf(Duration.millis(1000)))
+      // late firing: Fire on any late data.
+      .withLateFirings(AfterPane.elementCountAtLeast(1));
+
+    final FixedWindows window = (FixedWindows) Window.into(
+      FixedWindows.of(Duration.standardSeconds(5)))
+      // lateness
+      .withAllowedLateness(lateness)
+      .triggering(trigger)
+      // TODO #308: Test discarding of refinements
+      .accumulatingFiredPanes().getWindowFn();
+
+    final TupleTag<String> outputTag = new TupleTag<>("main-output");
+
+    final GBKTransform<String, String, Iterable<String>> doFnTransform =
+      new GBKTransform(
+        NULL_OUTPUT_CODERS,
+        outputTag,
+        WindowingStrategy.of(window).withTrigger(trigger)
+          .withMode(ACCUMULATING_FIRED_PANES)
+          .withAllowedLateness(lateness),
+        PipelineOptionsFactory.as(NemoPipelineOptions.class),
+        SystemReduceFn.buffering(STRING_CODER),
+        DoFnSchemaInformation.create(),
+        DisplayData.none());
+
+
+    final Transform.Context context = mock(Transform.Context.class);
+    final TestOutputCollector<KV<String, Iterable<String>>> oc = new TestOutputCollector();
+    doFnTransform.prepare(context, oc);
+
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("1", "hello"), new Instant(1), window.assignWindow(new Instant(1)), PaneInfo.NO_FIRING));
+
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+
+    // early firing is not related to the watermark progress
+    doFnTransform.onWatermark(new Watermark(2));
+    assertEquals(1, oc.outputs.size());
+    assertEquals(EARLY, oc.outputs.get(0).getPane().getTiming());
+    oc.outputs.clear();
+
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("1", "world"), new Instant(3), window.assignWindow(new Instant(3)), PaneInfo.NO_FIRING));
+    // EARLY firing... waiting >= 1 sec
+    try {
+      Thread.sleep(2000);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+
+    // GBKTransform emits data when receiving watermark
+    // TODO #250: element-wise processing
+    doFnTransform.onWatermark(new Watermark(5));
+    assertEquals(1, oc.outputs.size());
+    assertEquals(EARLY, oc.outputs.get(0).getPane().getTiming());
+    // ACCUMULATION MODE
+    checkOutput2(KV.of("1", Arrays.asList("hello", "world")), oc.outputs.get(0).getValue());
+    oc.outputs.clear();
+
+    // ON TIME
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("1", "!!"), new Instant(3), window.assignWindow(new Instant(3)), PaneInfo.NO_FIRING));
+    doFnTransform.onWatermark(new Watermark(5001));
+    assertEquals(1, oc.outputs.size());
+    assertEquals(ON_TIME, oc.outputs.get(0).getPane().getTiming());
+    // ACCUMULATION MODE
+    checkOutput2(KV.of("1", Arrays.asList("hello", "world", "!!")), oc.outputs.get(0).getValue());
+    oc.outputs.clear();
+
+    // LATE DATA
+    // actual window: [0-5000)
+    // allowed lateness: 1000 (ms)
+    // current watermark: 5001
+    // data: 1000
+    // End of current window + allowed lateness >= current watermark (4999 + 1000 >= 5001)
+    // so it should be accumulated to the prev window
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("1", "bye!"), new Instant(1000),
+      window.assignWindow(new Instant(1000)), PaneInfo.NO_FIRING));
+    doFnTransform.onWatermark(new Watermark(6000));
+    assertEquals(1, oc.outputs.size());
+    assertEquals(LATE, oc.outputs.get(0).getPane().getTiming());
+    // The data should  be accumulated to the previous window because it allows 1 second lateness
+    checkOutput2(KV.of("1", Arrays.asList("hello", "world", "!!", "bye!")), oc.outputs.get(0).getValue());
+    oc.outputs.clear();
+
+    // LATE DATA
+    // actual window: [0-5000)
+    // allowed lateness: 1000 (ms)
+    // data timestamp: 4800
+    // current watermark: 6000
+    // End of current window + allowed lateness < current watermark (4999 + 1000 < 6000)
+    // It should not be accumulated to the prev window
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("1", "hello again!"), new Instant(4800),
+      window.assignWindow(new Instant(4800)), PaneInfo.NO_FIRING));
+    doFnTransform.onWatermark(new Watermark(6300));
+    assertEquals(1, oc.outputs.size());
+    assertEquals(LATE, oc.outputs.get(0).getPane().getTiming());
+    checkOutput2(KV.of("1", Arrays.asList("hello again!")), oc.outputs.get(0).getValue());
+    oc.outputs.clear();
+    doFnTransform.close();
+  }
+}
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
deleted file mode 100644
index 1af392c..0000000
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
+++ /dev/null
@@ -1,366 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.compiler.frontend.beam.transform;
-
-import org.apache.beam.runners.core.SystemReduceFn;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.windowing.*;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
-import org.apache.nemo.common.punctuation.Watermark;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-import static org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing.*;
-import static org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-
-public final class GroupByKeyAndWindowDoFnTransformTest {
-  private static final Logger LOG = LoggerFactory.getLogger(GroupByKeyAndWindowDoFnTransformTest.class.getName());
-  private final static Coder NULL_INPUT_CODER = null;
-  private final static Map<TupleTag<?>, Coder<?>> NULL_OUTPUT_CODERS = null;
-
-  private void checkOutput(final KV<String, List<String>> expected, final KV<String, Iterable<String>> result) {
-
-    // check key
-    assertEquals(expected.getKey(), result.getKey());
-
-    // check value
-    final List<String> resultValue = new ArrayList<>();
-    final List<String> expectedValue = new ArrayList<>(expected.getValue());
-    result.getValue().iterator().forEachRemaining(resultValue::add);
-    Collections.sort(resultValue);
-    Collections.sort(expectedValue);
-
-    assertEquals(expectedValue, resultValue);
-  }
-
-
-  // window size: 2 sec
-  // interval size: 1 sec
-  //
-  //                           [--------------window2------------------------------]
-  // [----------------------- window1 --------------------------]
-  // [-------window0-------]
-  // ts1 -- ts2 -- ts3 -- w -- ts4 -- w2 -- ts5 --ts6 --ts7 -- w3 -- ts8 --ts9 - --w4
-  // (1, "hello")
-  //      (1, "world")
-  //             (2, "hello")
-  //                   ==> window1: {(1,["hello","world"]), (2, ["hello"])}
-  //                                 (1, "a")
-  //                                                       (2,"a")
-  //                                                             (3,"a")
-  //                                                                  (2,"b")
-  //                                                       => window2: {(1,"a"), (2,["a","b"]), (3,"a")}
-  @Test
-  @SuppressWarnings("unchecked")
-  public void test() {
-
-    final TupleTag<String> outputTag = new TupleTag<>("main-output");
-    final SlidingWindows slidingWindows = SlidingWindows.of(Duration.standardSeconds(2))
-      .every(Duration.standardSeconds(1));
-
-    final GroupByKeyAndWindowDoFnTransform<String, String> doFnTransform =
-      new GroupByKeyAndWindowDoFnTransform(
-        NULL_OUTPUT_CODERS,
-        outputTag,
-        WindowingStrategy.of(slidingWindows),
-        PipelineOptionsFactory.as(NemoPipelineOptions.class),
-        SystemReduceFn.buffering(NULL_INPUT_CODER),
-        DisplayData.none());
-
-    final Instant ts1 = new Instant(1);
-    final Instant ts2 = new Instant(100);
-    final Instant ts3 = new Instant(300);
-    final Watermark watermark = new Watermark(1003);
-    final Instant ts4 = new Instant(1200);
-    final Watermark watermark2 = new Watermark(1400);
-    final Instant ts5 = new Instant(1600);
-    final Instant ts6 = new Instant(1800);
-    final Instant ts7 = new Instant(1900);
-    final Watermark watermark3 = new Watermark(2100);
-    final Instant ts8 = new Instant(2200);
-    final Instant ts9 = new Instant(2300);
-    final Watermark watermark4 = new Watermark(3000);
-
-
-    List<IntervalWindow> sortedWindows = new ArrayList<>(slidingWindows.assignWindows(ts1));
-    Collections.sort(sortedWindows, IntervalWindow::compareTo);
-
-    // [0---1000)
-    final IntervalWindow window0 = sortedWindows.get(0);
-    // [0---2000)
-    final IntervalWindow window1 = sortedWindows.get(1);
-
-    sortedWindows.clear();
-    sortedWindows = new ArrayList<>(slidingWindows.assignWindows(ts4));
-    Collections.sort(sortedWindows, IntervalWindow::compareTo);
-
-    // [1000--3000)
-    final IntervalWindow window2 = sortedWindows.get(1);
-
-
-    final Transform.Context context = mock(Transform.Context.class);
-    final TestOutputCollector<KV<String, Iterable<String>>> oc = new TestOutputCollector();
-    doFnTransform.prepare(context, oc);
-
-    doFnTransform.onData(WindowedValue.of(
-      KV.of("1", "hello"), ts1, slidingWindows.assignWindows(ts1), PaneInfo.NO_FIRING));
-
-    doFnTransform.onData(WindowedValue.of(
-      KV.of("1", "world"), ts2, slidingWindows.assignWindows(ts2), PaneInfo.NO_FIRING));
-
-    doFnTransform.onData(WindowedValue.of(
-      KV.of("2", "hello"), ts3, slidingWindows.assignWindows(ts3), PaneInfo.NO_FIRING));
-
-    doFnTransform.onWatermark(watermark);
-
-    // output
-    // 1: ["hello", "world"]
-    // 2: ["hello"]
-    Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
-
-    // windowed result for key 1
-    assertEquals(Arrays.asList(window0), oc.outputs.get(0).getWindows());
-    checkOutput(KV.of("1", Arrays.asList("hello", "world")), oc.outputs.get(0).getValue());
-
-    // windowed result for key 2
-    assertEquals(Arrays.asList(window0), oc.outputs.get(1).getWindows());
-    checkOutput(KV.of("2", Arrays.asList("hello")), oc.outputs.get(1).getValue());
-
-    assertEquals(2, oc.outputs.size());
-    assertEquals(1, oc.watermarks.size());
-
-    // check output watermark
-    assertEquals(1000,
-      oc.watermarks.get(0).getTimestamp());
-
-    oc.outputs.clear();
-    oc.watermarks.clear();
-
-    doFnTransform.onData(WindowedValue.of(
-      KV.of("1", "a"), ts4, slidingWindows.assignWindows(ts4), PaneInfo.NO_FIRING));
-
-
-    doFnTransform.onWatermark(watermark2);
-
-    assertEquals(0, oc.outputs.size()); // do not emit anything
-    assertEquals(0, oc.watermarks.size());
-
-    doFnTransform.onData(WindowedValue.of(
-      KV.of("3", "a"), ts5, slidingWindows.assignWindows(ts5), PaneInfo.NO_FIRING));
-
-    doFnTransform.onData(WindowedValue.of(
-      KV.of("3", "a"), ts6, slidingWindows.assignWindows(ts6), PaneInfo.NO_FIRING));
-
-    doFnTransform.onData(WindowedValue.of(
-      KV.of("3", "b"), ts7, slidingWindows.assignWindows(ts7), PaneInfo.NO_FIRING));
-
-    // emit window1
-    doFnTransform.onWatermark(watermark3);
-
-    // output
-    // 1: ["hello", "world", "a"]
-    // 2: ["hello"]
-    // 3: ["a", "a", "b"]
-    Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
-
-
-    // windowed result for key 1
-    assertEquals(Arrays.asList(window1), oc.outputs.get(0).getWindows());
-    checkOutput(KV.of("1", Arrays.asList("hello", "world", "a")), oc.outputs.get(0).getValue());
-
-    // windowed result for key 2
-    assertEquals(Arrays.asList(window1), oc.outputs.get(1).getWindows());
-    checkOutput(KV.of("2", Arrays.asList("hello")), oc.outputs.get(1).getValue());
-
-    // windowed result for key 3
-    assertEquals(Arrays.asList(window1), oc.outputs.get(2).getWindows());
-    checkOutput(KV.of("3", Arrays.asList("a", "a", "b")), oc.outputs.get(2).getValue());
-
-    // check output watermark
-    assertEquals(2000,
-      oc.watermarks.get(0).getTimestamp());
-
-    oc.outputs.clear();
-    oc.watermarks.clear();
-
-
-    doFnTransform.onData(WindowedValue.of(
-      KV.of("1", "a"), ts8, slidingWindows.assignWindows(ts8), PaneInfo.NO_FIRING));
-
-    doFnTransform.onData(WindowedValue.of(
-      KV.of("3", "b"), ts9, slidingWindows.assignWindows(ts9), PaneInfo.NO_FIRING));
-
-    // emit window2
-    doFnTransform.onWatermark(watermark4);
-
-    // output
-    // 1: ["a", "a"]
-    // 3: ["a", "a", "b", "b"]
-    Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
-
-    assertEquals(2, oc.outputs.size());
-
-    // windowed result for key 1
-    assertEquals(Arrays.asList(window2), oc.outputs.get(0).getWindows());
-    checkOutput(KV.of("1", Arrays.asList("a", "a")), oc.outputs.get(0).getValue());
-
-    // windowed result for key 3
-    assertEquals(Arrays.asList(window2), oc.outputs.get(1).getWindows());
-    checkOutput(KV.of("3", Arrays.asList("a", "a", "b", "b")), oc.outputs.get(1).getValue());
-
-    // check output watermark
-    assertEquals(3000,
-      oc.watermarks.get(0).getTimestamp());
-
-    doFnTransform.close();
-  }
-
-  /**
-   * Test complex triggers that emit early and late firing.
-   */
-  @Test
-  public void eventTimeTriggerTest() {
-    final Duration lateness = Duration.standardSeconds(1);
-    final AfterWatermark.AfterWatermarkEarlyAndLate trigger = AfterWatermark.pastEndOfWindow()
-      // early firing
-      .withEarlyFirings(
-        AfterProcessingTime
-          .pastFirstElementInPane()
-          // early firing 1 sec after receiving an element
-          .plusDelayOf(Duration.millis(1000)))
-      // late firing: Fire on any late data.
-      .withLateFirings(AfterPane.elementCountAtLeast(1));
-
-    final FixedWindows window = (FixedWindows) Window.into(
-      FixedWindows.of(Duration.standardSeconds(5)))
-      // lateness
-      .withAllowedLateness(lateness)
-      .triggering(trigger)
-      // TODO #308: Test discarding of refinements
-      .accumulatingFiredPanes().getWindowFn();
-
-    final TupleTag<String> outputTag = new TupleTag<>("main-output");
-    final GroupByKeyAndWindowDoFnTransform<String, String> doFnTransform =
-      new GroupByKeyAndWindowDoFnTransform(
-        NULL_OUTPUT_CODERS,
-        outputTag,
-        WindowingStrategy.of(window).withTrigger(trigger)
-          .withMode(ACCUMULATING_FIRED_PANES)
-          .withAllowedLateness(lateness),
-        PipelineOptionsFactory.as(NemoPipelineOptions.class),
-        SystemReduceFn.buffering(NULL_INPUT_CODER),
-        DisplayData.none());
-
-
-    final Transform.Context context = mock(Transform.Context.class);
-    final TestOutputCollector<KV<String, Iterable<String>>> oc = new TestOutputCollector();
-    doFnTransform.prepare(context, oc);
-
-    doFnTransform.onData(WindowedValue.of(
-      KV.of("1", "hello"), new Instant(1), window.assignWindow(new Instant(1)), PaneInfo.NO_FIRING));
-
-    // early firing is not related to the watermark progress
-    doFnTransform.onWatermark(new Watermark(2));
-    assertEquals(1, oc.outputs.size());
-    assertEquals(EARLY, oc.outputs.get(0).getPane().getTiming());
-    LOG.info("Output: {}", oc.outputs.get(0));
-    oc.outputs.clear();
-
-    doFnTransform.onData(WindowedValue.of(
-      KV.of("1", "world"), new Instant(3), window.assignWindow(new Instant(3)), PaneInfo.NO_FIRING));
-    // EARLY firing... waiting >= 1 sec
-    try {
-      Thread.sleep(2000);
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    }
-
-    // GBKTransform emits data when receiving watermark
-    // TODO #250: element-wise processing
-    doFnTransform.onWatermark(new Watermark(5));
-    assertEquals(1, oc.outputs.size());
-    assertEquals(EARLY, oc.outputs.get(0).getPane().getTiming());
-    // ACCUMULATION MODE
-    checkOutput(KV.of("1", Arrays.asList("hello", "world")), oc.outputs.get(0).getValue());
-    LOG.info("Output: {}", oc.outputs.get(0));
-    oc.outputs.clear();
-
-    // ON TIME
-    doFnTransform.onData(WindowedValue.of(
-      KV.of("1", "!!"), new Instant(3), window.assignWindow(new Instant(3)), PaneInfo.NO_FIRING));
-    doFnTransform.onWatermark(new Watermark(5001));
-    assertEquals(1, oc.outputs.size());
-    assertEquals(ON_TIME, oc.outputs.get(0).getPane().getTiming());
-    LOG.info("Output: {}", oc.outputs.get(0));
-    // ACCUMULATION MODE
-    checkOutput(KV.of("1", Arrays.asList("hello", "world", "!!")), oc.outputs.get(0).getValue());
-    oc.outputs.clear();
-
-    // LATE DATA
-    // actual window: [0-5000)
-    // allowed lateness: 1000 (ms)
-    // current watermark: 5001
-    // data: 4500
-    // the data timestamp + allowed lateness > current watermark,
-    // so it should be accumulated to the prev window
-    doFnTransform.onData(WindowedValue.of(
-      KV.of("1", "bye!"), new Instant(4500),
-      window.assignWindow(new Instant(4500)), PaneInfo.NO_FIRING));
-    doFnTransform.onWatermark(new Watermark(6000));
-    assertEquals(1, oc.outputs.size());
-    assertEquals(LATE, oc.outputs.get(0).getPane().getTiming());
-    LOG.info("Output: {}", oc.outputs.get(0));
-    // The data should  be accumulated to the previous window because it allows 1 second lateness
-    checkOutput(KV.of("1", Arrays.asList("hello", "world", "!!", "bye!")), oc.outputs.get(0).getValue());
-    oc.outputs.clear();
-
-    // LATE DATA
-    // data timestamp: 4800
-    // current watermark: 6000
-    // data timestamp + allowed lateness < current watermark
-    // It should not be accumulated to the prev window
-    doFnTransform.onData(WindowedValue.of(
-      KV.of("1", "hello again!"), new Instant(4800),
-      window.assignWindow(new Instant(4800)), PaneInfo.NO_FIRING));
-    doFnTransform.onWatermark(new Watermark(6300));
-    assertEquals(1, oc.outputs.size());
-    assertEquals(LATE, oc.outputs.get(0).getPane().getTiming());
-    LOG.info("Output: {}", oc.outputs.get(0));
-    checkOutput(KV.of("1", Arrays.asList("hello again!")), oc.outputs.get(0).getValue());
-    oc.outputs.clear();
-
-
-    doFnTransform.close();
-
-  }
-}