You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/10/11 04:30:07 UTC

[2/2] apex-malhar git commit: Introduced WindowedMergeOperatorFeatures classes to solve the problem of code duplication

Introduced WindowedMergeOperatorFeatures classes to solve the problem of code duplication


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/aeb10f33
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/aeb10f33
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/aeb10f33

Branch: refs/heads/master
Commit: aeb10f33d54b6b661cb4f776a4cad0e41d5375c3
Parents: 92bd732
Author: David Yan <da...@datatorrent.com>
Authored: Mon Sep 26 16:47:11 2016 -0700
Committer: Shunxin <lu...@hotmail.com>
Committed: Mon Oct 10 19:07:25 2016 -0700

----------------------------------------------------------------------
 .../malhar/lib/window/MergeAccumulation.java    |   2 +-
 .../lib/window/MergeWindowedOperator.java       |  12 +-
 .../impl/AbstractWindowedMergeOperator.java     | 123 ------------
 .../window/impl/AbstractWindowedOperator.java   |  34 +++-
 .../impl/KeyedWindowedMergeOperatorImpl.java    | 105 +++++------
 .../impl/WindowedMergeOperatorFeatures.java     | 185 +++++++++++++++++++
 .../window/impl/WindowedMergeOperatorImpl.java  | 106 ++++++-----
 .../window/impl/WindowedMergeOperatorTest.java  |   2 +-
 8 files changed, 320 insertions(+), 249 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/aeb10f33/library/src/main/java/org/apache/apex/malhar/lib/window/MergeAccumulation.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/MergeAccumulation.java b/library/src/main/java/org/apache/apex/malhar/lib/window/MergeAccumulation.java
