You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by qihong <qc...@pivotal.io> on 2014/09/10 05:03:08 UTC

how to setup steady state stream partitions

I'm working on a DStream application.  The input are sensors' measurements, 
the data format is <sensor id><timestamp><measure>

There are 10 thousands sensors, and updateStateByKey is used to maintain
the states of sensors, the code looks like following:

val inputDStream = ...
val keyedDStream = inputDStream.map(...)  // use sensorId as key
val stateDStream = keyedDStream.updateStateByKey[...](udpateFunction)

Here's the question:
In a cluster with 10 worker nodes, is it possible to partition the input
dstream, so that node 1 handles sendor 0-999, node 2 handles 1000-1999,
and so on?

Also, is it possible to keep state stream for sensor 0 - 999 on node 1, 1000
to 1999 on node 2, and etc. Right now, I see sensor state stream is shuffled
for every batch, which used lot of network bandwidth and it's unnecessary.

Any suggestions?

Thanks! 



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-setup-steady-state-stream-partitions-tp13850.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


RE: how to setup steady state stream partitions

Posted by qihong <qc...@pivotal.io>.
Thanks for your response! I found that too, and it does the trick! Here's
refined code:

val inputDStream = ... 
val keyedDStream = inputDStream.map(...)  // use sensorId as key 
val partitionedDStream = keyedDstream.transform(rdd => rdd.partitionBy(new
MyPartitioner(...))) 
val stateDStream = partitionedDStream.updateStateByKey[...](udpateFunction,
new MyPartitioner(...)) 



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-setup-steady-state-stream-partitions-tp13850p13931.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


RE: how to setup steady state stream partitions

Posted by Anton Brazhnyk <an...@genesys.com>.
Just a guess.
updateStateByKey has overloaded variants with partitioner as parameter. Can it help?

-----Original Message-----
From: qihong [mailto:qchen@pivotal.io] 
Sent: Tuesday, September 09, 2014 9:13 PM
To: user@spark.incubator.apache.org
Subject: Re: how to setup steady state stream partitions

Thanks for your response. I do have something like:

val inputDStream = ...
val keyedDStream = inputDStream.map(...)  // use sensorId as key val partitionedDStream = keyedDstream.transform(rdd => rdd.partitionBy(new
MyPartitioner(...)))
val stateDStream = partitionedDStream.updateStateByKey[...](udpateFunction)

The partitionedDStream does have steady partitions, but stateDStream does not have steady partitions, i.e., in the partition 0 of partitionedDStream, there's only data for sensors 0 to 999, but the partition 0 of stateDStream contains data for some sensors from 0 to 999 range, and lot of sensor from other partitions of partitionedDStream. 

I wish the partition 0 of stateDStream only contains the data from the partition 0 of partitionedDStream, partiton 1 of stateDStream only from partition 1 of partitionedDStream, and so on. Anyone knows how to implement that?

Thanks!



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-setup-steady-state-stream-partitions-tp13850p13853.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org For additional commands, e-mail: user-help@spark.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: how to setup steady state stream partitions

Posted by qihong <qc...@pivotal.io>.
Thanks for your response. I do have something like:

val inputDStream = ...
val keyedDStream = inputDStream.map(...)  // use sensorId as key
val partitionedDStream = keyedDstream.transform(rdd => rdd.partitionBy(new
MyPartitioner(...)))
val stateDStream = partitionedDStream.updateStateByKey[...](udpateFunction)

The partitionedDStream does have steady partitions, but stateDStream does
not
have steady partitions, i.e., in the partition 0 of partitionedDStream,
there's only
data for sensors 0 to 999, but the partition 0 of stateDStream contains data
for some sensors from 0 to 999 range, and lot of sensor from other
partitions of
partitionedDStream. 

I wish the partition 0 of stateDStream only contains the data from the
partition 0
of partitionedDStream, partiton 1 of stateDStream only from partition 1 of 
partitionedDStream, and so on. Anyone knows how to implement that?

Thanks!



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-setup-steady-state-stream-partitions-tp13850p13853.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: how to setup steady state stream partitions

Posted by x <wa...@gmail.com>.
Using your own partitioner didn't work?

e.g.
YourRDD.partitionBy(new HashPartitioner(your number))

xj @ Tokyo

On Wed, Sep 10, 2014 at 12:03 PM, qihong <qc...@pivotal.io> wrote:

> I'm working on a DStream application.  The input are sensors' measurements,
> the data format is <sensor id><timestamp><measure>
>
> There are 10 thousands sensors, and updateStateByKey is used to maintain
> the states of sensors, the code looks like following:
>
> val inputDStream = ...
> val keyedDStream = inputDStream.map(...)  // use sensorId as key
> val stateDStream = keyedDStream.updateStateByKey[...](udpateFunction)
>
> Here's the question:
> In a cluster with 10 worker nodes, is it possible to partition the input
> dstream, so that node 1 handles sendor 0-999, node 2 handles 1000-1999,
> and so on?
>
> Also, is it possible to keep state stream for sensor 0 - 999 on node 1,
> 1000
> to 1999 on node 2, and etc. Right now, I see sensor state stream is
> shuffled
> for every batch, which used lot of network bandwidth and it's unnecessary.
>
> Any suggestions?
>
> Thanks!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-setup-steady-state-stream-partitions-tp13850.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>