You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Gavin Yue <yu...@gmail.com> on 2016/01/08 23:04:34 UTC

How to merge two large table and remove duplicates?

Hey,

I got everyday's Event table and want to merge them into a single Event
table. But there so many duplicates among each day's data.

I use Parquet as the data source.  What I am doing now is

EventDay1.unionAll(EventDay2).distinct().write.parquet("a new parquet
file").

Each day's Event is stored in their own Parquet file

But it failed at the stage2 which keeps losing connection to one executor.
I guess this is due to the memory issue.

Any suggestion how I do this efficiently?

Thanks,
Gavin

Re: How to merge two large table and remove duplicates?

Posted by Sayan Sanyal <to...@gmail.com>.
Unsubscribe

Sent from Outlook Mobile

    _____________________________
From: Gavin Yue <yu...@gmail.com>
Sent: Saturday, January 9, 2016 14:33
Subject: Re: How to merge two large table and remove duplicates?
To: Ted Yu <yu...@gmail.com>
Cc: Benyi Wang <be...@gmail.com>, user <us...@spark.apache.org>, ayan guha <gu...@gmail.com>


                             So I tried to set the parquet compression codec to lzo, but hadoop does not have the lzo natives, while lz4 does included.        
      But I could set the code to lz4, it only accepts lzo.       
      
     Any solution here?     
     
    Thank,    
   Gavin   
            
                  
                         
       On Sat, Jan 9, 2016 at 12:09 AM, Gavin Yue     <yu...@gmail.com> wrote:    
                                       I saw in the document, the value is LZO.    Is it LZO or LZ4?          
                
              https://github.com/Cyan4973/lz4       
       
      Based on this benchmark, they differ quite a lot.       
      
      
                               
                 On Fri, Jan 8, 2016 at 9:55 PM, Ted Yu          <yu...@gmail.com> wrote:         
                              gzip is relatively slow. It consumes much CPU.                       
                                  snappy is faster.                                  
                                  LZ4 is faster than             GZIP             and smaller than             Snappy            .            
                                  
                                  Cheers                                                                   
                           On Fri, Jan 8, 2016 at 7:56 PM, Gavin Yue               <yu...@gmail.com> wrote:              
                                                                                Thank you .                  
                  
                 And speaking of compression, is there big difference on performance between gzip and snappy? And why parquet is using gzip by default?                 
                 
                Thanks.                
                
                                                                                 
                                     On Fri, Jan 8, 2016 at 6:39 PM, Ted Yu                    <yu...@gmail.com> wrote:                   
                                                            Cycling old bits:                                           http://search-hadoop.com/m/q3RTtRuvrm1CGzBJ                      
                                                                
                                                                Gavin:                                                                Which release of hbase did you play with ?                                                                
                                                                HBase has been evolving and is getting more stable.                                                                
                                                                Cheers                                                                                                                               
                                               On Fri, Jan 8, 2016 at 6:29 PM, Gavin Yue                         <yu...@gmail.com> wrote:                        
                                                                                                                                  I used to maintain a HBase cluster. The experience with it was not happy.                             
                            
                           I just tried query the data  from each day's first and dedup with smaller set, the performance is acceptable.  So I guess I will use this method.                            
                           
                          Again, could anyone give advice about:                           
                                                                                 Automatically determine the number of reducers for joins and groupbys: Currently in Spark SQL, you need to control the degree of parallelism post-shuffle using “SET spark.sql.shuffle.partitions=[num_tasks];”.                                                     

Thanks.                          

Gavin



                                                                                                                                   
                                                         On Fri, Jan 8, 2016 at 6:25 PM, Ted Yu                              <yu...@gmail.com> wrote:                             
                                                                                          bq.                                in an noSQL db such as Hbase                                                               
                                                                                              +1 :-)                                                                                              
                                                                                                                                                                                           
                                                                   On Fri, Jan 8, 2016 at 6:25 PM, ayan guha                                   <gu...@gmail.com> wrote:                                  
                                                                                                         One option you may want to explore is writing event table in an noSQL db such as Hbase. One inherent problem in your approach is you always need to load either full data set or a defined number of partitions to see if the event has already come (and no gurantee it is full proof, but lead to unnecessary loading in most cases).                                                                                                                                                                                     
                                                                             On Sat, Jan 9, 2016 at 12:56 PM, Gavin Yue                                        <yu...@gmail.com> wrote:                                       
                                                                                                                                                                                                             Hey,                                            
                                          Thank you for the answer. I checked the setting you mentioend they are all correct.  I noticed that in the job, there are always only 200 reducers for shuffle read, I believe it is setting in the sql shuffle parallism.                                           
                                          
                                         In the doc, it mentions:                                          
                                                                                   Automatically determine the number of reducers for joins and groupbys: Currently in Spark SQL, you need to control the degree of parallelism post-shuffle using “SET spark.sql.shuffle.partitions=[num_tasks];”.                                                                                  


                                         

What would be the ideal number for this setting? Is it based on the hardware of cluster?                                         


                                         

Thanks,                                         

Gavin 
                                                                                                                         
                                                                                   On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang <be...@gmail.com> wrote:
                                                                                                                                                                                                                                                                           I assume your parquet files are compressed. Gzip or Snappy?                                               What spark version did you use? It seems at least 1.4. If you use spark-sql and tungsten, you might have better performance. but spark 1.5.2 gave me a wrong result when the data was about 300~400GB, just for a simple group-by and aggregate.                                               Did you use kyro serialization?                                                you should have spark.shuffle.compress=true, verify it.                                               How many tasks did you use? spark.default.parallelism=?                                                What about this:                                                                                                Read the data day by day                                                compute a bucket id from timestamp, e.g., the date and hour                                                Write into different buckets (you probably need a special writer to write data efficiently without shuffling the data).                                                distinct for each bucket. Because each bucket is small, spark can get it done faster than having everything in one run.                                                I think using groupBy (userId, timestamp) might be better than distinct. I guess distinct() will compare every field.                                                                                                                                                                                                                                                                                                                                     
                                                                                                 On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue                                                  <yu...@gmail.com> wrote:                                                 
                                                                                                                                                                                                                                                               And the most frequent operation I am gonna do is find the UserID who have some events, then retrieve all the events associted with the UserID.                                                      
                                                     
                                                    In this case, how should I partition to speed up the process?                                                     
                                                    
                                                   Thanks.                                                   
                                                                                                                                                                                                                                                                
                                                                                                           On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue                                                       <yu...@gmail.com> wrote:                                                      
                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          hey Ted,                                                               
                                                              
                                                             Event table is like this: UserID, EventType, EventKey, TimeStamp, MetaData.  I just parse it from Json and save as Parquet, did not change the partition.                                                              
                                                             
                                                            Annoyingly, every day's incoming Event data having duplicates among each other.  One same event could show up in Day1 and Day2 and probably Day3.                                                             
                                                            
                                                           I only want to keep single Event table and each day it come so many duplicates.                                                           
                                                           
                                                          Is there a way I could just insert into Parquet and if duplicate found, just ignore?                                                           
                                                          
                                                         Thanks,                                                         
                                                        Gavin                                                         
                                                                                                                                                                                                                                      
                                                           
                                                           
                                                                                                                       
                                                            
                                                            
                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               
                                                                                                                     On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu                                                            <yu...@gmail.com> wrote:                                                           
                                                                                                                                                                                    Is your                                                              Parquet data source partitioned by date ?                                                                                                                           
                                                                                                                                                                                        Can you dedup within partitions ?                                                                                                                                                                                        
                                                                                                                                                                                        Cheers                                                                                                                                                                                                                                                                                                                                                                               
                                                                                                                               On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue                                                                 <yu...@gmail.com> wrote:                                                                
                                                                                                                                                                                                                                                                                                                                          I tried on Three day's data.  The total input is only 980GB, but the shuffle write Data is about 6.2TB, then the job failed during shuffle read step, which should be another 6.2TB shuffle read.                                                                     
                                                                    
                                                                   I think to Dedup, the shuffling can not be avoided. Is there anything I could do to stablize this process?                                                                    
                                                                   
                                                                  Thanks.                                                                  
                                                                  
                                                                                                                                                                                                                                                                                                                                           
                                                                                                                                         On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue                                                                      <yu...@gmail.com> wrote:                                                                     
                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               Hey,                                                                               
                                                                              
                                                                             I got everyday's Event table and want to merge them into a single Event table. But there so many duplicates among each day's data.                                                                              
                                                                             
                                                                            I use Parquet as the data source.  What I am doing now is                                                                             
                                                                            
                                                                           EventDay1.unionAll(EventDay2).distinct().write.parquet("a new parquet file").                                                                            
                                                                           
                                                                                                                                                                                                                               Each day's Event is stored in their own Parquet file                                                                           
                                                                                                                                                                                                                               
                                                                          But it failed at the stage2 which keeps losing connection to one executor. I guess this is due to the memory issue.                                                                           
                                                                          
                                                                         Any suggestion how I do this efficiently?                                                                          
                                                                         
                                                                        Thanks,                                                                        
                                                                       Gavin                                                                        
                                                                                                                                                                                                                                                                                    
                                                                                                                                                                                                                                                                                                                                                                                                     
                                                                                                                                                                                                                                                                                                                                                                       
                                                                                                                                                                                                                                                                                                                                         
                                                                                                                                                                                                                                                                                                           
                                                                                                                                                                                                                                                                                                                                                              
                                                                                                                                                            
                                      
                                                                             
                                                                                                                                                   -- 
                                                                             Best Regards,                                       
Ayan Guha                                       
                                                                                                                                                                               
                                                                                                                                                                                   
                                                                                                                                                     
                                                                                                                       
                                                                                         
                                                           
                             
    


  

Re: How to merge two large table and remove duplicates?

Posted by Ted Yu <yu...@gmail.com>.
See the first half of this wiki:

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+LZO

