You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Something Something <ma...@gmail.com> on 2020/03/02 22:37:12 UTC

Spark Streaming with mapGroupsWithState

I am writing a Stateful Streaming application in which I am using
mapGroupsWithState to create aggregates for Groups but I need to create *Groups
based on more than one column in the Input Row*. All the examples in the
'Spark: The Definitive Guide' use only one column such as 'User' or
'Device'. I am using code similar to what's given below. *How do I specify
more than one field in the 'groupByKey'?*

There are other challenges as well. The book says we can use
'updateAcrossEvents' the way given below but I get compile time error
saying:


*Error:(43, 65) missing argument list for method updateAcrossEvents in
object MainUnapplied methods are only converted to functions when a
function type is expected.You can make this conversion explicit by writing
`updateAcrossEvents _` or `updateAcrossEvents(_,_,_,_,_)` instead of
`updateAcrossEvents`.
.mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)*

Another challenge: Compiler also complains about the my *MyReport*: *Error:(41,
12) Unable to find encoder for type stored in a Dataset.  Primitive types
(Int, String, etc) and Product types (case classes) are supported by
importing spark.implicits._  Support for serializing other types will be
added in future releases.*

Help in resolving these errors would be greatly appreciated. Thanks in
advance.


withEventTime
    .as[MyReport]
  .groupByKey(_.getKeys.getKey1). // How do I add _.getKeys.getKey2?
  .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)
  .writeStream
  .queryName("test_query")
  .format("memory")
  .outputMode("update")
  .start()

Re: Spark Streaming with mapGroupsWithState

Posted by Something Something <ma...@gmail.com>.
I changed it to Tuple2 and that problem is solved.

Any thoughts on this message....

*Unapplied methods are only converted to functions when a function type is
expected.*

*You can make this conversion explicit by writing `updateAcrossEvents _` or
`updateAcrossEvents(_,_,_,_,_)` instead of `updateAcrossEvents`.
.mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)*

On Mon, Mar 2, 2020 at 5:12 PM lec ssmi <sh...@gmail.com> wrote:

> maybe you can combine the fields you want to use into one field
>
> Something Something <ma...@gmail.com> 于2020年3月3日周二 上午6:37写道:
>
>> I am writing a Stateful Streaming application in which I am using
>> mapGroupsWithState to create aggregates for Groups but I need to create *Groups
>> based on more than one column in the Input Row*. All the examples in the
>> 'Spark: The Definitive Guide' use only one column such as 'User' or
>> 'Device'. I am using code similar to what's given below. *How do I
>> specify more than one field in the 'groupByKey'?*
>>
>> There are other challenges as well. The book says we can use
>> 'updateAcrossEvents' the way given below but I get compile time error
>> saying:
>>
>>
>> *Error:(43, 65) missing argument list for method updateAcrossEvents in
>> object MainUnapplied methods are only converted to functions when a
>> function type is expected.You can make this conversion explicit by writing
>> `updateAcrossEvents _` or `updateAcrossEvents(_,_,_,_,_)` instead of
>> `updateAcrossEvents`.
>> .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)*
>>
>> Another challenge: Compiler also complains about the my *MyReport*: *Error:(41,
>> 12) Unable to find encoder for type stored in a Dataset.  Primitive types
>> (Int, String, etc) and Product types (case classes) are supported by
>> importing spark.implicits._  Support for serializing other types will be
>> added in future releases.*
>>
>> Help in resolving these errors would be greatly appreciated. Thanks in
>> advance.
>>
>>
>> withEventTime
>>     .as[MyReport]
>>   .groupByKey(_.getKeys.getKey1). // How do I add _.getKeys.getKey2?
>>   .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)
>>   .writeStream
>>   .queryName("test_query")
>>   .format("memory")
>>   .outputMode("update")
>>   .start()
>>
>>

Re: Spark Streaming with mapGroupsWithState

Posted by lec ssmi <sh...@gmail.com>.
maybe you can combine the fields you want to use into one field

Something Something <ma...@gmail.com> 于2020年3月3日周二 上午6:37写道:

> I am writing a Stateful Streaming application in which I am using
> mapGroupsWithState to create aggregates for Groups but I need to create *Groups
> based on more than one column in the Input Row*. All the examples in the
> 'Spark: The Definitive Guide' use only one column such as 'User' or
> 'Device'. I am using code similar to what's given below. *How do I
> specify more than one field in the 'groupByKey'?*
>
> There are other challenges as well. The book says we can use
> 'updateAcrossEvents' the way given below but I get compile time error
> saying:
>
>
> *Error:(43, 65) missing argument list for method updateAcrossEvents in
> object MainUnapplied methods are only converted to functions when a
> function type is expected.You can make this conversion explicit by writing
> `updateAcrossEvents _` or `updateAcrossEvents(_,_,_,_,_)` instead of
> `updateAcrossEvents`.
> .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)*
>
> Another challenge: Compiler also complains about the my *MyReport*: *Error:(41,
> 12) Unable to find encoder for type stored in a Dataset.  Primitive types
> (Int, String, etc) and Product types (case classes) are supported by
> importing spark.implicits._  Support for serializing other types will be
> added in future releases.*
>
> Help in resolving these errors would be greatly appreciated. Thanks in
> advance.
>
>
> withEventTime
>     .as[MyReport]
>   .groupByKey(_.getKeys.getKey1). // How do I add _.getKeys.getKey2?
>   .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)
>   .writeStream
>   .queryName("test_query")
>   .format("memory")
>   .outputMode("update")
>   .start()
>
>