You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by 唐思成 <ja...@qq.com> on 2014/07/14 10:14:04 UTC

How to implememt distinct count in trident topolgy?

Use case is simple, count unique user in for in a window slide, and I found the common solutions over the Internet is to use HashSet to fliter the duplicated user,like this 

public class Distinct extends BaseFilter {
    private static final long serialVersionUID = 1L;
    private Set<String> distincter = Collections.synchronizedSet(new HashSet<String>());
    @Override
    public boolean isKeep(TridentTuple tuple) {
        String id = this.getId(tuple);
        return distincter.add(id);
    }
    public String getId(TridentTuple t) {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < t.size(); i++) {
            sb.append(t.getString(i));
        }
        return sb.toString();
    }
}

However, the HashSet is stored in memory, when the data grows to a very large level, I think it will cause a OOM.
So is there a scalable solution?

2014-07-14 



唐思成 

Re: Re: Re: How to implememt distinct count in trident topolgy?

Posted by 唐思成 <ja...@qq.com>.
Hi, I run my topology on a cluster(one masternode and two workernode), I found a something interesting that I wanna share.

According the offical document, the trident topolgy will be complied to a normal storm topolgy.


that means what you write in the code and what you see in the UI may be different,

here is my topology code, I call hte name(String) method, try to name a bolt or spout with a meanful name.


however, this is what I sea on the UI



obviously, the name(String) method does not help much, and actually is difficult to understand what  a bolt or spout actually do.

For instance,  I found the b-0-groupByGameTypeBolt-aggregateBolt  has a relativly high process latency, and I try to find what is going on, here is what I see after click the   boltname
 
From the UI the bolt has a input stream named $coord-bg0 with very high lantency, but I dont know what this stream for since I don't code this stream in my source code. 


Thank you for your patients if you read here, and if you use trident topology in production already, I would love to hear how you write you topology to make the UI information more meaningful, and how you track down the bottelneck of your storm cluster.



2014-07-18 



唐思成 



发件人: 唐思成 
发送时间: 2014-07-18  10:29:42 
收件人: user 
抄送: 
主题: Re: Re: How to implememt distinct count in trident topolgy? 
 
Hi, I find trident topology is  a elegant solution to this problem, suppose I have a spout emitting user logging information, here is the model

public class ActionLog implements Serializable{
    private static final long serialVersionUID = 22583958918591L;
    private String actionType;
    private String passPort;
    private String game;
    private Date sTime;    
}

if i want to caculate unique user of each game in real-time,here is my topology


inputStream.each(new Fields("actionLog"), new addNewFiled("user"), new Fields("user"))
                    .groupBy(new Fields("user"))
                    .aggregate(new One(), new Fields("one"))  //this step is equal to sql distinct
                    .persistentAggregate(new RedisState.SnapshottableStateFactory("distinctCountUser"), new Count(), new Fields("distinCount"))
                    .newValuesStream()
                    .each(new Fields("distinCount"), new justLogger());

there is an another example  http://storm.incubator.apache.org/documentation/Trident-tutorial.html    Reach

The topology works on my local cluster, but I dont think very throughly, I wannt to put this implementation on production, so there are still lot work to be done, any suggestiong and ideas are truly welcome.
2014-07-18 



唐思成 



发件人: Sam Goodwin 
发送时间: 2014-07-17  05:45:03 
收件人: user@storm.incubator.apache.org 
抄送: 
主题: Re: How to implememt distinct count in trident topolgy? 
Even with Redis you'll need to maintain the sliding window yourself.


Does it need to be exact? If you want to estimate the number of distinct users seen in a sliding window then use the HyperLogLog data structure with a ring buffer. It's fast, accurate and memory efficient. For example, allocate 60 HyperLogLog structure for 60 minutes (1 per minute) and then use a Ring Buffer algorithm to maintain the sliding window. When you want the total count you can just merge all the HyperLogLog structures and extract the count. It's not exact but it's close enough and can be tuned based on your precision requirements.

Twitter's alegebird package has a monoid implementation of the HLL algorithm https://github.com/twitter/algebird/blob/develop/algebird-core/src/main/scala/com/twitter/algebird/HyperLogLog.scala#L353

 which basically means you can merge them into 1 and should be all you need. The Redis HLL also allows you to merge two HLLs. Please note that using the Redis HLL algorithm will make it harder to implement a transactional topology. If you ever want that then I suggest you implement the above algorithm and serialize/deserialize in your TridentState.


If you want a more precise window you can just increase the bucket counts. You may also be able to adapt this exponential histogram sliding window algorithm for your needs http://www-cs-students.stanford.edu/~datar/papers/sicomp_streams.pdf