> On Jan 9, 2016, at 1:02 AM, Gavin Yue <yu...@gmail.com> wrote:
> 
> So I tried to set the parquet compression codec to lzo, but hadoop does not have the lzo natives, while lz4 does included. 
> But I could set the code to lz4, it only accepts lzo. 
> 
> Any solution here?
> 
> Thank,
> Gavin
> 
> 
> 
>> On Sat, Jan 9, 2016 at 12:09 AM, Gavin Yue <yu...@gmail.com> wrote:
>> I saw in the document, the value is LZO.    Is it LZO or LZ4? 
>> 
>> https://github.com/Cyan4973/lz4
>> 
>> Based on this benchmark, they differ quite a lot. 
>> 
>> 
>> 
>>> On Fri, Jan 8, 2016 at 9:55 PM, Ted Yu <yu...@gmail.com> wrote:
>>> gzip is relatively slow. It consumes much CPU.
>>> 
>>> snappy is faster.
>>> 
>>> LZ4 is faster than GZIP and smaller than Snappy.
>>> 
>>> Cheers
>>> 
>>>> On Fri, Jan 8, 2016 at 7:56 PM, Gavin Yue <yu...@gmail.com> wrote:
>>>> Thank you .
>>>> 
>>>> And speaking of compression, is there big difference on performance between gzip and snappy? And why parquet is using gzip by default?
>>>> 
>>>> Thanks.
>>>> 
>>>> 
>>>>> On Fri, Jan 8, 2016 at 6:39 PM, Ted Yu <yu...@gmail.com> wrote:
>>>>> Cycling old bits:
>>>>> http://search-hadoop.com/m/q3RTtRuvrm1CGzBJ
>>>>> 
>>>>> Gavin:
>>>>> Which release of hbase did you play with ?
>>>>> 
>>>>> HBase has been evolving and is getting more stable.
>>>>> 
>>>>> Cheers
>>>>> 
>>>>>> On Fri, Jan 8, 2016 at 6:29 PM, Gavin Yue <yu...@gmail.com> wrote:
>>>>>> I used to maintain a HBase cluster. The experience with it was not happy. 
>>>>>> 
>>>>>> I just tried query the data  from each day's first and dedup with smaller set, the performance is acceptable.  So I guess I will use this method. 
>>>>>> 
>>>>>> Again, could anyone give advice about: 
>>>>>> Automatically determine the number of reducers for joins and groupbys: Currently in Spark SQL, you need to control the degree of parallelism post-shuffle using “SET spark.sql.shuffle.partitions=[num_tasks];”.
>>>>>> Thanks.
>>>>>> 
>>>>>> Gavin
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>>> On Fri, Jan 8, 2016 at 6:25 PM, Ted Yu <yu...@gmail.com> wrote:
>>>>>>> bq. in an noSQL db such as Hbase
>>>>>>> 
>>>>>>> +1 :-)
>>>>>>> 
>>>>>>> 
>>>>>>>> On Fri, Jan 8, 2016 at 6:25 PM, ayan guha <gu...@gmail.com> wrote:
>>>>>>>> One option you may want to explore is writing event table in an noSQL db such as Hbase. One inherent problem in your approach is you always need to load either full data set or a defined number of partitions to see if the event has already come (and no gurantee it is full proof, but lead to unnecessary loading in most cases).
>>>>>>>> 
>>>>>>>>> On Sat, Jan 9, 2016 at 12:56 PM, Gavin Yue <yu...@gmail.com> wrote:
>>>>>>>>> Hey, 
>>>>>>>>> Thank you for the answer. I checked the setting you mentioend they are all correct.  I noticed that in the job, there are always only 200 reducers for shuffle read, I believe it is setting in the sql shuffle parallism. 
>>>>>>>>> 
>>>>>>>>> In the doc, it mentions: 
>>>>>>>>> Automatically determine the number of reducers for joins and groupbys: Currently in Spark SQL, you need to control the degree of parallelism post-shuffle using “SET spark.sql.shuffle.partitions=[num_tasks];”.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> What would be the ideal number for this setting? Is it based on the hardware of cluster?
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Thanks,
>>>>>>>>> 
>>>>>>>>> Gavin 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>>> On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang <be...@gmail.com> wrote:
>>>>>>>>> 
>>>>>>>>>> I assume your parquet files are compressed. Gzip or Snappy?
>>>>>>>>>> What spark version did you use? It seems at least 1.4. If you use spark-sql and tungsten, you might have better performance. but spark 1.5.2 gave me a wrong result when the data was about 300~400GB, just for a simple group-by and aggregate.
>>>>>>>>>> Did you use kyro serialization? 
>>>>>>>>>> you should have spark.shuffle.compress=true, verify it.
>>>>>>>>>> How many tasks did you use? spark.default.parallelism=? 
>>>>>>>>>> What about this: 
>>>>>>>>>> Read the data day by day
>>>>>>>>>> compute a bucket id from timestamp, e.g., the date and hour
>>>>>>>>>> Write into different buckets (you probably need a special writer to write data efficiently without shuffling the data).
>>>>>>>>>> distinct for each bucket. Because each bucket is small, spark can get it done faster than having everything in one run.
>>>>>>>>>> I think using groupBy (userId, timestamp) might be better than distinct. I guess distinct() will compare every field. 
>>>>>>>>>> 
>>>>>>>>>>> On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue <yu...@gmail.com> wrote:
>>>>>>>>>>> And the most frequent operation I am gonna do is find the UserID who have some events, then retrieve all the events associted with the UserID. 
>>>>>>>>>>> 
>>>>>>>>>>> In this case, how should I partition to speed up the process? 
>>>>>>>>>>> 
>>>>>>>>>>> Thanks.
>>>>>>>>>>> 
>>>>>>>>>>>> On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue <yu...@gmail.com> wrote:
>>>>>>>>>>>> hey Ted, 
>>>>>>>>>>>> 
>>>>>>>>>>>> Event table is like this: UserID, EventType, EventKey, TimeStamp, MetaData.  I just parse it from Json and save as Parquet, did not change the partition. 
>>>>>>>>>>>> 
>>>>>>>>>>>> Annoyingly, every day's incoming Event data having duplicates among each other.  One same event could show up in Day1 and Day2 and probably Day3. 
>>>>>>>>>>>> 
>>>>>>>>>>>> I only want to keep single Event table and each day it come so many duplicates.
>>>>>>>>>>>> 
>>>>>>>>>>>> Is there a way I could just insert into Parquet and if duplicate found, just ignore? 
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Gavin 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>>> On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu <yu...@gmail.com> wrote:
>>>>>>>>>>>>> Is your Parquet data source partitioned by date ?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Can you dedup within partitions ?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Cheers
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue <yu...@gmail.com> wrote:
>>>>>>>>>>>>>> I tried on Three day's data.  The total input is only 980GB, but the shuffle write Data is about 6.2TB, then the job failed during shuffle read step, which should be another 6.2TB shuffle read. 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I think to Dedup, the shuffling can not be avoided. Is there anything I could do to stablize this process? 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue <yu...@gmail.com> wrote:
>>>>>>>>>>>>>>> Hey, 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I got everyday's Event table and want to merge them into a single Event table. But there so many duplicates among each day's data. 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I use Parquet as the data source.  What I am doing now is 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new parquet file"). 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Each day's Event is stored in their own Parquet file
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> But it failed at the stage2 which keeps losing connection to one executor. I guess this is due to the memory issue. 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Any suggestion how I do this efficiently? 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>> Gavin
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> -- 
>>>>>>>> Best Regards,
>>>>>>>> Ayan Guha
> 

Re: How to merge two large table and remove duplicates?

Posted by Gavin Yue <yu...@gmail.com>.
So I tried to set the parquet compression codec to lzo, but hadoop does not
have the lzo natives, while lz4 does included.
But I could set the code to lz4, it only accepts lzo.

Any solution here?

Thank,
Gavin



On Sat, Jan 9, 2016 at 12:09 AM, Gavin Yue <yu...@gmail.com> wrote:

> I saw in the document, the value is LZO.    Is it LZO or LZ4?
>
> https://github.com/Cyan4973/lz4
>
> Based on this benchmark, they differ quite a lot.
>
>
>
> On Fri, Jan 8, 2016 at 9:55 PM, Ted Yu <yu...@gmail.com> wrote:
>
>> gzip is relatively slow. It consumes much CPU.
>>
>> snappy is faster.
>>
>> LZ4 is faster than GZIP and smaller than Snappy.
>>
>> Cheers
>>
>> On Fri, Jan 8, 2016 at 7:56 PM, Gavin Yue <yu...@gmail.com> wrote:
>>
>>> Thank you .
>>>
>>> And speaking of compression, is there big difference on performance
>>> between gzip and snappy? And why parquet is using gzip by default?
>>>
>>> Thanks.
>>>
>>>
>>> On Fri, Jan 8, 2016 at 6:39 PM, Ted Yu <yu...@gmail.com> wrote:
>>>
>>>> Cycling old bits:
>>>> http://search-hadoop.com/m/q3RTtRuvrm1CGzBJ
>>>>
>>>> Gavin:
>>>> Which release of hbase did you play with ?
>>>>
>>>> HBase has been evolving and is getting more stable.
>>>>
>>>> Cheers
>>>>
>>>> On Fri, Jan 8, 2016 at 6:29 PM, Gavin Yue <yu...@gmail.com>
>>>> wrote:
>>>>
>>>>> I used to maintain a HBase cluster. The experience with it was not
>>>>> happy.
>>>>>
>>>>> I just tried query the data  from each day's first and dedup with
>>>>> smaller set, the performance is acceptable.  So I guess I will use this
>>>>> method.
>>>>>
>>>>> Again, could anyone give advice about:
>>>>>
>>>>>    - Automatically determine the number of reducers for joins and
>>>>>    groupbys: Currently in Spark SQL, you need to control the degree of
>>>>>    parallelism post-shuffle using “SET
>>>>>    spark.sql.shuffle.partitions=[num_tasks];”.
>>>>>
>>>>> Thanks.
>>>>>
>>>>> Gavin
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Jan 8, 2016 at 6:25 PM, Ted Yu <yu...@gmail.com> wrote:
>>>>>
>>>>>> bq. in an noSQL db such as Hbase
>>>>>>
>>>>>> +1 :-)
>>>>>>
>>>>>>
>>>>>> On Fri, Jan 8, 2016 at 6:25 PM, ayan guha <gu...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> One option you may want to explore is writing event table in an
>>>>>>> noSQL db such as Hbase. One inherent problem in your approach is you always
>>>>>>> need to load either full data set or a defined number of partitions to see
>>>>>>> if the event has already come (and no gurantee it is full proof, but lead
>>>>>>> to unnecessary loading in most cases).
>>>>>>>
>>>>>>> On Sat, Jan 9, 2016 at 12:56 PM, Gavin Yue <yu...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hey,
>>>>>>>> Thank you for the answer. I checked the setting you mentioend they
>>>>>>>> are all correct.  I noticed that in the job, there are always only 200
>>>>>>>> reducers for shuffle read, I believe it is setting in the sql shuffle
>>>>>>>> parallism.
>>>>>>>>
>>>>>>>> In the doc, it mentions:
>>>>>>>>
>>>>>>>>    - Automatically determine the number of reducers for joins and
>>>>>>>>    groupbys: Currently in Spark SQL, you need to control the degree of
>>>>>>>>    parallelism post-shuffle using “SET
>>>>>>>>    spark.sql.shuffle.partitions=[num_tasks];”.
>>>>>>>>
>>>>>>>>
>>>>>>>> What would be the ideal number for this setting? Is it based on the
>>>>>>>> hardware of cluster?
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>
>>>>>>>> Gavin
>>>>>>>>
>>>>>>>> On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang <be...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>>    - I assume your parquet files are compressed. Gzip or Snappy?
>>>>>>>>>    - What spark version did you use? It seems at least 1.4. If
>>>>>>>>>    you use spark-sql and tungsten, you might have better performance. but
>>>>>>>>>    spark 1.5.2 gave me a wrong result when the data was about 300~400GB, just
>>>>>>>>>    for a simple group-by and aggregate.
>>>>>>>>>    - Did you use kyro serialization?
>>>>>>>>>    - you should have spark.shuffle.compress=true, verify it.
>>>>>>>>>    - How many tasks did you use? spark.default.parallelism=?
>>>>>>>>>    - What about this:
>>>>>>>>>       - Read the data day by day
>>>>>>>>>       - compute a bucket id from timestamp, e.g., the date and
>>>>>>>>>       hour
>>>>>>>>>       - Write into different buckets (you probably need a special
>>>>>>>>>       writer to write data efficiently without shuffling the data).
>>>>>>>>>       - distinct for each bucket. Because each bucket is small,
>>>>>>>>>       spark can get it done faster than having everything in one run.
>>>>>>>>>       - I think using groupBy (userId, timestamp) might be better
>>>>>>>>>       than distinct. I guess distinct() will compare every field.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue <yu...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> And the most frequent operation I am gonna do is find the UserID
>>>>>>>>>> who have some events, then retrieve all the events associted with the
>>>>>>>>>> UserID.
>>>>>>>>>>
>>>>>>>>>> In this case, how should I partition to speed up the process?
>>>>>>>>>>
>>>>>>>>>> Thanks.
>>>>>>>>>>
>>>>>>>>>> On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue <yue.yuanyuan@gmail.com
>>>>>>>>>> > wrote:
>>>>>>>>>>
>>>>>>>>>>> hey Ted,
>>>>>>>>>>>
>>>>>>>>>>> Event table is like this: UserID, EventType, EventKey,
>>>>>>>>>>> TimeStamp, MetaData.  I just parse it from Json and save as Parquet, did
>>>>>>>>>>> not change the partition.
>>>>>>>>>>>
>>>>>>>>>>> Annoyingly, every day's incoming Event data having duplicates
>>>>>>>>>>> among each other.  One same event could show up in Day1 and Day2 and
>>>>>>>>>>> probably Day3.
>>>>>>>>>>>
>>>>>>>>>>> I only want to keep single Event table and each day it come so
>>>>>>>>>>> many duplicates.
>>>>>>>>>>>
>>>>>>>>>>> Is there a way I could just insert into Parquet and if duplicate
>>>>>>>>>>> found, just ignore?
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Gavin
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu <yu...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Is your Parquet data source partitioned by date ?
>>>>>>>>>>>>
>>>>>>>>>>>> Can you dedup within partitions ?
>>>>>>>>>>>>
>>>>>>>>>>>> Cheers
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue <
>>>>>>>>>>>> yue.yuanyuan@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I tried on Three day's data.  The total input is only 980GB,
>>>>>>>>>>>>> but the shuffle write Data is about 6.2TB, then the job failed during
>>>>>>>>>>>>> shuffle read step, which should be another 6.2TB shuffle read.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I think to Dedup, the shuffling can not be avoided. Is there
>>>>>>>>>>>>> anything I could do to stablize this process?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue <
>>>>>>>>>>>>> yue.yuanyuan@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hey,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I got everyday's Event table and want to merge them into a
>>>>>>>>>>>>>> single Event table. But there so many duplicates among each day's data.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I use Parquet as the data source.  What I am doing now is
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new
>>>>>>>>>>>>>> parquet file").
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Each day's Event is stored in their own Parquet file
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> But it failed at the stage2 which keeps losing connection to
>>>>>>>>>>>>>> one executor. I guess this is due to the memory issue.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Any suggestion how I do this efficiently?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Gavin
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best Regards,
>>>>>>> Ayan Guha
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: How to merge two large table and remove duplicates?