index 71f4408..53cfd40 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/MergeAccumulation.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/MergeAccumulation.java
@@ -21,7 +21,7 @@ package org.apache.apex.malhar.lib.window;
 import org.apache.hadoop.classification.InterfaceStability;
 
 /**
- * This is the interface for accumulation when joining multiple streams.
+ * This is the interface for accumulation when joining two streams.
  *
  * @since 3.5.0
  */

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/aeb10f33/library/src/main/java/org/apache/apex/malhar/lib/window/MergeWindowedOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/MergeWindowedOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/MergeWindowedOperator.java
index 89a70a4..1561caa 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/MergeWindowedOperator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/MergeWindowedOperator.java
@@ -19,12 +19,22 @@
 package org.apache.apex.malhar.lib.window;
 
 /**
- * Interface for Join Windowed Operator.
+ * Interface for Merge Windowed Operator.
  */
 public interface MergeWindowedOperator<InputT1, InputT2>
     extends WindowedOperator<InputT1>
 {
+  /**
+   * The method to accumulate the data tuple from the 2nd input stream
+   *
+   * @param tuple the data tuple
+   */
   void accumulateTuple2(Tuple.WindowedTuple<InputT2> tuple);
 
+  /**
+   * The method to process the watermark tuple from the 2nd input stream
+   *
+   * @param watermark the watermark tuple
+   */
   void processWatermark2(ControlTuple.Watermark watermark);
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/aeb10f33/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedMergeOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedMergeOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedMergeOperator.java
deleted file mode 100644
index 05a2495..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedMergeOperator.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.apex.malhar.lib.window.ControlTuple;
-import org.apache.apex.malhar.lib.window.MergeAccumulation;
-import org.apache.apex.malhar.lib.window.MergeWindowedOperator;
-import org.apache.apex.malhar.lib.window.Tuple;
-import org.apache.apex.malhar.lib.window.WindowedStorage;
-
-import com.google.common.base.Function;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.annotation.InputPortFieldAnnotation;
-
-
-/**
- * Abstract Windowed Merge Operator.
- */
-public abstract class AbstractWindowedMergeOperator<InputT1, InputT2, OutputT, DataStorageT extends WindowedStorage,
-    RetractionStorageT extends WindowedStorage, AccumulationT extends
-    MergeAccumulation>
-    extends AbstractWindowedOperator<InputT1, OutputT, DataStorageT, RetractionStorageT, AccumulationT>
-    implements MergeWindowedOperator<InputT1, InputT2>
-{
-
-  private static final transient Logger LOG = LoggerFactory.getLogger(AbstractWindowedMergeOperator.class);
-  private Function<InputT2, Long> timestampExtractor2;
-
-  private long latestWatermark1 = -1;  // latest watermark from stream 1
-  private long latestWatermark2 = -1;  // latest watermark from stream 2
-
-  public final transient DefaultInputPort<Tuple<InputT2>> input2 = new DefaultInputPort<Tuple<InputT2>>()
-  {
-    @Override
-    public void process(Tuple<InputT2> tuple)
-    {
-      processTuple2(tuple);
-    }
-  };
-
-  // TODO: This port should be removed when Apex Core has native support for custom control tuples
-  @InputPortFieldAnnotation(optional = true)
-  public final transient DefaultInputPort<ControlTuple> controlInput2 = new DefaultInputPort<ControlTuple>()
-  {
-    @Override
-    public void process(ControlTuple tuple)
-    {
-      if (tuple instanceof ControlTuple.Watermark) {
-        processWatermark2((ControlTuple.Watermark)tuple);
-      }
-    }
-  };
-
-  public void processTuple2(Tuple<InputT2> tuple)
-  {
-    long timestamp = extractTimestamp(tuple, timestampExtractor2);
-    if (isTooLate(timestamp)) {
-      dropTuple(tuple);
-    } else {
-      Tuple.WindowedTuple<InputT2> windowedTuple = getWindowedValueWithTimestamp(tuple, timestamp);
-      // do the accumulation
-      accumulateTuple2(windowedTuple);
-      processWindowState(windowedTuple);
-    }
-  }
-
-  public void setTimestampExtractor2(Function<InputT2, Long> timestampExtractor2)
-  {
-    this.timestampExtractor2 = timestampExtractor2;
-  }
-
-
-  @Override
-  public void processWatermark(ControlTuple.Watermark watermark)
-  {
-    latestWatermark1 = watermark.getTimestamp();
-    if (latestWatermark1 >= 0 && latestWatermark2 >= 0) {
-      // Select the smallest timestamp of the latest watermarks as the watermark of the operator.
-      long minWatermark = Math.min(latestWatermark1, latestWatermark2);
-      if (this.watermarkTimestamp < 0 || minWatermark != this.watermarkTimestamp) {
-        this.watermarkTimestamp = minWatermark;
-      }
-    }
-  }
-
-  @Override
-  public void processWatermark2(ControlTuple.Watermark watermark)
-  {
-    latestWatermark2 = watermark.getTimestamp();
-    if (latestWatermark1 >= 0 && latestWatermark2 >= 0) {
-      long minWatermark = Math.min(latestWatermark1, latestWatermark2);
-      if (this.watermarkTimestamp < 0 || minWatermark != this.watermarkTimestamp) {
-        this.watermarkTimestamp = minWatermark;
-      }
-    }
-  }
-
-  @Override
-  protected void processWatermarkAtEndWindow()
-  {
-    if (fixedWatermarkMillis > 0 || this.watermarkTimestamp != this.currentWatermark) {
-      super.processWatermarkAtEndWindow();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/aeb10f33/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
index f965a01..e8ff622 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
@@ -79,8 +79,8 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
 
   private Function<InputT, Long> timestampExtractor;
 
+  protected long nextWatermark = -1;
   protected long currentWatermark = -1;
-  protected long watermarkTimestamp = -1;
   private boolean triggerAtWatermark;
   protected long earlyTriggerCount;
   private long earlyTriggerMillis;
@@ -141,7 +141,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
     if (isTooLate(timestamp)) {
       dropTuple(tuple);
     } else {
-      Tuple.WindowedTuple<InputT> windowedTuple = getWindowedValue(tuple);
+      Tuple.WindowedTuple<InputT> windowedTuple = getWindowedValueWithTimestamp(tuple, timestamp);
       // do the accumulation
       accumulateTuple(windowedTuple);
       processWindowState(windowedTuple);
@@ -256,6 +256,11 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
     this.timestampExtractor = timestampExtractor;
   }
 
+  public void setNextWatermark(long timestamp)
+  {
+    this.nextWatermark = timestamp;
+  }
+
   /**
    * Sets the fixed watermark with respect to the processing time derived from the Apex window ID. This is useful if we
    * don't have watermark tuples from upstream. However, using this means whether a tuple is considered late totally
@@ -408,7 +413,7 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
   @Override
   public void processWatermark(ControlTuple.Watermark watermark)
   {
-    this.watermarkTimestamp = watermark.getTimestamp();
+    this.nextWatermark = watermark.getTimestamp();
   }
 
   @Override
@@ -460,7 +465,6 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
     } else {
       currentDerivedTimestamp += timeIncrement;
     }
-    watermarkTimestamp = -1;
   }
 
   /**
@@ -484,18 +488,17 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
   protected void processWatermarkAtEndWindow()
   {
     if (fixedWatermarkMillis > 0) {
-      watermarkTimestamp = currentDerivedTimestamp - fixedWatermarkMillis;
+      nextWatermark = currentDerivedTimestamp - fixedWatermarkMillis;
     }
-    if (watermarkTimestamp > 0) {
-      this.currentWatermark = watermarkTimestamp;
+    if (nextWatermark > 0 && currentWatermark < nextWatermark) {
 
-      long horizon = watermarkTimestamp - allowedLatenessMillis;
+      long horizon = nextWatermark - allowedLatenessMillis;
 
       for (Iterator<Map.Entry<Window, WindowState>> it = windowStateMap.entries().iterator(); it.hasNext(); ) {
         Map.Entry<Window, WindowState> entry = it.next();
         Window window = entry.getKey();
         WindowState windowState = entry.getValue();
-        if (window.getBeginTimestamp() + window.getDurationMillis() < watermarkTimestamp) {
+        if (window.getBeginTimestamp() + window.getDurationMillis() < nextWatermark) {
           // watermark has not arrived for this window before, marking this window late
           if (windowState.watermarkArrivalTime == -1) {
             windowState.watermarkArrivalTime = currentDerivedTimestamp;
@@ -514,7 +517,8 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
           }
         }
       }
-      controlOutput.emit(new WatermarkImpl(watermarkTimestamp));
+      controlOutput.emit(new WatermarkImpl(nextWatermark));
+      this.currentWatermark = nextWatermark;
     }
   }
 
@@ -552,6 +556,16 @@ public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT ext
     }
   }
 
+  DataStorageT getDataStorage()
+  {
+    return dataStorage;
+  }
+
+  AccumulationT getAccumulation()
+  {
+    return accumulation;
+  }
+
   /**
    * This method fires the normal trigger for the given window.
    *

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/aeb10f33/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedMergeOperatorImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedMergeOperatorImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedMergeOperatorImpl.java
index a5f17c5..3714d6d 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedMergeOperatorImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedMergeOperatorImpl.java
@@ -18,14 +18,14 @@
  */
 package org.apache.apex.malhar.lib.window.impl;
 
-import java.util.Map;
-
-import org.apache.apex.malhar.lib.window.MergeAccumulation;
-import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.ControlTuple;
+import org.apache.apex.malhar.lib.window.MergeWindowedOperator;
 import org.apache.apex.malhar.lib.window.Tuple;
-import org.apache.apex.malhar.lib.window.Window;
-import org.apache.apex.malhar.lib.window.WindowedStorage;
 
+import com.google.common.base.Function;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
 import com.datatorrent.lib.util.KeyValPair;
 
 
@@ -40,81 +40,68 @@ import com.datatorrent.lib.util.KeyValPair;
  * @param <OutputT> The type of the value of the keyed output tuple.
  */
 public class KeyedWindowedMergeOperatorImpl<KeyT, InputT1, InputT2, AccumT, OutputT>
-    extends AbstractWindowedMergeOperator<KeyValPair<KeyT, InputT1>, KeyValPair<KeyT, InputT2>, KeyValPair<KeyT, OutputT>, WindowedStorage.WindowedKeyedStorage<KeyT, AccumT>, WindowedStorage.WindowedKeyedStorage<KeyT, OutputT>, MergeAccumulation<InputT1, InputT2, AccumT, OutputT>>
+    extends KeyedWindowedOperatorImpl<KeyT, InputT1, AccumT, OutputT>
+    implements MergeWindowedOperator<KeyValPair<KeyT, InputT1>, KeyValPair<KeyT, InputT2>>
 {
-  // TODO: Add session window support.
+  private Function<KeyValPair<KeyT, InputT2>, Long> timestampExtractor2;
+
+  private WindowedMergeOperatorFeatures.Keyed joinFeatures = new WindowedMergeOperatorFeatures.Keyed(this);
 
-  private abstract class AccumFunction<T>
+  public final transient DefaultInputPort<Tuple<KeyValPair<KeyT, InputT2>>> input2 = new DefaultInputPort<Tuple<KeyValPair<KeyT, InputT2>>>()
   {
-    abstract AccumT accumulate(AccumT accum, T value);
-  }
+    @Override
+    public void process(Tuple<KeyValPair<KeyT, InputT2>> tuple)
+    {
+      processTuple2(tuple);
+    }
+  };
 
-  private <T> void accumulateTupleHelper(Tuple.WindowedTuple<KeyValPair<KeyT, T>> tuple, AccumFunction<T> accumFn)
+  // TODO: This port should be removed when Apex Core has native support for custom control tuples
+  @InputPortFieldAnnotation(optional = true)
+  public final transient DefaultInputPort<ControlTuple> controlInput2 = new DefaultInputPort<ControlTuple>()
   {
-    final KeyValPair<KeyT, T> kvData = tuple.getValue();
-    KeyT key = kvData.getKey();
-    for (Window window : tuple.getWindows()) {
-      // process each window
-      AccumT accum = dataStorage.get(window, key);
-      if (accum == null) {
-        accum = accumulation.defaultAccumulatedValue();
+    @Override
+    public void process(ControlTuple tuple)
+    {
+      if (tuple instanceof ControlTuple.Watermark) {
+        processWatermark2((ControlTuple.Watermark)tuple);
       }
-      dataStorage.put(window, key, accumFn.accumulate(accum, kvData.getValue()));
     }
+  };
+
+  public void setTimestampExtractor2(Function<KeyValPair<KeyT, InputT2>, Long> timestampExtractor)
+  {
+    this.timestampExtractor2 = timestampExtractor;
   }
 
-  @Override
-  public void accumulateTuple(Tuple.WindowedTuple<KeyValPair<KeyT, InputT1>> tuple)
+  public void processTuple2(Tuple<KeyValPair<KeyT, InputT2>> tuple)
   {
-    accumulateTupleHelper(tuple, new AccumFunction<InputT1>()
-    {
-      @Override
-      AccumT accumulate(AccumT accum, InputT1 value)
-      {
-        return accumulation.accumulate(accum, value);
-      }
-    });
+    long timestamp = extractTimestamp(tuple, this.timestampExtractor2);
+    if (isTooLate(timestamp)) {
+      dropTuple(tuple);
+    } else {
+      Tuple.WindowedTuple<KeyValPair<KeyT, InputT2>> windowedTuple = getWindowedValueWithTimestamp(tuple, timestamp);
+      // do the accumulation
+      accumulateTuple2(windowedTuple);
+      processWindowState(windowedTuple);
+    }
   }
 
   @Override
   public void accumulateTuple2(Tuple.WindowedTuple<KeyValPair<KeyT, InputT2>> tuple)
   {
-    accumulateTupleHelper(tuple, new AccumFunction<InputT2>()
-    {
-      @Override
-      AccumT accumulate(AccumT accum, InputT2 value)
-      {
-        return accumulation.accumulate2(accum, value);
-      }
-    });
+    joinFeatures.accumulateTuple2(tuple);
   }
 
   @Override
-  public void fireNormalTrigger(Window window, boolean fireOnlyUpdatedPanes)
+  public void processWatermark(ControlTuple.Watermark watermark)
   {
-    for (Map.Entry<KeyT, AccumT> entry : dataStorage.entries(window)) {
-      OutputT outputVal = accumulation.getOutput(entry.getValue());
-      if (fireOnlyUpdatedPanes) {
-        OutputT oldValue = retractionStorage.get(window, entry.getKey());
-        if (oldValue != null && oldValue.equals(outputVal)) {
-          continue;
-        }
-      }
-      output.emit(new Tuple.WindowedTuple<>(window, new KeyValPair<>(entry.getKey(), outputVal)));
-      if (retractionStorage != null) {
-        retractionStorage.put(window, entry.getKey(), outputVal);
-      }
-    }
+    joinFeatures.processWatermark1(watermark);
   }
 
   @Override
-  public void fireRetractionTrigger(Window window, boolean firingOnlyUpdatedPanes)
+  public void processWatermark2(ControlTuple.Watermark watermark)
   {
-    if (triggerOption.getAccumulationMode() != TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) {
-      throw new UnsupportedOperationException();
-    }
-    for (Map.Entry<KeyT, OutputT> entry : retractionStorage.entries(window)) {
-      output.emit(new Tuple.WindowedTuple<>(window, new KeyValPair<>(entry.getKey(), accumulation.getRetraction(entry.getValue()))));
-    }
+    joinFeatures.processWatermark2(watermark);
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/aeb10f33/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorFeatures.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorFeatures.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorFeatures.java
new file mode 100644
index 0000000..3fceb06
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorFeatures.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import org.apache.apex.malhar.lib.window.ControlTuple;
+import org.apache.apex.malhar.lib.window.MergeAccumulation;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
+
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * This class provides the features in a MergeWindowedOperator and is intended to be used only
+ * by the implementation of such operator
+ */
+abstract class WindowedMergeOperatorFeatures<InputT1, InputT2, AccumT, AccumulationT extends MergeAccumulation, DataStorageT extends WindowedStorage>
+{
+  protected AbstractWindowedOperator<InputT1, ?, DataStorageT, ?, AccumulationT> operator;
+
+  protected long latestWatermark1 = -1;  // latest watermark from stream 1
+  protected long latestWatermark2 = -1;  // latest watermark from stream 2
+
+  protected abstract class AccumFunction<T>
+  {
+    abstract AccumT accumulate(AccumT accum, T value);
+  }
+
+  protected WindowedMergeOperatorFeatures()
+  {
+    // for kryo
+  }
+
+  WindowedMergeOperatorFeatures(AbstractWindowedOperator<InputT1, ?, DataStorageT, ?, AccumulationT> operator)
+  {
+    this.operator = operator;
+  }
+
+  abstract void accumulateTuple1(Tuple.WindowedTuple<InputT1> tuple);
+
+  abstract void accumulateTuple2(Tuple.WindowedTuple<InputT2> tuple);
+
+  void processWatermark1(ControlTuple.Watermark watermark)
+  {
+    latestWatermark1 = watermark.getTimestamp();
+    // Select the smallest timestamp of the latest watermarks as the watermark of the operator.
+    long minWatermark = Math.min(latestWatermark1, latestWatermark2);
+    operator.setNextWatermark(minWatermark);
+  }
+
+  void processWatermark2(ControlTuple.Watermark watermark)
+  {
+    latestWatermark2 = watermark.getTimestamp();
+    long minWatermark = Math.min(latestWatermark1, latestWatermark2);
+    operator.setNextWatermark(minWatermark);
+  }
+
+  /**
+   * The merge features for plain (non-keyed) operator
+   */
+  static class Plain<InputT1, InputT2, AccumT, AccumulationT extends MergeAccumulation<InputT1, InputT2, AccumT, ?>, DataStorageT extends WindowedStorage.WindowedPlainStorage<AccumT>>
+      extends WindowedMergeOperatorFeatures<InputT1, InputT2, AccumT, AccumulationT, DataStorageT>
+  {
+    private Plain()
+    {
+      // for kryo
+    }
+
+    Plain(AbstractWindowedOperator<InputT1, ?, DataStorageT, ?, AccumulationT> operator)
+    {
+      super(operator);
+    }
+
+    private <T> void accumulateTupleHelper(Tuple.WindowedTuple<T> tuple, AccumFunction<T> accumFn)
+    {
+      for (Window window : tuple.getWindows()) {
+        // process each window
+        AccumT accum = operator.getDataStorage().get(window);
+        if (accum == null) {
+          accum = operator.getAccumulation().defaultAccumulatedValue();
+        }
+        operator.getDataStorage().put(window, accumFn.accumulate(accum, tuple.getValue()));
+      }
+    }
+
+    @Override
+    void accumulateTuple1(Tuple.WindowedTuple<InputT1> tuple)
+    {
+      accumulateTupleHelper(tuple, new AccumFunction<InputT1>()
+      {
+        @Override
+        AccumT accumulate(AccumT accum, InputT1 value)
+        {
+          return operator.getAccumulation().accumulate(accum, value);
+        }
+      });
+    }
+
+    @Override
+    void accumulateTuple2(Tuple.WindowedTuple<InputT2> tuple)
+    {
+      accumulateTupleHelper(tuple, new AccumFunction<InputT2>()
+      {
+        @Override
+        AccumT accumulate(AccumT accum, InputT2 value)
+        {
+          return operator.getAccumulation().accumulate2(accum, value);
+        }
+      });
+    }
+  }
+
+  /**
+   * The merge features for keyed operator
+   */
+  static class Keyed<KeyT, InputT1, InputT2, AccumT, AccumulationT extends MergeAccumulation<InputT1, InputT2, AccumT, ?>, DataStorageT extends WindowedStorage.WindowedKeyedStorage<KeyT, AccumT>>
+      extends WindowedMergeOperatorFeatures<KeyValPair<KeyT, InputT1>, KeyValPair<KeyT, InputT2>, AccumT, AccumulationT, DataStorageT>
+  {
+    private Keyed()
+    {
+      // for kryo
+    }
+
+    Keyed(AbstractWindowedOperator<KeyValPair<KeyT, InputT1>, ?, DataStorageT, ?, AccumulationT> operator)
+    {
+      super(operator);
+    }
+
+    private <T> void accumulateTupleHelper(Tuple.WindowedTuple<KeyValPair<KeyT, T>> tuple, AccumFunction<T> accumFn)
+    {
+      final KeyValPair<KeyT, T> kvData = tuple.getValue();
+      KeyT key = kvData.getKey();
+      for (Window window : tuple.getWindows()) {
+        // process each window
+        AccumT accum = operator.getDataStorage().get(window, key);
+        if (accum == null) {
+          accum = operator.getAccumulation().defaultAccumulatedValue();
+        }
+        operator.getDataStorage().put(window, key, accumFn.accumulate(accum, kvData.getValue()));
+      }
+    }
+
+    @Override
+    void accumulateTuple1(Tuple.WindowedTuple<KeyValPair<KeyT, InputT1>> tuple)
+    {
+      accumulateTupleHelper(tuple, new AccumFunction<InputT1>()
+      {
+        @Override
+        AccumT accumulate(AccumT accum, InputT1 value)
+        {
+          return operator.getAccumulation().accumulate(accum, value);
+        }
+      });
+    }
+
+    @Override
+    void accumulateTuple2(Tuple.WindowedTuple<KeyValPair<KeyT, InputT2>> tuple)
+    {
+      accumulateTupleHelper(tuple, new AccumFunction<InputT2>()
+      {
+        @Override
+        AccumT accumulate(AccumT accum, InputT2 value)
+        {
+          return operator.getAccumulation().accumulate2(accum, value);
+        }
+      });
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/aeb10f33/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorImpl.java
index 38eeff0..0f8a762 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorImpl.java
@@ -18,11 +18,14 @@
  */
 package org.apache.apex.malhar.lib.window.impl;
 
-import org.apache.apex.malhar.lib.window.MergeAccumulation;
-import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.ControlTuple;
+import org.apache.apex.malhar.lib.window.MergeWindowedOperator;
 import org.apache.apex.malhar.lib.window.Tuple;
-import org.apache.apex.malhar.lib.window.Window;
-import org.apache.apex.malhar.lib.window.WindowedStorage;
+
+import com.google.common.base.Function;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
 
 /**
  * Windowed Merge Operator to merge two streams together. It aggregates tuple from two
@@ -35,78 +38,73 @@ import org.apache.apex.malhar.lib.window.WindowedStorage;
  * @param <OutputT> The type of output tuple.
  */
 public class WindowedMergeOperatorImpl<InputT1, InputT2, AccumT, OutputT>
-    extends AbstractWindowedMergeOperator<InputT1, InputT2, OutputT, WindowedStorage.WindowedPlainStorage<AccumT>, WindowedStorage.WindowedPlainStorage<OutputT>, MergeAccumulation<InputT1, InputT2, AccumT, OutputT>>
+    extends WindowedOperatorImpl<InputT1, AccumT, OutputT> implements MergeWindowedOperator<InputT1, InputT2>
 {
-  private abstract class AccumFunction<T>
+  private Function<InputT2, Long> timestampExtractor2;
+
+  private WindowedMergeOperatorFeatures.Plain joinFeatures = new WindowedMergeOperatorFeatures.Plain(this);
+
+  public final transient DefaultInputPort<Tuple<InputT2>> input2 = new DefaultInputPort<Tuple<InputT2>>()
   {
-    abstract AccumT accumulate(AccumT accum, T value);
-  }
+    @Override
+    public void process(Tuple<InputT2> tuple)
+    {
+      processTuple2(tuple);
+    }
+  };
 
-  private <T> void accumulateTupleHelper(Tuple.WindowedTuple<T> tuple, AccumFunction<T> accumFn)
+  // TODO: This port should be removed when Apex Core has native support for custom control tuples
+  @InputPortFieldAnnotation(optional = true)
+  public final transient DefaultInputPort<ControlTuple> controlInput2 = new DefaultInputPort<ControlTuple>()
   {
-    for (Window window : tuple.getWindows()) {
-      // process each window
-      AccumT accum = dataStorage.get(window);
-      if (accum == null) {
-        accum = accumulation.defaultAccumulatedValue();
+    @Override
+    public void process(ControlTuple tuple)
+    {
+      if (tuple instanceof ControlTuple.Watermark) {
+        processWatermark2((ControlTuple.Watermark)tuple);
       }
-      dataStorage.put(window, accumFn.accumulate(accum, tuple.getValue()));
     }
+  };
+
+  public void setTimestampExtractor2(Function<InputT2, Long> timestampExtractor)
+  {
+    this.timestampExtractor2 = timestampExtractor;
   }
 
-  @Override
-  public void accumulateTuple2(Tuple.WindowedTuple<InputT2> tuple)
+  public void processTuple2(Tuple<InputT2> tuple)
   {
-    accumulateTupleHelper(tuple, new AccumFunction<InputT2>()
-    {
-      @Override
-      AccumT accumulate(AccumT accum, InputT2 value)
-      {
-        return accumulation.accumulate2(accum, value);
-      }
-    });
+    long timestamp = extractTimestamp(tuple, this.timestampExtractor2);
+    if (isTooLate(timestamp)) {
+      dropTuple(tuple);
+    } else {
+      Tuple.WindowedTuple<InputT2> windowedTuple = getWindowedValueWithTimestamp(tuple, timestamp);
+      // do the accumulation
+      accumulateTuple2(windowedTuple);
+      processWindowState(windowedTuple);
+    }
   }
 
   @Override
   public void accumulateTuple(Tuple.WindowedTuple<InputT1> tuple)
   {
-    accumulateTupleHelper(tuple, new AccumFunction<InputT1>()
-    {
-      @Override
-      AccumT accumulate(AccumT accum, InputT1 value)
-      {
-        return accumulation.accumulate(accum, value);
-      }
-    });
+    joinFeatures.accumulateTuple1(tuple);
   }
 
   @Override
-  public void fireNormalTrigger(Window window, boolean fireOnlyUpdatedPanes)
+  public void accumulateTuple2(Tuple.WindowedTuple<InputT2> tuple)
   {
-    AccumT accumulatedValue = dataStorage.get(window);
-    OutputT outputValue = accumulation.getOutput(accumulatedValue);
+    joinFeatures.accumulateTuple2(tuple);
+  }
 
-    if (fireOnlyUpdatedPanes) {
-      OutputT oldValue = retractionStorage.get(window);
-      if (oldValue != null && oldValue.equals(outputValue)) {
-        return;
-      }
-    }
-    output.emit(new Tuple.WindowedTuple<>(window, outputValue));
-    if (retractionStorage != null) {
-      retractionStorage.put(window, outputValue);
-    }
+  @Override
+  public void processWatermark(ControlTuple.Watermark watermark)
+  {
+    joinFeatures.processWatermark1(watermark);
   }
 
   @Override
-  public void fireRetractionTrigger(Window window, boolean firingOnlyUpdatedPanes)
+  public void processWatermark2(ControlTuple.Watermark watermark)
   {
-    if (triggerOption.getAccumulationMode() != TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING) {
-      throw new UnsupportedOperationException();
-    }
-    OutputT oldValue = retractionStorage.get(window);
-    if (oldValue != null) {
-      output.emit(new Tuple.WindowedTuple<>(window, accumulation.getRetraction(oldValue)));
-    }
+    joinFeatures.processWatermark2(watermark);
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/aeb10f33/library/src/test/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTest.java
index 7dc09d0..8c37d57 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTest.java
@@ -130,8 +130,8 @@ public class WindowedMergeOperatorTest
 
     // Current watermark of Merge operator could only change during endWindow() event.
     op.controlInput.process(new WatermarkImpl(1100000));
-    Assert.assertEquals(1100000, op.currentWatermark);
     op.endWindow();
+    Assert.assertEquals(1100000, op.currentWatermark);
     Assert.assertEquals(3, sink.collectedTuples.size());
 
     // If the upstreams sent a watermark but the minimum of the latest input watermarks doesn't change, the Merge