You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@apex.apache.org by Vivek Bhide <vi...@target.com> on 2018/03/15 21:05:24 UTC

Need help on TimeBasedDedupOperator

Hi, 

I'm trying to understand working of TimeBasedDedupOperator for my streaming
application. I'm using the example shown in Malhar dedup example:
https://github.com/apache/apex-malhar/blob/master/examples/dedup/src/main/java/org/apache/apex/examples/dedup/Application.java

I made few modifications to minimize the output. 
Properties: 
  <property>
   
<name>dt.application.DedupExample.operator.Deduper.prop.keyExpression</name>
    <value>id</value>
  </property>
  <property>
   
<name>dt.application.DedupExample.operator.Deduper.prop.timeExpression</name>
    <value>eventTime.getTime()</value>
  </property>
  <property>
   
<name>dt.application.DedupExample.operator.Deduper.prop.bucketSpan</name>
    <value>10</value>
  </property>
  <property>
   
<name>dt.application.DedupExample.operator.Deduper.prop.expireBefore</name>
    <value>60</value>
  </property>

Below is Application code: 

public class Application implements StreamingApplication 
{ 

  @Override 
  public void populateDAG(DAG dag, Configuration conf) 
  { 
    // Test Data Generator Operator 
    RandomDataGeneratorOperator gen = dag.addOperator("RandomGenerator", new
RandomDataGeneratorOperator()); 

    // Dedup Operator. Configuration through
resources/META-INF/properties.xml 
    TimeBasedDedupOperator dedup = dag.addOperator("Deduper", new
TimeBasedDedupOperator()); 

    // Console output operator for unique tuples 
    ConsoleOutputOperator consoleUnique = dag.addOperator("ConsoleUnique",
new ConsoleOutputOperator()); 

    // Streams 
    dag.addStream("Generator to Dedup", gen.output, dedup.input); 

    // Connect Dedup unique to Console 
    dag.addStream("Dedup Unique to Console", dedup.unique,
consoleUnique.input); 
    // Set Attribute TUPLE_CLASS for supplying schema information to the
port 
    dag.setInputPortAttribute(dedup.input, Context.PortContext.TUPLE_CLASS,
TestEvent.class); 
} 

  public static class RandomDataGeneratorOperator extends BaseOperator
implements InputOperator 
  { 

    public final transient DefaultOutputPort<TestEvent> output = new
DefaultOutputPort<>(); 
    private final transient Random r = new Random(); 
    private int tuplesPerWindow = 100; 
    private transient int count = 0; 

    @Override 
    public void beginWindow(long windowId) 
    { 
      count = 0; 
    } 

    @Override 
    public void emitTuples() 
    { 
      if (count++ > tuplesPerWindow) { 
        return; 
      } 
      TestEvent event = new TestEvent(); 
      event.id = r.nextInt(2); 
      long millis = System.currentTimeMillis(); 
      event.millis = millis; 
      event.setTimeNow(new Date(millis)); 
//      event.eventTime = new Date( millis - (r.nextInt(60 * 1000))); 
      event.eventTime = new Date(millis); 
      output.emit(event); 
    } 
  } 

  public static class TestEvent 
  { 
    private int id; 
    private Date timeNow; 
    private Date eventTime; 
    private long millis; 

    public TestEvent() 
    { 
    } 
    public long getMillis() { return millis; } 

    public int getId() 
    { 
      return id; 
    } 

    public void setId(int id) 
    { 
      this.id = id; 
    } 

    public Date getEventTime() 
    { 
      return eventTime; 
    } 

    public void setTimeNow(Date timeNow) { 
      this.timeNow = timeNow; 
    } 

    public Date getTimeNow() { 
      return timeNow; 
    } 

    public void setEventTime(Date eventTime) 
    { 
      this.eventTime = eventTime; 
    } 

    @Override 
    public String toString() 
    { 
      return "TestEvent [id=" + id + "; millis = " + millis + "; nowTime=" +
timeNow + "; eventTime=" + eventTime + "]"; 
    } 

  } 

} 