Posted by Gavin Yue <yu...@gmail.com>.
I saw in the document, the value is LZO.    Is it LZO or LZ4?

https://github.com/Cyan4973/lz4

Based on this benchmark, they differ quite a lot.



On Fri, Jan 8, 2016 at 9:55 PM, Ted Yu <yu...@gmail.com> wrote:

> gzip is relatively slow. It consumes much CPU.
>
> snappy is faster.
>
> LZ4 is faster than GZIP and smaller than Snappy.
>
> Cheers
>
> On Fri, Jan 8, 2016 at 7:56 PM, Gavin Yue <yu...@gmail.com> wrote:
>
>> Thank you .
>>
>> And speaking of compression, is there big difference on performance
>> between gzip and snappy? And why parquet is using gzip by default?
>>
>> Thanks.
>>
>>
>> On Fri, Jan 8, 2016 at 6:39 PM, Ted Yu <yu...@gmail.com> wrote:
>>
>>> Cycling old bits:
>>> http://search-hadoop.com/m/q3RTtRuvrm1CGzBJ
>>>
>>> Gavin:
>>> Which release of hbase did you play with ?
>>>
>>> HBase has been evolving and is getting more stable.
>>>
>>> Cheers
>>>
>>> On Fri, Jan 8, 2016 at 6:29 PM, Gavin Yue <yu...@gmail.com>
>>> wrote:
>>>
>>>> I used to maintain a HBase cluster. The experience with it was not
>>>> happy.
>>>>
>>>> I just tried query the data  from each day's first and dedup with
>>>> smaller set, the performance is acceptable.  So I guess I will use this
>>>> method.
>>>>
>>>> Again, could anyone give advice about:
>>>>
>>>>    - Automatically determine the number of reducers for joins and
>>>>    groupbys: Currently in Spark SQL, you need to control the degree of
>>>>    parallelism post-shuffle using “SET
>>>>    spark.sql.shuffle.partitions=[num_tasks];”.
>>>>
>>>> Thanks.
>>>>
>>>> Gavin
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Jan 8, 2016 at 6:25 PM, Ted Yu <yu...@gmail.com> wrote:
>>>>
>>>>> bq. in an noSQL db such as Hbase
>>>>>
>>>>> +1 :-)
>>>>>
>>>>>
>>>>> On Fri, Jan 8, 2016 at 6:25 PM, ayan guha <gu...@gmail.com> wrote:
>>>>>
>>>>>> One option you may want to explore is writing event table in an noSQL
>>>>>> db such as Hbase. One inherent problem in your approach is you always need
>>>>>> to load either full data set or a defined number of partitions to see if
>>>>>> the event has already come (and no gurantee it is full proof, but lead to
>>>>>> unnecessary loading in most cases).
>>>>>>
>>>>>> On Sat, Jan 9, 2016 at 12:56 PM, Gavin Yue <yu...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hey,
>>>>>>> Thank you for the answer. I checked the setting you mentioend they
>>>>>>> are all correct.  I noticed that in the job, there are always only 200
>>>>>>> reducers for shuffle read, I believe it is setting in the sql shuffle
>>>>>>> parallism.
>>>>>>>
>>>>>>> In the doc, it mentions:
>>>>>>>
>>>>>>>    - Automatically determine the number of reducers for joins and
>>>>>>>    groupbys: Currently in Spark SQL, you need to control the degree of
>>>>>>>    parallelism post-shuffle using “SET
>>>>>>>    spark.sql.shuffle.partitions=[num_tasks];”.
>>>>>>>
>>>>>>>
>>>>>>> What would be the ideal number for this setting? Is it based on the
>>>>>>> hardware of cluster?
>>>>>>>
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Gavin
>>>>>>>
>>>>>>> On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang <be...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>>    - I assume your parquet files are compressed. Gzip or Snappy?
>>>>>>>>    - What spark version did you use? It seems at least 1.4. If you
>>>>>>>>    use spark-sql and tungsten, you might have better performance. but spark
>>>>>>>>    1.5.2 gave me a wrong result when the data was about 300~400GB, just for a
>>>>>>>>    simple group-by and aggregate.
>>>>>>>>    - Did you use kyro serialization?
>>>>>>>>    - you should have spark.shuffle.compress=true, verify it.
>>>>>>>>    - How many tasks did you use? spark.default.parallelism=?
>>>>>>>>    - What about this:
>>>>>>>>       - Read the data day by day
>>>>>>>>       - compute a bucket id from timestamp, e.g., the date and hour
>>>>>>>>       - Write into different buckets (you probably need a special
>>>>>>>>       writer to write data efficiently without shuffling the data).
>>>>>>>>       - distinct for each bucket. Because each bucket is small,
>>>>>>>>       spark can get it done faster than having everything in one run.
>>>>>>>>       - I think using groupBy (userId, timestamp) might be better
>>>>>>>>       than distinct. I guess distinct() will compare every field.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue <yu...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> And the most frequent operation I am gonna do is find the UserID
>>>>>>>>> who have some events, then retrieve all the events associted with the
>>>>>>>>> UserID.
>>>>>>>>>
>>>>>>>>> In this case, how should I partition to speed up the process?
>>>>>>>>>
>>>>>>>>> Thanks.
>>>>>>>>>
>>>>>>>>> On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue <yu...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> hey Ted,
>>>>>>>>>>
>>>>>>>>>> Event table is like this: UserID, EventType, EventKey, TimeStamp,
>>>>>>>>>> MetaData.  I just parse it from Json and save as Parquet, did not change
>>>>>>>>>> the partition.
>>>>>>>>>>
>>>>>>>>>> Annoyingly, every day's incoming Event data having duplicates
>>>>>>>>>> among each other.  One same event could show up in Day1 and Day2 and
>>>>>>>>>> probably Day3.
>>>>>>>>>>
>>>>>>>>>> I only want to keep single Event table and each day it come so
>>>>>>>>>> many duplicates.
>>>>>>>>>>
>>>>>>>>>> Is there a way I could just insert into Parquet and if duplicate
>>>>>>>>>> found, just ignore?
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Gavin
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu <yu...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Is your Parquet data source partitioned by date ?
>>>>>>>>>>>
>>>>>>>>>>> Can you dedup within partitions ?
>>>>>>>>>>>
>>>>>>>>>>> Cheers
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue <
>>>>>>>>>>> yue.yuanyuan@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I tried on Three day's data.  The total input is only 980GB,
>>>>>>>>>>>> but the shuffle write Data is about 6.2TB, then the job failed during
>>>>>>>>>>>> shuffle read step, which should be another 6.2TB shuffle read.
>>>>>>>>>>>>
>>>>>>>>>>>> I think to Dedup, the shuffling can not be avoided. Is there
>>>>>>>>>>>> anything I could do to stablize this process?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue <
>>>>>>>>>>>> yue.yuanyuan@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hey,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I got everyday's Event table and want to merge them into a
>>>>>>>>>>>>> single Event table. But there so many duplicates among each day's data.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I use Parquet as the data source.  What I am doing now is
>>>>>>>>>>>>>
>>>>>>>>>>>>> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new
>>>>>>>>>>>>> parquet file").
>>>>>>>>>>>>>
>>>>>>>>>>>>> Each day's Event is stored in their own Parquet file
>>>>>>>>>>>>>
>>>>>>>>>>>>> But it failed at the stage2 which keeps losing connection to
>>>>>>>>>>>>> one executor. I guess this is due to the memory issue.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Any suggestion how I do this efficiently?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Gavin
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best Regards,
>>>>>> Ayan Guha
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: How to merge two large table and remove duplicates?

Posted by Ted Yu <yu...@gmail.com>.
gzip is relatively slow. It consumes much CPU.

snappy is faster.

LZ4 is faster than GZIP and smaller than Snappy.

Cheers

On Fri, Jan 8, 2016 at 7:56 PM, Gavin Yue <yu...@gmail.com> wrote:

