You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2016/08/05 05:32:04 UTC

[1/2] apex-malhar git commit: APEXMALHAR-1701 An abstract deduper implemenatation

Repository: apex-malhar
Updated Branches:
  refs/heads/master 0ec1433b2 -> 7dea3d0a0


APEXMALHAR-1701 An abstract deduper implemenatation


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/a1f62669
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/a1f62669
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/a1f62669

Branch: refs/heads/master
Commit: a1f62669aaa46d2e38c42a8755baf4dc66d3246f
Parents: 0ec1433
Author: Chandni Singh <cs...@apache.org>
Authored: Fri Apr 29 11:43:33 2016 -0700
Committer: bhupeshchawda <bh...@gmail.com>
Committed: Tue Aug 2 17:29:22 2016 +0530

----------------------------------------------------------------------
 .../apex/malhar/lib/dedup/AbstractDeduper.java  | 177 +++++++++++++++++++
 1 file changed, 177 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a1f62669/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java b/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java
new file mode 100644
index 0000000..d23a28a
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * An operator that de-dupes a stream.
+ *
+ * @param <T> type of events
+ */
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper<T> implements Operator, Operator.CheckpointNotificationListener,
+    Operator.IdleTimeHandler
+{
+  @NotNull
+  protected final ManagedTimeUnifiedStateImpl managedState = new ManagedTimeUnifiedStateImpl();
+
+  private transient long sleepMillis;
+
+  private transient Map<T, Future<Slice>> waitingEvents = Maps.newLinkedHashMap();
+
+  public final transient DefaultOutputPort<T> output = new DefaultOutputPort<>();
+
+  public final transient DefaultOutputPort<T> duplicates = new DefaultOutputPort<>();
+
+  public final transient DefaultInputPort<T> input = new DefaultInputPort<T>()
+  {
+    @Override
+    public void process(T tuple)
+    {
+      long time = getTime(tuple);
+      Future<Slice> valFuture = managedState.getAsync(time, getKey(tuple));
+      if (valFuture.isDone()) {
+        try {
+          processEvent(tuple, valFuture.get());
+        } catch (InterruptedException | ExecutionException e) {
+          throw new RuntimeException("process", e);
+        }
+      } else {
+        waitingEvents.put(tuple, valFuture);
+      }
+    }
+  };
+
+  protected abstract long getTime(T tuple);
+
+  protected abstract Slice getKey(T tuple);
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    sleepMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
+    managedState.setup(context);
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    managedState.beginWindow(windowId);
+  }
+
+  @Override
+  public void handleIdleTime()
+  {
+    if (waitingEvents.size() > 0) {
+      Iterator<Map.Entry<T, Future<Slice>>> waitIterator = waitingEvents.entrySet().iterator();
+      while (waitIterator.hasNext()) {
+        Map.Entry<T, Future<Slice>> waitingEvent = waitIterator.next();
+        if (waitingEvent.getValue().isDone()) {
+          try {
+            processEvent(waitingEvent.getKey(), waitingEvent.getValue().get());
+          } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException("handle idle time", e);
+          }
+          waitIterator.remove();
+        }
+      }
+    } else {
+      /* nothing to do here, so sleep for a while to avoid busy loop */
+      try {
+        Thread.sleep(sleepMillis);
+      } catch (InterruptedException ie) {
+        throw new RuntimeException(ie);
+      }
+    }
+  }
+
+  protected void processEvent(T tuple, Slice value)
+  {
+    if (value == BucketedState.EXPIRED) {
+      return;
+    }
+    if (value == null) {
+      //not a duplicate event
+      output.emit(tuple);
+    } else {
+      if (duplicates.isConnected()) {
+        duplicates.emit(tuple);
+      }
+    }
+  }
+
+  @Override
+  public void endWindow()
+  {
+    Iterator<Map.Entry<T, Future<Slice>>> waitIterator = waitingEvents.entrySet().iterator();
+    while (waitIterator.hasNext()) {
+      Map.Entry<T, Future<Slice>> waitingEvent = waitIterator.next();
+      try {
+        processEvent(waitingEvent.getKey(), waitingEvent.getValue().get());
+      } catch (InterruptedException | ExecutionException e) {
+        throw new RuntimeException("end window", e);
+      }
+      waitIterator.remove();
+
+    }
+    managedState.endWindow();
+  }
+
+  @Override
+  public void teardown()
+  {
+    managedState.teardown();
+  }
+
+  @Override
+  public void beforeCheckpoint(long windowId)
+  {
+    managedState.beforeCheckpoint(windowId);
+  }
+
+  @Override
+  public void checkpointed(long windowId)
+  {
+    managedState.checkpointed(windowId);
+  }
+
+  @Override
+  public void committed(long windowId)
+  {
+    managedState.committed(windowId);
+  }
+}


[2/2] apex-malhar git commit: APEXMALHAR-1701: Added features to AbstractDeduper. Added Pojo implementation. Added unit tests.

Posted by ch...@apache.org.
APEXMALHAR-1701: Added features to AbstractDeduper. Added Pojo implementation. Added unit tests.


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/7dea3d0a
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/7dea3d0a
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/7dea3d0a

