You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Michael Armbrust <mi...@databricks.com> on 2017/11/06 21:24:44 UTC

Re: Structured Stream equivalent of reduceByKey

Hmmm, I see.  You could output the delta using flatMapGroupsWithState
<https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/sql/KeyValueGroupedDataset.html#flatMapGroupsWithState-org.apache.spark.api.java.function.FlatMapGroupsWithStateFunction-org.apache.spark.sql.streaming.OutputMode-org.apache.spark.sql.Encoder-org.apache.spark.sql.Encoder-org.apache.spark.sql.streaming.GroupStateTimeout->
probably.

On Thu, Oct 26, 2017 at 10:11 PM, Piyush Mukati <pi...@gmail.com>
wrote:

> Thanks, Michael
> I have explored Aggregator
> <https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.expressions.Aggregator> with
> update mode. The problem is it will give the overall aggregated value for
> the changed. while I only want the delta change in the group as the
> aggregation we are doing at sink level too.
>
> Below is the plan generated with count Aggregator.
>
> *HashAggregate
> StateStoreSave
> *HashAggregate,
> StateStoreRestore
> *HashAggregate,
> Exchange
> *HashAggregate,
> *Project
> StreamingRelation
>
> we are looking for some aggregation which will avoid state
> store interactions.
>
> Also anyone aware of any design doc or some example about how we can add
> new operation on dataSet and corresponding physical plan.
>
>
>
> On Thu, Oct 26, 2017 at 5:54 PM, Michael Armbrust <mi...@databricks.com>
> wrote:
>
>> - dev
>>
>> I think you should be able to write an Aggregator
>> <https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.expressions.Aggregator>.
>> You probably want to run in update mode if you are looking for it to output
>> any group that has changed in the batch.
>>
>> On Wed, Oct 25, 2017 at 5:52 PM, Piyush Mukati <pi...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> we are migrating some jobs from Dstream to Structured Stream.
>>>
>>> Currently to handle aggregations we call map and reducebyKey on each RDD
>>> like
>>> rdd.map(event => (event._1, event)).reduceByKey((a, b) => merge(a, b))
>>>
>>> The final output of each RDD is merged to the sink with support for
>>> aggregation at the sink( Like co-processor at HBase ).
>>>
>>> In the new DataSet API, I am not finding any suitable API to aggregate
>>> over the micro-batch.
>>> Most of the aggregation API uses state-store and provide global
>>> aggregations. ( with append mode it does not give the change in existing
>>> buckets )
>>> Problems we are suspecting are :
>>>  1) state-store is tightly linked to the job definitions. while in our
>>> case we want may edit the job while keeping the older calculated aggregate
>>> as it is.
>>>
>>> The desired result can be achieved with below dataset APIs.
>>> dataset.groupByKey(a=>a._1).mapGroups( (key, valueItr) =>
>>> merge(valueItr))
>>> while on observing the physical plan it does not call any merge before
>>> sort.
>>>
>>>  Anyone aware of API or other workarounds to get the desired result?
>>>
>>
>>
>