> Thank you .
>
> And speaking of compression, is there big difference on performance
> between gzip and snappy? And why parquet is using gzip by default?
>
> Thanks.
>
>
> On Fri, Jan 8, 2016 at 6:39 PM, Ted Yu <yu...@gmail.com> wrote:
>
>> Cycling old bits:
>> http://search-hadoop.com/m/q3RTtRuvrm1CGzBJ
>>
>> Gavin:
>> Which release of hbase did you play with ?
>>
>> HBase has been evolving and is getting more stable.
>>
>> Cheers
>>
>> On Fri, Jan 8, 2016 at 6:29 PM, Gavin Yue <yu...@gmail.com> wrote:
>>
>>> I used to maintain a HBase cluster. The experience with it was not
>>> happy.
>>>
>>> I just tried query the data  from each day's first and dedup with
>>> smaller set, the performance is acceptable.  So I guess I will use this
>>> method.
>>>
>>> Again, could anyone give advice about:
>>>
>>>    - Automatically determine the number of reducers for joins and
>>>    groupbys: Currently in Spark SQL, you need to control the degree of
>>>    parallelism post-shuffle using “SET
>>>    spark.sql.shuffle.partitions=[num_tasks];”.
>>>
>>> Thanks.
>>>
>>> Gavin
>>>
>>>
>>>
>>>
>>> On Fri, Jan 8, 2016 at 6:25 PM, Ted Yu <yu...@gmail.com> wrote:
>>>
>>>> bq. in an noSQL db such as Hbase
>>>>
>>>> +1 :-)
>>>>
>>>>
>>>> On Fri, Jan 8, 2016 at 6:25 PM, ayan guha <gu...@gmail.com> wrote:
>>>>
>>>>> One option you may want to explore is writing event table in an noSQL
>>>>> db such as Hbase. One inherent problem in your approach is you always need
>>>>> to load either full data set or a defined number of partitions to see if
>>>>> the event has already come (and no gurantee it is full proof, but lead to
>>>>> unnecessary loading in most cases).
>>>>>
>>>>> On Sat, Jan 9, 2016 at 12:56 PM, Gavin Yue <yu...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hey,
>>>>>> Thank you for the answer. I checked the setting you mentioend they
>>>>>> are all correct.  I noticed that in the job, there are always only 200
>>>>>> reducers for shuffle read, I believe it is setting in the sql shuffle
>>>>>> parallism.
>>>>>>
>>>>>> In the doc, it mentions:
>>>>>>
>>>>>>    - Automatically determine the number of reducers for joins and
>>>>>>    groupbys: Currently in Spark SQL, you need to control the degree of
>>>>>>    parallelism post-shuffle using “SET
>>>>>>    spark.sql.shuffle.partitions=[num_tasks];”.
>>>>>>
>>>>>>
>>>>>> What would be the ideal number for this setting? Is it based on the
>>>>>> hardware of cluster?
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Gavin
>>>>>>
>>>>>> On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang <be...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>>
>>>>>>>    - I assume your parquet files are compressed. Gzip or Snappy?
>>>>>>>    - What spark version did you use? It seems at least 1.4. If you
>>>>>>>    use spark-sql and tungsten, you might have better performance. but spark
>>>>>>>    1.5.2 gave me a wrong result when the data was about 300~400GB, just for a
>>>>>>>    simple group-by and aggregate.
>>>>>>>    - Did you use kyro serialization?
>>>>>>>    - you should have spark.shuffle.compress=true, verify it.
>>>>>>>    - How many tasks did you use? spark.default.parallelism=?
>>>>>>>    - What about this:
>>>>>>>       - Read the data day by day
>>>>>>>       - compute a bucket id from timestamp, e.g., the date and hour
>>>>>>>       - Write into different buckets (you probably need a special
>>>>>>>       writer to write data efficiently without shuffling the data).
>>>>>>>       - distinct for each bucket. Because each bucket is small,
>>>>>>>       spark can get it done faster than having everything in one run.
>>>>>>>       - I think using groupBy (userId, timestamp) might be better
>>>>>>>       than distinct. I guess distinct() will compare every field.
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue <yu...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> And the most frequent operation I am gonna do is find the UserID
>>>>>>>> who have some events, then retrieve all the events associted with the
>>>>>>>> UserID.
>>>>>>>>
>>>>>>>> In this case, how should I partition to speed up the process?
>>>>>>>>
>>>>>>>> Thanks.
>>>>>>>>
>>>>>>>> On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue <yu...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> hey Ted,
>>>>>>>>>
>>>>>>>>> Event table is like this: UserID, EventType, EventKey, TimeStamp,
>>>>>>>>> MetaData.  I just parse it from Json and save as Parquet, did not change
>>>>>>>>> the partition.
>>>>>>>>>
>>>>>>>>> Annoyingly, every day's incoming Event data having duplicates
>>>>>>>>> among each other.  One same event could show up in Day1 and Day2 and
>>>>>>>>> probably Day3.
>>>>>>>>>
>>>>>>>>> I only want to keep single Event table and each day it come so
>>>>>>>>> many duplicates.
>>>>>>>>>
>>>>>>>>> Is there a way I could just insert into Parquet and if duplicate
>>>>>>>>> found, just ignore?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Gavin
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu <yu...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Is your Parquet data source partitioned by date ?
>>>>>>>>>>
>>>>>>>>>> Can you dedup within partitions ?
>>>>>>>>>>
>>>>>>>>>> Cheers
>>>>>>>>>>
>>>>>>>>>> On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue <yue.yuanyuan@gmail.com
>>>>>>>>>> > wrote:
>>>>>>>>>>
>>>>>>>>>>> I tried on Three day's data.  The total input is only 980GB, but
>>>>>>>>>>> the shuffle write Data is about 6.2TB, then the job failed during shuffle
>>>>>>>>>>> read step, which should be another 6.2TB shuffle read.
>>>>>>>>>>>
>>>>>>>>>>> I think to Dedup, the shuffling can not be avoided. Is there
>>>>>>>>>>> anything I could do to stablize this process?
>>>>>>>>>>>
>>>>>>>>>>> Thanks.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue <
>>>>>>>>>>> yue.yuanyuan@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hey,
>>>>>>>>>>>>
>>>>>>>>>>>> I got everyday's Event table and want to merge them into a
>>>>>>>>>>>> single Event table. But there so many duplicates among each day's data.
>>>>>>>>>>>>
>>>>>>>>>>>> I use Parquet as the data source.  What I am doing now is
>>>>>>>>>>>>
>>>>>>>>>>>> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new
>>>>>>>>>>>> parquet file").
>>>>>>>>>>>>
>>>>>>>>>>>> Each day's Event is stored in their own Parquet file
>>>>>>>>>>>>
>>>>>>>>>>>> But it failed at the stage2 which keeps losing connection to
>>>>>>>>>>>> one executor. I guess this is due to the memory issue.
>>>>>>>>>>>>
>>>>>>>>>>>> Any suggestion how I do this efficiently?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Gavin
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best Regards,
>>>>> Ayan Guha
>>>>>
>>>>
>>>>
>>>
>>
>

Re: How to merge two large table and remove duplicates?

Posted by Gavin Yue <yu...@gmail.com>.
Thank you .

And speaking of compression, is there big difference on performance between
gzip and snappy? And why parquet is using gzip by default?

Thanks.


On Fri, Jan 8, 2016 at 6:39 PM, Ted Yu <yu...@gmail.com> wrote:

> Cycling old bits:
> http://search-hadoop.com/m/q3RTtRuvrm1CGzBJ
>
> Gavin:
> Which release of hbase did you play with ?
>
> HBase has been evolving and is getting more stable.
>
> Cheers
>
> On Fri, Jan 8, 2016 at 6:29 PM, Gavin Yue <yu...@gmail.com> wrote:
>
>> I used to maintain a HBase cluster. The experience with it was not happy.
>>
>> I just tried query the data  from each day's first and dedup with smaller
>> set, the performance is acceptable.  So I guess I will use this method.
>>
>> Again, could anyone give advice about:
>>
>>    - Automatically determine the number of reducers for joins and
>>    groupbys: Currently in Spark SQL, you need to control the degree of
>>    parallelism post-shuffle using “SET
>>    spark.sql.shuffle.partitions=[num_tasks];”.
>>
>> Thanks.
>>
>> Gavin
>>
>>
>>
>>
>> On Fri, Jan 8, 2016 at 6:25 PM, Ted Yu <yu...@gmail.com> wrote:
>>
>>> bq. in an noSQL db such as Hbase
>>>
>>> +1 :-)
>>>
>>>
>>> On Fri, Jan 8, 2016 at 6:25 PM, ayan guha <gu...@gmail.com> wrote:
>>>
>>>> One option you may want to explore is writing event table in an noSQL
>>>> db such as Hbase. One inherent problem in your approach is you always need
>>>> to load either full data set or a defined number of partitions to see if
>>>> the event has already come (and no gurantee it is full proof, but lead to
>>>> unnecessary loading in most cases).
>>>>
>>>> On Sat, Jan 9, 2016 at 12:56 PM, Gavin Yue <yu...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hey,
>>>>> Thank you for the answer. I checked the setting you mentioend they are
>>>>> all correct.  I noticed that in the job, there are always only 200 reducers
>>>>> for shuffle read, I believe it is setting in the sql shuffle parallism.
>>>>>
>>>>> In the doc, it mentions:
>>>>>
>>>>>    - Automatically determine the number of reducers for joins and
>>>>>    groupbys: Currently in Spark SQL, you need to control the degree of
>>>>>    parallelism post-shuffle using “SET
>>>>>    spark.sql.shuffle.partitions=[num_tasks];”.
>>>>>
>>>>>
>>>>> What would be the ideal number for this setting? Is it based on the
>>>>> hardware of cluster?
>>>>>
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Gavin
>>>>>
>>>>> On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang <be...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>>
>>>>>>    - I assume your parquet files are compressed. Gzip or Snappy?
>>>>>>    - What spark version did you use? It seems at least 1.4. If you
>>>>>>    use spark-sql and tungsten, you might have better performance. but spark
>>>>>>    1.5.2 gave me a wrong result when the data was about 300~400GB, just for a
>>>>>>    simple group-by and aggregate.
>>>>>>    - Did you use kyro serialization?
>>>>>>    - you should have spark.shuffle.compress=true, verify it.
>>>>>>    - How many tasks did you use? spark.default.parallelism=?
>>>>>>    - What about this:
>>>>>>       - Read the data day by day
>>>>>>       - compute a bucket id from timestamp, e.g., the date and hour
>>>>>>       - Write into different buckets (you probably need a special
>>>>>>       writer to write data efficiently without shuffling the data).
>>>>>>       - distinct for each bucket. Because each bucket is small,
>>>>>>       spark can get it done faster than having everything in one run.
>>>>>>       - I think using groupBy (userId, timestamp) might be better
>>>>>>       than distinct. I guess distinct() will compare every field.
>>>>>>
>>>>>>
>>>>>> On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue <yu...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> And the most frequent operation I am gonna do is find the UserID who
>>>>>>> have some events, then retrieve all the events associted with the UserID.
>>>>>>>
>>>>>>> In this case, how should I partition to speed up the process?
>>>>>>>
>>>>>>> Thanks.
>>>>>>>
>>>>>>> On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue <yu...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> hey Ted,
>>>>>>>>
>>>>>>>> Event table is like this: UserID, EventType, EventKey, TimeStamp,
>>>>>>>> MetaData.  I just parse it from Json and save as Parquet, did not change
>>>>>>>> the partition.
>>>>>>>>
>>>>>>>> Annoyingly, every day's incoming Event data having duplicates among
>>>>>>>> each other.  One same event could show up in Day1 and Day2 and probably
>>>>>>>> Day3.
>>>>>>>>
>>>>>>>> I only want to keep single Event table and each day it come so many
>>>>>>>> duplicates.
>>>>>>>>
>>>>>>>> Is there a way I could just insert into Parquet and if duplicate
>>>>>>>> found, just ignore?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Gavin
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu <yu...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Is your Parquet data source partitioned by date ?
>>>>>>>>>
>>>>>>>>> Can you dedup within partitions ?
>>>>>>>>>
>>>>>>>>> Cheers
>>>>>>>>>
>>>>>>>>> On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue <yu...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> I tried on Three day's data.  The total input is only 980GB, but
>>>>>>>>>> the shuffle write Data is about 6.2TB, then the job failed during shuffle
>>>>>>>>>> read step, which should be another 6.2TB shuffle read.
>>>>>>>>>>
>>>>>>>>>> I think to Dedup, the shuffling can not be avoided. Is there
>>>>>>>>>> anything I could do to stablize this process?
>>>>>>>>>>
>>>>>>>>>> Thanks.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue <yue.yuanyuan@gmail.com
>>>>>>>>>> > wrote:
>>>>>>>>>>
>>>>>>>>>>> Hey,
>>>>>>>>>>>
>>>>>>>>>>> I got everyday's Event table and want to merge them into a
>>>>>>>>>>> single Event table. But there so many duplicates among each day's data.
>>>>>>>>>>>
>>>>>>>>>>> I use Parquet as the data source.  What I am doing now is
>>>>>>>>>>>
>>>>>>>>>>> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new
>>>>>>>>>>> parquet file").
>>>>>>>>>>>
>>>>>>>>>>> Each day's Event is stored in their own Parquet file
>>>>>>>>>>>
>>>>>>>>>>> But it failed at the stage2 which keeps losing connection to one
>>>>>>>>>>> executor. I guess this is due to the memory issue.
>>>>>>>>>>>
>>>>>>>>>>> Any suggestion how I do this efficiently?
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Gavin
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Best Regards,
>>>> Ayan Guha
>>>>
>>>
>>>
>>
>

