You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Piyush Mukati <pi...@gmail.com> on 2017/10/25 16:52:57 UTC

Structured Stream equivalent of reduceByKey

Hi,
we are migrating some jobs from Dstream to Structured Stream.

Currently to handle aggregations we call map and reducebyKey on each RDD
like
rdd.map(event => (event._1, event)).reduceByKey((a, b) => merge(a, b))

The final output of each RDD is merged to the sink with support for
aggregation at the sink( Like co-processor at HBase ).

In the new DataSet API, I am not finding any suitable API to aggregate over
the micro-batch.
Most of the aggregation API uses state-store and provide global
aggregations. ( with append mode it does not give the change in existing
buckets )
Problems we are suspecting are :
 1) state-store is tightly linked to the job definitions. while in our case
we want may edit the job while keeping the older calculated aggregate as it
is.

The desired result can be achieved with below dataset APIs.
dataset.groupByKey(a=>a._1).mapGroups( (key, valueItr) => merge(valueItr))
while on observing the physical plan it does not call any merge before sort.

 Anyone aware of API or other workarounds to get the desired result?

Re: Structured Stream equivalent of reduceByKey

Posted by Michael Armbrust <mi...@databricks.com>.
Hmmm, I see.  You could output the delta using flatMapGroupsWithState
<https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/sql/KeyValueGroupedDataset.html#flatMapGroupsWithState-org.apache.spark.api.java.function.FlatMapGroupsWithStateFunction-org.apache.spark.sql.streaming.OutputMode-org.apache.spark.sql.Encoder-org.apache.spark.sql.Encoder-org.apache.spark.sql.streaming.GroupStateTimeout->
probably.

On Thu, Oct 26, 2017 at 10:11 PM, Piyush Mukati <pi...@gmail.com>
wrote:

> Thanks, Michael
> I have explored Aggregator
> <https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.expressions.Aggregator> with
> update mode. The problem is it will give the overall aggregated value for
> the changed. while I only want the delta change in the group as the
> aggregation we are doing at sink level too.
>
> Below is the plan generated with count Aggregator.
>
> *HashAggregate
> StateStoreSave
> *HashAggregate,
> StateStoreRestore
> *HashAggregate,
> Exchange
> *HashAggregate,
> *Project
> StreamingRelation
>
> we are looking for some aggregation which will avoid state
> store interactions.
>
> Also anyone aware of any design doc or some example about how we can add
> new operation on dataSet and corresponding physical plan.
>
>
>
> On Thu, Oct 26, 2017 at 5:54 PM, Michael Armbrust <mi...@databricks.com>
> wrote:
>
>> - dev
>>
>> I think you should be able to write an Aggregator
>> <https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.expressions.Aggregator>.
>> You probably want to run in update mode if you are looking for it to output
>> any group that has changed in the batch.
>>
>> On Wed, Oct 25, 2017 at 5:52 PM, Piyush Mukati <pi...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> we are migrating some jobs from Dstream to Structured Stream.
>>>
>>> Currently to handle aggregations we call map and reducebyKey on each RDD
>>> like
>>> rdd.map(event => (event._1, event)).reduceByKey((a, b) => merge(a, b))
>>>
>>> The final output of each RDD is merged to the sink with support for
>>> aggregation at the sink( Like co-processor at HBase ).
>>>
>>> In the new DataSet API, I am not finding any suitable API to aggregate
>>> over the micro-batch.
>>> Most of the aggregation API uses state-store and provide global
>>> aggregations. ( with append mode it does not give the change in existing
>>> buckets )
>>> Problems we are suspecting are :
>>>  1) state-store is tightly linked to the job definitions. while in our
>>> case we want may edit the job while keeping the older calculated aggregate
>>> as it is.
>>>
>>> The desired result can be achieved with below dataset APIs.
>>> dataset.groupByKey(a=>a._1).mapGroups( (key, valueItr) =>
>>> merge(valueItr))
>>> while on observing the physical plan it does not call any merge before
>>> sort.
>>>
>>>  Anyone aware of API or other workarounds to get the desired result?
>>>
>>
>>
>

Re: Structured Stream equivalent of reduceByKey

Posted by Piyush Mukati <pi...@gmail.com>.
Thanks, Michael
I have explored Aggregator
<https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.expressions.Aggregator>
with
update mode. The problem is it will give the overall aggregated value for
the changed. while I only want the delta change in the group as the
aggregation we are doing at sink level too.

Below is the plan generated with count Aggregator.

*HashAggregate
StateStoreSave
*HashAggregate,
StateStoreRestore
*HashAggregate,
Exchange
*HashAggregate,
*Project
StreamingRelation

