You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@kudu.apache.org by Дмитрий Павлов <dm...@inbox.ru> on 2018/11/15 09:02:51 UTC

Re[2]: How to load kudu RDD with correct partitioner

Hi Grant

I really appreciate for your reply

Let me explain my case with Kudu and Spark.  We a using Kudu with Spark in production to aggregate current and historical records.
 
So i have 2 tables:

Table A - which have following partition  schema:
HASH (record_id) PARTITIONS N,
RANGE (meet_date) (
    PARTITION 2018-11-15T00:00:00.000000Z <= VALUES < 2018-11-16T00:00:00.000000Z,
    PARTITION 2018-11-16T00:00:00.000000Z <= VALUES < 2018-11-17T00:00:00.000000Z,
    PARTITION 2018-11-17T00:00:00.000000Z <= VALUES < 2018-11-18T00:00:00.000000Z
) Table B 
HASH (record_id) PARTITIONS N,
RANGE (record_id) (
    PARTITION UNBOUNDED
) Table B contains historical information and Table contains current information flow splited by days. History table is big enough (~ 4 000 000 000 records)  

So i expect that records with same record_id from Table A and Table B will be stored on same kudu nodes.  (May be this expectation is unreasonable) Please correct me if i'm wrong.

Next i try to do following

RDDA = kuduRDD(Table A)
RDDB = kuduRDD(Table B)

RDDA join RDDB by record_id 

And here i do not expect any shuffling but it does. Spark knows about partition locality (getPrefferedLocation returns correct list of kudu locations). 
but looks like it does not know about original Kudu partitioning. So i have to add rdd.partitionBy(new HashPartitioner(N)) before join operation
and it leads to shuffling  
Regards, Dmitry

>Среда, 14 ноября 2018, 18:58 +03:00 от Grant Henke <gh...@cloudera.com>:
>
>Unfortunately, I am not sure of a simple way to provide the partitioner information with the existing implementation. Currently the KuduRDD does not override the  RDD partitioner , though it probably could as an improvement. 
>
>Would you like to file a Kudu jira to track the work? Would you be interested in contributing the improvement? 
>
>I am curious to know, how are you planning to use the knowledge of the original Kudu partitioning and how is it useful to your Spark workflow?
>
>Thanks,
>Grant
>
>
>
>On Wed, Nov 14, 2018 at 2:41 AM Dmitry Pavlov < dm.pavlov@inbox.ru > wrote:
>>Hi guys
>>
>>I have a question about Kudu with Spark.
>>
>>For example there is a table in kudu with field record_id and following partitioning:
>>HASH (record_id) PARTITIONS N
>>
>>Is it possible to load records from such table in key value fashion with correct partitioner information in RDD? For example RDD[(record_id, row)]
>>Because when i try to use kuduRDD in spark the partitioner has None value so im losing information about original (kudu) partitioning.
>>
>>Thanks
>
>-- 
>Grant Henke 
>Software Engineer | Cloudera
>grant@cloudera.com |  twitter.com/gchenke |  linkedin.com/in/granthenke

-- 
Дмитрий Павлов

Re: Re[2]: How to load kudu RDD with correct partitioner

Posted by Grant Henke <gh...@cloudera.com>.
>
> *So i expect that records with same record_id from Table A and Table B
> will be stored on same kudu nodes. *(May be this expectation is
> unreasonable) Please correct me if i'm wrong.
>

This isn't an expectation that Kudu satisfies. Table A and B are completely
separate tables and the tablet in each table that corresponds to a given
record_id is not necessarily co-located on the same tablet server. Even if
they happend to be, it wouldn't be a good idea to depend on it.


On Thu, Nov 15, 2018 at 3:03 AM Дмитрий Павлов <dm...@inbox.ru> wrote:

>
> Hi Grant
>
> I really appreciate for your reply
>
> Let me explain my case with Kudu and Spark.  We a using Kudu with Spark in
> production to aggregate current and historical records.
>
> So i have 2 tables:
>
> *Table A* - which have following partition  schema:
>
> HASH (record_id) PARTITIONS N,
> RANGE (meet_date) (
>     PARTITION 2018-11-15T00:00:00.000000Z <= VALUES < 2018-11-16T00:00:00.000000Z,
>     PARTITION 2018-11-16T00:00:00.000000Z <= VALUES < 2018-11-17T00:00:00.000000Z,
>     PARTITION 2018-11-17T00:00:00.000000Z <= VALUES < 2018-11-18T00:00:00.000000Z
> )
>
>
> *Table B *
>
> HASH (record_id) PARTITIONS N,
> RANGE (record_id) (
>     PARTITION UNBOUNDED
> )
>
> Table B contains historical information and Table contains current
> information flow splited by days. History table is big enough (~ 4 000 000
> 000 records)
>
> *So i expect that records with same record_id from Table A and Table B
> will be stored on same kudu nodes. *(May be this expectation is
> unreasonable) Please correct me if i'm wrong.
>
> Next i try to do following
>
> RDDA = kuduRDD(Table A)
> RDDB = kuduRDD(Table B)
>
> RDDA join RDDB by record_id
>
> And here i do not expect any shuffling but it does. Spark knows about
> partition locality (getPrefferedLocation returns correct list of kudu
> locations).
> but looks like it does not know about original Kudu partitioning. So i
> have to add rdd.partitionBy(new HashPartitioner(N)) before join operation
> and it leads to shuffling
>
> Regards, Dmitry
>
> Среда, 14 ноября 2018, 18:58 +03:00 от Grant Henke <gh...@cloudera.com>:
>
> Unfortunately, I am not sure of a simple way to provide the partitioner
> information with the existing implementation. Currently the KuduRDD does
> not override the RDD partitioner
> <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L141>,
> though it probably could as an improvement.
>
> Would you like to file a Kudu jira to track the work? Would you be
> interested in contributing the improvement?
>
> I am curious to know, how are you planning to use the knowledge of the
> original Kudu partitioning and how is it useful to your Spark workflow?
>
> Thanks,
> Grant
>
>
>
> On Wed, Nov 14, 2018 at 2:41 AM Dmitry Pavlov <dm.pavlov@inbox.ru
> <ht...@inbox.ru>> wrote:
>
> Hi guys
>
> I have a question about Kudu with Spark.
>
> For example there is a table in kudu with field record_id and following
> partitioning:
> HASH (record_id) PARTITIONS N
>
> Is it possible to load records from such table in key value fashion with
> correct partitioner information in RDD? For example RDD[(record_id, row)]
> Because when i try to use kuduRDD in spark the partitioner has None value
> so im losing information about original (kudu) partitioning.
>
> Thanks
>
>
>
> --
> Grant Henke
> Software Engineer | Cloudera
> grant@cloudera.com
> <ht...@cloudera.com> |
> twitter.com/gchenke | linkedin.com/in/granthenke
>
>
>
> --
> Дмитрий Павлов
>


-- 
Grant Henke
Software Engineer | Cloudera
grant@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke