You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2016/08/05 05:32:04 UTC
[1/2] apex-malhar git commit: APEXMALHAR-1701 An abstract deduper
implemenatation
Repository: apex-malhar
Updated Branches:
refs/heads/master 0ec1433b2 -> 7dea3d0a0
APEXMALHAR-1701 An abstract deduper implemenatation
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/a1f62669
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/a1f62669
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/a1f62669
Branch: refs/heads/master
Commit: a1f62669aaa46d2e38c42a8755baf4dc66d3246f
Parents: 0ec1433
Author: Chandni Singh <cs...@apache.org>
Authored: Fri Apr 29 11:43:33 2016 -0700
Committer: bhupeshchawda <bh...@gmail.com>
Committed: Tue Aug 2 17:29:22 2016 +0530
----------------------------------------------------------------------
.../apex/malhar/lib/dedup/AbstractDeduper.java | 177 +++++++++++++++++++
1 file changed, 177 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a1f62669/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java b/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java
new file mode 100644
index 0000000..d23a28a
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * An operator that de-dupes a stream.
+ *
+ * @param <T> type of events
+ */
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public abstract class AbstractDeduper<T> implements Operator, Operator.CheckpointNotificationListener,
+ Operator.IdleTimeHandler
+{
+ @NotNull
+ protected final ManagedTimeUnifiedStateImpl managedState = new ManagedTimeUnifiedStateImpl();
+
+ private transient long sleepMillis;
+
+ private transient Map<T, Future<Slice>> waitingEvents = Maps.newLinkedHashMap();
+
+ public final transient DefaultOutputPort<T> output = new DefaultOutputPort<>();
+
+ public final transient DefaultOutputPort<T> duplicates = new DefaultOutputPort<>();
+
+ public final transient DefaultInputPort<T> input = new DefaultInputPort<T>()
+ {
+ @Override
+ public void process(T tuple)
+ {
+ long time = getTime(tuple);
+ Future<Slice> valFuture = managedState.getAsync(time, getKey(tuple));
+ if (valFuture.isDone()) {
+ try {
+ processEvent(tuple, valFuture.get());
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException("process", e);
+ }
+ } else {
+ waitingEvents.put(tuple, valFuture);
+ }
+ }
+ };
+
+ protected abstract long getTime(T tuple);
+
+ protected abstract Slice getKey(T tuple);
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ sleepMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
+ managedState.setup(context);
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ managedState.beginWindow(windowId);
+ }
+
+ @Override
+ public void handleIdleTime()
+ {
+ if (waitingEvents.size() > 0) {
+ Iterator<Map.Entry<T, Future<Slice>>> waitIterator = waitingEvents.entrySet().iterator();
+ while (waitIterator.hasNext()) {
+ Map.Entry<T, Future<Slice>> waitingEvent = waitIterator.next();
+ if (waitingEvent.getValue().isDone()) {
+ try {
+ processEvent(waitingEvent.getKey(), waitingEvent.getValue().get());
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException("handle idle time", e);
+ }
+ waitIterator.remove();
+ }
+ }
+ } else {
+ /* nothing to do here, so sleep for a while to avoid busy loop */
+ try {
+ Thread.sleep(sleepMillis);
+ } catch (InterruptedException ie) {
+ throw new RuntimeException(ie);
+ }
+ }
+ }
+
+ protected void processEvent(T tuple, Slice value)
+ {
+ if (value == BucketedState.EXPIRED) {
+ return;
+ }
+ if (value == null) {
+ //not a duplicate event
+ output.emit(tuple);
+ } else {
+ if (duplicates.isConnected()) {
+ duplicates.emit(tuple);
+ }
+ }
+ }
+
+ @Override
+ public void endWindow()
+ {
+ Iterator<Map.Entry<T, Future<Slice>>> waitIterator = waitingEvents.entrySet().iterator();
+ while (waitIterator.hasNext()) {
+ Map.Entry<T, Future<Slice>> waitingEvent = waitIterator.next();
+ try {
+ processEvent(waitingEvent.getKey(), waitingEvent.getValue().get());
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException("end window", e);
+ }
+ waitIterator.remove();
+
+ }
+ managedState.endWindow();
+ }
+
+ @Override
+ public void teardown()
+ {
+ managedState.teardown();
+ }
+
+ @Override
+ public void beforeCheckpoint(long windowId)
+ {
+ managedState.beforeCheckpoint(windowId);
+ }
+
+ @Override
+ public void checkpointed(long windowId)
+ {
+ managedState.checkpointed(windowId);
+ }
+
+ @Override
+ public void committed(long windowId)
+ {
+ managedState.committed(windowId);
+ }
+}
[2/2] apex-malhar git commit: APEXMALHAR-1701: Added features to
AbstractDeduper. Added Pojo implementation. Added unit tests.
Posted by ch...@apache.org.
APEXMALHAR-1701: Added features to AbstractDeduper. Added Pojo implementation. Added unit tests.
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/7dea3d0a
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/7dea3d0a
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/7dea3d0a
Branch: refs/heads/master
Commit: 7dea3d0a06875cb30cd5dd9f5d6e353f909fc485
Parents: a1f6266
Author: bhupesh <bh...@gmail.com>
Authored: Thu Jul 7 15:52:34 2016 +0530
Committer: bhupeshchawda <bh...@gmail.com>
Committed: Tue Aug 2 19:59:10 2016 +0530
----------------------------------------------------------------------
.../apex/malhar/lib/dedup/AbstractDeduper.java | 402 ++++++++++++++++---
.../malhar/lib/dedup/DeduperStreamCodec.java | 55 +++
.../lib/dedup/TimeBasedDedupOperator.java | 239 +++++++++++
.../malhar/lib/dedup/DeduperOrderingTest.java | 176 ++++++++
.../lib/dedup/DeduperPartitioningTest.java | 195 +++++++++
.../lib/dedup/DeduperTimeBasedPOJOImplTest.java | 116 ++++++
.../apache/apex/malhar/lib/dedup/TestPojo.java | 82 ++++
7 files changed, 1203 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7dea3d0a/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java b/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java
index d23a28a..d06acc3 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java
@@ -20,135 +20,368 @@ package org.apache.apex.malhar.lib.dedup;
import java.util.Iterator;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import javax.validation.constraints.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.apex.malhar.lib.state.BucketedState;
import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
import com.datatorrent.netlet.util.Slice;
/**
- * An operator that de-dupes a stream.
+ * Abstract class which allows de-duplicating incoming tuples based on a configured key.
+ * Also supports expiry mechanism based on a configurable expiry period configured using {@link TimeBucketAssigner}
+ * in {@link ManagedTimeUnifiedStateImpl}
+ * Following steps are used in identifying the state of a particular tuple:
+ * 1. Check if the time of the tuple falls in an expired bucket. If so, the tuple is expired
+ * 2. If the tuple is a valid event, it is checked in the store whether the same key already exists in the
+ * time bucket identified by the event time. If, so, the tuple is a duplicate.
+ * 3. Otherwise the tuple is a unique tuple.
*
* @param <T> type of events
*/
+@Evolving
@OperatorAnnotation(checkpointableWithinAppWindow = false)
-public abstract class AbstractDeduper<T> implements Operator, Operator.CheckpointNotificationListener,
- Operator.IdleTimeHandler
+public abstract class AbstractDeduper<T>
+ implements Operator, Operator.IdleTimeHandler, ActivationListener<Context>, Operator.CheckpointNotificationListener
{
+ /**
+ * The input port on which events are received.
+ */
+ public final transient DefaultInputPort<T> input = new DefaultInputPort<T>()
+ {
+ @Override
+ public final void process(T tuple)
+ {
+ processTuple(tuple);
+ }
+ };
+
+ /**
+ * The output port on which deduped events are emitted.
+ */
+ public final transient DefaultOutputPort<T> unique = new DefaultOutputPort<>();
+
+ /**
+ * The output port on which duplicate events are emitted.
+ */
+ public final transient DefaultOutputPort<T> duplicate = new DefaultOutputPort<>();
+
+ /**
+ * The output port on which expired events are emitted.
+ */
+ public final transient DefaultOutputPort<T> expired = new DefaultOutputPort<>();
+
+ /**
+ * Whether or not the order of tuples be maintained.
+ * Making this "true" might entail some cost in performance, but makes the operator idempotent.
+ */
+ private boolean preserveTupleOrder = true;
+
@NotNull
protected final ManagedTimeUnifiedStateImpl managedState = new ManagedTimeUnifiedStateImpl();
- private transient long sleepMillis;
-
+ /**
+ * Map to hold the result of a tuple processing (unique, duplicate, expired or error) until previous
+ * tuples get processed. This is used only when {@link #preserveTupleOrder} is true.
+ */
+ private transient Map<T, Decision> decisions;
private transient Map<T, Future<Slice>> waitingEvents = Maps.newLinkedHashMap();
+ private transient Map<Slice, Long> asyncEvents = Maps.newLinkedHashMap();
+
+ // Metrics
+ @AutoMetric
+ private transient long uniqueEvents;
+ @AutoMetric
+ private transient long duplicateEvents;
+ @AutoMetric
+ private transient long expiredEvents;
- public final transient DefaultOutputPort<T> output = new DefaultOutputPort<>();
+ @Override
+ public void setup(OperatorContext context)
+ {
+ FileAccessFSImpl fAccessImpl = new TFileImpl.DTFileImpl();
+ fAccessImpl.setBasePath(context.getValue(DAG.APPLICATION_PATH) + "/bucket_data");
+ managedState.setFileAccess(fAccessImpl);
+ managedState.setup(context);
- public final transient DefaultOutputPort<T> duplicates = new DefaultOutputPort<>();
+ if (preserveTupleOrder) {
+ decisions = Maps.newLinkedHashMap();
+ }
+ }
- public final transient DefaultInputPort<T> input = new DefaultInputPort<T>()
+ @Override
+ public void beginWindow(long l)
{
- @Override
- public void process(T tuple)
- {
- long time = getTime(tuple);
- Future<Slice> valFuture = managedState.getAsync(time, getKey(tuple));
- if (valFuture.isDone()) {
- try {
- processEvent(tuple, valFuture.get());
- } catch (InterruptedException | ExecutionException e) {
- throw new RuntimeException("process", e);
- }
+ // Reset Metrics
+ uniqueEvents = 0;
+ duplicateEvents = 0;
+ expiredEvents = 0;
+
+ managedState.beginWindow(l);
+ }
+
+ protected abstract Slice getKey(T event);
+
+ protected abstract long getTime(T event);
+
+ /**
+ * Processes an incoming tuple
+ *
+ * @param tuple the incoming tuple
+ */
+ protected void processTuple(T tuple)
+ {
+
+ long time = getTime(tuple);
+ Future<Slice> valFuture = managedState.getAsync(time, getKey(tuple));
+
+ if (valFuture.isDone()) {
+ try {
+ processEvent(tuple, valFuture.get());
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ processWaitingEvent(tuple, valFuture);
+ }
+ }
+
+ /**
+ * Processes a looked-up event
+ *
+ * @param tuple the incoming tuple
+ * @param value the looked up key of the tuple
+ */
+ protected void processEvent(T tuple, Slice value)
+ {
+ if (value == BucketedState.EXPIRED) {
+ processInvalid(tuple);
+ return;
+ }
+ processValid(tuple, value);
+ }
+
+ /**
+ * Processes a tuple which is waiting for the lookup to return.
+ *
+ * @param tuple The tuple which needs to wait
+ * @param future The future object which will ultimately return the lookup result
+ */
+ protected void processWaitingEvent(T tuple, Future<Slice> future)
+ {
+ waitingEvents.put(tuple, future);
+ if (preserveTupleOrder) {
+ recordDecision(tuple, Decision.UNKNOWN);
+ }
+ }
+
+ /**
+ * Processes a valid (non-expired) tuple. This tuple may be a unique or a duplicate.
+ *
+ * @param tuple
+ * The tuple to be processed
+ * @param value
+ * Looked up key of the tuple
+ */
+ protected void processValid(T tuple, Slice value)
+ {
+ if (!preserveTupleOrder || waitingEvents.isEmpty()) {
+ if (value == null) {
+ managedState.put(getTime(tuple), getKey(tuple), new Slice(new byte[0]));
+ processUnique(tuple);
} else {
- waitingEvents.put(tuple, valFuture);
+ processDuplicate(tuple);
}
+ } else {
+ processWaitingEvent(tuple, Futures.immediateFuture(value));
}
- };
+ }
- protected abstract long getTime(T tuple);
+ /**
+ * Processes invalid tuples.
+ *
+ * @param tuple the incoming tuple
+ */
+ protected void processInvalid(T tuple)
+ {
+ if (preserveTupleOrder && !decisions.isEmpty()) {
+ recordDecision(tuple, Decision.EXPIRED);
+ } else {
+ processExpired(tuple);
+ }
+ }
- protected abstract Slice getKey(T tuple);
+ /**
+ * Processes an expired tuple
+ *
+ * @param tuple the incoming tuple
+ */
+ protected void processExpired(T tuple)
+ {
+ expiredEvents++;
+ emitExpired(tuple);
+ }
- @Override
- public void setup(Context.OperatorContext context)
+ /**
+ * Processes the duplicate tuple.
+ *
+ * @param tuple
+ * The tuple which is a duplicate
+ */
+ protected void processDuplicate(T tuple)
{
- sleepMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
- managedState.setup(context);
+ if (preserveTupleOrder && !decisions.isEmpty()) {
+ recordDecision(tuple, Decision.DUPLICATE);
+ } else {
+ duplicateEvents++;
+ emitDuplicate(tuple);
+ }
}
- @Override
- public void beginWindow(long windowId)
+ /**
+ * Processes the unique tuple.
+ *
+ * @param tuple
+ * The tuple which is a unique
+ */
+ protected void processUnique(T tuple)
{
- managedState.beginWindow(windowId);
+ if (preserveTupleOrder && !decisions.isEmpty()) {
+ recordDecision(tuple, Decision.UNIQUE);
+ } else {
+ uniqueEvents++;
+ emitUnique(tuple);
+ }
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public void handleIdleTime()
{
+ if (preserveTupleOrder) {
+ emitProcessedTuples();
+ }
+ processAuxiliary(false);
+ }
+
+ /**
+ * Does any auxiliary processing in the idle time of the operator.
+ * Processes any tuples which are waiting for the lookup to return.
+ *
+ * @param finalize Whether or not to wait for future to return
+ */
+ protected void processAuxiliary(boolean finalize)
+ {
if (waitingEvents.size() > 0) {
Iterator<Map.Entry<T, Future<Slice>>> waitIterator = waitingEvents.entrySet().iterator();
while (waitIterator.hasNext()) {
Map.Entry<T, Future<Slice>> waitingEvent = waitIterator.next();
- if (waitingEvent.getValue().isDone()) {
+ T tuple = waitingEvent.getKey();
+ Slice tupleKey = getKey(tuple);
+ long tupleTime = getTime(tuple);
+ Future<Slice> future = waitingEvent.getValue();
+ if (future.isDone() || finalize ) {
try {
- processEvent(waitingEvent.getKey(), waitingEvent.getValue().get());
+ if (future.get() == null && asyncEvents.get(tupleKey) == null) {
+ managedState.put(tupleTime, tupleKey, new Slice(new byte[0]));
+ asyncEvents.put(tupleKey, tupleTime);
+ processUnique(tuple);
+ } else {
+ processDuplicate(tuple);
+ }
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("handle idle time", e);
}
waitIterator.remove();
}
- }
- } else {
- /* nothing to do here, so sleep for a while to avoid busy loop */
- try {
- Thread.sleep(sleepMillis);
- } catch (InterruptedException ie) {
- throw new RuntimeException(ie);
+ if (!finalize) {
+ break;
+ }
}
}
}
- protected void processEvent(T tuple, Slice value)
+ @Override
+ public void endWindow()
{
- if (value == BucketedState.EXPIRED) {
- return;
- }
- if (value == null) {
- //not a duplicate event
- output.emit(tuple);
- } else {
- if (duplicates.isConnected()) {
- duplicates.emit(tuple);
- }
+ processAuxiliary(true);
+ if (preserveTupleOrder) {
+ emitProcessedTuples();
}
+ Preconditions.checkArgument(waitingEvents.isEmpty());
+ asyncEvents.clear();
+ managedState.endWindow();
}
- @Override
- public void endWindow()
+ /**
+ * Records a decision for use later. This is needed to ensure that the order of incoming tuples is maintained.
+ *
+ * @param tuple the incoming tuple
+ * @param d The decision for the tuple
+ */
+ protected void recordDecision(T tuple, Decision d)
{
- Iterator<Map.Entry<T, Future<Slice>>> waitIterator = waitingEvents.entrySet().iterator();
- while (waitIterator.hasNext()) {
- Map.Entry<T, Future<Slice>> waitingEvent = waitIterator.next();
- try {
- processEvent(waitingEvent.getKey(), waitingEvent.getValue().get());
- } catch (InterruptedException | ExecutionException e) {
- throw new RuntimeException("end window", e);
- }
- waitIterator.remove();
+ decisions.put(tuple, d);
+ }
+ /**
+ * Processes tuples for which the decision (unique / duplicate / expired) has been made.
+ * Breaks once an undecided tuple is found, as we don't want to emit out of order
+ */
+ protected void emitProcessedTuples()
+ {
+ Iterator<Entry<T, Decision>> entries = decisions.entrySet().iterator();
+ while (entries.hasNext()) {
+ Entry<T, Decision> td = entries.next();
+ switch (td.getValue()) {
+ case UNIQUE:
+ uniqueEvents++;
+ emitUnique(td.getKey());
+ entries.remove();
+ break;
+ case DUPLICATE:
+ duplicateEvents++;
+ emitDuplicate(td.getKey());
+ entries.remove();
+ break;
+ case EXPIRED:
+ expiredEvents++;
+ emitExpired(td.getKey());
+ entries.remove();
+ break;
+ default:
+ /*
+ * Decision for this is still UNKNOWN. Tuple is still waiting for bucket to be loaded. Break.
+ */
+ break;
+ }
}
- managedState.endWindow();
}
@Override
@@ -174,4 +407,49 @@ public abstract class AbstractDeduper<T> implements Operator, Operator.Checkpoin
{
managedState.committed(windowId);
}
+
+ protected void emitUnique(T event)
+ {
+ unique.emit(event);
+ }
+
+ protected void emitDuplicate(T event)
+ {
+ duplicate.emit(event);
+ }
+
+ protected void emitExpired(T event)
+ {
+ expired.emit(event);
+ }
+
+ /**
+ * Checks whether output of deduper should preserve the input order
+ */
+ public boolean isOrderedOutput()
+ {
+ return preserveTupleOrder;
+ }
+
+ /**
+ * If set to true, the deduper will emit tuples in the order in which they were received. Tuples which arrived later
+ * will wait for previous tuples to get processed and emitted. If not set, the order of tuples may change as tuples
+ * may be emitted out of order as and when they get processed.
+ *
+ * @param preserveTupleOrder whether or not to preserve the order of incoming tuples
+ */
+ public void setPreserveTupleOrder(boolean preserveTupleOrder)
+ {
+ this.preserveTupleOrder = preserveTupleOrder;
+ }
+
+ /**
+ * Enum for holding all possible values for a decision for a tuple
+ */
+ protected enum Decision
+ {
+ UNIQUE, DUPLICATE, EXPIRED, UNKNOWN
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(AbstractDeduper.class);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7dea3d0a/library/src/main/java/org/apache/apex/malhar/lib/dedup/DeduperStreamCodec.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dedup/DeduperStreamCodec.java b/library/src/main/java/org/apache/apex/malhar/lib/dedup/DeduperStreamCodec.java
new file mode 100644
index 0000000..d40a550
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dedup/DeduperStreamCodec.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.Getter;
+
+/**
+ * A {@link StreamCodec} for {@link AbstractDeduper}.
+ * This helps in partitioning the tuples depending on the key field in the tuple.
+ * The {@link #getPartition(Object)} function returns the hash code of the key field
+ *
+ */
+@Evolving
+public class DeduperStreamCodec extends KryoSerializableStreamCodec<Object>
+{
+ private static final long serialVersionUID = -6904078808859412149L;
+
+ private transient Getter<Object, Object> getter;
+ private String keyExpression;
+
+ public DeduperStreamCodec(String keyExpression)
+ {
+ this.keyExpression = keyExpression;
+ }
+
+ @Override
+ public int getPartition(Object t)
+ {
+ if (getter == null) {
+ getter = PojoUtils.createGetter(t.getClass(), keyExpression, Object.class);
+ }
+ return getter.get(t).hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7dea3d0a/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java
new file mode 100644
index 0000000..6aebe6b
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import javax.validation.constraints.NotNull;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.Getter;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Time based deduper will de-duplicate incoming POJO tuples and classify them into the following:
+ * 1. Unique
+ * 2. Duplicate
+ * 3. Expired
+ *
+ * Since this is de-duplicating in a stream of tuples, and we cannot store all incoming keys indefinitely,
+ * we use the concept of expiry, where incoming tuples expire after a specified period of time. In this case,
+ * we choose to expire an entire bucket of data as a unit. This requires the user to specify the bucketing
+ * structure in advance in order for the operator to function. Here are the parameters for specifying the
+ * bucketing structure:
+ * 1. {@link #expireBefore} (in seconds)- This is the total time period during which a tuple stays in the
+ * system and blocks any other tuple with the same key.
+ * 2. {@link #bucketSpan} (in seconds) - This is the unit which describes how large a bucket can be.
+ * Typically this should be defined depending on the use case. For example, if we have {@link #expireBefore}
+ * set to 1 hour, then typically we would be clubbing data in the order of minutes, so a {@link #bucketSpan} of
+ * around 1 minute or 5 minutes would make sense. Note that in this case, the entire data worth 1 minute or
+ * 5 minutes will expire as a whole. Setting it to 1 minute would make the number of time buckets in the system
+ * to be 1 hour / 1 minute = 60 buckets. Similarly setting {@link #bucketSpan} to 5 minutes would make number
+ * of buckets to be 12. Note that having too many or too less buckets could have a performance impact. If unsure,
+ * set the {@link #bucketSpan} to be ~ sqrt({@link #expireBefore}). This way the number of buckets and bucket span
+ * are balanced.
+ * 3. {@link #referenceInstant} - The reference point from which to start the time which is used for expiry.
+ * Setting the {@link #referenceInstant} to say, r seconds from the epoch, would initialize the start of expiry
+ * to be from that instant = r. The start and end of the expiry window periodically move by the span of a single
+ * bucket. Refer {@link TimeBucketAssigner} for details.
+ *
+ * Additionally, it also needs the following parameters:
+ * 1. {@link #keyExpression} - The java expression to extract the key fields in the incoming tuple (POJO)
+ * 2. {@link #timeExpression} - The java expression to extract the time field in the incoming tuple (POJO).
+ * In case there is no time field in the tuple, system time, when the tuple is processed, will be used.
+ *
+ */
+@Evolving
+public class TimeBasedDedupOperator extends AbstractDeduper<Object> implements ActivationListener<Context>
+{
+
+ // Required properties
+ @NotNull
+ private String keyExpression;
+
+ private String timeExpression;
+
+ @NotNull
+ private long bucketSpan;
+
+ @NotNull
+ private long expireBefore;
+
+ // Optional
+ private long referenceInstant = new Instant().getMillis() / 1000;
+
+ private transient Class<?> pojoClass;
+
+ private transient Getter<Object, Long> timeGetter;
+
+ private transient Getter<Object, Object> keyGetter;
+
+ @InputPortFieldAnnotation(schemaRequired = true)
+ public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
+ {
+ @Override
+ public void setup(PortContext context)
+ {
+ pojoClass = context.getAttributes().get(PortContext.TUPLE_CLASS);
+ }
+
+ @Override
+ public void process(Object tuple)
+ {
+ processTuple(tuple);
+ }
+
+ @Override
+ public StreamCodec<Object> getStreamCodec()
+ {
+ return getDeduperStreamCodec();
+ }
+ };
+
+ @Override
+ protected long getTime(Object tuple)
+ {
+ if (timeGetter != null) {
+ return timeGetter.get(tuple);
+ }
+ return System.currentTimeMillis();
+ }
+
+ @Override
+ protected Slice getKey(Object tuple)
+ {
+ Object key = keyGetter.get(tuple);
+ return new Slice(key.toString().getBytes());
+ }
+
+ protected StreamCodec<Object> getDeduperStreamCodec()
+ {
+ return new DeduperStreamCodec(keyExpression);
+ }
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ TimeBucketAssigner timeBucketAssigner = new TimeBucketAssigner();
+ timeBucketAssigner.setBucketSpan(Duration.standardSeconds(bucketSpan));
+ timeBucketAssigner.setExpireBefore(Duration.standardSeconds(expireBefore));
+ timeBucketAssigner.setReferenceInstant(new Instant(referenceInstant * 1000));
+ managedState.setTimeBucketAssigner(timeBucketAssigner);
+ super.setup(context);
+ }
+
+ @Override
+ public void activate(Context context)
+ {
+ if (timeExpression != null) {
+ timeGetter = PojoUtils.createGetter(pojoClass, timeExpression, Long.class);
+ } else {
+ timeGetter = null;
+ }
+ keyGetter = PojoUtils.createGetter(pojoClass, keyExpression, Object.class);
+ }
+
+ @Override
+ public void deactivate()
+ {
+ }
+
+ public String getKeyExpression()
+ {
+ return keyExpression;
+ }
+
+ /**
+ * Sets the key expression
+ * @param keyExpression
+ */
+ public void setKeyExpression(String keyExpression)
+ {
+ this.keyExpression = keyExpression;
+ }
+
+ public String getTimeExpression()
+ {
+ return timeExpression;
+ }
+
+ /**
+ * Sets the time expression
+ * @param timeExpression
+ */
+ public void setTimeExpression(String timeExpression)
+ {
+ this.timeExpression = timeExpression;
+ }
+
+ public long getBucketSpan()
+ {
+ return bucketSpan;
+ }
+
+ /**
+ * Sets the length of a single time bucket (in seconds)
+ * @param bucketSpan
+ */
+ public void setBucketSpan(long bucketSpan)
+ {
+ this.bucketSpan = bucketSpan;
+ }
+
+ public long getExpireBefore()
+ {
+ return expireBefore;
+ }
+
+ /**
+ * Sets the expiry time (in seconds). Any event with time before this is considered to be expired.
+ * @param expireBefore
+ */
+ public void setExpireBefore(long expireBefore)
+ {
+ this.expireBefore = expireBefore;
+ }
+
+ public long getReferenceInstant()
+ {
+ return referenceInstant;
+ }
+
+ /**
+ * Sets the reference instant (in seconds from the epoch).
+ * By default this is the time when the application is started.
+ * @param referenceInstant
+ */
+ public void setReferenceInstant(long referenceInstant)
+ {
+ this.referenceInstant = referenceInstant;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7dea3d0a/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperOrderingTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperOrderingTest.java b/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperOrderingTest.java
new file mode 100644
index 0000000..544c1df
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperOrderingTest.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.Stats;
+import com.datatorrent.api.StatsListener;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+
+public class DeduperOrderingTest
+{
+ public static boolean testFailed = false;
+
+ @Test
+ public void testApplication() throws IOException, Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ DeduperOrderingTestApp app = new DeduperOrderingTestApp();
+ lma.prepareDAG(app, conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.runAsync();
+ app.verifier.latch.await();
+ Assert.assertFalse(testFailed);
+ lc.shutdown();
+ }
+
+ public static class DeduperOrderingTestApp implements StreamingApplication
+ {
+ Verifier verifier;
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ RandomDedupDataGenerator random = dag.addOperator("Input", RandomDedupDataGenerator.class);
+
+ TimeBasedDedupOperator dedup = dag.addOperator("Dedup", TimeBasedDedupOperator.class);
+ dedup.setKeyExpression("key");
+ dedup.setTimeExpression("date.getTime()");
+ dedup.setBucketSpan(10);
+ dedup.setExpireBefore(60);
+ dedup.setPreserveTupleOrder(true);
+ FileAccessFSImpl fAccessImpl = new TFileImpl.DTFileImpl();
+ fAccessImpl.setBasePath(dag.getAttributes().get(DAG.APPLICATION_PATH) + "/bucket_data");
+ dedup.managedState.setFileAccess(fAccessImpl);
+ dag.setInputPortAttribute(dedup.input, Context.PortContext.TUPLE_CLASS, TestPojo.class);
+
+ verifier = dag.addOperator("Verifier", Verifier.class);
+
+ dag.addStream("Input to Dedup", random.output, dedup.input);
+ dag.addStream("Dedup to Unique", dedup.unique, verifier.unique);
+ dag.addStream("Dedup to Duplicate", dedup.duplicate, verifier.duplicate);
+ dag.addStream("Dedup to Expired", dedup.expired, verifier.expired);
+ }
+ }
+
+ public static class RandomDedupDataGenerator extends BaseOperator implements InputOperator
+ {
+ private final long count = 500;
+ private long windowCount = 0;
+ private long sequenceId = 0;
+
+ public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>();
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ windowCount = 0;
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ if (windowCount < count) {
+ TestPojo pojo = new TestPojo(sequenceId, new Date(), sequenceId);
+ output.emit(pojo);
+ sequenceId++;
+ }
+ }
+ }
+
+ public static class Verifier extends BaseOperator implements StatsListener
+ {
+ long prevSequence = 0;
+
+ public transient CountDownLatch latch = new CountDownLatch(1);
+ @AutoMetric
+ int count = 0;
+ public final transient DefaultInputPort<Object> unique = new DefaultInputPort<Object>()
+ {
+ @Override
+ public void process(Object tuple)
+ {
+ TestPojo pojo = (TestPojo)tuple;
+ if (pojo.getSequence() < prevSequence) {
+ testFailed = true;
+ }
+ count++;
+ prevSequence = pojo.sequence;
+ }
+ };
+
+ public final transient DefaultInputPort<Object> duplicate = new DefaultInputPort<Object>()
+ {
+ @Override
+ public void process(Object tuple)
+ {
+ TestPojo pojo = (TestPojo)tuple;
+ if (pojo.getSequence() < prevSequence) {
+ testFailed = true;
+ }
+ count++;
+ prevSequence = pojo.sequence;
+ }
+ };
+
+ public final transient DefaultInputPort<Object> expired = new DefaultInputPort<Object>()
+ {
+ @Override
+ public void process(Object tuple)
+ {
+ TestPojo pojo = (TestPojo)tuple;
+ if (pojo.getSequence() < prevSequence) {
+ testFailed = true;
+ }
+ count++;
+ prevSequence = pojo.sequence;
+ }
+ };
+
+ @Override
+ public Response processStats(BatchedOperatorStats stats)
+ {
+ Stats.OperatorStats operatorStats = stats.getLastWindowedStats().get(stats.getLastWindowedStats().size() - 1);
+ count = (Integer)operatorStats.metrics.get("count");
+ if (count >= 1000) {
+ latch.countDown();
+ }
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7dea3d0a/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperPartitioningTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperPartitioningTest.java b/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperPartitioningTest.java
new file mode 100644
index 0000000..ebe5a3e
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperPartitioningTest.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Maps;
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.Stats;
+import com.datatorrent.api.StatsListener;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+
+/**
+ * Tests whether the operator functions correctly when partitioned
+ * The partitioning in Dedup is overridden by partitioning on basis of the key in the tuple.
+ *
+ */
+public class DeduperPartitioningTest
+{
+ public static final int NUM_DEDUP_PARTITIONS = 5;
+ private static boolean testFailed = false;
+
+ /**
+ * Application to test the partitioning
+ *
+ */
+ public static class TestDedupApp implements StreamingApplication
+ {
+ TestDeduper dedup;
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ TestGenerator gen = dag.addOperator("Generator", new TestGenerator());
+
+ dedup = dag.addOperator("Deduper", new TestDeduper());
+ dedup.setKeyExpression("id");
+ dedup.setTimeExpression("eventTime.getTime()");
+ dedup.setBucketSpan(60);
+ dedup.setExpireBefore(600);
+
+ ConsoleOutputOperator console = dag.addOperator("Console", new ConsoleOutputOperator());
+ dag.addStream("Generator to Dedup", gen.output, dedup.input);
+ dag.addStream("Dedup to Console", dedup.unique, console.input);
+ dag.setInputPortAttribute(dedup.input, Context.PortContext.TUPLE_CLASS, TestEvent.class);
+ dag.setOutputPortAttribute(dedup.unique, Context.PortContext.TUPLE_CLASS, TestEvent.class);
+ dag.setAttribute(dedup, Context.OperatorContext.PARTITIONER,
+ new StatelessPartitioner<TimeBasedDedupOperator>(NUM_DEDUP_PARTITIONS));
+ }
+ }
+
+ public static class TestDeduper extends TimeBasedDedupOperator implements StatsListener
+ {
+ int operatorId;
+ HashMap<Integer, Integer> partitionMap = Maps.newHashMap();
+ transient CountDownLatch latch = new CountDownLatch(1);
+ int tuplesProcessed = 0;
+ @AutoMetric
+ int tuplesProcessedCompletely = 0;
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ super.setup(context);
+ operatorId = context.getId();
+ }
+
+ @Override
+ protected void processTuple(Object tuple)
+ {
+ TestEvent event = (TestEvent)tuple;
+ if (partitionMap.containsKey(event.id)) {
+ if (partitionMap.get(event.id) != operatorId) {
+ testFailed = true;
+ throw new RuntimeException("Wrong tuple assignment");
+ }
+ } else {
+ partitionMap.put(event.id, operatorId);
+ }
+ tuplesProcessed++;
+ }
+
+ @Override
+ public void endWindow()
+ {
+ super.endWindow();
+ tuplesProcessedCompletely = tuplesProcessed;
+ }
+
+ @Override
+ public Response processStats(BatchedOperatorStats stats)
+ {
+ Stats.OperatorStats operatorStats = stats.getLastWindowedStats().get(stats.getLastWindowedStats().size() - 1);
+ tuplesProcessedCompletely = (Integer)operatorStats.metrics.get("tuplesProcessedCompletely");
+ if (tuplesProcessedCompletely >= 1000) {
+ latch.countDown();
+ }
+ return null;
+ }
+ }
+
+ public static class TestGenerator extends BaseOperator implements InputOperator
+ {
+
+ public final transient DefaultOutputPort<TestEvent> output = new DefaultOutputPort<>();
+ private final transient Random r = new Random();
+
+ @Override
+ public void emitTuples()
+ {
+ TestEvent event = new TestEvent();
+ event.id = r.nextInt(100);
+ output.emit(event);
+ }
+ }
+
+ public static class TestEvent
+ {
+ private int id;
+ private Date eventTime;
+
+ public TestEvent()
+ {
+ }
+
+ public int getId()
+ {
+ return id;
+ }
+
+ public void setId(int id)
+ {
+ this.id = id;
+ }
+
+ public Date getEventTime()
+ {
+ return eventTime;
+ }
+
+ public void setEventTime(Date eventTime)
+ {
+ this.eventTime = eventTime;
+ }
+ }
+
+ /**
+ * This test validates whether a tuple key goes to exactly one partition
+ */
+ @Test
+ public void testDeduperStreamCodec() throws Exception
+ {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ TestDedupApp app = new TestDedupApp();
+ lma.prepareDAG(app, conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.runAsync();
+ app.dedup.latch.await();
+ Assert.assertFalse(testFailed);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7dea3d0a/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperTimeBasedPOJOImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperTimeBasedPOJOImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperTimeBasedPOJOImplTest.java
new file mode 100644
index 0000000..4b25341
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperTimeBasedPOJOImplTest.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.io.IOException;
+import java.util.Date;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+import com.datatorrent.stram.engine.PortContext;
+
+public class DeduperTimeBasedPOJOImplTest
+{
+ private static String applicationPath;
+ private static final String APPLICATION_PATH_PREFIX = "target/DeduperPOJOImplTest";
+ private static final String APP_ID = "DeduperPOJOImplTest";
+ private static final int OPERATOR_ID = 0;
+ private static TimeBasedDedupOperator deduper;
+
+ @Before
+ public void setup()
+ {
+ applicationPath = OperatorContextTestHelper.getUniqueApplicationPath(APPLICATION_PATH_PREFIX);
+ deduper = new TimeBasedDedupOperator();
+ deduper.setKeyExpression("key");
+ deduper.setTimeExpression("date.getTime()");
+ deduper.setBucketSpan(10);
+ deduper.setExpireBefore(60);
+ FileAccessFSImpl fAccessImpl = new TFileImpl.DTFileImpl();
+ fAccessImpl.setBasePath(applicationPath + "/bucket_data");
+ deduper.managedState.setFileAccess(fAccessImpl);
+ }
+
+ @Test
+ public void testDedup()
+ {
+ com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributes =
+ new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
+ attributes.put(DAG.APPLICATION_ID, APP_ID);
+ attributes.put(DAG.APPLICATION_PATH, applicationPath);
+ attributes.put(DAG.InputPortMeta.TUPLE_CLASS, TestPojo.class);
+ OperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributes);
+ deduper.setup(context);
+ deduper.input.setup(new PortContext(attributes, context));
+ deduper.activate(context);
+ CollectorTestSink<TestPojo> uniqueSink = new CollectorTestSink<TestPojo>();
+ TestUtils.setSink(deduper.unique, uniqueSink);
+ CollectorTestSink<TestPojo> duplicateSink = new CollectorTestSink<TestPojo>();
+ TestUtils.setSink(deduper.duplicate, duplicateSink);
+ CollectorTestSink<TestPojo> expiredSink = new CollectorTestSink<TestPojo>();
+ TestUtils.setSink(deduper.expired, expiredSink);
+
+ deduper.beginWindow(0);
+
+ long millis = System.currentTimeMillis();
+ for (int i = 0; i < 100; i++) {
+ TestPojo pojo = new TestPojo(i, new Date(millis + i));
+ deduper.input.process(pojo);
+ }
+ TestPojo expiredPojo = new TestPojo(100, new Date(millis - 1000 * 60));
+ deduper.input.process(expiredPojo);
+ for (int i = 90; i < 200; i++) {
+ TestPojo pojo = new TestPojo(i, new Date(millis + i));
+ deduper.input.process(pojo);
+ }
+ deduper.handleIdleTime();
+ deduper.endWindow();
+ Assert.assertTrue(uniqueSink.collectedTuples.size() == 200);
+ Assert.assertTrue(duplicateSink.collectedTuples.size() == 10);
+ Assert.assertTrue(expiredSink.collectedTuples.size() == 1);
+
+ deduper.teardown();
+ }
+
+ @After
+ public void teardown()
+ {
+ Path root = new Path(applicationPath);
+ try {
+ FileSystem fs = FileSystem.newInstance(root.toUri(), new Configuration());
+ fs.delete(root, true);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7dea3d0a/library/src/test/java/org/apache/apex/malhar/lib/dedup/TestPojo.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/dedup/TestPojo.java b/library/src/test/java/org/apache/apex/malhar/lib/dedup/TestPojo.java
new file mode 100644
index 0000000..6311517
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/dedup/TestPojo.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Date;
+
+public class TestPojo
+{
+ private long key;
+ private Date date;
+ public long sequence;
+
+ public TestPojo()
+ {
+ }
+
+ public TestPojo(long key, Date date)
+ {
+ this.key = key;
+ this.date = date;
+ }
+
+ public TestPojo(long key, Date date, long sequence)
+ {
+ this.key = key;
+ this.date = date;
+ this.sequence = sequence;
+ }
+
+ public long getKey()
+ {
+ return key;
+ }
+
+ public Date getDate()
+ {
+ return date;
+ }
+
+ public void setKey(long key)
+ {
+ this.key = key;
+ }
+
+ public void setDate(Date date)
+ {
+ this.date = date;
+ }
+
+ public long getSequence()
+ {
+ return sequence;
+ }
+
+ public void setSequence(long sequence)
+ {
+ this.sequence = sequence;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "TestPojo [key=" + key + ", date=" + date + ", sequence=" + sequence + "]";
+ }
+
+}