Re: How to merge two large table and remove duplicates?

Posted by Ted Yu <yu...@gmail.com>.
Cycling old bits:
http://search-hadoop.com/m/q3RTtRuvrm1CGzBJ

Gavin:
Which release of hbase did you play with ?

HBase has been evolving and is getting more stable.

Cheers

On Fri, Jan 8, 2016 at 6:29 PM, Gavin Yue <yu...@gmail.com> wrote:

> I used to maintain a HBase cluster. The experience with it was not happy.
>
> I just tried query the data  from each day's first and dedup with smaller
> set, the performance is acceptable.  So I guess I will use this method.
>
> Again, could anyone give advice about:
>
>    - Automatically determine the number of reducers for joins and
>    groupbys: Currently in Spark SQL, you need to control the degree of
>    parallelism post-shuffle using “SET
>    spark.sql.shuffle.partitions=[num_tasks];”.
>
> Thanks.
>
> Gavin
>
>
>
>
> On Fri, Jan 8, 2016 at 6:25 PM, Ted Yu <yu...@gmail.com> wrote:
>
>> bq. in an noSQL db such as Hbase
>>
>> +1 :-)
>>
>>
>> On Fri, Jan 8, 2016 at 6:25 PM, ayan guha <gu...@gmail.com> wrote:
>>
>>> One option you may want to explore is writing event table in an noSQL db
>>> such as Hbase. One inherent problem in your approach is you always need to
>>> load either full data set or a defined number of partitions to see if the
>>> event has already come (and no gurantee it is full proof, but lead to
>>> unnecessary loading in most cases).
>>>
>>> On Sat, Jan 9, 2016 at 12:56 PM, Gavin Yue <yu...@gmail.com>
>>> wrote:
>>>
>>>> Hey,
>>>> Thank you for the answer. I checked the setting you mentioend they are
>>>> all correct.  I noticed that in the job, there are always only 200 reducers
>>>> for shuffle read, I believe it is setting in the sql shuffle parallism.
>>>>
>>>> In the doc, it mentions:
>>>>
>>>>    - Automatically determine the number of reducers for joins and
>>>>    groupbys: Currently in Spark SQL, you need to control the degree of
>>>>    parallelism post-shuffle using “SET
>>>>    spark.sql.shuffle.partitions=[num_tasks];”.
>>>>
>>>>
>>>> What would be the ideal number for this setting? Is it based on the
>>>> hardware of cluster?
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Gavin
>>>>
>>>> On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang <be...@gmail.com>
>>>> wrote:
>>>>
>>>>>
>>>>>    - I assume your parquet files are compressed. Gzip or Snappy?
>>>>>    - What spark version did you use? It seems at least 1.4. If you
>>>>>    use spark-sql and tungsten, you might have better performance. but spark
>>>>>    1.5.2 gave me a wrong result when the data was about 300~400GB, just for a
>>>>>    simple group-by and aggregate.
>>>>>    - Did you use kyro serialization?
>>>>>    - you should have spark.shuffle.compress=true, verify it.
>>>>>    - How many tasks did you use? spark.default.parallelism=?
>>>>>    - What about this:
>>>>>       - Read the data day by day
>>>>>       - compute a bucket id from timestamp, e.g., the date and hour
>>>>>       - Write into different buckets (you probably need a special
>>>>>       writer to write data efficiently without shuffling the data).
>>>>>       - distinct for each bucket. Because each bucket is small, spark
>>>>>       can get it done faster than having everything in one run.
>>>>>       - I think using groupBy (userId, timestamp) might be better
>>>>>       than distinct. I guess distinct() will compare every field.
>>>>>
>>>>>
>>>>> On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue <yu...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> And the most frequent operation I am gonna do is find the UserID who
>>>>>> have some events, then retrieve all the events associted with the UserID.
>>>>>>
>>>>>> In this case, how should I partition to speed up the process?
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>> On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue <yu...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> hey Ted,
>>>>>>>
>>>>>>> Event table is like this: UserID, EventType, EventKey, TimeStamp,
>>>>>>> MetaData.  I just parse it from Json and save as Parquet, did not change
>>>>>>> the partition.
>>>>>>>
>>>>>>> Annoyingly, every day's incoming Event data having duplicates among
>>>>>>> each other.  One same event could show up in Day1 and Day2 and probably
>>>>>>> Day3.
>>>>>>>
>>>>>>> I only want to keep single Event table and each day it come so many
>>>>>>> duplicates.
>>>>>>>
>>>>>>> Is there a way I could just insert into Parquet and if duplicate
>>>>>>> found, just ignore?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Gavin
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu <yu...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Is your Parquet data source partitioned by date ?
>>>>>>>>
>>>>>>>> Can you dedup within partitions ?
>>>>>>>>
>>>>>>>> Cheers
>>>>>>>>
>>>>>>>> On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue <yu...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I tried on Three day's data.  The total input is only 980GB, but
>>>>>>>>> the shuffle write Data is about 6.2TB, then the job failed during shuffle
>>>>>>>>> read step, which should be another 6.2TB shuffle read.
>>>>>>>>>
>>>>>>>>> I think to Dedup, the shuffling can not be avoided. Is there
>>>>>>>>> anything I could do to stablize this process?
>>>>>>>>>
>>>>>>>>> Thanks.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue <yu...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hey,
>>>>>>>>>>
>>>>>>>>>> I got everyday's Event table and want to merge them into a single
>>>>>>>>>> Event table. But there so many duplicates among each day's data.
>>>>>>>>>>
>>>>>>>>>> I use Parquet as the data source.  What I am doing now is
>>>>>>>>>>
>>>>>>>>>> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new
>>>>>>>>>> parquet file").
>>>>>>>>>>
>>>>>>>>>> Each day's Event is stored in their own Parquet file
>>>>>>>>>>
>>>>>>>>>> But it failed at the stage2 which keeps losing connection to one
>>>>>>>>>> executor. I guess this is due to the memory issue.
>>>>>>>>>>
>>>>>>>>>> Any suggestion how I do this efficiently?
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Gavin
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>>
>

Re: How to merge two large table and remove duplicates?

Posted by Benyi Wang <be...@gmail.com>.
Just try to give 1000, even 2000 to see if it works. If your see something
like "Lost Executor", you'd better to stop your job, otherwise you are
wasting time. Usually the container of the lost executor is killed by
NodeManager because there is not enough memory. You can check NodeManager's
log to confirm it.

There are couple of parameters may affect the performance of shuffle.

--num-executors use larger number, e.g., 2 x #data nodes
--executor-cores give small number 3/4
--executor-memory #cores x (memory for one core)

increase spark.shuffle.memoryFraction

With larger number of spark.sql.shuffle.partitions, a partition (task) will
be smaller and fit in the memory for one core. If you use too large
partitions, the performance might be worse. You have to try based on your
cluster's nodes/memory.

On Fri, Jan 8, 2016 at 6:29 PM, Gavin Yue <yu...@gmail.com> wrote:

> I used to maintain a HBase cluster. The experience with it was not happy.
>
> I just tried query the data  from each day's first and dedup with smaller
> set, the performance is acceptable.  So I guess I will use this method.
>
> Again, could anyone give advice about:
>
>    - Automatically determine the number of reducers for joins and
>    groupbys: Currently in Spark SQL, you need to control the degree of
>    parallelism post-shuffle using “SET
>    spark.sql.shuffle.partitions=[num_tasks];”.
>
> Thanks.
>
> Gavin
>
>
>
>
> On Fri, Jan 8, 2016 at 6:25 PM, Ted Yu <yu...@gmail.com> wrote:
>
>> bq. in an noSQL db such as Hbase
>>
>> +1 :-)
>>
>>
>> On Fri, Jan 8, 2016 at 6:25 PM, ayan guha <gu...@gmail.com> wrote:
>>
>>> One option you may want to explore is writing event table in an noSQL db
>>> such as Hbase. One inherent problem in your approach is you always need to
>>> load either full data set or a defined number of partitions to see if the
>>> event has already come (and no gurantee it is full proof, but lead to
>>> unnecessary loading in most cases).
>>>
>>> On Sat, Jan 9, 2016 at 12:56 PM, Gavin Yue <yu...@gmail.com>
>>> wrote:
>>>
>>>> Hey,
>>>> Thank you for the answer. I checked the setting you mentioend they are
>>>> all correct.  I noticed that in the job, there are always only 200 reducers
>>>> for shuffle read, I believe it is setting in the sql shuffle parallism.
>>>>
>>>> In the doc, it mentions:
>>>>
>>>>    - Automatically determine the number of reducers for joins and
>>>>    groupbys: Currently in Spark SQL, you need to control the degree of
>>>>    parallelism post-shuffle using “SET
>>>>    spark.sql.shuffle.partitions=[num_tasks];”.
>>>>
>>>>
>>>> What would be the ideal number for this setting? Is it based on the
>>>> hardware of cluster?
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Gavin
>>>>
>>>> On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang <be...@gmail.com>
>>>> wrote:
>>>>
>>>>>
>>>>>    - I assume your parquet files are compressed. Gzip or Snappy?
>>>>>    - What spark version did you use? It seems at least 1.4. If you
>>>>>    use spark-sql and tungsten, you might have better performance. but spark
>>>>>    1.5.2 gave me a wrong result when the data was about 300~400GB, just for a
>>>>>    simple group-by and aggregate.
>>>>>    - Did you use kyro serialization?
>>>>>    - you should have spark.shuffle.compress=true, verify it.
>>>>>    - How many tasks did you use? spark.default.parallelism=?
>>>>>    - What about this:
>>>>>       - Read the data day by day
>>>>>       - compute a bucket id from timestamp, e.g., the date and hour
>>>>>       - Write into different buckets (you probably need a special
>>>>>       writer to write data efficiently without shuffling the data).
>>>>>       - distinct for each bucket. Because each bucket is small, spark
>>>>>       can get it done faster than having everything in one run.
>>>>>       - I think using groupBy (userId, timestamp) might be better
>>>>>       than distinct. I guess distinct() will compare every field.
>>>>>
>>>>>
>>>>> On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue <yu...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> And the most frequent operation I am gonna do is find the UserID who
>>>>>> have some events, then retrieve all the events associted with the UserID.
>>>>>>
>>>>>> In this case, how should I partition to speed up the process?
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>> On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue <yu...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> hey Ted,
>>>>>>>
>>>>>>> Event table is like this: UserID, EventType, EventKey, TimeStamp,
>>>>>>> MetaData.  I just parse it from Json and save as Parquet, did not change
>>>>>>> the partition.
>>>>>>>
>>>>>>> Annoyingly, every day's incoming Event data having duplicates among
>>>>>>> each other.  One same event could show up in Day1 and Day2 and probably
>>>>>>> Day3.
>>>>>>>
>>>>>>> I only want to keep single Event table and each day it come so many
>>>>>>> duplicates.
>>>>>>>
>>>>>>> Is there a way I could just insert into Parquet and if duplicate
>>>>>>> found, just ignore?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Gavin
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu <yu...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Is your Parquet data source partitioned by date ?
>>>>>>>>
>>>>>>>> Can you dedup within partitions ?
>>>>>>>>
>>>>>>>> Cheers
>>>>>>>>
>>>>>>>> On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue <yu...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I tried on Three day's data.  The total input is only 980GB, but
>>>>>>>>> the shuffle write Data is about 6.2TB, then the job failed during shuffle
>>>>>>>>> read step, which should be another 6.2TB shuffle read.
>>>>>>>>>
>>>>>>>>> I think to Dedup, the shuffling can not be avoided. Is there
>>>>>>>>> anything I could do to stablize this process?
>>>>>>>>>
>>>>>>>>> Thanks.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue <yu...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hey,
>>>>>>>>>>
>>>>>>>>>> I got everyday's Event table and want to merge them into a single
>>>>>>>>>> Event table. But there so many duplicates among each day's data.
>>>>>>>>>>
>>>>>>>>>> I use Parquet as the data source.  What I am doing now is
>>>>>>>>>>
>>>>>>>>>> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new
>>>>>>>>>> parquet file").
>>>>>>>>>>
>>>>>>>>>> Each day's Event is stored in their own Parquet file
>>>>>>>>>>
>>>>>>>>>> But it failed at the stage2 which keeps losing connection to one
>>>>>>>>>> executor. I guess this is due to the memory issue.
>>>>>>>>>>
>>>>>>>>>> Any suggestion how I do this efficiently?
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Gavin
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>>
>