On Wed, Jul 16, 2014 at 1:21 PM, Danijel Schiavuzzi <da...@schiavuzzi.com> wrote:

Take a look at a distributed data structure server, for example Redis. The are various Storm integrations available.

On Monday, July 14, 2014, 唐思成 <ja...@qq.com> wrote:

Use case is simple, count unique user in for in a window slide, and I found the common solutions over the Internet is to use HashSet to fliter the duplicated user,like this 

public class Distinct extends BaseFilter {
    private static final long serialVersionUID = 1L;
    private Set<String> distincter = Collections.synchronizedSet(new HashSet<String>());
    @Override
    public boolean isKeep(TridentTuple tuple) {
        String id = this.getId(tuple);
        return distincter.add(id);
    }
    public String getId(TridentTuple t) {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < t.size(); i++) {
            sb.append(t.getString(i));
        }
        return sb.toString();
    }
}

However, the HashSet is stored in memory, when the data grows to a very large level, I think it will cause a OOM.
So is there a scalable solution?

2014-07-14 



唐思成 


-- 
Danijel Schiavuzzi

E: danijel@schiavuzzi.com
W: www.schiavuzzi.com
T: +385989035562
Skype: danijels7

Re: Re: How to implememt distinct count in trident topolgy?

Posted by 唐思成 <ja...@qq.com>.
Hi, I find trident topology is  a elegant solution to this problem, suppose I have a spout emitting user logging information, here is the model

public class ActionLog implements Serializable{
    private static final long serialVersionUID = 22583958918591L;
    private String actionType;
    private String passPort;
    private String game;
    private Date sTime;    
}

if i want to caculate unique user of each game in real-time,here is my topology


inputStream.each(new Fields("actionLog"), new addNewFiled("user"), new Fields("user"))
                    .groupBy(new Fields("user"))
                    .aggregate(new One(), new Fields("one"))  //this step is equal to sql distinct
                    .persistentAggregate(new RedisState.SnapshottableStateFactory("distinctCountUser"), new Count(), new Fields("distinCount"))
                    .newValuesStream()
                    .each(new Fields("distinCount"), new justLogger());

there is an another example  http://storm.incubator.apache.org/documentation/Trident-tutorial.html    Reach

The topology works on my local cluster, but I dont think very throughly, I wannt to put this implementation on production, so there are still lot work to be done, any suggestiong and ideas are truly welcome.
2014-07-18 



唐思成 



发件人: Sam Goodwin 
发送时间: 2014-07-17  05:45:03 
收件人: user@storm.incubator.apache.org 
抄送: 
主题: Re: How to implememt distinct count in trident topolgy? 
 
Even with Redis you'll need to maintain the sliding window yourself.


Does it need to be exact? If you want to estimate the number of distinct users seen in a sliding window then use the HyperLogLog data structure with a ring buffer. It's fast, accurate and memory efficient. For example, allocate 60 HyperLogLog structure for 60 minutes (1 per minute) and then use a Ring Buffer algorithm to maintain the sliding window. When you want the total count you can just merge all the HyperLogLog structures and extract the count. It's not exact but it's close enough and can be tuned based on your precision requirements.

Twitter's alegebird package has a monoid implementation of the HLL algorithm https://github.com/twitter/algebird/blob/develop/algebird-core/src/main/scala/com/twitter/algebird/HyperLogLog.scala#L353

 which basically means you can merge them into 1 and should be all you need. The Redis HLL also allows you to merge two HLLs. Please note that using the Redis HLL algorithm will make it harder to implement a transactional topology. If you ever want that then I suggest you implement the above algorithm and serialize/deserialize in your TridentState.


If you want a more precise window you can just increase the bucket counts. You may also be able to adapt this exponential histogram sliding window algorithm for your needs http://www-cs-students.stanford.edu/~datar/papers/sicomp_streams.pdf



On Wed, Jul 16, 2014 at 1:21 PM, Danijel Schiavuzzi <da...@schiavuzzi.com> wrote:

Take a look at a distributed data structure server, for example Redis. The are various Storm integrations available.

On Monday, July 14, 2014, 唐思成 <ja...@qq.com> wrote:

Use case is simple, count unique user in for in a window slide, and I found the common solutions over the Internet is to use HashSet to fliter the duplicated user,like this 

public class Distinct extends BaseFilter {
    private static final long serialVersionUID = 1L;
    private Set<String> distincter = Collections.synchronizedSet(new HashSet<String>());
    @Override
    public boolean isKeep(TridentTuple tuple) {
        String id = this.getId(tuple);
        return distincter.add(id);
    }
    public String getId(TridentTuple t) {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < t.size(); i++) {
            sb.append(t.getString(i));
        }
        return sb.toString();
    }
}

