You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Something Something <ma...@gmail.com> on 2020/03/04 02:02:30 UTC
Stateful Spark Streaming: Required attribute 'value' not found
In a Stateful Spark Streaming application I am writing the 'OutputRow' in
the 'updateAcrossEvents' but I keep getting this error (*Required attribute
'value' not found*) while it's trying to write to Kafka. I know from the
documentation that 'value' attribute needs to be set but how do I do that
in the 'Stateful Structured Streaming'? Where & how do I add this 'value'
attribute in the following code? *Note: I am using Spark 2.3.1*
withEventTime
.as[R00tJsonObject]
.withWatermark("event_time", "5 minutes")
.groupByKey(row => (row.value.Id, row.value.time.toString, row.value.cId))
.mapGroupsWithState(GroupStateTimeout.EventTimeTimeout)(updateAcrossEvents)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "myTopic")
.option("checkpointLocation", "/Users/username/checkpointLocation")
.outputMode("update")
.start()
.awaitTermination()
Re: Stateful Spark Streaming: Required attribute 'value' not found
Posted by Something Something <ma...@gmail.com>.
By simply adding 'toJSON' before 'writeStream' the problem was fixed. Maybe
it will help someone.
On Tue, Mar 3, 2020 at 6:02 PM Something Something <ma...@gmail.com>
wrote:
> In a Stateful Spark Streaming application I am writing the 'OutputRow' in
> the 'updateAcrossEvents' but I keep getting this error (*Required
> attribute 'value' not found*) while it's trying to write to Kafka. I know
> from the documentation that 'value' attribute needs to be set but how do I
> do that in the 'Stateful Structured Streaming'? Where & how do I add this
> 'value' attribute in the following code? *Note: I am using Spark 2.3.1*
>
> withEventTime
> .as[R00tJsonObject]
> .withWatermark("event_time", "5 minutes")
> .groupByKey(row => (row.value.Id, row.value.time.toString, row.value.cId))
> .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout)(updateAcrossEvents)
> .writeStream
> .format("kafka")
> .option("kafka.bootstrap.servers", "localhost:9092")
> .option("topic", "myTopic")
> .option("checkpointLocation", "/Users/username/checkpointLocation")
> .outputMode("update")
> .start()
> .awaitTermination()
>
>