Re: How to merge two large table and remove duplicates?

Posted by Gavin Yue <yu...@gmail.com>.
I used to maintain a HBase cluster. The experience with it was not happy.

I just tried query the data  from each day's first and dedup with smaller
set, the performance is acceptable.  So I guess I will use this method.

Again, could anyone give advice about:

   - Automatically determine the number of reducers for joins and groupbys:
   Currently in Spark SQL, you need to control the degree of parallelism
   post-shuffle using “SET spark.sql.shuffle.partitions=[num_tasks];”.

Thanks.

Gavin




On Fri, Jan 8, 2016 at 6:25 PM, Ted Yu <yu...@gmail.com> wrote:

> bq. in an noSQL db such as Hbase
>
> +1 :-)
>
>
> On Fri, Jan 8, 2016 at 6:25 PM, ayan guha <gu...@gmail.com> wrote:
>
>> One option you may want to explore is writing event table in an noSQL db
>> such as Hbase. One inherent problem in your approach is you always need to
>> load either full data set or a defined number of partitions to see if the
>> event has already come (and no gurantee it is full proof, but lead to
>> unnecessary loading in most cases).
>>
>> On Sat, Jan 9, 2016 at 12:56 PM, Gavin Yue <yu...@gmail.com>
>> wrote:
>>
>>> Hey,
>>> Thank you for the answer. I checked the setting you mentioend they are
>>> all correct.  I noticed that in the job, there are always only 200 reducers
>>> for shuffle read, I believe it is setting in the sql shuffle parallism.
>>>
>>> In the doc, it mentions:
>>>
>>>    - Automatically determine the number of reducers for joins and
>>>    groupbys: Currently in Spark SQL, you need to control the degree of
>>>    parallelism post-shuffle using “SET
>>>    spark.sql.shuffle.partitions=[num_tasks];”.
>>>
>>>
>>> What would be the ideal number for this setting? Is it based on the
>>> hardware of cluster?
>>>
>>>
>>> Thanks,
>>>
>>> Gavin
>>>
>>> On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang <be...@gmail.com>
>>> wrote:
>>>
>>>>
>>>>    - I assume your parquet files are compressed. Gzip or Snappy?
>>>>    - What spark version did you use? It seems at least 1.4. If you use
>>>>    spark-sql and tungsten, you might have better performance. but spark 1.5.2
>>>>    gave me a wrong result when the data was about 300~400GB, just for a simple
>>>>    group-by and aggregate.
>>>>    - Did you use kyro serialization?
>>>>    - you should have spark.shuffle.compress=true, verify it.
>>>>    - How many tasks did you use? spark.default.parallelism=?
>>>>    - What about this:
>>>>       - Read the data day by day
>>>>       - compute a bucket id from timestamp, e.g., the date and hour
>>>>       - Write into different buckets (you probably need a special
>>>>       writer to write data efficiently without shuffling the data).
>>>>       - distinct for each bucket. Because each bucket is small, spark
>>>>       can get it done faster than having everything in one run.
>>>>       - I think using groupBy (userId, timestamp) might be better than
>>>>       distinct. I guess distinct() will compare every field.
>>>>
>>>>
>>>> On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue <yu...@gmail.com>
>>>> wrote:
>>>>
>>>>> And the most frequent operation I am gonna do is find the UserID who
>>>>> have some events, then retrieve all the events associted with the UserID.
>>>>>
>>>>> In this case, how should I partition to speed up the process?
>>>>>
>>>>> Thanks.
>>>>>
>>>>> On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue <yu...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> hey Ted,
>>>>>>
>>>>>> Event table is like this: UserID, EventType, EventKey, TimeStamp,
>>>>>> MetaData.  I just parse it from Json and save as Parquet, did not change
>>>>>> the partition.
>>>>>>
>>>>>> Annoyingly, every day's incoming Event data having duplicates among
>>>>>> each other.  One same event could show up in Day1 and Day2 and probably
>>>>>> Day3.
>>>>>>
>>>>>> I only want to keep single Event table and each day it come so many
>>>>>> duplicates.
>>>>>>
>>>>>> Is there a way I could just insert into Parquet and if duplicate
>>>>>> found, just ignore?
>>>>>>
>>>>>> Thanks,
>>>>>> Gavin
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu <yu...@gmail.com> wrote:
>>>>>>
>>>>>>> Is your Parquet data source partitioned by date ?
>>>>>>>
>>>>>>> Can you dedup within partitions ?
>>>>>>>
>>>>>>> Cheers
>>>>>>>
>>>>>>> On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue <yu...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I tried on Three day's data.  The total input is only 980GB, but
>>>>>>>> the shuffle write Data is about 6.2TB, then the job failed during shuffle
>>>>>>>> read step, which should be another 6.2TB shuffle read.
>>>>>>>>
>>>>>>>> I think to Dedup, the shuffling can not be avoided. Is there
>>>>>>>> anything I could do to stablize this process?
>>>>>>>>
>>>>>>>> Thanks.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue <yu...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hey,
>>>>>>>>>
>>>>>>>>> I got everyday's Event table and want to merge them into a single
>>>>>>>>> Event table. But there so many duplicates among each day's data.
>>>>>>>>>
>>>>>>>>> I use Parquet as the data source.  What I am doing now is
>>>>>>>>>
>>>>>>>>> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new
>>>>>>>>> parquet file").
>>>>>>>>>
>>>>>>>>> Each day's Event is stored in their own Parquet file
>>>>>>>>>
>>>>>>>>> But it failed at the stage2 which keeps losing connection to one
>>>>>>>>> executor. I guess this is due to the memory issue.
>>>>>>>>>
>>>>>>>>> Any suggestion how I do this efficiently?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Gavin
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>

Re: How to merge two large table and remove duplicates?

Posted by Ted Yu <yu...@gmail.com>.
bq. in an noSQL db such as Hbase

+1 :-)


On Fri, Jan 8, 2016 at 6:25 PM, ayan guha <gu...@gmail.com> wrote:

> One option you may want to explore is writing event table in an noSQL db
> such as Hbase. One inherent problem in your approach is you always need to
> load either full data set or a defined number of partitions to see if the
> event has already come (and no gurantee it is full proof, but lead to
> unnecessary loading in most cases).
>
> On Sat, Jan 9, 2016 at 12:56 PM, Gavin Yue <yu...@gmail.com> wrote:
>
>> Hey,
>> Thank you for the answer. I checked the setting you mentioend they are
>> all correct.  I noticed that in the job, there are always only 200 reducers
>> for shuffle read, I believe it is setting in the sql shuffle parallism.
>>
>> In the doc, it mentions:
>>
>>    - Automatically determine the number of reducers for joins and
>>    groupbys: Currently in Spark SQL, you need to control the degree of
>>    parallelism post-shuffle using “SET
>>    spark.sql.shuffle.partitions=[num_tasks];”.
>>
>>
>> What would be the ideal number for this setting? Is it based on the
>> hardware of cluster?
>>
>>
>> Thanks,
>>
>> Gavin
>>
>> On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang <be...@gmail.com> wrote:
>>
>>>
>>>    - I assume your parquet files are compressed. Gzip or Snappy?
>>>    - What spark version did you use? It seems at least 1.4. If you use
>>>    spark-sql and tungsten, you might have better performance. but spark 1.5.2
>>>    gave me a wrong result when the data was about 300~400GB, just for a simple
>>>    group-by and aggregate.
>>>    - Did you use kyro serialization?
>>>    - you should have spark.shuffle.compress=true, verify it.
>>>    - How many tasks did you use? spark.default.parallelism=?
>>>    - What about this:
>>>       - Read the data day by day
>>>       - compute a bucket id from timestamp, e.g., the date and hour
>>>       - Write into different buckets (you probably need a special
>>>       writer to write data efficiently without shuffling the data).
>>>       - distinct for each bucket. Because each bucket is small, spark
>>>       can get it done faster than having everything in one run.
>>>       - I think using groupBy (userId, timestamp) might be better than
>>>       distinct. I guess distinct() will compare every field.
>>>
>>>
>>> On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue <yu...@gmail.com>
>>> wrote:
>>>
>>>> And the most frequent operation I am gonna do is find the UserID who
>>>> have some events, then retrieve all the events associted with the UserID.
>>>>
>>>> In this case, how should I partition to speed up the process?
>>>>
>>>> Thanks.
>>>>
>>>> On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue <yu...@gmail.com>
>>>> wrote:
>>>>
>>>>> hey Ted,
>>>>>
>>>>> Event table is like this: UserID, EventType, EventKey, TimeStamp,
>>>>> MetaData.  I just parse it from Json and save as Parquet, did not change
>>>>> the partition.
>>>>>
>>>>> Annoyingly, every day's incoming Event data having duplicates among
>>>>> each other.  One same event could show up in Day1 and Day2 and probably
>>>>> Day3.
>>>>>
>>>>> I only want to keep single Event table and each day it come so many
>>>>> duplicates.
>>>>>
>>>>> Is there a way I could just insert into Parquet and if duplicate
>>>>> found, just ignore?
>>>>>
>>>>> Thanks,
>>>>> Gavin
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu <yu...@gmail.com> wrote:
>>>>>
>>>>>> Is your Parquet data source partitioned by date ?
>>>>>>
>>>>>> Can you dedup within partitions ?
>>>>>>
>>>>>> Cheers
>>>>>>
>>>>>> On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue <yu...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I tried on Three day's data.  The total input is only 980GB, but the
>>>>>>> shuffle write Data is about 6.2TB, then the job failed during shuffle read
>>>>>>> step, which should be another 6.2TB shuffle read.
>>>>>>>
>>>>>>> I think to Dedup, the shuffling can not be avoided. Is there
>>>>>>> anything I could do to stablize this process?
>>>>>>>
>>>>>>> Thanks.
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue <yu...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hey,
>>>>>>>>
>>>>>>>> I got everyday's Event table and want to merge them into a single
>>>>>>>> Event table. But there so many duplicates among each day's data.
>>>>>>>>
>>>>>>>> I use Parquet as the data source.  What I am doing now is
>>>>>>>>
>>>>>>>> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new
>>>>>>>> parquet file").
>>>>>>>>
>>>>>>>> Each day's Event is stored in their own Parquet file
>>>>>>>>
>>>>>>>> But it failed at the stage2 which keeps losing connection to one
>>>>>>>> executor. I guess this is due to the memory issue.
>>>>>>>>
>>>>>>>> Any suggestion how I do this efficiently?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Gavin
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>