However, the HashSet is stored in memory, when the data grows to a very large level, I think it will cause a OOM.
So is there a scalable solution?

2014-07-14 



唐思成 


-- 
Danijel Schiavuzzi

E: danijel@schiavuzzi.com
W: www.schiavuzzi.com
T: +385989035562
Skype: danijels7

Re: How to implememt distinct count in trident topolgy?

Posted by Sam Goodwin <sa...@gmail.com>.
Even with Redis you'll need to maintain the sliding window yourself.

Does it need to be exact? If you want to estimate the number of distinct
users seen in a sliding window then use the HyperLogLog data structure with
a ring buffer. It's fast, accurate and memory efficient. For example,
allocate 60 HyperLogLog structure for 60 minutes (1 per minute) and then
use a Ring Buffer algorithm to maintain the sliding window. When you want
the total count you can just merge all the HyperLogLog structures and
extract the count. It's not exact but it's close enough and can be tuned
based on your precision requirements.

Twitter's alegebird package has a monoid implementation of the HLL
algorithm
https://github.com/twitter/algebird/blob/develop/algebird-core/src/main/scala/com/twitter/algebird/HyperLogLog.scala#L353
 which basically means you can merge them into 1 and should be all you
need. The Redis HLL also allows you to merge two HLLs. Please note that
using the Redis HLL algorithm will make it harder to implement a
transactional topology. If you ever want that then I suggest you implement
the above algorithm and serialize/deserialize in your TridentState.

If you want a more precise window you can just increase the bucket counts.
You may also be able to adapt this exponential histogram sliding window
algorithm for your needs
http://www-cs-students.stanford.edu/~datar/papers/sicomp_streams.pdf


On Wed, Jul 16, 2014 at 1:21 PM, Danijel Schiavuzzi <da...@schiavuzzi.com>
wrote:

> Take a look at a distributed data structure server, for example Redis. The
> are various Storm integrations available.
>
> On Monday, July 14, 2014, 唐思成 <ja...@qq.com> wrote:
>
>>  Use case is simple, count unique user in for in a window slide, and I
>> found the common solutions over the Internet is to use HashSet to fliter
>> the duplicated user,like this
>>
>>  public class Distinct extends BaseFilter {
>>     private static final long serialVersionUID = 1L;
>>
>>     private Set<String> distincter = Collections.synchronizedSet(new HashSet<String>());
>>      @Override
>>     public boolean isKeep(TridentTuple tuple) {
>>         String id = this.getId(tuple);
>>         return distincter.add(id);
>>     }
>>      public String getId(TridentTuple t) {
>>         StringBuilder sb = new StringBuilder();
>>         for (int i = 0; i < t.size(); i++) {
>>             sb.append(t.getString(i));
>>         }
>>         return sb.toString();
>>     }
>> }
>>
>> However, the HashSet is stored in memory, when the data grows to a very
>> large level, I think it will cause a OOM.
>> So is there a scalable solution?
>>
>> 2014-07-14
>> ------------------------------
>> 唐思成
>>
>
>
> --
> Danijel Schiavuzzi
>
> E: danijel@schiavuzzi.com
> W: www.schiavuzzi.com
> T: +385989035562
> Skype: danijels7
>

Re: How to implememt distinct count in trident topolgy?

Posted by Danijel Schiavuzzi <da...@schiavuzzi.com>.
Take a look at a distributed data structure server, for example Redis. The
are various Storm integrations available.

On Monday, July 14, 2014, 唐思成 <ja...@qq.com> wrote:

>  Use case is simple, count unique user in for in a window slide, and I
> found the common solutions over the Internet is to use HashSet to fliter
> the duplicated user,like this
>
>  public class Distinct extends BaseFilter {
>     private static final long serialVersionUID = 1L;
>
>     private Set<String> distincter = Collections.synchronizedSet(new HashSet<String>());
>      @Override
>     public boolean isKeep(TridentTuple tuple) {
>         String id = this.getId(tuple);
>         return distincter.add(id);
>     }
>      public String getId(TridentTuple t) {
>         StringBuilder sb = new StringBuilder();
>         for (int i = 0; i < t.size(); i++) {
>             sb.append(t.getString(i));
>         }
>         return sb.toString();
>     }
> }
>
> However, the HashSet is stored in memory, when the data grows to a very
> large level, I think it will cause a OOM.
> So is there a scalable solution?
>
> 2014-07-14
> ------------------------------
> 唐思成
>


-- 
Danijel Schiavuzzi

E: danijel@schiavuzzi.com
W: www.schiavuzzi.com
T: +385989035562
Skype: danijels7