You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@apex.apache.org by Vivek Bhide <vi...@target.com> on 2018/03/15 21:05:24 UTC
Need help on TimeBasedDedupOperator
Hi,
I'm trying to understand working of TimeBasedDedupOperator for my streaming
application. I'm using the example shown in Malhar dedup example:
https://github.com/apache/apex-malhar/blob/master/examples/dedup/src/main/java/org/apache/apex/examples/dedup/Application.java
I made few modifications to minimize the output.
Properties:
<property>
<name>dt.application.DedupExample.operator.Deduper.prop.keyExpression</name>
<value>id</value>
</property>
<property>
<name>dt.application.DedupExample.operator.Deduper.prop.timeExpression</name>
<value>eventTime.getTime()</value>
</property>
<property>
<name>dt.application.DedupExample.operator.Deduper.prop.bucketSpan</name>
<value>10</value>
</property>
<property>
<name>dt.application.DedupExample.operator.Deduper.prop.expireBefore</name>
<value>60</value>
</property>
Below is Application code:
public class Application implements StreamingApplication
{
@Override
public void populateDAG(DAG dag, Configuration conf)
{
// Test Data Generator Operator
RandomDataGeneratorOperator gen = dag.addOperator("RandomGenerator", new
RandomDataGeneratorOperator());
// Dedup Operator. Configuration through
resources/META-INF/properties.xml
TimeBasedDedupOperator dedup = dag.addOperator("Deduper", new
TimeBasedDedupOperator());
// Console output operator for unique tuples
ConsoleOutputOperator consoleUnique = dag.addOperator("ConsoleUnique",
new ConsoleOutputOperator());
// Streams
dag.addStream("Generator to Dedup", gen.output, dedup.input);
// Connect Dedup unique to Console
dag.addStream("Dedup Unique to Console", dedup.unique,
consoleUnique.input);
// Set Attribute TUPLE_CLASS for supplying schema information to the
port
dag.setInputPortAttribute(dedup.input, Context.PortContext.TUPLE_CLASS,
TestEvent.class);
}
public static class RandomDataGeneratorOperator extends BaseOperator
implements InputOperator
{
public final transient DefaultOutputPort<TestEvent> output = new
DefaultOutputPort<>();
private final transient Random r = new Random();
private int tuplesPerWindow = 100;
private transient int count = 0;
@Override
public void beginWindow(long windowId)
{
count = 0;
}
@Override
public void emitTuples()
{
if (count++ > tuplesPerWindow) {
return;
}
TestEvent event = new TestEvent();
event.id = r.nextInt(2);
long millis = System.currentTimeMillis();
event.millis = millis;
event.setTimeNow(new Date(millis));
// event.eventTime = new Date( millis - (r.nextInt(60 * 1000)));
event.eventTime = new Date(millis);
output.emit(event);
}
}
public static class TestEvent
{
private int id;
private Date timeNow;
private Date eventTime;
private long millis;
public TestEvent()
{
}
public long getMillis() { return millis; }
public int getId()
{
return id;
}
public void setId(int id)
{
this.id = id;
}
public Date getEventTime()
{
return eventTime;
}
public void setTimeNow(Date timeNow) {
this.timeNow = timeNow;
}
public Date getTimeNow() {
return timeNow;
}
public void setEventTime(Date eventTime)
{
this.eventTime = eventTime;
}
@Override
public String toString()
{
return "TestEvent [id=" + id + "; millis = " + millis + "; nowTime=" +
timeNow + "; eventTime=" + eventTime + "]";
}
}
}
I executed this application using JUnit test using LocalMode. But, in the
console output I see duplicate records. I'm trying to understand the reason
behind the duplication message appearing in unique console:
1. Unique: TestEvent [id=1; millis = 1520413075333; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
2. Unique: TestEvent [id=1; millis = 1520413075334; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
3. Unique: TestEvent [id=0; millis = 1520413075363; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
4. Unique: TestEvent [id=0; millis = 1520413075364; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
5. Unique: TestEvent [id=0; millis = 1520413075365; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
6. Unique: TestEvent [id=0; millis = 1520413075366; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
7. Unique: TestEvent [id=0; millis = 1520413075367; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
8. Unique: TestEvent [id=0; millis = 1520413075368; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
9. Unique: TestEvent [id=0; millis = 1520413075369; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
10. Unique: TestEvent [id=1; millis = 1520413082317; nowTime=Wed Mar 07
00:58:02 PST 2018; eventTime=Wed Mar 07 00:58:02 PST 2018]
11. Unique: TestEvent [id=0; millis = 1520413082317; nowTime=Wed Mar 07
00:58:02 PST 2018; eventTime=Wed Mar 07 00:58:02 PST 2018]
12. Unique: TestEvent [id=0; millis = 1520413092321; nowTime=Wed Mar 07
00:58:12 PST 2018; eventTime=Wed Mar 07 00:58:12 PST 2018]
13. Unique: TestEvent [id=1; millis = 1520413092321; nowTime=Wed Mar 07
00:58:12 PST 2018; eventTime=Wed Mar 07 00:58:12 PST 2018]
I see lot of duplicates in unique port. Did I set any configuration wrong?
Any suggestions are appreciated.
Thanks
--
Sent from: http://apache-apex-users-list.78494.x6.nabble.com/
Re: Need help on TimeBasedDedupOperator
Posted by Vivek Bhide <vi...@target.com>.
Thanks Bhupesh.
Regards
Vivek
--
Sent from: http://apache-apex-users-list.78494.x6.nabble.com/
Re: Need help on TimeBasedDedupOperator
Posted by Bhupesh Chawda <bh...@datatorrent.com>.
Hi Vivek,
The deduper assumes a binding of the dedup key with the timestamp (expiry
key) in case of dedup with expiry.
See https://apex.apache.org/docs/malhar/operators/deduper/#assumptions for
more details.
~ Bhupesh
_______________________________________________________
Bhupesh Chawda
E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
www.datatorrent.com | apex.apache.org
On Fri, Mar 16, 2018 at 2:35 AM, Vivek Bhide <vi...@target.com> wrote:
> Hi,
>
> I'm trying to understand working of TimeBasedDedupOperator for my streaming
> application. I'm using the example shown in Malhar dedup example:
> https://github.com/apache/apex-malhar/blob/master/
> examples/dedup/src/main/java/org/apache/apex/examples/
> dedup/Application.java
>
> I made few modifications to minimize the output.
> Properties:
> <property>
>
> <name>dt.application.DedupExample.operator.Deduper.
> prop.keyExpression</name>
> <value>id</value>
> </property>
> <property>
>
> <name>dt.application.DedupExample.operator.Deduper.
> prop.timeExpression</name>
> <value>eventTime.getTime()</value>
> </property>
> <property>
>
> <name>dt.application.DedupExample.operator.Deduper.prop.bucketSpan</name>
> <value>10</value>
> </property>
> <property>
>
> <name>dt.application.DedupExample.operator.Deduper.
> prop.expireBefore</name>
> <value>60</value>
> </property>
>
> Below is Application code:
>
> public class Application implements StreamingApplication
> {
>
> @Override
> public void populateDAG(DAG dag, Configuration conf)
> {
> // Test Data Generator Operator
> RandomDataGeneratorOperator gen = dag.addOperator("RandomGenerator",
> new
> RandomDataGeneratorOperator());
>
> // Dedup Operator. Configuration through
> resources/META-INF/properties.xml
> TimeBasedDedupOperator dedup = dag.addOperator("Deduper", new
> TimeBasedDedupOperator());
>
> // Console output operator for unique tuples
> ConsoleOutputOperator consoleUnique = dag.addOperator("ConsoleUnique",
> new ConsoleOutputOperator());
>
> // Streams
> dag.addStream("Generator to Dedup", gen.output, dedup.input);
>
> // Connect Dedup unique to Console
> dag.addStream("Dedup Unique to Console", dedup.unique,
> consoleUnique.input);
> // Set Attribute TUPLE_CLASS for supplying schema information to the
> port
> dag.setInputPortAttribute(dedup.input, Context.PortContext.TUPLE_
> CLASS,
> TestEvent.class);
> }
>
> public static class RandomDataGeneratorOperator extends BaseOperator
> implements InputOperator
> {
>
> public final transient DefaultOutputPort<TestEvent> output = new
> DefaultOutputPort<>();
> private final transient Random r = new Random();
> private int tuplesPerWindow = 100;
> private transient int count = 0;
>
> @Override
> public void beginWindow(long windowId)
> {
> count = 0;
> }
>
> @Override
> public void emitTuples()
> {
> if (count++ > tuplesPerWindow) {
> return;
> }
> TestEvent event = new TestEvent();
> event.id = r.nextInt(2);
> long millis = System.currentTimeMillis();
> event.millis = millis;
> event.setTimeNow(new Date(millis));
> // event.eventTime = new Date( millis - (r.nextInt(60 * 1000)));
> event.eventTime = new Date(millis);
> output.emit(event);
> }
> }
>
> public static class TestEvent
> {
> private int id;
> private Date timeNow;
> private Date eventTime;
> private long millis;
>
> public TestEvent()
> {
> }
> public long getMillis() { return millis; }
>
> public int getId()
> {
> return id;
> }
>
> public void setId(int id)
> {
> this.id = id;
> }
>
> public Date getEventTime()
> {
> return eventTime;
> }
>
> public void setTimeNow(Date timeNow) {
> this.timeNow = timeNow;
> }
>
> public Date getTimeNow() {
> return timeNow;
> }
>
> public void setEventTime(Date eventTime)
> {
> this.eventTime = eventTime;
> }
>
> @Override
> public String toString()
> {
> return "TestEvent [id=" + id + "; millis = " + millis + "; nowTime="
> +
> timeNow + "; eventTime=" + eventTime + "]";
> }
>
> }
>
> }
>
> I executed this application using JUnit test using LocalMode. But, in the
> console output I see duplicate records. I'm trying to understand the reason
> behind the duplication message appearing in unique console:
> 1. Unique: TestEvent [id=1; millis = 1520413075333; nowTime=Wed Mar 07
> 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
> 2. Unique: TestEvent [id=1; millis = 1520413075334; nowTime=Wed Mar 07
> 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
> 3. Unique: TestEvent [id=0; millis = 1520413075363; nowTime=Wed Mar 07
> 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
> 4. Unique: TestEvent [id=0; millis = 1520413075364; nowTime=Wed Mar 07
> 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
> 5. Unique: TestEvent [id=0; millis = 1520413075365; nowTime=Wed Mar 07
> 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
> 6. Unique: TestEvent [id=0; millis = 1520413075366; nowTime=Wed Mar 07
> 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
> 7. Unique: TestEvent [id=0; millis = 1520413075367; nowTime=Wed Mar 07
> 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
> 8. Unique: TestEvent [id=0; millis = 1520413075368; nowTime=Wed Mar 07
> 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
> 9. Unique: TestEvent [id=0; millis = 1520413075369; nowTime=Wed Mar 07
> 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
> 10. Unique: TestEvent [id=1; millis = 1520413082317; nowTime=Wed Mar 07
> 00:58:02 PST 2018; eventTime=Wed Mar 07 00:58:02 PST 2018]
> 11. Unique: TestEvent [id=0; millis = 1520413082317; nowTime=Wed Mar 07
> 00:58:02 PST 2018; eventTime=Wed Mar 07 00:58:02 PST 2018]
> 12. Unique: TestEvent [id=0; millis = 1520413092321; nowTime=Wed Mar 07
> 00:58:12 PST 2018; eventTime=Wed Mar 07 00:58:12 PST 2018]
> 13. Unique: TestEvent [id=1; millis = 1520413092321; nowTime=Wed Mar 07
> 00:58:12 PST 2018; eventTime=Wed Mar 07 00:58:12 PST 2018]
>
> I see lot of duplicates in unique port. Did I set any configuration wrong?
>
> Any suggestions are appreciated.
>
> Thanks
>
>
>
> --
> Sent from: http://apache-apex-users-list.78494.x6.nabble.com/
>