Re: How to merge two large table and remove duplicates?

Posted by ayan guha <gu...@gmail.com>.
One option you may want to explore is writing event table in an noSQL db
such as Hbase. One inherent problem in your approach is you always need to
load either full data set or a defined number of partitions to see if the
event has already come (and no gurantee it is full proof, but lead to
unnecessary loading in most cases).

On Sat, Jan 9, 2016 at 12:56 PM, Gavin Yue <yu...@gmail.com> wrote:

> Hey,
> Thank you for the answer. I checked the setting you mentioend they are all
> correct.  I noticed that in the job, there are always only 200 reducers for
> shuffle read, I believe it is setting in the sql shuffle parallism.
>
> In the doc, it mentions:
>
>    - Automatically determine the number of reducers for joins and
>    groupbys: Currently in Spark SQL, you need to control the degree of
>    parallelism post-shuffle using “SET
>    spark.sql.shuffle.partitions=[num_tasks];”.
>
>
> What would be the ideal number for this setting? Is it based on the
> hardware of cluster?
>
>
> Thanks,
>
> Gavin
>
> On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang <be...@gmail.com> wrote:
>
>>
>>    - I assume your parquet files are compressed. Gzip or Snappy?
>>    - What spark version did you use? It seems at least 1.4. If you use
>>    spark-sql and tungsten, you might have better performance. but spark 1.5.2
>>    gave me a wrong result when the data was about 300~400GB, just for a simple
>>    group-by and aggregate.
>>    - Did you use kyro serialization?
>>    - you should have spark.shuffle.compress=true, verify it.
>>    - How many tasks did you use? spark.default.parallelism=?
>>    - What about this:
>>       - Read the data day by day
>>       - compute a bucket id from timestamp, e.g., the date and hour
>>       - Write into different buckets (you probably need a special writer
>>       to write data efficiently without shuffling the data).
>>       - distinct for each bucket. Because each bucket is small, spark
>>       can get it done faster than having everything in one run.
>>       - I think using groupBy (userId, timestamp) might be better than
>>       distinct. I guess distinct() will compare every field.
>>
>>
>> On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue <yu...@gmail.com> wrote:
>>
>>> And the most frequent operation I am gonna do is find the UserID who
>>> have some events, then retrieve all the events associted with the UserID.
>>>
>>> In this case, how should I partition to speed up the process?
>>>
>>> Thanks.
>>>
>>> On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue <yu...@gmail.com>
>>> wrote:
>>>
>>>> hey Ted,
>>>>
>>>> Event table is like this: UserID, EventType, EventKey, TimeStamp,
>>>> MetaData.  I just parse it from Json and save as Parquet, did not change
>>>> the partition.
>>>>
>>>> Annoyingly, every day's incoming Event data having duplicates among
>>>> each other.  One same event could show up in Day1 and Day2 and probably
>>>> Day3.
>>>>
>>>> I only want to keep single Event table and each day it come so many
>>>> duplicates.
>>>>
>>>> Is there a way I could just insert into Parquet and if duplicate found,
>>>> just ignore?
>>>>
>>>> Thanks,
>>>> Gavin
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu <yu...@gmail.com> wrote:
>>>>
>>>>> Is your Parquet data source partitioned by date ?
>>>>>
>>>>> Can you dedup within partitions ?
>>>>>
>>>>> Cheers
>>>>>
>>>>> On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue <yu...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I tried on Three day's data.  The total input is only 980GB, but the
>>>>>> shuffle write Data is about 6.2TB, then the job failed during shuffle read
>>>>>> step, which should be another 6.2TB shuffle read.
>>>>>>
>>>>>> I think to Dedup, the shuffling can not be avoided. Is there anything
>>>>>> I could do to stablize this process?
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>>
>>>>>> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue <yu...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hey,
>>>>>>>
>>>>>>> I got everyday's Event table and want to merge them into a single
>>>>>>> Event table. But there so many duplicates among each day's data.
>>>>>>>
>>>>>>> I use Parquet as the data source.  What I am doing now is
>>>>>>>
>>>>>>> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new
>>>>>>> parquet file").
>>>>>>>
>>>>>>> Each day's Event is stored in their own Parquet file
>>>>>>>
>>>>>>> But it failed at the stage2 which keeps losing connection to one
>>>>>>> executor. I guess this is due to the memory issue.
>>>>>>>
>>>>>>> Any suggestion how I do this efficiently?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Gavin
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


-- 
Best Regards,
Ayan Guha

Re: How to merge two large table and remove duplicates?

Posted by Gavin Yue <yu...@gmail.com>.
Hey,
Thank you for the answer. I checked the setting you mentioend they are all
correct.  I noticed that in the job, there are always only 200 reducers for
shuffle read, I believe it is setting in the sql shuffle parallism.

In the doc, it mentions:

   - Automatically determine the number of reducers for joins and groupbys:
   Currently in Spark SQL, you need to control the degree of parallelism
   post-shuffle using “SET spark.sql.shuffle.partitions=[num_tasks];”.


What would be the ideal number for this setting? Is it based on the
hardware of cluster?


Thanks,

Gavin

On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang <be...@gmail.com> wrote:

>
>    - I assume your parquet files are compressed. Gzip or Snappy?
>    - What spark version did you use? It seems at least 1.4. If you use
>    spark-sql and tungsten, you might have better performance. but spark 1.5.2
>    gave me a wrong result when the data was about 300~400GB, just for a simple
>    group-by and aggregate.
>    - Did you use kyro serialization?
>    - you should have spark.shuffle.compress=true, verify it.
>    - How many tasks did you use? spark.default.parallelism=?
>    - What about this:
>       - Read the data day by day
>       - compute a bucket id from timestamp, e.g., the date and hour
>       - Write into different buckets (you probably need a special writer
>       to write data efficiently without shuffling the data).
>       - distinct for each bucket. Because each bucket is small, spark can
>       get it done faster than having everything in one run.
>       - I think using groupBy (userId, timestamp) might be better than
>       distinct. I guess distinct() will compare every field.
>
>
> On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue <yu...@gmail.com> wrote:
>
>> And the most frequent operation I am gonna do is find the UserID who have
>> some events, then retrieve all the events associted with the UserID.
>>
>> In this case, how should I partition to speed up the process?
>>
>> Thanks.
>>
>> On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue <yu...@gmail.com> wrote:
>>
>>> hey Ted,
>>>
>>> Event table is like this: UserID, EventType, EventKey, TimeStamp,
>>> MetaData.  I just parse it from Json and save as Parquet, did not change
>>> the partition.
>>>
>>> Annoyingly, every day's incoming Event data having duplicates among each
>>> other.  One same event could show up in Day1 and Day2 and probably Day3.
>>>
>>> I only want to keep single Event table and each day it come so many
>>> duplicates.
>>>
>>> Is there a way I could just insert into Parquet and if duplicate found,
>>> just ignore?
>>>
>>> Thanks,
>>> Gavin
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu <yu...@gmail.com> wrote:
>>>
>>>> Is your Parquet data source partitioned by date ?
>>>>
>>>> Can you dedup within partitions ?
>>>>
>>>> Cheers
>>>>
>>>> On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue <yu...@gmail.com>
>>>> wrote:
>>>>
>>>>> I tried on Three day's data.  The total input is only 980GB, but the
>>>>> shuffle write Data is about 6.2TB, then the job failed during shuffle read
>>>>> step, which should be another 6.2TB shuffle read.
>>>>>
>>>>> I think to Dedup, the shuffling can not be avoided. Is there anything
>>>>> I could do to stablize this process?
>>>>>
>>>>> Thanks.
>>>>>
>>>>>
>>>>> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue <yu...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hey,
>>>>>>
>>>>>> I got everyday's Event table and want to merge them into a single
>>>>>> Event table. But there so many duplicates among each day's data.
>>>>>>
>>>>>> I use Parquet as the data source.  What I am doing now is
>>>>>>
>>>>>> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new parquet
>>>>>> file").
>>>>>>
>>>>>> Each day's Event is stored in their own Parquet file
>>>>>>
>>>>>> But it failed at the stage2 which keeps losing connection to one
>>>>>> executor. I guess this is due to the memory issue.
>>>>>>
>>>>>> Any suggestion how I do this efficiently?
>>>>>>
>>>>>> Thanks,
>>>>>> Gavin
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: How to merge two large table and remove duplicates?

Posted by Ted Yu <yu...@gmail.com>.
Benyi:

bq. spark 1.5.2 gave me a wrong result when the data was about 300~400GB,
just for a simple group-by and aggregate

Can you reproduce the above using Spark 1.6.0 ?

Thanks

On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang <be...@gmail.com> wrote:

