You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2016/08/24 09:33:07 UTC
[1/2] apex-malhar git commit: APEXMALHAR-2185: Added Bounded Deduper
Repository: apex-malhar
Updated Branches:
refs/heads/master 822323d02 -> 17f6c5523
APEXMALHAR-2185: Added Bounded Deduper
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/cc62a5eb
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/cc62a5eb
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/cc62a5eb
Branch: refs/heads/master
Commit: cc62a5eb7d6e58a01bf3e5a32edc889ceb43a75b
Parents: 1700725
Author: bhupeshchawda <bh...@gmail.com>
Authored: Thu Aug 11 15:40:07 2016 +0530
Committer: bhupeshchawda <bh...@apache.org>
Committed: Mon Aug 22 10:43:33 2016 +0530
----------------------------------------------------------------------
.../apex/malhar/lib/dedup/AbstractDeduper.java | 25 ++-
.../malhar/lib/dedup/BoundedDedupOperator.java | 206 +++++++++++++++++++
.../lib/dedup/TimeBasedDedupOperator.java | 31 ++-
.../lib/dedup/DeduperBoundedPOJOImplTest.java | 110 ++++++++++
4 files changed, 359 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/cc62a5eb/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java b/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java
index d06acc3..13a3475 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dedup/AbstractDeduper.java
@@ -30,9 +30,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.apex.malhar.lib.state.BucketedState;
+import org.apache.apex.malhar.lib.state.managed.AbstractManagedStateImpl;
import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.fs.Path;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
@@ -48,7 +50,6 @@ import com.datatorrent.api.Operator;
import com.datatorrent.api.Operator.ActivationListener;
import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
-import com.datatorrent.lib.fileaccess.TFileImpl;
import com.datatorrent.netlet.util.Slice;
/**
@@ -68,6 +69,9 @@ import com.datatorrent.netlet.util.Slice;
public abstract class AbstractDeduper<T>
implements Operator, Operator.IdleTimeHandler, ActivationListener<Context>, Operator.CheckpointNotificationListener
{
+
+ private static final String BUCKET_DIR = "bucket_data";
+
/**
* The input port on which events are received.
*/
@@ -102,7 +106,7 @@ public abstract class AbstractDeduper<T>
private boolean preserveTupleOrder = true;
@NotNull
- protected final ManagedTimeUnifiedStateImpl managedState = new ManagedTimeUnifiedStateImpl();
+ protected AbstractManagedStateImpl managedState;
/**
* Map to hold the result of a tuple processing (unique, duplicate, expired or error) until previous
@@ -123,9 +127,8 @@ public abstract class AbstractDeduper<T>
@Override
public void setup(OperatorContext context)
{
- FileAccessFSImpl fAccessImpl = new TFileImpl.DTFileImpl();
- fAccessImpl.setBasePath(context.getValue(DAG.APPLICATION_PATH) + "/bucket_data");
- managedState.setFileAccess(fAccessImpl);
+ ((FileAccessFSImpl)managedState.getFileAccess()).setBasePath(context.getValue(DAG.APPLICATION_PATH)
+ + Path.SEPARATOR + BUCKET_DIR);
managedState.setup(context);
if (preserveTupleOrder) {
@@ -155,9 +158,7 @@ public abstract class AbstractDeduper<T>
*/
protected void processTuple(T tuple)
{
-
- long time = getTime(tuple);
- Future<Slice> valFuture = managedState.getAsync(time, getKey(tuple));
+ Future<Slice> valFuture = getAsyncManagedState(tuple);
if (valFuture.isDone()) {
try {
@@ -211,7 +212,7 @@ public abstract class AbstractDeduper<T>
{
if (!preserveTupleOrder || waitingEvents.isEmpty()) {
if (value == null) {
- managedState.put(getTime(tuple), getKey(tuple), new Slice(new byte[0]));
+ putManagedState(tuple);
processUnique(tuple);
} else {
processDuplicate(tuple);
@@ -309,7 +310,7 @@ public abstract class AbstractDeduper<T>
if (future.isDone() || finalize ) {
try {
if (future.get() == null && asyncEvents.get(tupleKey) == null) {
- managedState.put(tupleTime, tupleKey, new Slice(new byte[0]));
+ putManagedState(tuple);
asyncEvents.put(tupleKey, tupleTime);
processUnique(tuple);
} else {
@@ -339,6 +340,10 @@ public abstract class AbstractDeduper<T>
managedState.endWindow();
}
+ protected abstract Future<Slice> getAsyncManagedState(T tuple);
+
+ protected abstract void putManagedState(T tuple);
+
/**
* Records a decision for use later. This is needed to ensure that the order of incoming tuples is maintained.
*
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/cc62a5eb/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java
new file mode 100644
index 0000000..e71762e
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.util.Arrays;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl;
+import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
+
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.Getter;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * An implementation for {@link AbstractDeduper} which handles the case of bounded data set.
+ * This implementation assumes that the incoming tuple does not have a time field, and the de-duplication
+ * is to be strictly based on the key of the tuple.
+ *
+ * This implementation uses {@link ManagedTimeStateImpl} for storing the tuple keys on the persistent storage.
+ *
+ * Following properties need to be configured for the functioning of the operator:
+ * 1. {@link #keyExpression}: The java expression to extract the key fields in the incoming tuple (POJO)
+ * 2. {@link #numBuckets} (optional): The number of buckets that need to be used for storing the keys of the
+ * incoming tuples.
+ * NOTE: Users can decide upon the proper value for this parameter by guessing the number of distinct keys
+ * in the application. A appropriate value would be sqrt(num distinct keys). In case, the number of distinct keys is a
+ * huge number, leave it blank so that the default value of 46340 will be used. The rationale for using this number is
+ * that sqrt(max integer) = 46340. This implies that the number of buckets used will roughly be equal to the size of
+ * each bucket, thus spreading the load equally among each bucket.
+ *
+ */
+@Evolving
+public class BoundedDedupOperator extends AbstractDeduper<Object>
+{
+ private static final long DEFAULT_CONSTANT_TIME = 0;
+ private static final int DEFAULT_NUM_BUCKETS = 46340;
+
+ // Required properties
+ @NotNull
+ private String keyExpression;
+
+ //Optional, but recommended to be provided by user
+ private int numBuckets = DEFAULT_NUM_BUCKETS;
+
+ private transient Class<?> pojoClass;
+ private transient Getter<Object, Object> keyGetter;
+ private transient StreamCodec<Object> streamCodec;
+
+ @InputPortFieldAnnotation(schemaRequired = true)
+ public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
+ {
+ @Override
+ public void setup(PortContext context)
+ {
+ pojoClass = context.getAttributes().get(PortContext.TUPLE_CLASS);
+ streamCodec = getDeduperStreamCodec();
+ }
+
+ @Override
+ public void process(Object tuple)
+ {
+ processTuple(tuple);
+ }
+
+ @Override
+ public StreamCodec<Object> getStreamCodec()
+ {
+ return streamCodec;
+ }
+ };
+
+ public BoundedDedupOperator()
+ {
+ managedState = new ManagedTimeStateImpl();
+ }
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ if (numBuckets == 0) {
+ numBuckets = DEFAULT_NUM_BUCKETS;
+ }
+ ((ManagedTimeStateImpl)managedState).setNumBuckets(numBuckets);
+ TimeBucketAssigner timeBucketAssigner = new TimeBucketAssigner();
+ managedState.setTimeBucketAssigner(timeBucketAssigner);
+ super.setup(context);
+ }
+
+ @Override
+ public void activate(Context context)
+ {
+ keyGetter = PojoUtils.createGetter(pojoClass, keyExpression, Object.class);
+ }
+
+ @Override
+ public void deactivate()
+ {
+ }
+
+ @Override
+ protected long getTime(Object tuple)
+ {
+ return DEFAULT_CONSTANT_TIME;
+ }
+
+ @Override
+ protected Slice getKey(Object tuple)
+ {
+ Object key = keyGetter.get(tuple);
+ return streamCodec.toByteArray(key);
+ }
+
+ protected StreamCodec<Object> getDeduperStreamCodec()
+ {
+ return new DeduperStreamCodec(keyExpression);
+ }
+
+ @Override
+ protected Future<Slice> getAsyncManagedState(Object tuple)
+ {
+ Slice key = getKey(tuple);
+ Future<Slice> valFuture = ((ManagedTimeStateImpl)managedState).getAsync(getBucketId(key), key);
+ return valFuture;
+ }
+
+ @Override
+ protected void putManagedState(Object tuple)
+ {
+ Slice key = getKey(tuple);
+ ((ManagedTimeStateImpl)managedState).put(getBucketId(key), DEFAULT_CONSTANT_TIME,
+ key, new Slice(new byte[0]));
+ }
+
+ protected int getBucketId(Slice key)
+ {
+ return Arrays.hashCode(key.buffer) % numBuckets;
+ }
+
+ /**
+ * Returns the key expression
+ * @return key expression
+ */
+ public String getKeyExpression()
+ {
+ return keyExpression;
+ }
+
+ /**
+ * Sets the key expression for the fields used for de-duplication
+ * @param keyExpression the expression
+ */
+ public void setKeyExpression(String keyExpression)
+ {
+ this.keyExpression = keyExpression;
+ }
+
+ /**
+ * Returns the number of buckets
+ * @return number of buckets
+ */
+ public int getNumBuckets()
+ {
+ return numBuckets;
+ }
+
+ /**
+ * Sets the number of buckets
+ * NOTE: Users can decide upon the proper value for this parameter by guessing the number of distinct keys
+ * in the application. A appropriate value would be sqrt(num distinct keys). In case, the number of distinct keys is a
+ * huge number, leave it blank so that the default value of 46340 will be used. The rationale for using this number is
+ * that sqrt(max integer) = 46340. This implies that the number of buckets used will roughly be equal to the size of
+ * each bucket, thus spreading the load equally among each bucket.
+ * @param numBuckets the number of buckets
+ */
+ public void setNumBuckets(int numBuckets)
+ {
+ this.numBuckets = numBuckets;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/cc62a5eb/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java
index 6aebe6b..225c8a3 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.java
@@ -18,11 +18,13 @@
*/
package org.apache.apex.malhar.lib.dedup;
+import java.util.concurrent.Future;
+
import javax.validation.constraints.NotNull;
import org.joda.time.Duration;
import org.joda.time.Instant;
-
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
@@ -95,6 +97,13 @@ public class TimeBasedDedupOperator extends AbstractDeduper<Object> implements A
private transient Getter<Object, Object> keyGetter;
+ private transient StreamCodec<Object> streamCodec;
+
+ public TimeBasedDedupOperator()
+ {
+ managedState = new ManagedTimeUnifiedStateImpl();
+ }
+
@InputPortFieldAnnotation(schemaRequired = true)
public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
{
@@ -102,6 +111,7 @@ public class TimeBasedDedupOperator extends AbstractDeduper<Object> implements A
public void setup(PortContext context)
{
pojoClass = context.getAttributes().get(PortContext.TUPLE_CLASS);
+ streamCodec = getDeduperStreamCodec();
}
@Override
@@ -113,7 +123,7 @@ public class TimeBasedDedupOperator extends AbstractDeduper<Object> implements A
@Override
public StreamCodec<Object> getStreamCodec()
{
- return getDeduperStreamCodec();
+ return streamCodec;
}
};
@@ -130,7 +140,7 @@ public class TimeBasedDedupOperator extends AbstractDeduper<Object> implements A
protected Slice getKey(Object tuple)
{
Object key = keyGetter.get(tuple);
- return new Slice(key.toString().getBytes());
+ return streamCodec.toByteArray(key);
}
protected StreamCodec<Object> getDeduperStreamCodec()
@@ -165,6 +175,21 @@ public class TimeBasedDedupOperator extends AbstractDeduper<Object> implements A
{
}
+ @Override
+ protected Future<Slice> getAsyncManagedState(Object tuple)
+ {
+ Future<Slice> valFuture = ((ManagedTimeUnifiedStateImpl)managedState).getAsync(getTime(tuple),
+ getKey(tuple));
+ return valFuture;
+ }
+
+ @Override
+ protected void putManagedState(Object tuple)
+ {
+ ((ManagedTimeUnifiedStateImpl)managedState).put(getTime(tuple), getKey(tuple), new Slice(new byte[0]));
+ }
+
+
public String getKeyExpression()
{
return keyExpression;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/cc62a5eb/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperBoundedPOJOImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperBoundedPOJOImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperBoundedPOJOImplTest.java
new file mode 100644
index 0000000..448e76f
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperBoundedPOJOImplTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.dedup;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Random;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+import com.datatorrent.stram.engine.PortContext;
+
+public class DeduperBoundedPOJOImplTest
+{
+ private static String applicationPath;
+ private static final String APPLICATION_PATH_PREFIX = "target/DeduperBoundedPOJOImplTest";
+ private static final String APP_ID = "DeduperBoundedPOJOImplTest";
+ private static final int OPERATOR_ID = 0;
+ private static BoundedDedupOperator deduper;
+ private static final int NUM_BUCKETS = 10;
+
+ @Before
+ public void setup()
+ {
+ applicationPath = OperatorContextTestHelper.getUniqueApplicationPath(APPLICATION_PATH_PREFIX);
+ deduper = new BoundedDedupOperator();
+ deduper.setKeyExpression("key");
+ deduper.setNumBuckets(NUM_BUCKETS);
+ }
+
+ @Test
+ public void testDedup()
+ {
+ com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributes =
+ new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
+ attributes.put(DAG.APPLICATION_ID, APP_ID);
+ attributes.put(DAG.APPLICATION_PATH, applicationPath);
+ attributes.put(DAG.InputPortMeta.TUPLE_CLASS, TestPojo.class);
+ OperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributes);
+ deduper.setup(context);
+ deduper.input.setup(new PortContext(attributes, context));
+ deduper.activate(context);
+ CollectorTestSink<TestPojo> uniqueSink = new CollectorTestSink<TestPojo>();
+ TestUtils.setSink(deduper.unique, uniqueSink);
+ CollectorTestSink<TestPojo> duplicateSink = new CollectorTestSink<TestPojo>();
+ TestUtils.setSink(deduper.duplicate, duplicateSink);
+ CollectorTestSink<TestPojo> expiredSink = new CollectorTestSink<TestPojo>();
+ TestUtils.setSink(deduper.expired, expiredSink);
+
+ deduper.beginWindow(0);
+ Random r = new Random();
+ int k = 1;
+ for (int i = 1; i <= 1000; i++) {
+ TestPojo pojo = new TestPojo(i, new Date(), k++);
+ deduper.input.process(pojo);
+ if (i % 10 == 0) {
+ int dupId = r.nextInt(i);
+ TestPojo pojoDuplicate = new TestPojo(dupId == 0 ? 1 : dupId, new Date(), k++);
+ deduper.input.process(pojoDuplicate);
+ }
+ }
+ deduper.handleIdleTime();
+ deduper.endWindow();
+
+ Assert.assertTrue(uniqueSink.collectedTuples.size() == 1000);
+ Assert.assertTrue(duplicateSink.collectedTuples.size() == 100);
+
+ deduper.teardown();
+ }
+
+ @After
+ public void teardown()
+ {
+ Path root = new Path(applicationPath);
+ try {
+ FileSystem fs = FileSystem.newInstance(root.toUri(), new Configuration());
+ fs.delete(root, true);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
[2/2] apex-malhar git commit: Merge branch
'APEXMALHAR-2185-bounded-dedup' of
https://github.com/bhupeshchawda/apex-malhar
Posted by ch...@apache.org.
Merge branch 'APEXMALHAR-2185-bounded-dedup' of https://github.com/bhupeshchawda/apex-malhar
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/17f6c552
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/17f6c552
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/17f6c552
Branch: refs/heads/master
Commit: 17f6c5523e33cae6d70f9260e295f92e44894bcd
Parents: 822323d cc62a5e
Author: Chinmay Kolhatkar <ch...@datatorrent.com>
Authored: Wed Aug 24 15:01:52 2016 +0530
Committer: Chinmay Kolhatkar <ch...@datatorrent.com>
Committed: Wed Aug 24 15:01:52 2016 +0530
----------------------------------------------------------------------
.../apex/malhar/lib/dedup/AbstractDeduper.java | 25 ++-
.../malhar/lib/dedup/BoundedDedupOperator.java | 206 +++++++++++++++++++
.../lib/dedup/TimeBasedDedupOperator.java | 31 ++-
.../lib/dedup/DeduperBoundedPOJOImplTest.java | 110 ++++++++++
4 files changed, 359 insertions(+), 13 deletions(-)
----------------------------------------------------------------------