Branch: refs/heads/master
Commit: 7dea3d0a06875cb30cd5dd9f5d6e353f909fc485
Parents: a1f6266
Author: bhupesh <bh...@gmail.com>
Authored: Thu Jul 7 15:52:34 2016 +0530
Committer: bhupeshchawda <bh...@gmail.com>
Committed: Tue Aug 2 19:59:10 2016 +0530

----------------------------------------------------------------------
 .../apex/malhar/lib/dedup/AbstractDeduper.java  | 402 ++++++++++++++++---
 .../malhar/lib/dedup/DeduperStreamCodec.java    |  55 +++
 .../lib/dedup/TimeBasedDedupOperator.java       | 239 +++++++++++
 .../malhar/lib/dedup/DeduperOrderingTest.java   | 176 ++++++++
 .../lib/dedup/DeduperPartitioningTest.java      | 195 +++++++++
 .../lib/dedup/DeduperTimeBasedPOJOImplTest.java | 116 ++++++
 .../apache/apex/malhar/lib/dedup/TestPojo.java  |  82 ++++
 7 files changed, 1203 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7dea3d0a/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java b/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java
index d23a28a..d06acc3 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java
@@ -20,135 +20,368 @@ package org.apache.apex.malhar.lib.dedup;
 
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
 import javax.validation.constraints.NotNull;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.apex.malhar.lib.state.BucketedState;
 import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
 
+import com.datatorrent.api.AutoMetric;
 import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
 import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
 import com.datatorrent.netlet.util.Slice;
 
 /**
- * An operator that de-dupes a stream.
+ * Abstract class which allows de-duplicating incoming tuples based on a configured key.
+ * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ * Following steps are used in identifying the state of a particular tuple:
+ * 1. Check if the time of the tuple falls in an expired bucket. If so, the tuple is expired
+ * 2. If the tuple is a valid event, it is checked in the store whether the same key already exists in the
+ * time bucket identified by the event time. If, so, the tuple is a duplicate.
+ * 3. Otherwise the tuple is a unique tuple.
  *
  * @param <T> type of events
  */
+@Evolving
 @OperatorAnnotation(checkpointableWithinAppWindow = false)