>
>    - I assume your parquet files are compressed. Gzip or Snappy?
>    - What spark version did you use? It seems at least 1.4. If you use
>    spark-sql and tungsten, you might have better performance. but spark 1.5.2
>    gave me a wrong result when the data was about 300~400GB, just for a simple
>    group-by and aggregate.
>    - Did you use kyro serialization?
>    - you should have spark.shuffle.compress=true, verify it.
>    - How many tasks did you use? spark.default.parallelism=?
>    - What about this:
>       - Read the data day by day
>       - compute a bucket id from timestamp, e.g., the date and hour
>       - Write into different buckets (you probably need a special writer
>       to write data efficiently without shuffling the data).
>       - distinct for each bucket. Because each bucket is small, spark can
>       get it done faster than having everything in one run.
>       - I think using groupBy (userId, timestamp) might be better than
>       distinct. I guess distinct() will compare every field.
>
>
> On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue <yu...@gmail.com> wrote:
>
>> And the most frequent operation I am gonna do is find the UserID who have
>> some events, then retrieve all the events associted with the UserID.
>>
>> In this case, how should I partition to speed up the process?
>>
>> Thanks.
>>
>> On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue <yu...@gmail.com> wrote:
>>
>>> hey Ted,
>>>
>>> Event table is like this: UserID, EventType, EventKey, TimeStamp,
>>> MetaData.  I just parse it from Json and save as Parquet, did not change
>>> the partition.
>>>
>>> Annoyingly, every day's incoming Event data having duplicates among each
>>> other.  One same event could show up in Day1 and Day2 and probably Day3.
>>>
>>> I only want to keep single Event table and each day it come so many
>>> duplicates.
>>>
>>> Is there a way I could just insert into Parquet and if duplicate found,
>>> just ignore?
>>>
>>> Thanks,
>>> Gavin
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu <yu...@gmail.com> wrote:
>>>
>>>> Is your Parquet data source partitioned by date ?
>>>>
>>>> Can you dedup within partitions ?
>>>>
>>>> Cheers
>>>>
>>>> On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue <yu...@gmail.com>
>>>> wrote:
>>>>
>>>>> I tried on Three day's data.  The total input is only 980GB, but the
>>>>> shuffle write Data is about 6.2TB, then the job failed during shuffle read
>>>>> step, which should be another 6.2TB shuffle read.
>>>>>
>>>>> I think to Dedup, the shuffling can not be avoided. Is there anything
>>>>> I could do to stablize this process?
>>>>>
>>>>> Thanks.
>>>>>
>>>>>
>>>>> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue <yu...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hey,
>>>>>>
>>>>>> I got everyday's Event table and want to merge them into a single
>>>>>> Event table. But there so many duplicates among each day's data.
>>>>>>
>>>>>> I use Parquet as the data source.  What I am doing now is
>>>>>>
>>>>>> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new parquet
>>>>>> file").
>>>>>>
>>>>>> Each day's Event is stored in their own Parquet file
>>>>>>
>>>>>> But it failed at the stage2 which keeps losing connection to one
>>>>>> executor. I guess this is due to the memory issue.
>>>>>>
>>>>>> Any suggestion how I do this efficiently?
>>>>>>
>>>>>> Thanks,
>>>>>> Gavin
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: How to merge two large table and remove duplicates?

Posted by Benyi Wang <be...@gmail.com>.
   - I assume your parquet files are compressed. Gzip or Snappy?
   - What spark version did you use? It seems at least 1.4. If you use
   spark-sql and tungsten, you might have better performance. but spark 1.5.2
   gave me a wrong result when the data was about 300~400GB, just for a simple
   group-by and aggregate.
   - Did you use kyro serialization?
   - you should have spark.shuffle.compress=true, verify it.
   - How many tasks did you use? spark.default.parallelism=?
   - What about this:
      - Read the data day by day
      - compute a bucket id from timestamp, e.g., the date and hour
      - Write into different buckets (you probably need a special writer to
      write data efficiently without shuffling the data).
      - distinct for each bucket. Because each bucket is small, spark can
      get it done faster than having everything in one run.
      - I think using groupBy (userId, timestamp) might be better than
      distinct. I guess distinct() will compare every field.


On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue <yu...@gmail.com> wrote:

> And the most frequent operation I am gonna do is find the UserID who have
> some events, then retrieve all the events associted with the UserID.
>
> In this case, how should I partition to speed up the process?
>
> Thanks.
>
> On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue <yu...@gmail.com> wrote:
>
>> hey Ted,
>>
>> Event table is like this: UserID, EventType, EventKey, TimeStamp,
>> MetaData.  I just parse it from Json and save as Parquet, did not change
>> the partition.
>>
>> Annoyingly, every day's incoming Event data having duplicates among each
>> other.  One same event could show up in Day1 and Day2 and probably Day3.
>>
>> I only want to keep single Event table and each day it come so many
>> duplicates.
>>
>> Is there a way I could just insert into Parquet and if duplicate found,
>> just ignore?
>>
>> Thanks,
>> Gavin
>>
>>
>>
>>
>>
>>
>>
>> On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu <yu...@gmail.com> wrote:
>>
>>> Is your Parquet data source partitioned by date ?
>>>
>>> Can you dedup within partitions ?
>>>
>>> Cheers
>>>
>>> On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue <yu...@gmail.com>
>>> wrote:
>>>
>>>> I tried on Three day's data.  The total input is only 980GB, but the
>>>> shuffle write Data is about 6.2TB, then the job failed during shuffle read
>>>> step, which should be another 6.2TB shuffle read.
>>>>
>>>> I think to Dedup, the shuffling can not be avoided. Is there anything I
>>>> could do to stablize this process?
>>>>
>>>> Thanks.
>>>>
>>>>
>>>> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue <yu...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hey,
>>>>>
>>>>> I got everyday's Event table and want to merge them into a single
>>>>> Event table. But there so many duplicates among each day's data.
>>>>>
>>>>> I use Parquet as the data source.  What I am doing now is
>>>>>
>>>>> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new parquet
>>>>> file").
>>>>>
>>>>> Each day's Event is stored in their own Parquet file
>>>>>
>>>>> But it failed at the stage2 which keeps losing connection to one
>>>>> executor. I guess this is due to the memory issue.
>>>>>
>>>>> Any suggestion how I do this efficiently?
>>>>>
>>>>> Thanks,
>>>>> Gavin
>>>>>
>>>>
>>>>
>>>
>>
>

Re: How to merge two large table and remove duplicates?

Posted by Gavin Yue <yu...@gmail.com>.
And the most frequent operation I am gonna do is find the UserID who have
some events, then retrieve all the events associted with the UserID.

In this case, how should I partition to speed up the process?

Thanks.

On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue <yu...@gmail.com> wrote:

> hey Ted,
>
> Event table is like this: UserID, EventType, EventKey, TimeStamp,
> MetaData.  I just parse it from Json and save as Parquet, did not change
> the partition.
>
> Annoyingly, every day's incoming Event data having duplicates among each
> other.  One same event could show up in Day1 and Day2 and probably Day3.
>
> I only want to keep single Event table and each day it come so many
> duplicates.
>
> Is there a way I could just insert into Parquet and if duplicate found,
> just ignore?
>
> Thanks,
> Gavin
>
>
>
>
>
>
>
> On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu <yu...@gmail.com> wrote:
>
>> Is your Parquet data source partitioned by date ?
>>
>> Can you dedup within partitions ?
>>
>> Cheers
>>
>> On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue <yu...@gmail.com> wrote:
>>
>>> I tried on Three day's data.  The total input is only 980GB, but the
>>> shuffle write Data is about 6.2TB, then the job failed during shuffle read
>>> step, which should be another 6.2TB shuffle read.
>>>
>>> I think to Dedup, the shuffling can not be avoided. Is there anything I
>>> could do to stablize this process?
>>>
>>> Thanks.
>>>
>>>
>>> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue <yu...@gmail.com>
>>> wrote:
>>>
>>>> Hey,
>>>>
>>>> I got everyday's Event table and want to merge them into a single Event
>>>> table. But there so many duplicates among each day's data.
>>>>
>>>> I use Parquet as the data source.  What I am doing now is
>>>>
>>>> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new parquet
>>>> file").
>>>>
>>>> Each day's Event is stored in their own Parquet file
>>>>
>>>> But it failed at the stage2 which keeps losing connection to one
>>>> executor. I guess this is due to the memory issue.
>>>>
>>>> Any suggestion how I do this efficiently?
>>>>
>>>> Thanks,
>>>> Gavin
>>>>
>>>
>>>
>>
>

Re: How to merge two large table and remove duplicates?

Posted by Gavin Yue <yu...@gmail.com>.
hey Ted,

Event table is like this: UserID, EventType, EventKey, TimeStamp,
MetaData.  I just parse it from Json and save as Parquet, did not change
the partition.

Annoyingly, every day's incoming Event data having duplicates among each
other.  One same event could show up in Day1 and Day2 and probably Day3.

I only want to keep single Event table and each day it come so many
duplicates.

Is there a way I could just insert into Parquet and if duplicate found,
just ignore?

Thanks,
Gavin







On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu <yu...@gmail.com> wrote:

> Is your Parquet data source partitioned by date ?
>
> Can you dedup within partitions ?
>
> Cheers
>
> On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue <yu...@gmail.com> wrote:
>
>> I tried on Three day's data.  The total input is only 980GB, but the
>> shuffle write Data is about 6.2TB, then the job failed during shuffle read
>> step, which should be another 6.2TB shuffle read.
>>
>> I think to Dedup, the shuffling can not be avoided. Is there anything I
>> could do to stablize this process?
>>
>> Thanks.
>>
>>
>> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue <yu...@gmail.com> wrote:
>>
>>> Hey,
>>>
>>> I got everyday's Event table and want to merge them into a single Event
>>> table. But there so many duplicates among each day's data.
>>>
>>> I use Parquet as the data source.  What I am doing now is
>>>
>>> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new parquet
>>> file").
>>>
>>> Each day's Event is stored in their own Parquet file
>>>
>>> But it failed at the stage2 which keeps losing connection to one
>>> executor. I guess this is due to the memory issue.
>>>
>>> Any suggestion how I do this efficiently?
>>>
>>> Thanks,
>>> Gavin
>>>
>>
>>
>

Re: How to merge two large table and remove duplicates?

Posted by Ted Yu <yu...@gmail.com>.
Is your Parquet data source partitioned by date ?

Can you dedup within partitions ?

Cheers

On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue <yu...@gmail.com> wrote:

> I tried on Three day's data.  The total input is only 980GB, but the
> shuffle write Data is about 6.2TB, then the job failed during shuffle read
> step, which should be another 6.2TB shuffle read.
>
> I think to Dedup, the shuffling can not be avoided. Is there anything I
> could do to stablize this process?
>
> Thanks.
>
>
> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue <yu...@gmail.com> wrote:
>
>> Hey,
>>
>> I got everyday's Event table and want to merge them into a single Event
>> table. But there so many duplicates among each day's data.
>>
>> I use Parquet as the data source.  What I am doing now is
>>
>> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new parquet
>> file").
>>
>> Each day's Event is stored in their own Parquet file
>>
>> But it failed at the stage2 which keeps losing connection to one
>> executor. I guess this is due to the memory issue.
>>
>> Any suggestion how I do this efficiently?
>>
>> Thanks,
>> Gavin
>>
>
>

Re: How to merge two large table and remove duplicates?

Posted by Gavin Yue <yu...@gmail.com>.
I tried on Three day's data.  The total input is only 980GB, but the
shuffle write Data is about 6.2TB, then the job failed during shuffle read
step, which should be another 6.2TB shuffle read.

I think to Dedup, the shuffling can not be avoided. Is there anything I
could do to stablize this process?

Thanks.


On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue <yu...@gmail.com> wrote:

> Hey,
>
> I got everyday's Event table and want to merge them into a single Event
> table. But there so many duplicates among each day's data.
>
> I use Parquet as the data source.  What I am doing now is
>
> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new parquet
> file").
>
> Each day's Event is stored in their own Parquet file
>
> But it failed at the stage2 which keeps losing connection to one executor.
> I guess this is due to the memory issue.
>
> Any suggestion how I do this efficiently?
>
> Thanks,
> Gavin
>