You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Milos Nikolic <mi...@gmail.com> on 2014/08/26 10:01:00 UTC

Partitioning strategy changed in Spark 1.0.x?

Hi guys, 

I’ve noticed some changes in the behavior of partitioning under Spark 1.0.x. 
I’d appreciate if someone could explain what has changed in the meantime. 

Here is a small example. I want to create two RDD[(K, V)] objects and then 
collocate partitions with the same K on one node. When the same partitioner 
for two RDDs is used, partitions with the same K end up being on different nodes.

    // Let's say I have 10 nodes 
    val partitioner = new HashPartitioner(10)     
    
    // Create RDD 
    val rdd = sc.parallelize(0 until 10).map(k => (k, computeValue(k))) 
    
    // Partition twice using the same partitioner 
    rdd.partitionBy(partitioner).foreach { case (k, v) => println("Dummy1 -> k = " + k) } 
    rdd.partitionBy(partitioner).foreach { case (k, v) => println("Dummy2 -> k = " + k) } 

The output on one node is: 
    Dummy1 -> k = 2 
    Dummy2 -> k = 7 

I was expecting to see the same keys on each node. That was happening under Spark 0.9.2, but not under Spark 1.0.x. 

Anyone has an idea what has changed in the meantime? Or how to get corresponding partitions on one node? 

Thanks in advance,
Milos
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Fwd: Partitioning strategy changed in Spark 1.0.x?

Posted by Reynold Xin <rx...@databricks.com>.
Sending the response back to the dev list so this is indexable and
searchable by others.

---------- Forwarded message ----------
From: Milos Nikolic <mi...@gmail.com>
Date: Sat, Aug 30, 2014 at 5:50 PM
Subject: Re: Partitioning strategy changed in Spark 1.0.x?
To: Reynold Xin <rx...@databricks.com>


Thank you, your insights were very helpful, and we managed to find a
solution that works for us.

Best,
Milos


On Aug 27, 2014, at 11:20 PM, Reynold Xin <rx...@databricks.com> wrote:

I don't think you can ever expect the mapping from data to physical nodes
in Spark, even in Spark 0.9. That is because the scheduler needs to be
fault-tolerant. What if the node is busy or the node is down?

What happens is the partitioning of data is deterministic, i.e. certain
data is always hashed into certain partitions (given the same partition
count). And if you don't run foreach twice, but instead simply zip the two
RDDs that are both hash partitioned using the same partitioner, then the
scheduler will not create extra stages.

e.g.

    // Let's say I have 10 nodes
    val partitioner = new HashPartitioner(10)

    // Create RDD
    val rdd = sc.parallelize(0 until 10).map(k => (k, computeValue(k)))

    // Partition twice using the same partitioner
    val p1 = rdd.partitionBy(partitioner)
    val p2 = rdd.partitionBy(partitioner)
    p1.zip(p2)       <--- this should work




On Wed, Aug 27, 2014 at 1:50 PM, Milos Nikolic <mi...@gmail.com>
wrote:

> Sure.
>
> Suppose we have two SQL relations, expressed as two RDDs, and we want to
> do a hash join between them. First, we would partition each RDD on the join
> key — that will collocate partitions with the same join key on one node.
> Then, I would zip corresponding partitions from two relations and do a
> local join on each node.
>
> This approach makes sense only if Spark always places key X on node Y for
> both RDDs, which is not true now. And I have no idea how to circumvent this
> issue with the recent changes in hashing you mentioned.
>
> Milos
>
> On Aug 27, 2014, at 10:05 PM, Reynold Xin <rx...@databricks.com> wrote:
>
> Can you elaborate your problem?
>
> I am not sure if I understand what you mean by "on one node, I get two
> different sets of keys"
>
>
> On Tue, Aug 26, 2014 at 2:16 AM, Milos Nikolic <mi...@gmail.com>
> wrote:
>
>> Hi Reynold,
>>
>> The problem still exists even with more elements. On one node, I get two
>> different
>> sets of keys -- I want these local sets to be the same to be able to zip
>> local partitions
>> together later on (rather than RDD.join them, which involves shuffling).
>>
>> With this recent change in hashing, RDD.zip seems not to be useful
>> anymore
>> as I cannot guarantee anymore that local partitions from two RDDs will
>> share
>> the same set of keys on one node.
>>
>> Do you have any ideas on how to resolve this problem?
>>
>> Thanks,
>> Milos
>>
>>
>>
>> On Aug 26, 2014, at 10:04 AM, Reynold Xin <rx...@databricks.com> wrote:
>>
>> It is better to use a larger number of elements rather than just 10 for
>> this test.
>>
>> Can you try larger? Like 1000 or 10000?
>>
>> IIRC, the hash function changed to murmur hash:
>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala#L205
>>
>>
>>
>>
>>
>>
>> On Tue, Aug 26, 2014 at 1:01 AM, Milos Nikolic <milos.nikolic83@gmail.com
>> > wrote:
>>
>>> Hi guys,
>>>
>>> I’ve noticed some changes in the behavior of partitioning under Spark
>>> 1.0.x.
>>> I’d appreciate if someone could explain what has changed in the meantime.
>>>
>>> Here is a small example. I want to create two RDD[(K, V)] objects and
>>> then
>>> collocate partitions with the same K on one node. When the same
>>> partitioner
>>> for two RDDs is used, partitions with the same K end up being on
>>> different nodes.
>>>
>>>     // Let's say I have 10 nodes
>>>     val partitioner = new HashPartitioner(10)
>>>
>>>     // Create RDD
>>>     val rdd = sc.parallelize(0 until 10).map(k => (k, computeValue(k)))
>>>
>>>     // Partition twice using the same partitioner
>>>     rdd.partitionBy(partitioner).foreach { case (k, v) =>
>>> println("Dummy1 -> k = " + k) }
>>>     rdd.partitionBy(partitioner).foreach { case (k, v) =>
>>> println("Dummy2 -> k = " + k) }
>>>
>>> The output on one node is:
>>>     Dummy1 -> k = 2
>>>     Dummy2 -> k = 7
>>>
>>> I was expecting to see the same keys on each node. That was happening
>>> under Spark 0.9.2, but not under Spark 1.0.x.
>>>
>>> Anyone has an idea what has changed in the meantime? Or how to get
>>> corresponding partitions on one node?
>>>
>>> Thanks in advance,
>>> Milos
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: dev-help@spark.apache.org
>>>
>>>
>>
>>
>
>