You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Srinivas V <sr...@gmail.com> on 2020/03/27 13:28:22 UTC

spark structured streaming GroupState returns weird values from sate

I am listening to Kafka topic with a structured streaming application with
Java,  testing it on my local Mac.
When I retrieve back GroupState<ProductSessionInformation> object with
state.get(), it is giving some random values for the fields in the object,
some are interchanging some are default and some are junk values.

See this example below:
While setting I am setting:
ProductSessionInformation{requestId='222112345', productId='222112345',
priority='0', firstEventTimeMillis=1585312384,
lastEventTimeMillis=1585312384, firstReceivedTimeMillis=1585312401693,
numberOfEvents=1}

When I retrieve it back, it comes like this:
ProductSessionInformation{requestId='some junk characters are coming here'
productId='222112345', priority='222112345',
firstEventTimeMillis=1585312401693, lastEventTimeMillis=1,
firstReceivedTimeMillis=1585312384, numberOfEvents=1}

Any clue why it might be happening? I am stuck with this for couple of
days. Immediate help is appreciated.

code snippet:


public class StateUpdateTask implements
MapGroupsWithStateFunction<String, Event, ProductStateInformation,
ProductSessionUpdate> {

 @Override
public ProductSessionUpdate call(String productId, Iterator<Event>
eventsIterator, GroupState<ProductStateInformation> state) throws
Exception {
    {



  if (state.hasTimedOut()) {

//....

}else{

if (state.exists()) {
    ProductStateInformation oldSession = state.get();
    System.out.println("State for productId:"+productId + " with old
values "+oldSession);

}


public class EventsApp implements Serializable{

public void run(String[] args) throws Exception {

...


Dataset<Row> dataSet = sparkSession
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost")
        .option("startingOffsets","latest")
        .option("failOnDataLoss", "false")
        .option("subscribe", "topic1,topic2")
        .option("includeTimestamp", true)

        .load();

 eventsDS.groupByKey(
                new MapFunction<Event, String>() {
                    @Override public String call(Event event) {
                        return event.getProductId();
                    }
                }, Encoders.STRING())
        .mapGroupsWithState(
                new StateUpdateTask(30000),
                Encoders.bean(ProductSessionInformation.class),
                Encoders.bean(ProductSessionUpdate.class),
                GroupStateTimeout.ProcessingTimeTimeout());

...


StreamingQuery query = productUpdates
        .writeStream()
        .foreach(new ForeachWriter<ProductSessionUpdate>() {
            @Override
            public boolean open(long l, long l1) {return true;}

            @Override
            public void process(ProductSessionUpdate productSessionUpdate) {
                logger.info("-----> query process: "+ productSessionUpdate);
            }

            @Override
            public void close(Throwable throwable) {}
        })
        .outputMode("update")
        .option("checkpointLocation", checkpointDir)
        .start();

query.awaitTermination();

}

public class ProductStateInformation implements Serializable {

    protected String requestId;
    protected String productId;
    protected String priority;
    protected long firstEventTimeMillis;
    protected long lastEventTimeMillis;
    protected long firstReceivedTimeMillis;
    protected int numberOfEvents;

...//getter setters

}

These are are the versions I am using:

<spark-cassandra-connector.version>2.3.1</spark-cassandra-connector.version>
<spark.version>2.4.3</spark.version>

<jackson.version>2.6.6</jackson.version><kafka.version>0.10.2.0</kafka.version>

<kryo.version>3.0.3</kryo.version>

Re: spark structured streaming GroupState returns weird values from sate

Posted by Jungtaek Lim <ka...@gmail.com>.
That seems to come from the difference how Spark infers schema and create
serializer / deserializer for Java beans to construct bean encoder.

When inferring schema for Java beans, all properties which have getter
methods are considered. When creating serializer / deserializer, only
properties which have both getter and setter methods are considered. Here
Java Introspector is being used which doesn't seem to deal with fields
(it's the JDK feature hence I guess that's the right definition of Java
beans), so having getter methods which have no pair of setter methods might
lead some problems.

The code line has TODO, but the code is ancient (added in 2015) - I have no
context and I'm not 100% sure fixing it would be safer. Maybe it would be
great if you could provide the simple reproducer to play with, but given
you've fixed the issue...

On Tue, Mar 31, 2020 at 5:01 PM Srinivas V <sr...@gmail.com> wrote:

>
> Never mind. It got resolved after I removed extra two getter methods (to
> calculate duration) I created in my State specific Java bean
> (ProductSessionInformation). But I am surprised why it has created so much
> problem. I guess when this bean is converted to Scala class it may not be
> taking care of non getter methods of the fields defined? Still how is that
> causing the state object get corrupt so much?
>
>
> On Sat, Mar 28, 2020 at 7:46 PM Srinivas V <sr...@gmail.com> wrote:
>
>> Ok, I will try to create some simple code to reproduce, if I can. Problem
>> is that I am adding this code in an existing big project with several
>> dependencies with spark streaming older version(2.2) on root level etc.
>>
>> Also, I observed that there is @Experimental on GroupState class. What
>> state is it in now? Several people using this feature in prod?
>>
>>
>> On Sat, Mar 28, 2020 at 6:23 PM Jungtaek Lim <
>> kabhwan.opensource@gmail.com> wrote:
>>
>>> I have't heard known issue for this - that said, this may require new
>>> investigation which is not possible or require huge effort without simple
>>> reproducer.
>>>
>>> Contributors (who are basically volunteers) may not want to struggle to
>>> reproduce from your partial information - I'd recommend you to spend your
>>> time to help volunteers starting from simple reproducer, if you are stuck
>>> at it and have to resolve it.
>>>
>>> Could you please get rid of the business logic which you may want to
>>> redact, and provide full of source code which reproduces the bug?
>>>
>>> On Sat, Mar 28, 2020 at 8:11 PM Srinivas V <sr...@gmail.com> wrote:
>>>
>>>> Sorry for typos , correcting them below
>>>>
>>>> On Sat, Mar 28, 2020 at 4:39 PM Srinivas V <sr...@gmail.com> wrote:
>>>>
>>>>> Sorry I was just changing some names not to send exact names. Please
>>>>> ignore that. I am really struggling with this since couple of days. Can
>>>>> this happen due to
>>>>> 1. some of the values being null or
>>>>> 2.UTF8  issue ? Or some serilization/ deserilization issue ?
>>>>> 3. Not enough memory ?
>>>>> BTW, I am using same names in my code.
>>>>>
>>>>> On Sat, Mar 28, 2020 at 10:50 AM Jungtaek Lim <
>>>>> kabhwan.opensource@gmail.com> wrote:
>>>>>
>>>>>> Well, the code itself doesn't seem to be OK - you're using
>>>>>> ProductStateInformation as the class of State whereas you provide
>>>>>> ProductSessionInformation to Encoder for State.
>>>>>>
>>>>>> On Fri, Mar 27, 2020 at 11:14 PM Jungtaek Lim <
>>>>>> kabhwan.opensource@gmail.com> wrote:
>>>>>>
>>>>>>> Could you play with Encoders.bean()? You can Encoders.bean() with
>>>>>>> your class, and call .schema() with the return value to see how it
>>>>>>> transforms to the schema in Spark SQL. The schema must be consistent across
>>>>>>> multiple JVM runs to make it work properly, but I suspect it doesn't retain
>>>>>>> the order.
>>>>>>>
>>>>>>> On Fri, Mar 27, 2020 at 10:28 PM Srinivas V <sr...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I am listening to Kafka topic with a structured streaming
>>>>>>>> application with Java,  testing it on my local Mac.
>>>>>>>> When I retrieve back GroupState<ProductSessionInformation> object
>>>>>>>> with state.get(), it is giving some random values for the fields in the
>>>>>>>> object, some are interchanging some are default and some are junk values.
>>>>>>>>
>>>>>>>> See this example below:
>>>>>>>> While setting I am setting:
>>>>>>>> ProductSessionInformation{requestId='222112345',
>>>>>>>> productId='222112345', priority='0', firstEventTimeMillis=1585312384,
>>>>>>>> lastEventTimeMillis=1585312384, firstReceivedTimeMillis=1585312401693,
>>>>>>>> numberOfEvents=1}
>>>>>>>>
>>>>>>>> When I retrieve it back, it comes like this:
>>>>>>>> ProductSessionInformation{requestId='some junk characters are
>>>>>>>> coming here' productId='222112345', priority='222112345',
>>>>>>>> firstEventTimeMillis=1585312401693, lastEventTimeMillis=1,
>>>>>>>> firstReceivedTimeMillis=1585312384, numberOfEvents=1}
>>>>>>>>
>>>>>>>> Any clue why it might be happening? I am stuck with this for couple
>>>>>>>> of days. Immediate help is appreciated.
>>>>>>>>
>>>>>>>> code snippet:
>>>>>>>>
>>>>>>>>
>>>>>>>> public class StateUpdateTask implements MapGroupsWithStateFunction<String, Event, ProductStateInformation, ProductSessionUpdate> {
>>>>>>>>
>>>>>>>>  @Override
>>>>>>>> public ProductSessionUpdate call(String productId, Iterator<Event> eventsIterator, GroupState<ProductStateInformation> state) throws Exception {
>>>>>>>>     {
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>   if (state.hasTimedOut()) {
>>>>>>>>
>>>>>>>> //....
>>>>>>>>
>>>>>>>> }else{
>>>>>>>>
>>>>>>>> if (state.exists()) {
>>>>>>>>     ProductStateInformation oldSession = state.get();
>>>>>>>>     System.out.println("State for productId:"+productId + " with old values "+oldSession);
>>>>>>>>
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> public class EventsApp implements Serializable{
>>>>>>>>
>>>>>>>> public void run(String[] args) throws Exception {
>>>>>>>>
>>>>>>>> ...
>>>>>>>>
>>>>>>>>
>>>>>>>> Dataset<Row> dataSet = sparkSession
>>>>>>>>         .readStream()
>>>>>>>>         .format("kafka")
>>>>>>>>         .option("kafka.bootstrap.servers", "localhost")
>>>>>>>>         .option("startingOffsets","latest")
>>>>>>>>         .option("failOnDataLoss", "false")
>>>>>>>>         .option("subscribe", "topic1,topic2")
>>>>>>>>         .option("includeTimestamp", true)
>>>>>>>>
>>>>>>>>         .load();
>>>>>>>>
>>>>>>>>  eventsDS.groupByKey(
>>>>>>>>                 new MapFunction<Event, String>() {
>>>>>>>>                     @Override public String call(Event event) {
>>>>>>>>                         return event.getProductId();
>>>>>>>>                     }
>>>>>>>>                 }, Encoders.STRING())
>>>>>>>>         .mapGroupsWithState(
>>>>>>>>                 new StateUpdateTask(30000),
>>>>>>>>                 Encoders.bean(ProductSessionInformation.class),
>>>>>>>>                 Encoders.bean(ProductSessionUpdate.class),
>>>>>>>>                 GroupStateTimeout.ProcessingTimeTimeout());
>>>>>>>>
>>>>>>>> ...
>>>>>>>>
>>>>>>>>
>>>>>>>> StreamingQuery query = productUpdates
>>>>>>>>         .writeStream()
>>>>>>>>         .foreach(new ForeachWriter<ProductSessionUpdate>() {
>>>>>>>>             @Override
>>>>>>>>             public boolean open(long l, long l1) {return true;}
>>>>>>>>
>>>>>>>>             @Override
>>>>>>>>             public void process(ProductSessionUpdate productSessionUpdate) {
>>>>>>>>                 logger.info("-----> query process: "+ productSessionUpdate);
>>>>>>>>             }
>>>>>>>>
>>>>>>>>             @Override
>>>>>>>>             public void close(Throwable throwable) {}
>>>>>>>>         })
>>>>>>>>         .outputMode("update")
>>>>>>>>         .option("checkpointLocation", checkpointDir)
>>>>>>>>         .start();
>>>>>>>>
>>>>>>>> query.awaitTermination();
>>>>>>>>
>>>>>>>> }
>>>>>>>>
>>>>>>>> public class ProductStateInformation implements Serializable {
>>>>>>>>
>>>>>>>>     protected String requestId;
>>>>>>>>     protected String productId;
>>>>>>>>     protected String priority;
>>>>>>>>     protected long firstEventTimeMillis;
>>>>>>>>     protected long lastEventTimeMillis;
>>>>>>>>     protected long firstReceivedTimeMillis;
>>>>>>>>     protected int numberOfEvents;
>>>>>>>>
>>>>>>>> ...//getter setters
>>>>>>>>
>>>>>>>> }
>>>>>>>>
>>>>>>>> These are are the versions I am using:
>>>>>>>>
>>>>>>>> <spark-cassandra-connector.version>2.3.1</spark-cassandra-connector.version>
>>>>>>>> <spark.version>2.4.3</spark.version>
>>>>>>>>
>>>>>>>> <jackson.version>2.6.6</jackson.version><kafka.version>0.10.2.0</kafka.version>
>>>>>>>>
>>>>>>>> <kryo.version>3.0.3</kryo.version>
>>>>>>>>
>>>>>>>>

Re: spark structured streaming GroupState returns weird values from sate

Posted by Srinivas V <sr...@gmail.com>.
Never mind. It got resolved after I removed extra two getter methods (to
calculate duration) I created in my State specific Java bean
(ProductSessionInformation). But I am surprised why it has created so much
problem. I guess when this bean is converted to Scala class it may not be
taking care of non getter methods of the fields defined? Still how is that
causing the state object get corrupt so much?


On Sat, Mar 28, 2020 at 7:46 PM Srinivas V <sr...@gmail.com> wrote:

> Ok, I will try to create some simple code to reproduce, if I can. Problem
> is that I am adding this code in an existing big project with several
> dependencies with spark streaming older version(2.2) on root level etc.
>
> Also, I observed that there is @Experimental on GroupState class. What
> state is it in now? Several people using this feature in prod?
>
>
> On Sat, Mar 28, 2020 at 6:23 PM Jungtaek Lim <ka...@gmail.com>
> wrote:
>
>> I have't heard known issue for this - that said, this may require new
>> investigation which is not possible or require huge effort without simple
>> reproducer.
>>
>> Contributors (who are basically volunteers) may not want to struggle to
>> reproduce from your partial information - I'd recommend you to spend your
>> time to help volunteers starting from simple reproducer, if you are stuck
>> at it and have to resolve it.
>>
>> Could you please get rid of the business logic which you may want to
>> redact, and provide full of source code which reproduces the bug?
>>
>> On Sat, Mar 28, 2020 at 8:11 PM Srinivas V <sr...@gmail.com> wrote:
>>
>>> Sorry for typos , correcting them below
>>>
>>> On Sat, Mar 28, 2020 at 4:39 PM Srinivas V <sr...@gmail.com> wrote:
>>>
>>>> Sorry I was just changing some names not to send exact names. Please
>>>> ignore that. I am really struggling with this since couple of days. Can
>>>> this happen due to
>>>> 1. some of the values being null or
>>>> 2.UTF8  issue ? Or some serilization/ deserilization issue ?
>>>> 3. Not enough memory ?
>>>> BTW, I am using same names in my code.
>>>>
>>>> On Sat, Mar 28, 2020 at 10:50 AM Jungtaek Lim <
>>>> kabhwan.opensource@gmail.com> wrote:
>>>>
>>>>> Well, the code itself doesn't seem to be OK - you're using
>>>>> ProductStateInformation as the class of State whereas you provide
>>>>> ProductSessionInformation to Encoder for State.
>>>>>
>>>>> On Fri, Mar 27, 2020 at 11:14 PM Jungtaek Lim <
>>>>> kabhwan.opensource@gmail.com> wrote:
>>>>>
>>>>>> Could you play with Encoders.bean()? You can Encoders.bean() with
>>>>>> your class, and call .schema() with the return value to see how it
>>>>>> transforms to the schema in Spark SQL. The schema must be consistent across
>>>>>> multiple JVM runs to make it work properly, but I suspect it doesn't retain
>>>>>> the order.
>>>>>>
>>>>>> On Fri, Mar 27, 2020 at 10:28 PM Srinivas V <sr...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I am listening to Kafka topic with a structured streaming
>>>>>>> application with Java,  testing it on my local Mac.
>>>>>>> When I retrieve back GroupState<ProductSessionInformation> object
>>>>>>> with state.get(), it is giving some random values for the fields in the
>>>>>>> object, some are interchanging some are default and some are junk values.
>>>>>>>
>>>>>>> See this example below:
>>>>>>> While setting I am setting:
>>>>>>> ProductSessionInformation{requestId='222112345',
>>>>>>> productId='222112345', priority='0', firstEventTimeMillis=1585312384,
>>>>>>> lastEventTimeMillis=1585312384, firstReceivedTimeMillis=1585312401693,
>>>>>>> numberOfEvents=1}
>>>>>>>
>>>>>>> When I retrieve it back, it comes like this:
>>>>>>> ProductSessionInformation{requestId='some junk characters are coming
>>>>>>> here' productId='222112345', priority='222112345',
>>>>>>> firstEventTimeMillis=1585312401693, lastEventTimeMillis=1,
>>>>>>> firstReceivedTimeMillis=1585312384, numberOfEvents=1}
>>>>>>>
>>>>>>> Any clue why it might be happening? I am stuck with this for couple
>>>>>>> of days. Immediate help is appreciated.
>>>>>>>
>>>>>>> code snippet:
>>>>>>>
>>>>>>>
>>>>>>> public class StateUpdateTask implements MapGroupsWithStateFunction<String, Event, ProductStateInformation, ProductSessionUpdate> {
>>>>>>>
>>>>>>>  @Override
>>>>>>> public ProductSessionUpdate call(String productId, Iterator<Event> eventsIterator, GroupState<ProductStateInformation> state) throws Exception {
>>>>>>>     {
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>   if (state.hasTimedOut()) {
>>>>>>>
>>>>>>> //....
>>>>>>>
>>>>>>> }else{
>>>>>>>
>>>>>>> if (state.exists()) {
>>>>>>>     ProductStateInformation oldSession = state.get();
>>>>>>>     System.out.println("State for productId:"+productId + " with old values "+oldSession);
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> public class EventsApp implements Serializable{
>>>>>>>
>>>>>>> public void run(String[] args) throws Exception {
>>>>>>>
>>>>>>> ...
>>>>>>>
>>>>>>>
>>>>>>> Dataset<Row> dataSet = sparkSession
>>>>>>>         .readStream()
>>>>>>>         .format("kafka")
>>>>>>>         .option("kafka.bootstrap.servers", "localhost")
>>>>>>>         .option("startingOffsets","latest")
>>>>>>>         .option("failOnDataLoss", "false")
>>>>>>>         .option("subscribe", "topic1,topic2")
>>>>>>>         .option("includeTimestamp", true)
>>>>>>>
>>>>>>>         .load();
>>>>>>>
>>>>>>>  eventsDS.groupByKey(
>>>>>>>                 new MapFunction<Event, String>() {
>>>>>>>                     @Override public String call(Event event) {
>>>>>>>                         return event.getProductId();
>>>>>>>                     }
>>>>>>>                 }, Encoders.STRING())
>>>>>>>         .mapGroupsWithState(
>>>>>>>                 new StateUpdateTask(30000),
>>>>>>>                 Encoders.bean(ProductSessionInformation.class),
>>>>>>>                 Encoders.bean(ProductSessionUpdate.class),
>>>>>>>                 GroupStateTimeout.ProcessingTimeTimeout());
>>>>>>>
>>>>>>> ...
>>>>>>>
>>>>>>>
>>>>>>> StreamingQuery query = productUpdates
>>>>>>>         .writeStream()
>>>>>>>         .foreach(new ForeachWriter<ProductSessionUpdate>() {
>>>>>>>             @Override
>>>>>>>             public boolean open(long l, long l1) {return true;}
>>>>>>>
>>>>>>>             @Override
>>>>>>>             public void process(ProductSessionUpdate productSessionUpdate) {
>>>>>>>                 logger.info("-----> query process: "+ productSessionUpdate);
>>>>>>>             }
>>>>>>>
>>>>>>>             @Override
>>>>>>>             public void close(Throwable throwable) {}
>>>>>>>         })
>>>>>>>         .outputMode("update")
>>>>>>>         .option("checkpointLocation", checkpointDir)
>>>>>>>         .start();
>>>>>>>
>>>>>>> query.awaitTermination();
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>> public class ProductStateInformation implements Serializable {
>>>>>>>
>>>>>>>     protected String requestId;
>>>>>>>     protected String productId;
>>>>>>>     protected String priority;
>>>>>>>     protected long firstEventTimeMillis;
>>>>>>>     protected long lastEventTimeMillis;
>>>>>>>     protected long firstReceivedTimeMillis;
>>>>>>>     protected int numberOfEvents;
>>>>>>>
>>>>>>> ...//getter setters
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>> These are are the versions I am using:
>>>>>>>
>>>>>>> <spark-cassandra-connector.version>2.3.1</spark-cassandra-connector.version>
>>>>>>> <spark.version>2.4.3</spark.version>
>>>>>>>
>>>>>>> <jackson.version>2.6.6</jackson.version><kafka.version>0.10.2.0</kafka.version>
>>>>>>>
>>>>>>> <kryo.version>3.0.3</kryo.version>
>>>>>>>
>>>>>>>

Re: spark structured streaming GroupState returns weird values from sate

Posted by Srinivas V <sr...@gmail.com>.
Ok, I will try to create some simple code to reproduce, if I can. Problem
is that I am adding this code in an existing big project with several
dependencies with spark streaming older version(2.2) on root level etc.

Also, I observed that there is @Experimental on GroupState class. What
state is it in now? Several people using this feature in prod?


On Sat, Mar 28, 2020 at 6:23 PM Jungtaek Lim <ka...@gmail.com>
wrote:

> I have't heard known issue for this - that said, this may require new
> investigation which is not possible or require huge effort without simple
> reproducer.
>
> Contributors (who are basically volunteers) may not want to struggle to
> reproduce from your partial information - I'd recommend you to spend your
> time to help volunteers starting from simple reproducer, if you are stuck
> at it and have to resolve it.
>
> Could you please get rid of the business logic which you may want to
> redact, and provide full of source code which reproduces the bug?
>
> On Sat, Mar 28, 2020 at 8:11 PM Srinivas V <sr...@gmail.com> wrote:
>
>> Sorry for typos , correcting them below
>>
>> On Sat, Mar 28, 2020 at 4:39 PM Srinivas V <sr...@gmail.com> wrote:
>>
>>> Sorry I was just changing some names not to send exact names. Please
>>> ignore that. I am really struggling with this since couple of days. Can
>>> this happen due to
>>> 1. some of the values being null or
>>> 2.UTF8  issue ? Or some serilization/ deserilization issue ?
>>> 3. Not enough memory ?
>>> BTW, I am using same names in my code.
>>>
>>> On Sat, Mar 28, 2020 at 10:50 AM Jungtaek Lim <
>>> kabhwan.opensource@gmail.com> wrote:
>>>
>>>> Well, the code itself doesn't seem to be OK - you're using
>>>> ProductStateInformation as the class of State whereas you provide
>>>> ProductSessionInformation to Encoder for State.
>>>>
>>>> On Fri, Mar 27, 2020 at 11:14 PM Jungtaek Lim <
>>>> kabhwan.opensource@gmail.com> wrote:
>>>>
>>>>> Could you play with Encoders.bean()? You can Encoders.bean() with your
>>>>> class, and call .schema() with the return value to see how it transforms to
>>>>> the schema in Spark SQL. The schema must be consistent across multiple JVM
>>>>> runs to make it work properly, but I suspect it doesn't retain the order.
>>>>>
>>>>> On Fri, Mar 27, 2020 at 10:28 PM Srinivas V <sr...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I am listening to Kafka topic with a structured streaming application
>>>>>> with Java,  testing it on my local Mac.
>>>>>> When I retrieve back GroupState<ProductSessionInformation> object
>>>>>> with state.get(), it is giving some random values for the fields in the
>>>>>> object, some are interchanging some are default and some are junk values.
>>>>>>
>>>>>> See this example below:
>>>>>> While setting I am setting:
>>>>>> ProductSessionInformation{requestId='222112345',
>>>>>> productId='222112345', priority='0', firstEventTimeMillis=1585312384,
>>>>>> lastEventTimeMillis=1585312384, firstReceivedTimeMillis=1585312401693,
>>>>>> numberOfEvents=1}
>>>>>>
>>>>>> When I retrieve it back, it comes like this:
>>>>>> ProductSessionInformation{requestId='some junk characters are coming
>>>>>> here' productId='222112345', priority='222112345',
>>>>>> firstEventTimeMillis=1585312401693, lastEventTimeMillis=1,
>>>>>> firstReceivedTimeMillis=1585312384, numberOfEvents=1}
>>>>>>
>>>>>> Any clue why it might be happening? I am stuck with this for couple
>>>>>> of days. Immediate help is appreciated.
>>>>>>
>>>>>> code snippet:
>>>>>>
>>>>>>
>>>>>> public class StateUpdateTask implements MapGroupsWithStateFunction<String, Event, ProductStateInformation, ProductSessionUpdate> {
>>>>>>
>>>>>>  @Override
>>>>>> public ProductSessionUpdate call(String productId, Iterator<Event> eventsIterator, GroupState<ProductStateInformation> state) throws Exception {
>>>>>>     {
>>>>>>
>>>>>>
>>>>>>
>>>>>>   if (state.hasTimedOut()) {
>>>>>>
>>>>>> //....
>>>>>>
>>>>>> }else{
>>>>>>
>>>>>> if (state.exists()) {
>>>>>>     ProductStateInformation oldSession = state.get();
>>>>>>     System.out.println("State for productId:"+productId + " with old values "+oldSession);
>>>>>>
>>>>>> }
>>>>>>
>>>>>>
>>>>>> public class EventsApp implements Serializable{
>>>>>>
>>>>>> public void run(String[] args) throws Exception {
>>>>>>
>>>>>> ...
>>>>>>
>>>>>>
>>>>>> Dataset<Row> dataSet = sparkSession
>>>>>>         .readStream()
>>>>>>         .format("kafka")
>>>>>>         .option("kafka.bootstrap.servers", "localhost")
>>>>>>         .option("startingOffsets","latest")
>>>>>>         .option("failOnDataLoss", "false")
>>>>>>         .option("subscribe", "topic1,topic2")
>>>>>>         .option("includeTimestamp", true)
>>>>>>
>>>>>>         .load();
>>>>>>
>>>>>>  eventsDS.groupByKey(
>>>>>>                 new MapFunction<Event, String>() {
>>>>>>                     @Override public String call(Event event) {
>>>>>>                         return event.getProductId();
>>>>>>                     }
>>>>>>                 }, Encoders.STRING())
>>>>>>         .mapGroupsWithState(
>>>>>>                 new StateUpdateTask(30000),
>>>>>>                 Encoders.bean(ProductSessionInformation.class),
>>>>>>                 Encoders.bean(ProductSessionUpdate.class),
>>>>>>                 GroupStateTimeout.ProcessingTimeTimeout());
>>>>>>
>>>>>> ...
>>>>>>
>>>>>>
>>>>>> StreamingQuery query = productUpdates
>>>>>>         .writeStream()
>>>>>>         .foreach(new ForeachWriter<ProductSessionUpdate>() {
>>>>>>             @Override
>>>>>>             public boolean open(long l, long l1) {return true;}
>>>>>>
>>>>>>             @Override
>>>>>>             public void process(ProductSessionUpdate productSessionUpdate) {
>>>>>>                 logger.info("-----> query process: "+ productSessionUpdate);
>>>>>>             }
>>>>>>
>>>>>>             @Override
>>>>>>             public void close(Throwable throwable) {}
>>>>>>         })
>>>>>>         .outputMode("update")
>>>>>>         .option("checkpointLocation", checkpointDir)
>>>>>>         .start();
>>>>>>
>>>>>> query.awaitTermination();
>>>>>>
>>>>>> }
>>>>>>
>>>>>> public class ProductStateInformation implements Serializable {
>>>>>>
>>>>>>     protected String requestId;
>>>>>>     protected String productId;
>>>>>>     protected String priority;
>>>>>>     protected long firstEventTimeMillis;
>>>>>>     protected long lastEventTimeMillis;
>>>>>>     protected long firstReceivedTimeMillis;
>>>>>>     protected int numberOfEvents;
>>>>>>
>>>>>> ...//getter setters
>>>>>>
>>>>>> }
>>>>>>
>>>>>> These are are the versions I am using:
>>>>>>
>>>>>> <spark-cassandra-connector.version>2.3.1</spark-cassandra-connector.version>
>>>>>> <spark.version>2.4.3</spark.version>
>>>>>>
>>>>>> <jackson.version>2.6.6</jackson.version><kafka.version>0.10.2.0</kafka.version>
>>>>>>
>>>>>> <kryo.version>3.0.3</kryo.version>
>>>>>>
>>>>>>

Re: spark structured streaming GroupState returns weird values from sate

Posted by Jungtaek Lim <ka...@gmail.com>.
I have't heard known issue for this - that said, this may require new
investigation which is not possible or require huge effort without simple
reproducer.

Contributors (who are basically volunteers) may not want to struggle to
reproduce from your partial information - I'd recommend you to spend your
time to help volunteers starting from simple reproducer, if you are stuck
at it and have to resolve it.

Could you please get rid of the business logic which you may want to
redact, and provide full of source code which reproduces the bug?

On Sat, Mar 28, 2020 at 8:11 PM Srinivas V <sr...@gmail.com> wrote:

> Sorry for typos , correcting them below
>
> On Sat, Mar 28, 2020 at 4:39 PM Srinivas V <sr...@gmail.com> wrote:
>
>> Sorry I was just changing some names not to send exact names. Please
>> ignore that. I am really struggling with this since couple of days. Can
>> this happen due to
>> 1. some of the values being null or
>> 2.UTF8  issue ? Or some serilization/ deserilization issue ?
>> 3. Not enough memory ?
>> BTW, I am using same names in my code.
>>
>> On Sat, Mar 28, 2020 at 10:50 AM Jungtaek Lim <
>> kabhwan.opensource@gmail.com> wrote:
>>
>>> Well, the code itself doesn't seem to be OK - you're using
>>> ProductStateInformation as the class of State whereas you provide
>>> ProductSessionInformation to Encoder for State.
>>>
>>> On Fri, Mar 27, 2020 at 11:14 PM Jungtaek Lim <
>>> kabhwan.opensource@gmail.com> wrote:
>>>
>>>> Could you play with Encoders.bean()? You can Encoders.bean() with your
>>>> class, and call .schema() with the return value to see how it transforms to
>>>> the schema in Spark SQL. The schema must be consistent across multiple JVM
>>>> runs to make it work properly, but I suspect it doesn't retain the order.
>>>>
>>>> On Fri, Mar 27, 2020 at 10:28 PM Srinivas V <sr...@gmail.com>
>>>> wrote:
>>>>
>>>>> I am listening to Kafka topic with a structured streaming application
>>>>> with Java,  testing it on my local Mac.
>>>>> When I retrieve back GroupState<ProductSessionInformation> object
>>>>> with state.get(), it is giving some random values for the fields in the
>>>>> object, some are interchanging some are default and some are junk values.
>>>>>
>>>>> See this example below:
>>>>> While setting I am setting:
>>>>> ProductSessionInformation{requestId='222112345',
>>>>> productId='222112345', priority='0', firstEventTimeMillis=1585312384,
>>>>> lastEventTimeMillis=1585312384, firstReceivedTimeMillis=1585312401693,
>>>>> numberOfEvents=1}
>>>>>
>>>>> When I retrieve it back, it comes like this:
>>>>> ProductSessionInformation{requestId='some junk characters are coming
>>>>> here' productId='222112345', priority='222112345',
>>>>> firstEventTimeMillis=1585312401693, lastEventTimeMillis=1,
>>>>> firstReceivedTimeMillis=1585312384, numberOfEvents=1}
>>>>>
>>>>> Any clue why it might be happening? I am stuck with this for couple of
>>>>> days. Immediate help is appreciated.
>>>>>
>>>>> code snippet:
>>>>>
>>>>>
>>>>> public class StateUpdateTask implements MapGroupsWithStateFunction<String, Event, ProductStateInformation, ProductSessionUpdate> {
>>>>>
>>>>>  @Override
>>>>> public ProductSessionUpdate call(String productId, Iterator<Event> eventsIterator, GroupState<ProductStateInformation> state) throws Exception {
>>>>>     {
>>>>>
>>>>>
>>>>>
>>>>>   if (state.hasTimedOut()) {
>>>>>
>>>>> //....
>>>>>
>>>>> }else{
>>>>>
>>>>> if (state.exists()) {
>>>>>     ProductStateInformation oldSession = state.get();
>>>>>     System.out.println("State for productId:"+productId + " with old values "+oldSession);
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>> public class EventsApp implements Serializable{
>>>>>
>>>>> public void run(String[] args) throws Exception {
>>>>>
>>>>> ...
>>>>>
>>>>>
>>>>> Dataset<Row> dataSet = sparkSession
>>>>>         .readStream()
>>>>>         .format("kafka")
>>>>>         .option("kafka.bootstrap.servers", "localhost")
>>>>>         .option("startingOffsets","latest")
>>>>>         .option("failOnDataLoss", "false")
>>>>>         .option("subscribe", "topic1,topic2")
>>>>>         .option("includeTimestamp", true)
>>>>>
>>>>>         .load();
>>>>>
>>>>>  eventsDS.groupByKey(
>>>>>                 new MapFunction<Event, String>() {
>>>>>                     @Override public String call(Event event) {
>>>>>                         return event.getProductId();
>>>>>                     }
>>>>>                 }, Encoders.STRING())
>>>>>         .mapGroupsWithState(
>>>>>                 new StateUpdateTask(30000),
>>>>>                 Encoders.bean(ProductSessionInformation.class),
>>>>>                 Encoders.bean(ProductSessionUpdate.class),
>>>>>                 GroupStateTimeout.ProcessingTimeTimeout());
>>>>>
>>>>> ...
>>>>>
>>>>>
>>>>> StreamingQuery query = productUpdates
>>>>>         .writeStream()
>>>>>         .foreach(new ForeachWriter<ProductSessionUpdate>() {
>>>>>             @Override
>>>>>             public boolean open(long l, long l1) {return true;}
>>>>>
>>>>>             @Override
>>>>>             public void process(ProductSessionUpdate productSessionUpdate) {
>>>>>                 logger.info("-----> query process: "+ productSessionUpdate);
>>>>>             }
>>>>>
>>>>>             @Override
>>>>>             public void close(Throwable throwable) {}
>>>>>         })
>>>>>         .outputMode("update")
>>>>>         .option("checkpointLocation", checkpointDir)
>>>>>         .start();
>>>>>
>>>>> query.awaitTermination();
>>>>>
>>>>> }
>>>>>
>>>>> public class ProductStateInformation implements Serializable {
>>>>>
>>>>>     protected String requestId;
>>>>>     protected String productId;
>>>>>     protected String priority;
>>>>>     protected long firstEventTimeMillis;
>>>>>     protected long lastEventTimeMillis;
>>>>>     protected long firstReceivedTimeMillis;
>>>>>     protected int numberOfEvents;
>>>>>
>>>>> ...//getter setters
>>>>>
>>>>> }
>>>>>
>>>>> These are are the versions I am using:
>>>>>
>>>>> <spark-cassandra-connector.version>2.3.1</spark-cassandra-connector.version>
>>>>> <spark.version>2.4.3</spark.version>
>>>>>
>>>>> <jackson.version>2.6.6</jackson.version><kafka.version>0.10.2.0</kafka.version>
>>>>>
>>>>> <kryo.version>3.0.3</kryo.version>
>>>>>
>>>>>

Re: spark structured streaming GroupState returns weird values from sate

Posted by Srinivas V <sr...@gmail.com>.
Sorry for typos , correcting them below

On Sat, Mar 28, 2020 at 4:39 PM Srinivas V <sr...@gmail.com> wrote:

> Sorry I was just changing some names not to send exact names. Please
> ignore that. I am really struggling with this since couple of days. Can
> this happen due to
> 1. some of the values being null or
> 2.UTF8  issue ? Or some serilization/ deserilization issue ?
> 3. Not enough memory ?
> BTW, I am using same names in my code.
>
> On Sat, Mar 28, 2020 at 10:50 AM Jungtaek Lim <
> kabhwan.opensource@gmail.com> wrote:
>
>> Well, the code itself doesn't seem to be OK - you're using
>> ProductStateInformation as the class of State whereas you provide
>> ProductSessionInformation to Encoder for State.
>>
>> On Fri, Mar 27, 2020 at 11:14 PM Jungtaek Lim <
>> kabhwan.opensource@gmail.com> wrote:
>>
>>> Could you play with Encoders.bean()? You can Encoders.bean() with your
>>> class, and call .schema() with the return value to see how it transforms to
>>> the schema in Spark SQL. The schema must be consistent across multiple JVM
>>> runs to make it work properly, but I suspect it doesn't retain the order.
>>>
>>> On Fri, Mar 27, 2020 at 10:28 PM Srinivas V <sr...@gmail.com> wrote:
>>>
>>>> I am listening to Kafka topic with a structured streaming application
>>>> with Java,  testing it on my local Mac.
>>>> When I retrieve back GroupState<ProductSessionInformation> object with
>>>> state.get(), it is giving some random values for the fields in the object,
>>>> some are interchanging some are default and some are junk values.
>>>>
>>>> See this example below:
>>>> While setting I am setting:
>>>> ProductSessionInformation{requestId='222112345', productId='222112345',
>>>> priority='0', firstEventTimeMillis=1585312384,
>>>> lastEventTimeMillis=1585312384, firstReceivedTimeMillis=1585312401693,
>>>> numberOfEvents=1}
>>>>
>>>> When I retrieve it back, it comes like this:
>>>> ProductSessionInformation{requestId='some junk characters are coming
>>>> here' productId='222112345', priority='222112345',
>>>> firstEventTimeMillis=1585312401693, lastEventTimeMillis=1,
>>>> firstReceivedTimeMillis=1585312384, numberOfEvents=1}
>>>>
>>>> Any clue why it might be happening? I am stuck with this for couple of
>>>> days. Immediate help is appreciated.
>>>>
>>>> code snippet:
>>>>
>>>>
>>>> public class StateUpdateTask implements MapGroupsWithStateFunction<String, Event, ProductStateInformation, ProductSessionUpdate> {
>>>>
>>>>  @Override
>>>> public ProductSessionUpdate call(String productId, Iterator<Event> eventsIterator, GroupState<ProductStateInformation> state) throws Exception {
>>>>     {
>>>>
>>>>
>>>>
>>>>   if (state.hasTimedOut()) {
>>>>
>>>> //....
>>>>
>>>> }else{
>>>>
>>>> if (state.exists()) {
>>>>     ProductStateInformation oldSession = state.get();
>>>>     System.out.println("State for productId:"+productId + " with old values "+oldSession);
>>>>
>>>> }
>>>>
>>>>
>>>> public class EventsApp implements Serializable{
>>>>
>>>> public void run(String[] args) throws Exception {
>>>>
>>>> ...
>>>>
>>>>
>>>> Dataset<Row> dataSet = sparkSession
>>>>         .readStream()
>>>>         .format("kafka")
>>>>         .option("kafka.bootstrap.servers", "localhost")
>>>>         .option("startingOffsets","latest")
>>>>         .option("failOnDataLoss", "false")
>>>>         .option("subscribe", "topic1,topic2")
>>>>         .option("includeTimestamp", true)
>>>>
>>>>         .load();
>>>>
>>>>  eventsDS.groupByKey(
>>>>                 new MapFunction<Event, String>() {
>>>>                     @Override public String call(Event event) {
>>>>                         return event.getProductId();
>>>>                     }
>>>>                 }, Encoders.STRING())
>>>>         .mapGroupsWithState(
>>>>                 new StateUpdateTask(30000),
>>>>                 Encoders.bean(ProductSessionInformation.class),
>>>>                 Encoders.bean(ProductSessionUpdate.class),
>>>>                 GroupStateTimeout.ProcessingTimeTimeout());
>>>>
>>>> ...
>>>>
>>>>
>>>> StreamingQuery query = productUpdates
>>>>         .writeStream()
>>>>         .foreach(new ForeachWriter<ProductSessionUpdate>() {
>>>>             @Override
>>>>             public boolean open(long l, long l1) {return true;}
>>>>
>>>>             @Override
>>>>             public void process(ProductSessionUpdate productSessionUpdate) {
>>>>                 logger.info("-----> query process: "+ productSessionUpdate);
>>>>             }
>>>>
>>>>             @Override
>>>>             public void close(Throwable throwable) {}
>>>>         })
>>>>         .outputMode("update")
>>>>         .option("checkpointLocation", checkpointDir)
>>>>         .start();
>>>>
>>>> query.awaitTermination();
>>>>
>>>> }
>>>>
>>>> public class ProductStateInformation implements Serializable {
>>>>
>>>>     protected String requestId;
>>>>     protected String productId;
>>>>     protected String priority;
>>>>     protected long firstEventTimeMillis;
>>>>     protected long lastEventTimeMillis;
>>>>     protected long firstReceivedTimeMillis;
>>>>     protected int numberOfEvents;
>>>>
>>>> ...//getter setters
>>>>
>>>> }
>>>>
>>>> These are are the versions I am using:
>>>>
>>>> <spark-cassandra-connector.version>2.3.1</spark-cassandra-connector.version>
>>>> <spark.version>2.4.3</spark.version>
>>>>
>>>> <jackson.version>2.6.6</jackson.version><kafka.version>0.10.2.0</kafka.version>
>>>>
>>>> <kryo.version>3.0.3</kryo.version>
>>>>
>>>>

Re: spark structured streaming GroupState returns weird values from sate

Posted by Srinivas V <sr...@gmail.com>.
Sorry I was just changing some names not to send exact names. Please ignore
that. I am really struggling with this sine couple of days. Can this happen
due to
1. some of the values being null or
2.UTF8  issue ? Or some sterilization/ deserilization issue ?
3. Not enough memory ?
I am using same names in my code.

On Sat, Mar 28, 2020 at 10:50 AM Jungtaek Lim <ka...@gmail.com>
wrote:

> Well, the code itself doesn't seem to be OK - you're using
> ProductStateInformation as the class of State whereas you provide
> ProductSessionInformation to Encoder for State.
>
> On Fri, Mar 27, 2020 at 11:14 PM Jungtaek Lim <
> kabhwan.opensource@gmail.com> wrote:
>
>> Could you play with Encoders.bean()? You can Encoders.bean() with your
>> class, and call .schema() with the return value to see how it transforms to
>> the schema in Spark SQL. The schema must be consistent across multiple JVM
>> runs to make it work properly, but I suspect it doesn't retain the order.
>>
>> On Fri, Mar 27, 2020 at 10:28 PM Srinivas V <sr...@gmail.com> wrote:
>>
>>> I am listening to Kafka topic with a structured streaming application
>>> with Java,  testing it on my local Mac.
>>> When I retrieve back GroupState<ProductSessionInformation> object with
>>> state.get(), it is giving some random values for the fields in the object,
>>> some are interchanging some are default and some are junk values.
>>>
>>> See this example below:
>>> While setting I am setting:
>>> ProductSessionInformation{requestId='222112345', productId='222112345',
>>> priority='0', firstEventTimeMillis=1585312384,
>>> lastEventTimeMillis=1585312384, firstReceivedTimeMillis=1585312401693,
>>> numberOfEvents=1}
>>>
>>> When I retrieve it back, it comes like this:
>>> ProductSessionInformation{requestId='some junk characters are coming
>>> here' productId='222112345', priority='222112345',
>>> firstEventTimeMillis=1585312401693, lastEventTimeMillis=1,
>>> firstReceivedTimeMillis=1585312384, numberOfEvents=1}
>>>
>>> Any clue why it might be happening? I am stuck with this for couple of
>>> days. Immediate help is appreciated.
>>>
>>> code snippet:
>>>
>>>
>>> public class StateUpdateTask implements MapGroupsWithStateFunction<String, Event, ProductStateInformation, ProductSessionUpdate> {
>>>
>>>  @Override
>>> public ProductSessionUpdate call(String productId, Iterator<Event> eventsIterator, GroupState<ProductStateInformation> state) throws Exception {
>>>     {
>>>
>>>
>>>
>>>   if (state.hasTimedOut()) {
>>>
>>> //....
>>>
>>> }else{
>>>
>>> if (state.exists()) {
>>>     ProductStateInformation oldSession = state.get();
>>>     System.out.println("State for productId:"+productId + " with old values "+oldSession);
>>>
>>> }
>>>
>>>
>>> public class EventsApp implements Serializable{
>>>
>>> public void run(String[] args) throws Exception {
>>>
>>> ...
>>>
>>>
>>> Dataset<Row> dataSet = sparkSession
>>>         .readStream()
>>>         .format("kafka")
>>>         .option("kafka.bootstrap.servers", "localhost")
>>>         .option("startingOffsets","latest")
>>>         .option("failOnDataLoss", "false")
>>>         .option("subscribe", "topic1,topic2")
>>>         .option("includeTimestamp", true)
>>>
>>>         .load();
>>>
>>>  eventsDS.groupByKey(
>>>                 new MapFunction<Event, String>() {
>>>                     @Override public String call(Event event) {
>>>                         return event.getProductId();
>>>                     }
>>>                 }, Encoders.STRING())
>>>         .mapGroupsWithState(
>>>                 new StateUpdateTask(30000),
>>>                 Encoders.bean(ProductSessionInformation.class),
>>>                 Encoders.bean(ProductSessionUpdate.class),
>>>                 GroupStateTimeout.ProcessingTimeTimeout());
>>>
>>> ...
>>>
>>>
>>> StreamingQuery query = productUpdates
>>>         .writeStream()
>>>         .foreach(new ForeachWriter<ProductSessionUpdate>() {
>>>             @Override
>>>             public boolean open(long l, long l1) {return true;}
>>>
>>>             @Override
>>>             public void process(ProductSessionUpdate productSessionUpdate) {
>>>                 logger.info("-----> query process: "+ productSessionUpdate);
>>>             }
>>>
>>>             @Override
>>>             public void close(Throwable throwable) {}
>>>         })
>>>         .outputMode("update")
>>>         .option("checkpointLocation", checkpointDir)
>>>         .start();
>>>
>>> query.awaitTermination();
>>>
>>> }
>>>
>>> public class ProductStateInformation implements Serializable {
>>>
>>>     protected String requestId;
>>>     protected String productId;
>>>     protected String priority;
>>>     protected long firstEventTimeMillis;
>>>     protected long lastEventTimeMillis;
>>>     protected long firstReceivedTimeMillis;
>>>     protected int numberOfEvents;
>>>
>>> ...//getter setters
>>>
>>> }
>>>
>>> These are are the versions I am using:
>>>
>>> <spark-cassandra-connector.version>2.3.1</spark-cassandra-connector.version>
>>> <spark.version>2.4.3</spark.version>
>>>
>>> <jackson.version>2.6.6</jackson.version><kafka.version>0.10.2.0</kafka.version>
>>>
>>> <kryo.version>3.0.3</kryo.version>
>>>
>>>

Re: spark structured streaming GroupState returns weird values from sate

Posted by Jungtaek Lim <ka...@gmail.com>.
Well, the code itself doesn't seem to be OK - you're using
ProductStateInformation as the class of State whereas you provide
ProductSessionInformation to Encoder for State.

On Fri, Mar 27, 2020 at 11:14 PM Jungtaek Lim <ka...@gmail.com>
wrote:

> Could you play with Encoders.bean()? You can Encoders.bean() with your
> class, and call .schema() with the return value to see how it transforms to
> the schema in Spark SQL. The schema must be consistent across multiple JVM
> runs to make it work properly, but I suspect it doesn't retain the order.
>
> On Fri, Mar 27, 2020 at 10:28 PM Srinivas V <sr...@gmail.com> wrote:
>
>> I am listening to Kafka topic with a structured streaming application
>> with Java,  testing it on my local Mac.
>> When I retrieve back GroupState<ProductSessionInformation> object with
>> state.get(), it is giving some random values for the fields in the object,
>> some are interchanging some are default and some are junk values.
>>
>> See this example below:
>> While setting I am setting:
>> ProductSessionInformation{requestId='222112345', productId='222112345',
>> priority='0', firstEventTimeMillis=1585312384,
>> lastEventTimeMillis=1585312384, firstReceivedTimeMillis=1585312401693,
>> numberOfEvents=1}
>>
>> When I retrieve it back, it comes like this:
>> ProductSessionInformation{requestId='some junk characters are coming
>> here' productId='222112345', priority='222112345',
>> firstEventTimeMillis=1585312401693, lastEventTimeMillis=1,
>> firstReceivedTimeMillis=1585312384, numberOfEvents=1}
>>
>> Any clue why it might be happening? I am stuck with this for couple of
>> days. Immediate help is appreciated.
>>
>> code snippet:
>>
>>
>> public class StateUpdateTask implements MapGroupsWithStateFunction<String, Event, ProductStateInformation, ProductSessionUpdate> {
>>
>>  @Override
>> public ProductSessionUpdate call(String productId, Iterator<Event> eventsIterator, GroupState<ProductStateInformation> state) throws Exception {
>>     {
>>
>>
>>
>>   if (state.hasTimedOut()) {
>>
>> //....
>>
>> }else{
>>
>> if (state.exists()) {
>>     ProductStateInformation oldSession = state.get();
>>     System.out.println("State for productId:"+productId + " with old values "+oldSession);
>>
>> }
>>
>>
>> public class EventsApp implements Serializable{
>>
>> public void run(String[] args) throws Exception {
>>
>> ...
>>
>>
>> Dataset<Row> dataSet = sparkSession
>>         .readStream()
>>         .format("kafka")
>>         .option("kafka.bootstrap.servers", "localhost")
>>         .option("startingOffsets","latest")
>>         .option("failOnDataLoss", "false")
>>         .option("subscribe", "topic1,topic2")
>>         .option("includeTimestamp", true)
>>
>>         .load();
>>
>>  eventsDS.groupByKey(
>>                 new MapFunction<Event, String>() {
>>                     @Override public String call(Event event) {
>>                         return event.getProductId();
>>                     }
>>                 }, Encoders.STRING())
>>         .mapGroupsWithState(
>>                 new StateUpdateTask(30000),
>>                 Encoders.bean(ProductSessionInformation.class),
>>                 Encoders.bean(ProductSessionUpdate.class),
>>                 GroupStateTimeout.ProcessingTimeTimeout());
>>
>> ...
>>
>>
>> StreamingQuery query = productUpdates
>>         .writeStream()
>>         .foreach(new ForeachWriter<ProductSessionUpdate>() {
>>             @Override
>>             public boolean open(long l, long l1) {return true;}
>>
>>             @Override
>>             public void process(ProductSessionUpdate productSessionUpdate) {
>>                 logger.info("-----> query process: "+ productSessionUpdate);
>>             }
>>
>>             @Override
>>             public void close(Throwable throwable) {}
>>         })
>>         .outputMode("update")
>>         .option("checkpointLocation", checkpointDir)
>>         .start();
>>
>> query.awaitTermination();
>>
>> }
>>
>> public class ProductStateInformation implements Serializable {
>>
>>     protected String requestId;
>>     protected String productId;
>>     protected String priority;
>>     protected long firstEventTimeMillis;
>>     protected long lastEventTimeMillis;
>>     protected long firstReceivedTimeMillis;
>>     protected int numberOfEvents;
>>
>> ...//getter setters
>>
>> }
>>
>> These are are the versions I am using:
>>
>> <spark-cassandra-connector.version>2.3.1</spark-cassandra-connector.version>
>> <spark.version>2.4.3</spark.version>
>>
>> <jackson.version>2.6.6</jackson.version><kafka.version>0.10.2.0</kafka.version>
>>
>> <kryo.version>3.0.3</kryo.version>
>>
>>

Re: spark structured streaming GroupState returns weird values from sate

Posted by Jungtaek Lim <ka...@gmail.com>.
Could you play with Encoders.bean()? You can Encoders.bean() with your
class, and call .schema() with the return value to see how it transforms to
the schema in Spark SQL. The schema must be consistent across multiple JVM
runs to make it work properly, but I suspect it doesn't retain the order.

On Fri, Mar 27, 2020 at 10:28 PM Srinivas V <sr...@gmail.com> wrote:

> I am listening to Kafka topic with a structured streaming application with
> Java,  testing it on my local Mac.
> When I retrieve back GroupState<ProductSessionInformation> object with
> state.get(), it is giving some random values for the fields in the object,
> some are interchanging some are default and some are junk values.
>
> See this example below:
> While setting I am setting:
> ProductSessionInformation{requestId='222112345', productId='222112345',
> priority='0', firstEventTimeMillis=1585312384,
> lastEventTimeMillis=1585312384, firstReceivedTimeMillis=1585312401693,
> numberOfEvents=1}
>
> When I retrieve it back, it comes like this:
> ProductSessionInformation{requestId='some junk characters are coming here'
> productId='222112345', priority='222112345',
> firstEventTimeMillis=1585312401693, lastEventTimeMillis=1,
> firstReceivedTimeMillis=1585312384, numberOfEvents=1}
>
> Any clue why it might be happening? I am stuck with this for couple of
> days. Immediate help is appreciated.
>
> code snippet:
>
>
> public class StateUpdateTask implements MapGroupsWithStateFunction<String, Event, ProductStateInformation, ProductSessionUpdate> {
>
>  @Override
> public ProductSessionUpdate call(String productId, Iterator<Event> eventsIterator, GroupState<ProductStateInformation> state) throws Exception {
>     {
>
>
>
>   if (state.hasTimedOut()) {
>
> //....
>
> }else{
>
> if (state.exists()) {
>     ProductStateInformation oldSession = state.get();
>     System.out.println("State for productId:"+productId + " with old values "+oldSession);
>
> }
>
>
> public class EventsApp implements Serializable{
>
> public void run(String[] args) throws Exception {
>
> ...
>
>
> Dataset<Row> dataSet = sparkSession
>         .readStream()
>         .format("kafka")
>         .option("kafka.bootstrap.servers", "localhost")
>         .option("startingOffsets","latest")
>         .option("failOnDataLoss", "false")
>         .option("subscribe", "topic1,topic2")
>         .option("includeTimestamp", true)
>
>         .load();
>
>  eventsDS.groupByKey(
>                 new MapFunction<Event, String>() {
>                     @Override public String call(Event event) {
>                         return event.getProductId();
>                     }
>                 }, Encoders.STRING())
>         .mapGroupsWithState(
>                 new StateUpdateTask(30000),
>                 Encoders.bean(ProductSessionInformation.class),
>                 Encoders.bean(ProductSessionUpdate.class),
>                 GroupStateTimeout.ProcessingTimeTimeout());
>
> ...
>
>
> StreamingQuery query = productUpdates
>         .writeStream()
>         .foreach(new ForeachWriter<ProductSessionUpdate>() {
>             @Override
>             public boolean open(long l, long l1) {return true;}
>
>             @Override
>             public void process(ProductSessionUpdate productSessionUpdate) {
>                 logger.info("-----> query process: "+ productSessionUpdate);
>             }
>
>             @Override
>             public void close(Throwable throwable) {}
>         })
>         .outputMode("update")
>         .option("checkpointLocation", checkpointDir)
>         .start();
>
> query.awaitTermination();
>
> }
>
> public class ProductStateInformation implements Serializable {
>
>     protected String requestId;
>     protected String productId;
>     protected String priority;
>     protected long firstEventTimeMillis;
>     protected long lastEventTimeMillis;
>     protected long firstReceivedTimeMillis;
>     protected int numberOfEvents;
>
> ...//getter setters
>
> }
>
> These are are the versions I am using:
>
> <spark-cassandra-connector.version>2.3.1</spark-cassandra-connector.version>
> <spark.version>2.4.3</spark.version>
>
> <jackson.version>2.6.6</jackson.version><kafka.version>0.10.2.0</kafka.version>
>
> <kryo.version>3.0.3</kryo.version>
>
>