I executed this application using JUnit test using LocalMode. But, in the
console output I see duplicate records. I'm trying to understand the reason
behind the duplication message appearing in unique console: 
1. Unique: TestEvent [id=1; millis = 1520413075333; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018] 
2. Unique: TestEvent [id=1; millis = 1520413075334; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018] 
3. Unique: TestEvent [id=0; millis = 1520413075363; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018] 
4. Unique: TestEvent [id=0; millis = 1520413075364; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018] 
5. Unique: TestEvent [id=0; millis = 1520413075365; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018] 
6. Unique: TestEvent [id=0; millis = 1520413075366; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018] 
7. Unique: TestEvent [id=0; millis = 1520413075367; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018] 
8. Unique: TestEvent [id=0; millis = 1520413075368; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018] 
9. Unique: TestEvent [id=0; millis = 1520413075369; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018] 
10. Unique: TestEvent [id=1; millis = 1520413082317; nowTime=Wed Mar 07
00:58:02 PST 2018; eventTime=Wed Mar 07 00:58:02 PST 2018] 
11. Unique: TestEvent [id=0; millis = 1520413082317; nowTime=Wed Mar 07
00:58:02 PST 2018; eventTime=Wed Mar 07 00:58:02 PST 2018] 
12. Unique: TestEvent [id=0; millis = 1520413092321; nowTime=Wed Mar 07
00:58:12 PST 2018; eventTime=Wed Mar 07 00:58:12 PST 2018] 
13. Unique: TestEvent [id=1; millis = 1520413092321; nowTime=Wed Mar 07
00:58:12 PST 2018; eventTime=Wed Mar 07 00:58:12 PST 2018] 

I see lot of duplicates in unique port. Did I set any configuration wrong? 

Any suggestions are appreciated. 

Thanks



--
Sent from: http://apache-apex-users-list.78494.x6.nabble.com/

Re: Need help on TimeBasedDedupOperator

Posted by Vivek Bhide <vi...@target.com>.
Thanks Bhupesh.

Regards
Vivek



--
Sent from: http://apache-apex-users-list.78494.x6.nabble.com/

Re: Need help on TimeBasedDedupOperator

Posted by Bhupesh Chawda <bh...@datatorrent.com>.
Hi Vivek,

The deduper assumes a binding of the dedup key with the timestamp (expiry
key) in case of dedup with expiry.
See https://apex.apache.org/docs/malhar/operators/deduper/#assumptions for
more details.

~ Bhupesh


_______________________________________________________

Bhupesh Chawda

E: bhupesh@datatorrent.com | Twitter: @bhupeshsc

www.datatorrent.com  |  apex.apache.org



On Fri, Mar 16, 2018 at 2:35 AM, Vivek Bhide <vi...@target.com> wrote:

> Hi,
>
> I'm trying to understand working of TimeBasedDedupOperator for my streaming
> application. I'm using the example shown in Malhar dedup example:
> https://github.com/apache/apex-malhar/blob/master/
> examples/dedup/src/main/java/org/apache/apex/examples/
> dedup/Application.java
>
> I made few modifications to minimize the output.
> Properties:
>   <property>
>
> <name>dt.application.DedupExample.operator.Deduper.
> prop.keyExpression</name>
>     <value>id</value>
>   </property>
>   <property>
>
> <name>dt.application.DedupExample.operator.Deduper.
> prop.timeExpression</name>
>     <value>eventTime.getTime()</value>
>   </property>
>   <property>
>
> <name>dt.application.DedupExample.operator.Deduper.prop.bucketSpan</name>
>     <value>10</value>
>   </property>
>   <property>
>
> <name>dt.application.DedupExample.operator.Deduper.
> prop.expireBefore</name>
>     <value>60</value>
>   </property>
>
> Below is Application code:
>
> public class Application implements StreamingApplication
> {
>
>   @Override
>   public void populateDAG(DAG dag, Configuration conf)
>   {
>     // Test Data Generator Operator
>     RandomDataGeneratorOperator gen = dag.addOperator("RandomGenerator",
> new
> RandomDataGeneratorOperator());
>
>     // Dedup Operator. Configuration through
> resources/META-INF/properties.xml
>     TimeBasedDedupOperator dedup = dag.addOperator("Deduper", new
> TimeBasedDedupOperator());
>
>     // Console output operator for unique tuples
>     ConsoleOutputOperator consoleUnique = dag.addOperator("ConsoleUnique",
> new ConsoleOutputOperator());
>
>     // Streams
>     dag.addStream("Generator to Dedup", gen.output, dedup.input);
>
>     // Connect Dedup unique to Console
>     dag.addStream("Dedup Unique to Console", dedup.unique,
> consoleUnique.input);
>     // Set Attribute TUPLE_CLASS for supplying schema information to the
> port
>     dag.setInputPortAttribute(dedup.input, Context.PortContext.TUPLE_
> CLASS,
> TestEvent.class);
> }
>
>   public static class RandomDataGeneratorOperator extends BaseOperator
> implements InputOperator
>   {
>
>     public final transient DefaultOutputPort<TestEvent> output = new
> DefaultOutputPort<>();
>     private final transient Random r = new Random();
>     private int tuplesPerWindow = 100;
>     private transient int count = 0;
>
>     @Override
>     public void beginWindow(long windowId)
>     {
>       count = 0;
>     }
>
>     @Override
>     public void emitTuples()
>     {
>       if (count++ > tuplesPerWindow) {
>         return;
>       }
>       TestEvent event = new TestEvent();
>       event.id = r.nextInt(2);
>       long millis = System.currentTimeMillis();
>       event.millis = millis;
>       event.setTimeNow(new Date(millis));
> //      event.eventTime = new Date( millis - (r.nextInt(60 * 1000)));
>       event.eventTime = new Date(millis);
>       output.emit(event);
>     }
>   }
>
>   public static class TestEvent
>   {
>     private int id;
>     private Date timeNow;
>     private Date eventTime;
>     private long millis;
>
>     public TestEvent()
>     {
>     }
>     public long getMillis() { return millis; }
>
>     public int getId()
>     {
>       return id;
>     }
>
>     public void setId(int id)
>     {
>       this.id = id;
>     }
>
>     public Date getEventTime()
>     {
>       return eventTime;
>     }
>
>     public void setTimeNow(Date timeNow) {
>       this.timeNow = timeNow;
>     }
>
>     public Date getTimeNow() {
>       return timeNow;
>     }
>
>     public void setEventTime(Date eventTime)
>     {
>       this.eventTime = eventTime;
>     }
>
>     @Override
>     public String toString()
>     {
>       return "TestEvent [id=" + id + "; millis = " + millis + "; nowTime="
> +
> timeNow + "; eventTime=" + eventTime + "]";
>     }
>
>   }
>
> }
>
> I executed this application using JUnit test using LocalMode. But, in the
> console output I see duplicate records. I'm trying to understand the reason
> behind the duplication message appearing in unique console:
> 1. Unique: TestEvent [id=1; millis = 1520413075333; nowTime=Wed Mar 07
> 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
> 2. Unique: TestEvent [id=1; millis = 1520413075334; nowTime=Wed Mar 07
> 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
> 3. Unique: TestEvent [id=0; millis = 1520413075363; nowTime=Wed Mar 07
> 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
> 4. Unique: TestEvent [id=0; millis = 1520413075364; nowTime=Wed Mar 07
> 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
> 5. Unique: TestEvent [id=0; millis = 1520413075365; nowTime=Wed Mar 07
> 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
> 6. Unique: TestEvent [id=0; millis = 1520413075366; nowTime=Wed Mar 07
> 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
> 7. Unique: TestEvent [id=0; millis = 1520413075367; nowTime=Wed Mar 07
> 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
> 8. Unique: TestEvent [id=0; millis = 1520413075368; nowTime=Wed Mar 07
> 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
> 9. Unique: TestEvent [id=0; millis = 1520413075369; nowTime=Wed Mar 07
> 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
> 10. Unique: TestEvent [id=1; millis = 1520413082317; nowTime=Wed Mar 07
> 00:58:02 PST 2018; eventTime=Wed Mar 07 00:58:02 PST 2018]
> 11. Unique: TestEvent [id=0; millis = 1520413082317; nowTime=Wed Mar 07
> 00:58:02 PST 2018; eventTime=Wed Mar 07 00:58:02 PST 2018]
> 12. Unique: TestEvent [id=0; millis = 1520413092321; nowTime=Wed Mar 07
> 00:58:12 PST 2018; eventTime=Wed Mar 07 00:58:12 PST 2018]
> 13. Unique: TestEvent [id=1; millis = 1520413092321; nowTime=Wed Mar 07
> 00:58:12 PST 2018; eventTime=Wed Mar 07 00:58:12 PST 2018]
>
> I see lot of duplicates in unique port. Did I set any configuration wrong?
>
> Any suggestions are appreciated.
>
> Thanks
>
>
>
> --
> Sent from: http://apache-apex-users-list.78494.x6.nabble.com/
>