-public abstract class AbstractDeduper<T> implements Operator, Operator.CheckpointNotificationListener,
-    Operator.IdleTimeHandler
+public abstract class AbstractDeduper<T>
+    implements Operator, Operator.IdleTimeHandler, ActivationListener<Context>, Operator.CheckpointNotificationListener
 {
+  /**
+   * The input port on which events are received.
+   */
+  public final transient DefaultInputPort<T> input = new DefaultInputPort<T>()
+  {
+    @Override
+    public final void process(T tuple)
+    {
+      processTuple(tuple);
+    }
+  };
+
+  /**
+   * The output port on which deduped events are emitted.
+   */
+  public final transient DefaultOutputPort<T> unique = new DefaultOutputPort<>();
+
+  /**
+   * The output port on which duplicate events are emitted.
+   */
+  public final transient DefaultOutputPort<T> duplicate = new DefaultOutputPort<>();
+
+  /**
+   * The output port on which expired events are emitted.
+   */
+  public final transient DefaultOutputPort<T> expired = new DefaultOutputPort<>();
+
+  /**
+   * Whether or not the order of tuples be maintained.
+   * Making this "true" might entail some cost in performance, but makes the operator idempotent.
+   */
+  private boolean preserveTupleOrder = true;
+
   @NotNull
   protected final ManagedTimeUnifiedStateImpl managedState = new ManagedTimeUnifiedStateImpl();
 
-  private transient long sleepMillis;
-
+  /**
+   * Map to hold the result of a tuple processing (unique, duplicate, expired or error) until previous
+   * tuples get processed. This is used only when {@link #preserveTupleOrder} is true.
+   */
+  private transient Map<T, Decision> decisions;
   private transient Map<T, Future<Slice>> waitingEvents = Maps.newLinkedHashMap();
+  private transient Map<Slice, Long> asyncEvents = Maps.newLinkedHashMap();
+
+  // Metrics
+  @AutoMetric
+  private transient long uniqueEvents;
+  @AutoMetric
+  private transient long duplicateEvents;
+  @AutoMetric
+  private transient long expiredEvents;
 
-  public final transient DefaultOutputPort<T> output = new DefaultOutputPort<>();
+  @Override
+  public void setup(OperatorContext context)
+  {
+    FileAccessFSImpl fAccessImpl = new TFileImpl.DTFileImpl();
+    fAccessImpl.setBasePath(context.getValue(DAG.APPLICATION_PATH) + "/bucket_data");
+    managedState.setFileAccess(fAccessImpl);
+    managedState.setup(context);
 
-  public final transient DefaultOutputPort<T> duplicates = new DefaultOutputPort<>();
+    if (preserveTupleOrder) {
+      decisions = Maps.newLinkedHashMap();
+    }
+  }
 
-  public final transient DefaultInputPort<T> input = new DefaultInputPort<T>()
+  @Override
+  public void beginWindow(long l)
   {
-    @Override
-    public void process(T tuple)
-    {
-      long time = getTime(tuple);
-      Future<Slice> valFuture = managedState.getAsync(time, getKey(tuple));
-      if (valFuture.isDone()) {
-        try {
-          processEvent(tuple, valFuture.get());
-        } catch (InterruptedException | ExecutionException e) {
-          throw new RuntimeException("process", e);
-        }
+    // Reset Metrics
+    uniqueEvents = 0;
+    duplicateEvents = 0;
+    expiredEvents = 0;
+
+    managedState.beginWindow(l);
+  }
+
+  protected abstract Slice getKey(T event);
+
+  protected abstract long getTime(T event);
+
+  /**
+   * Processes an incoming tuple
+   *
+   * @param tuple the incoming tuple
+   */
+  protected void processTuple(T tuple)
+  {
+
+    long time = getTime(tuple);
+    Future<Slice> valFuture = managedState.getAsync(time, getKey(tuple));
+
+    if (valFuture.isDone()) {
+      try {
+        processEvent(tuple, valFuture.get());
+      } catch (InterruptedException | ExecutionException e) {
+        throw new RuntimeException(e);
+      }
+    } else {
+      processWaitingEvent(tuple, valFuture);
+    }
+  }
+
+  /**
+   * Processes a looked-up event
+   *
+   * @param tuple the incoming tuple
+   * @param value the looked up key of the tuple
+   */
+  protected void processEvent(T tuple, Slice value)
+  {
+    if (value == BucketedState.EXPIRED) {
+      processInvalid(tuple);
+      return;
+    }
+    processValid(tuple, value);
+  }
+
+  /**
+   * Processes a tuple which is waiting for the lookup to return.
+   *
+   * @param tuple The tuple which needs to wait
+   * @param future The future object which will ultimately return the lookup result
+   */
+  protected void processWaitingEvent(T tuple, Future<Slice> future)
+  {
+    waitingEvents.put(tuple, future);
+    if (preserveTupleOrder) {
+      recordDecision(tuple, Decision.UNKNOWN);
+    }
+  }
+
+  /**
+   * Processes a valid (non-expired) tuple. This tuple may be a unique or a duplicate.
+   *
+   * @param tuple
+   *          The tuple to be processed
+   * @param value
+   *          Looked up key of the tuple
+   */
+  protected void processValid(T tuple, Slice value)
+  {
+    if (!preserveTupleOrder || waitingEvents.isEmpty()) {
+      if (value == null) {
+        managedState.put(getTime(tuple), getKey(tuple), new Slice(new byte[0]));
+        processUnique(tuple);
       } else {
-        waitingEvents.put(tuple, valFuture);
+        processDuplicate(tuple);
       }
+    } else {
+      processWaitingEvent(tuple, Futures.immediateFuture(value));
     }
-  };
+  }
 
-  protected abstract long getTime(T tuple);
+  /**
+   * Processes invalid tuples.
+   *
+   * @param tuple the incoming tuple
+   */
+  protected void processInvalid(T tuple)
+  {
+    if (preserveTupleOrder && !decisions.isEmpty()) {
+      recordDecision(tuple, Decision.EXPIRED);
+    } else {
+      processExpired(tuple);
+    }
+  }
 
-  protected abstract Slice getKey(T tuple);
+  /**
+   * Processes an expired tuple
+   *
+   * @param tuple the incoming tuple
+   */
+  protected void processExpired(T tuple)
+  {
+    expiredEvents++;
+    emitExpired(tuple);
+  }
 
-  @Override
-  public void setup(Context.OperatorContext context)
+  /**
+   * Processes the duplicate tuple.
+   *
+   * @param tuple
+   *          The tuple which is a duplicate
+   */
+  protected void processDuplicate(T tuple)
   {
-    sleepMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
-    managedState.setup(context);
+    if (preserveTupleOrder && !decisions.isEmpty()) {
+      recordDecision(tuple, Decision.DUPLICATE);
+    } else {
+      duplicateEvents++;
+      emitDuplicate(tuple);
+    }
   }
 
-  @Override
-  public void beginWindow(long windowId)
+  /**
+   * Processes the unique tuple.
+   *
+   * @param tuple
+   *          The tuple which is a unique
+   */
+  protected void processUnique(T tuple)
   {
-    managedState.beginWindow(windowId);
+    if (preserveTupleOrder && !decisions.isEmpty()) {
+      recordDecision(tuple, Decision.UNIQUE);
+    } else {
+      uniqueEvents++;
+      emitUnique(tuple);
+    }
   }
 
+  /**
+   * {@inheritDoc}
+   */
   @Override
   public void handleIdleTime()
   {
+    if (preserveTupleOrder) {
+      emitProcessedTuples();
+    }
+    processAuxiliary(false);
+  }
+
+  /**
+   * Does any auxiliary processing in the idle time of the operator.
+   * Processes any tuples which are waiting for the lookup to return.
+   *
+   * @param finalize Whether or not to wait for future to return
+   */
+  protected void processAuxiliary(boolean finalize)
+  {
     if (waitingEvents.size() > 0) {
       Iterator<Map.Entry<T, Future<Slice>>> waitIterator = waitingEvents.entrySet().iterator();
       while (waitIterator.hasNext()) {
         Map.Entry<T, Future<Slice>> waitingEvent = waitIterator.next();
-        if (waitingEvent.getValue().isDone()) {
+        T tuple = waitingEvent.getKey();
+        Slice tupleKey = getKey(tuple);
+        long tupleTime = getTime(tuple);
+        Future<Slice> future = waitingEvent.getValue();
+        if (future.isDone() || finalize ) {
           try {
-            processEvent(waitingEvent.getKey(), waitingEvent.getValue().get());
+            if (future.get() == null && asyncEvents.get(tupleKey) == null) {
+              managedState.put(tupleTime, tupleKey, new Slice(new byte[0]));
+              asyncEvents.put(tupleKey, tupleTime);
+              processUnique(tuple);
+            } else {
+              processDuplicate(tuple);
+            }
           } catch (InterruptedException | ExecutionException e) {
             throw new RuntimeException("handle idle time", e);
           }
           waitIterator.remove();
         }
-      }
-    } else {
-      /* nothing to do here, so sleep for a while to avoid busy loop */
-      try {
-        Thread.sleep(sleepMillis);
-      } catch (InterruptedException ie) {
-        throw new RuntimeException(ie);
+        if (!finalize) {
+          break;
+        }
       }
     }
   }
 
-  protected void processEvent(T tuple, Slice value)
+  @Override
+  public void endWindow()
   {
-    if (value == BucketedState.EXPIRED) {
-      return;
-    }
-    if (value == null) {
-      //not a duplicate event
-      output.emit(tuple);
-    } else {
-      if (duplicates.isConnected()) {
-        duplicates.emit(tuple);
-      }
+    processAuxiliary(true);
+    if (preserveTupleOrder) {
+      emitProcessedTuples();
     }
+    Preconditions.checkArgument(waitingEvents.isEmpty());
+    asyncEvents.clear();
+    managedState.endWindow();
   }
 
-  @Override
-  public void endWindow()
+  /**
+   * Records a decision for use later. This is needed to ensure that the order of incoming tuples is maintained.
+   *
+   * @param tuple the incoming tuple
+   * @param d The decision for the tuple
+   */
+  protected void recordDecision(T tuple, Decision d)
   {
-    Iterator<Map.Entry<T, Future<Slice>>> waitIterator = waitingEvents.entrySet().iterator();
-    while (waitIterator.hasNext()) {
-      Map.Entry<T, Future<Slice>> waitingEvent = waitIterator.next();
-      try {
-        processEvent(waitingEvent.getKey(), waitingEvent.getValue().get());
-      } catch (InterruptedException | ExecutionException e) {
-        throw new RuntimeException("end window", e);
-      }
-      waitIterator.remove();
+    decisions.put(tuple, d);
+  }
 
+  /**
+   * Processes tuples for which the decision (unique / duplicate / expired) has been made.
+   * Breaks once an undecided tuple is found, as we don't want to emit out of order
+   */
+  protected void emitProcessedTuples()
+  {
+    Iterator<Entry<T, Decision>> entries = decisions.entrySet().iterator();
+    while (entries.hasNext()) {
+      Entry<T, Decision> td = entries.next();
+      switch (td.getValue()) {
+        case UNIQUE:
+          uniqueEvents++;
+          emitUnique(td.getKey());
+          entries.remove();
+          break;
+        case DUPLICATE:
+          duplicateEvents++;
+          emitDuplicate(td.getKey());
+          entries.remove();
+          break;
+        case EXPIRED:
+          expiredEvents++;
+          emitExpired(td.getKey());
+          entries.remove();
+          break;
+        default:
+          /*
+           * Decision for this is still UNKNOWN. Tuple is still waiting for bucket to be loaded. Break.
+           */
+          break;
+      }
     }
-    managedState.endWindow();
   }
 
   @Override
@@ -174,4 +407,49 @@ public abstract class AbstractDeduper<T> implements Operator, Operator.Checkpoin
   {
     managedState.committed(windowId);
   }
+
+  protected void emitUnique(T event)
+  {
+    unique.emit(event);
+  }
+
+  protected void emitDuplicate(T event)
+  {
+    duplicate.emit(event);
+  }
+
+  protected void emitExpired(T event)
+  {
+    expired.emit(event);
+  }
+
+  /**
+   * Checks whether output of deduper should preserve the input order
+   */
+  public boolean isOrderedOutput()
+  {
+    return preserveTupleOrder;
+  }
+
+  /**
+   * If set to true, the deduper will emit tuples in the order in which they were received. Tuples which arrived later
+   * will wait for previous tuples to get processed and emitted. If not set, the order of tuples may change as tuples
+   * may be emitted out of order as and when they get processed.
+   *
+   * @param preserveTupleOrder whether or not to preserve the order of incoming tuples
+   */
+  public void setPreserveTupleOrder(boolean preserveTupleOrder)
+  {
+    this.preserveTupleOrder = preserveTupleOrder;
+  }
+
+  /**
+   * Enum for holding all possible values for a decision for a tuple
+   */
+  protected enum Decision
+  {
+    UNIQUE, DUPLICATE, EXPIRED, UNKNOWN
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(AbstractDeduper.class);
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7dea3d0a/library/src/main/java/org/apache/apex/malhar/lib/dedup/DeduperStreamCodec.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dedup/DeduperStreamCodec.java b/library/src/main/java/org/apache/apex/malhar/lib/dedup/DeduperStreamCodec.java
new file mode 100644
index 0000000..d40a550
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dedup/DeduperStreamCodec.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.Getter;
+
+/**
+ * A {@link StreamCodec} for {@link AbstractDeduper}.
+ * This helps in partitioning the tuples depending on the key field in the tuple.
+ * The {@link #getPartition(Object)} function returns the hash code of the key field
+ *
+ */
+@Evolving
+public class DeduperStreamCodec extends KryoSerializableStreamCodec<Object>
+{
+  private static final long serialVersionUID = -6904078808859412149L;
+
+  private transient Getter<Object, Object> getter;
+  private String keyExpression;
+
+  public DeduperStreamCodec(String keyExpression)
+  {
+    this.keyExpression = keyExpression;
+  }
+
+  @Override
+  public int getPartition(Object t)
+  {
+    if (getter == null) {
+      getter = PojoUtils.createGetter(t.getClass(), keyExpression, Object.class);
+    }
+    return getter.get(t).hashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7dea3d0a/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java
new file mode 100644
index 0000000..6aebe6b
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import javax.validation.constraints.NotNull;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.Getter;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Time based deduper will de-duplicate incoming POJO tuples and classify them into the following:
+ * 1. Unique
+ * 2. Duplicate
+ * 3. Expired
+ *
+ * Since this is de-duplicating in a stream of tuples, and we cannot store all incoming keys indefinitely,
+ * we use the concept of expiry, where incoming tuples expire after a specified period of time. In this case,
+ * we choose to expire an entire bucket of data as a unit. This requires the user to specify the bucketing
+ * structure in advance in order for the operator to function. Here are the parameters for specifying the
+ * bucketing structure:
+ * 1. {@link #expireBefore} (in seconds)- This is the total time period during which a tuple stays in the
+ * system and blocks any other tuple with the same key.
+ * 2. {@link #bucketSpan} (in seconds) - This is the unit which describes how large a bucket can be.
+ * Typically this should be defined depending on the use case. For example, if we have {@link #expireBefore}
+ * set to 1 hour, then typically we would be clubbing data in the order of minutes, so a {@link #bucketSpan} of
+ * around 1 minute or 5 minutes would make sense. Note that in this case, the entire data worth 1 minute or
+ * 5 minutes will expire as a whole. Setting it to 1 minute would make the number of time buckets in the system
+ * to be 1 hour / 1 minute = 60 buckets. Similarly setting {@link #bucketSpan} to 5 minutes would make number
+ * of buckets to be 12. Note that having too many or too less buckets could have a performance impact. If unsure,
+ * set the {@link #bucketSpan} to be ~ sqrt({@link #expireBefore}). This way the number of buckets and bucket span
+ * are balanced.
+ * 3. {@link #referenceInstant} - The reference point from which to start the time which is used for expiry.
+ * Setting the {@link #referenceInstant} to say, r seconds from the epoch, would initialize the start of expiry
+ * to be from that instant = r. The start and end of the expiry window periodically move by the span of a single
+ * bucket. Refer {@link TimeBucketAssigner} for details.
+ *
+ * Additionally, it also needs the following parameters:
+ * 1. {@link #keyExpression} - The java expression to extract the key fields in the incoming tuple (POJO)
+ * 2. {@link #timeExpression} - The java expression to extract the time field in the incoming tuple (POJO).
+ * In case there is no time field in the tuple, system time, when the tuple is processed, will be used.
+ *
+ */
+@Evolving
+public class TimeBasedDedupOperator extends AbstractDeduper<Object> implements ActivationListener<Context>
+{
+
+  // Required properties
+  @NotNull
+  private String keyExpression;
+
+  private String timeExpression;
+
+  @NotNull
+  private long bucketSpan;
+
+  @NotNull
+  private long expireBefore;
+
+  // Optional
+  private long referenceInstant = new Instant().getMillis() / 1000;
+
+  private transient Class<?> pojoClass;
+
+  private transient Getter<Object, Long> timeGetter;
+
+  private transient Getter<Object, Object> keyGetter;
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
+  {
+    @Override
+    public void setup(PortContext context)
+    {
+      pojoClass = context.getAttributes().get(PortContext.TUPLE_CLASS);
+    }
+
+    @Override
+    public void process(Object tuple)
+    {
+      processTuple(tuple);
+    }
+
+    @Override
+    public StreamCodec<Object> getStreamCodec()
+    {
+      return getDeduperStreamCodec();
+    }
+  };
+
+  @Override
+  protected long getTime(Object tuple)
+  {
+    if (timeGetter != null) {
+      return timeGetter.get(tuple);
+    }
+    return System.currentTimeMillis();
+  }
+
+  @Override
+  protected Slice getKey(Object tuple)
+  {
+    Object key = keyGetter.get(tuple);
+    return new Slice(key.toString().getBytes());
+  }
+
+  protected StreamCodec<Object> getDeduperStreamCodec()
+  {
+    return new DeduperStreamCodec(keyExpression);
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    TimeBucketAssigner timeBucketAssigner = new TimeBucketAssigner();
+    timeBucketAssigner.setBucketSpan(Duration.standardSeconds(bucketSpan));
+    timeBucketAssigner.setExpireBefore(Duration.standardSeconds(expireBefore));
+    timeBucketAssigner.setReferenceInstant(new Instant(referenceInstant * 1000));
+    managedState.setTimeBucketAssigner(timeBucketAssigner);
+    super.setup(context);
+  }
+
+  @Override
+  public void activate(Context context)
+  {
+    if (timeExpression != null) {
+      timeGetter = PojoUtils.createGetter(pojoClass, timeExpression, Long.class);
+    } else {
+      timeGetter = null;
+    }
+    keyGetter = PojoUtils.createGetter(pojoClass, keyExpression, Object.class);
+  }
+
+  @Override
+  public void deactivate()
+  {
+  }
+
+  public String getKeyExpression()
+  {
+    return keyExpression;
+  }
+
+  /**
+   * Sets the key expression
+   * @param keyExpression
+   */
+  public void setKeyExpression(String keyExpression)
+  {
+    this.keyExpression = keyExpression;
+  }
+
+  public String getTimeExpression()
+  {
+    return timeExpression;
+  }
+
+  /**
+   * Sets the time expression
+   * @param timeExpression
+   */
+  public void setTimeExpression(String timeExpression)
+  {
+    this.timeExpression = timeExpression;
+  }
+
+  public long getBucketSpan()
+  {
+    return bucketSpan;
+  }
+
+  /**
+   * Sets the length of a single time bucket (in seconds)
+   * @param bucketSpan
+   */
+  public void setBucketSpan(long bucketSpan)
+  {
+    this.bucketSpan = bucketSpan;
+  }
+
+  public long getExpireBefore()
+  {
+    return expireBefore;
+  }
+
+  /**
+   * Sets the expiry time (in seconds). Any event with time before this is considered to be expired.
+   * @param expireBefore
+   */
+  public void setExpireBefore(long expireBefore)
+  {
+    this.expireBefore = expireBefore;
+  }
+
+  public long getReferenceInstant()
+  {
+    return referenceInstant;
+  }
+
+  /**
+   * Sets the reference instant (in seconds from the epoch).
+   * By default this is the time when the application is started.
+   * @param referenceInstant
+   */
+  public void setReferenceInstant(long referenceInstant)
+  {
+    this.referenceInstant = referenceInstant;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7dea3d0a/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperOrderingTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperOrderingTest.java b/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperOrderingTest.java
new file mode 100644
index 0000000..544c1df
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperOrderingTest.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.Stats;
+import com.datatorrent.api.StatsListener;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+
+public class DeduperOrderingTest
+{
+  public static boolean testFailed = false;
+
+  @Test
+  public void testApplication() throws IOException, Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    DeduperOrderingTestApp app = new DeduperOrderingTestApp();
+    lma.prepareDAG(app, conf);
+    LocalMode.Controller lc = lma.getController();
+    lc.runAsync();
+    app.verifier.latch.await();
+    Assert.assertFalse(testFailed);
+    lc.shutdown();
+  }
+
+  public static class DeduperOrderingTestApp implements StreamingApplication
+  {
+    Verifier verifier;
+
+    @Override
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      RandomDedupDataGenerator random = dag.addOperator("Input", RandomDedupDataGenerator.class);
+
+      TimeBasedDedupOperator dedup = dag.addOperator("Dedup", TimeBasedDedupOperator.class);
+      dedup.setKeyExpression("key");
+      dedup.setTimeExpression("date.getTime()");
+      dedup.setBucketSpan(10);
+      dedup.setExpireBefore(60);
+      dedup.setPreserveTupleOrder(true);
+      FileAccessFSImpl fAccessImpl = new TFileImpl.DTFileImpl();
+      fAccessImpl.setBasePath(dag.getAttributes().get(DAG.APPLICATION_PATH) + "/bucket_data");
+      dedup.managedState.setFileAccess(fAccessImpl);
+      dag.setInputPortAttribute(dedup.input, Context.PortContext.TUPLE_CLASS, TestPojo.class);
+
+      verifier = dag.addOperator("Verifier", Verifier.class);
+
+      dag.addStream("Input to Dedup", random.output, dedup.input);
+      dag.addStream("Dedup to Unique", dedup.unique, verifier.unique);
+      dag.addStream("Dedup to Duplicate", dedup.duplicate, verifier.duplicate);
+      dag.addStream("Dedup to Expired", dedup.expired, verifier.expired);
+    }
+  }
+
+  public static class RandomDedupDataGenerator extends BaseOperator implements InputOperator
+  {
+    private final long count = 500;
+    private long windowCount = 0;
+    private long sequenceId = 0;
+
+    public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>();
+
+    @Override
+    public void beginWindow(long windowId)
+    {
+      windowCount = 0;
+    }
+
+    @Override
+    public void emitTuples()
+    {
+      if (windowCount < count) {
+        TestPojo pojo = new TestPojo(sequenceId, new Date(), sequenceId);
+        output.emit(pojo);
+        sequenceId++;
+      }
+    }
+  }
+
+  public static class Verifier extends BaseOperator implements StatsListener
+  {
+    long prevSequence = 0;
+
+    public transient CountDownLatch latch = new CountDownLatch(1);
+    @AutoMetric
+    int count = 0;
+    public final transient DefaultInputPort<Object> unique = new DefaultInputPort<Object>()
+    {
+      @Override
+      public void process(Object tuple)
+      {
+        TestPojo pojo = (TestPojo)tuple;
+        if (pojo.getSequence() < prevSequence) {
+          testFailed = true;
+        }
+        count++;
+        prevSequence = pojo.sequence;
+      }
+    };
+
+    public final transient DefaultInputPort<Object> duplicate = new DefaultInputPort<Object>()
+    {
+      @Override
+      public void process(Object tuple)
+      {
+        TestPojo pojo = (TestPojo)tuple;
+        if (pojo.getSequence() < prevSequence) {
+          testFailed = true;
+        }
+        count++;
+        prevSequence = pojo.sequence;
+      }
+    };
+
+    public final transient DefaultInputPort<Object> expired = new DefaultInputPort<Object>()
+    {
+      @Override
+      public void process(Object tuple)
+      {
+        TestPojo pojo = (TestPojo)tuple;
+        if (pojo.getSequence() < prevSequence) {
+          testFailed = true;
+        }
+        count++;
+        prevSequence = pojo.sequence;
+      }
+    };
+
+    @Override
+    public Response processStats(BatchedOperatorStats stats)
+    {
+      Stats.OperatorStats operatorStats = stats.getLastWindowedStats().get(stats.getLastWindowedStats().size() - 1);
+      count = (Integer)operatorStats.metrics.get("count");
+      if (count >= 1000) {
+        latch.countDown();
+      }
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7dea3d0a/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperPartitioningTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperPartitioningTest.java b/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperPartitioningTest.java
new file mode 100644
index 0000000..ebe5a3e
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperPartitioningTest.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Maps;
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.Stats;
+import com.datatorrent.api.StatsListener;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+
+/**
+ * Tests whether the operator functions correctly when partitioned
+ * The partitioning in Dedup is overridden by partitioning on basis of the key in the tuple.
+ *
+ */
+public class DeduperPartitioningTest
+{
+  public static final int NUM_DEDUP_PARTITIONS = 5;
+  private static boolean testFailed = false;
+
+  /**
+   * Application to test the partitioning
+   *
+   */
+  public static class TestDedupApp implements StreamingApplication
+  {
+    TestDeduper dedup;
+
+    @Override
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      TestGenerator gen = dag.addOperator("Generator", new TestGenerator());
+
+      dedup = dag.addOperator("Deduper", new TestDeduper());
+      dedup.setKeyExpression("id");
+      dedup.setTimeExpression("eventTime.getTime()");
+      dedup.setBucketSpan(60);
+      dedup.setExpireBefore(600);
+      
+      ConsoleOutputOperator console = dag.addOperator("Console", new ConsoleOutputOperator());
+      dag.addStream("Generator to Dedup", gen.output, dedup.input);
+      dag.addStream("Dedup to Console", dedup.unique, console.input);
+      dag.setInputPortAttribute(dedup.input, Context.PortContext.TUPLE_CLASS, TestEvent.class);
+      dag.setOutputPortAttribute(dedup.unique, Context.PortContext.TUPLE_CLASS, TestEvent.class);
+      dag.setAttribute(dedup, Context.OperatorContext.PARTITIONER, 
+          new StatelessPartitioner<TimeBasedDedupOperator>(NUM_DEDUP_PARTITIONS));
+    }
+  }
+
+  public static class TestDeduper extends TimeBasedDedupOperator implements StatsListener
+  {
+    int operatorId;
+    HashMap<Integer, Integer> partitionMap = Maps.newHashMap();
+    transient CountDownLatch latch = new CountDownLatch(1);
+    int tuplesProcessed = 0;
+    @AutoMetric
+    int tuplesProcessedCompletely = 0;
+
+    @Override
+    public void setup(OperatorContext context)
+    {
+      super.setup(context);
+      operatorId = context.getId();
+    }
+
+    @Override
+    protected void processTuple(Object tuple)
+    {
+      TestEvent event = (TestEvent)tuple;
+      if (partitionMap.containsKey(event.id)) {
+        if (partitionMap.get(event.id) != operatorId) {
+          testFailed = true;
+          throw new RuntimeException("Wrong tuple assignment");
+        }
+      } else {
+        partitionMap.put(event.id, operatorId);
+      }
+      tuplesProcessed++;
+    }
+
+    @Override
+    public void endWindow()
+    {
+      super.endWindow();
+      tuplesProcessedCompletely = tuplesProcessed;
+    }
+
+    @Override
+    public Response processStats(BatchedOperatorStats stats)
+    {
+      Stats.OperatorStats operatorStats = stats.getLastWindowedStats().get(stats.getLastWindowedStats().size() - 1);
+      tuplesProcessedCompletely = (Integer)operatorStats.metrics.get("tuplesProcessedCompletely");
+      if (tuplesProcessedCompletely >= 1000) {
+        latch.countDown();
+      }
+      return null;
+    }
+  }
+
+  public static class TestGenerator extends BaseOperator implements InputOperator
+  {
+
+    public final transient DefaultOutputPort<TestEvent> output = new DefaultOutputPort<>();
+    private final transient Random r = new Random();
+
+    @Override
+    public void emitTuples()
+    {
+      TestEvent event = new TestEvent();
+      event.id = r.nextInt(100);
+      output.emit(event);
+    }
+  }
+
+  public static class TestEvent
+  {
+    private int id;
+    private Date eventTime;
+
+    public TestEvent()
+    {
+    }
+
+    public int getId()
+    {
+      return id;
+    }
+
+    public void setId(int id)
+    {
+      this.id = id;
+    }
+
+    public Date getEventTime()
+    {
+      return eventTime;
+    }
+
+    public void setEventTime(Date eventTime)
+    {
+      this.eventTime = eventTime;
+    }
+  }
+
+  /**
+   * This test validates whether a tuple key goes to exactly one partition
+   */
+  @Test
+  public void testDeduperStreamCodec() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+    TestDedupApp app = new TestDedupApp();
+    lma.prepareDAG(app, conf);
+    LocalMode.Controller lc = lma.getController();
+    lc.runAsync();
+    app.dedup.latch.await();
+    Assert.assertFalse(testFailed);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7dea3d0a/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperTimeBasedPOJOImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperTimeBasedPOJOImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperTimeBasedPOJOImplTest.java
new file mode 100644
index 0000000..4b25341
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperTimeBasedPOJOImplTest.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.io.IOException;
+import java.util.Date;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+import com.datatorrent.stram.engine.PortContext;
+
+public class DeduperTimeBasedPOJOImplTest
+{
+  private static String applicationPath;
+  private static final String APPLICATION_PATH_PREFIX = "target/DeduperPOJOImplTest";
+  private static final String APP_ID = "DeduperPOJOImplTest";
+  private static final int OPERATOR_ID = 0;
+  private static TimeBasedDedupOperator deduper;
+
+  @Before
+  public void setup()
+  {
+    applicationPath = OperatorContextTestHelper.getUniqueApplicationPath(APPLICATION_PATH_PREFIX);
+    deduper = new TimeBasedDedupOperator();
+    deduper.setKeyExpression("key");
+    deduper.setTimeExpression("date.getTime()");
+    deduper.setBucketSpan(10);
+    deduper.setExpireBefore(60);
+    FileAccessFSImpl fAccessImpl = new TFileImpl.DTFileImpl();
+    fAccessImpl.setBasePath(applicationPath + "/bucket_data");
+    deduper.managedState.setFileAccess(fAccessImpl);
+  }
+
+  @Test
+  public void testDedup()
+  {
+    com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributes =
+        new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
+    attributes.put(DAG.APPLICATION_ID, APP_ID);
+    attributes.put(DAG.APPLICATION_PATH, applicationPath);
+    attributes.put(DAG.InputPortMeta.TUPLE_CLASS, TestPojo.class);
+    OperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributes);
+    deduper.setup(context);
+    deduper.input.setup(new PortContext(attributes, context));
+    deduper.activate(context);
+    CollectorTestSink<TestPojo> uniqueSink = new CollectorTestSink<TestPojo>();
+    TestUtils.setSink(deduper.unique, uniqueSink);
+    CollectorTestSink<TestPojo> duplicateSink = new CollectorTestSink<TestPojo>();
+    TestUtils.setSink(deduper.duplicate, duplicateSink);
+    CollectorTestSink<TestPojo> expiredSink = new CollectorTestSink<TestPojo>();
+    TestUtils.setSink(deduper.expired, expiredSink);
+
+    deduper.beginWindow(0);
+
+    long millis = System.currentTimeMillis();
+    for (int i = 0; i < 100; i++) {
+      TestPojo pojo = new TestPojo(i, new Date(millis + i));
+      deduper.input.process(pojo);
+    }
+    TestPojo expiredPojo = new TestPojo(100, new Date(millis - 1000 * 60));
+    deduper.input.process(expiredPojo);
+    for (int i = 90; i < 200; i++) {
+      TestPojo pojo = new TestPojo(i, new Date(millis + i));
+      deduper.input.process(pojo);
+    }
+    deduper.handleIdleTime();
+    deduper.endWindow();
+    Assert.assertTrue(uniqueSink.collectedTuples.size() == 200);
+    Assert.assertTrue(duplicateSink.collectedTuples.size() == 10);
+    Assert.assertTrue(expiredSink.collectedTuples.size() == 1);
+
+    deduper.teardown();
+  }
+
+  @After
+  public void teardown()
+  {
+    Path root = new Path(applicationPath);
+    try {
+      FileSystem fs = FileSystem.newInstance(root.toUri(), new Configuration());
+      fs.delete(root, true);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7dea3d0a/library/src/test/java/org/apache/apex/malhar/lib/dedup/TestPojo.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/dedup/TestPojo.java b/library/src/test/java/org/apache/apex/malhar/lib/dedup/TestPojo.java
new file mode 100644
index 0000000..6311517
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/dedup/TestPojo.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Date;
+
+public class TestPojo
+{
+  private long key;
+  private Date date;
+  public long sequence;
+
+  public TestPojo()
+  {
+  }
+
+  public TestPojo(long key, Date date)
+  {
+    this.key = key;
+    this.date = date;
+  }
+
+  public TestPojo(long key, Date date, long sequence)
+  {
+    this.key = key;
+    this.date = date;
+    this.sequence = sequence;
+  }
+
+  public long getKey()
+  {
+    return key;
+  }
+
+  public Date getDate()
+  {
+    return date;
+  }
+
+  public void setKey(long key)
+  {
+    this.key = key;
+  }
+
+  public void setDate(Date date)
+  {
+    this.date = date;
+  }
+
+  public long getSequence()
+  {
+    return sequence;
+  }
+
+  public void setSequence(long sequence)
+  {
+    this.sequence = sequence;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "TestPojo [key=" + key + ", date=" + date + ", sequence=" + sequence + "]";
+  }
+
+}