we are looking for some aggregation which will avoid state
store interactions.

Also anyone aware of any design doc or some example about how we can add
new operation on dataSet and corresponding physical plan.



On Thu, Oct 26, 2017 at 5:54 PM, Michael Armbrust <mi...@databricks.com>
wrote:

> - dev
>
> I think you should be able to write an Aggregator
> <https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.expressions.Aggregator>.
> You probably want to run in update mode if you are looking for it to output
> any group that has changed in the batch.
>
> On Wed, Oct 25, 2017 at 5:52 PM, Piyush Mukati <pi...@gmail.com>
> wrote:
>
>> Hi,
>> we are migrating some jobs from Dstream to Structured Stream.
>>
>> Currently to handle aggregations we call map and reducebyKey on each RDD
>> like
>> rdd.map(event => (event._1, event)).reduceByKey((a, b) => merge(a, b))
>>
>> The final output of each RDD is merged to the sink with support for
>> aggregation at the sink( Like co-processor at HBase ).
>>
>> In the new DataSet API, I am not finding any suitable API to aggregate
>> over the micro-batch.
>> Most of the aggregation API uses state-store and provide global
>> aggregations. ( with append mode it does not give the change in existing
>> buckets )
>> Problems we are suspecting are :
>>  1) state-store is tightly linked to the job definitions. while in our
>> case we want may edit the job while keeping the older calculated aggregate
>> as it is.
>>
>> The desired result can be achieved with below dataset APIs.
>> dataset.groupByKey(a=>a._1).mapGroups( (key, valueItr) =>
>> merge(valueItr))
>> while on observing the physical plan it does not call any merge before
>> sort.
>>
>>  Anyone aware of API or other workarounds to get the desired result?
>>
>
>

Re: Structured Stream equivalent of reduceByKey

Posted by Michael Armbrust <mi...@databricks.com>.
- dev

I think you should be able to write an Aggregator
<https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.expressions.Aggregator>.
You probably want to run in update mode if you are looking for it to output
any group that has changed in the batch.

On Wed, Oct 25, 2017 at 5:52 PM, Piyush Mukati <pi...@gmail.com>
wrote:

> Hi,
> we are migrating some jobs from Dstream to Structured Stream.
>
> Currently to handle aggregations we call map and reducebyKey on each RDD
> like
> rdd.map(event => (event._1, event)).reduceByKey((a, b) => merge(a, b))
>
> The final output of each RDD is merged to the sink with support for
> aggregation at the sink( Like co-processor at HBase ).
>
> In the new DataSet API, I am not finding any suitable API to aggregate
> over the micro-batch.
> Most of the aggregation API uses state-store and provide global
> aggregations. ( with append mode it does not give the change in existing
> buckets )
> Problems we are suspecting are :
>  1) state-store is tightly linked to the job definitions. while in our
> case we want may edit the job while keeping the older calculated aggregate
> as it is.
>
> The desired result can be achieved with below dataset APIs.
> dataset.groupByKey(a=>a._1).mapGroups( (key, valueItr) => merge(valueItr))
> while on observing the physical plan it does not call any merge before
> sort.
>
>  Anyone aware of API or other workarounds to get the desired result?
>

Re: Structured Stream equivalent of reduceByKey

Posted by Michael Armbrust <mi...@databricks.com>.
- dev

I think you should be able to write an Aggregator
<https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.expressions.Aggregator>.
You probably want to run in update mode if you are looking for it to output
any group that has changed in the batch.

On Wed, Oct 25, 2017 at 5:52 PM, Piyush Mukati <pi...@gmail.com>
wrote:

> Hi,
> we are migrating some jobs from Dstream to Structured Stream.
>
> Currently to handle aggregations we call map and reducebyKey on each RDD
> like
> rdd.map(event => (event._1, event)).reduceByKey((a, b) => merge(a, b))
>
> The final output of each RDD is merged to the sink with support for
> aggregation at the sink( Like co-processor at HBase ).
>
> In the new DataSet API, I am not finding any suitable API to aggregate
> over the micro-batch.
> Most of the aggregation API uses state-store and provide global
> aggregations. ( with append mode it does not give the change in existing
> buckets )
> Problems we are suspecting are :
>  1) state-store is tightly linked to the job definitions. while in our
> case we want may edit the job while keeping the older calculated aggregate
> as it is.
>
> The desired result can be achieved with below dataset APIs.
> dataset.groupByKey(a=>a._1).mapGroups( (key, valueItr) => merge(valueItr))
> while on observing the physical plan it does not call any merge before
> sort.
>
>  Anyone aware of API or other workarounds to get the desired result?
>