You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by aravias <as...@homeaway.com> on 2017/06/07 03:40:40 UTC
StructuredStreaming :
org.apache.spark.sql.streaming.StreamingQueryException
hi,
I have one read stream to consume data from a Kafka topic , and based on an
attribute value in each of the incoming messages, I have to write data to
either of the 2 different locations in S3 (if value1 write to location1,
otherwise to location2).
On a high level below is what I have for doing that,
Dataset<Row> *kafkaStreamSet* = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBootstrap)
.option("subscribe", kafkaTopic)
.option("startingOffsets", "latest")
.option("failOnDataLoss", false)
.option("maxOffsetsPerTrigger", offsetsPerTrigger)
.load();
//raw message to ClickStream
Dataset<ClickStream> ds1 =
*kafkaStreamSet*.mapPartitions(processClickStreamMessages,
Encoders.bean(ClickStream.class));
ClickStream.java has 2 child objects within it and only one of them will
be populated at a time depending on if the message attribute value is
either *value1* or *value2*,
1) BookingRequest.java if *value1*,
2) PropertyPageView.java if *value2* ,
which I then separate out as below from clickstream to write to 2 diff
locations in S3,
//fetch BookingRequests in the ClickStream
Dataset<BookingRequest> ds2 =
ds1.map(filterBookingRequests,Encoders.bean(BookingRequest.class));
//fetch PropertyPageViews in the ClickStream
Dataset<PropertyPageView> ds3 =
ds1.map(filterPropertyPageViews,Encoders.bean(PropertyPageView.class));
finally ds2 and ds3 are written to 2 different locations ,
StreamingQuery bookingRequestsParquetStreamWriter =
ds2.writeStream().outputMode("append")
.format("parquet")
.trigger(ProcessingTime.create(bookingRequestProcessingTime,
TimeUnit.MILLISECONDS))
.option("checkpointLocation", "s3://" + s3Bucket+
"/checkpoint/bookingRequests")
.partitionBy("eventDate")
.start("s3://" + s3Bucket+ "/" + bookingRequestPath);
StreamingQuery PageViewsParquetStreamWriter =
ds3.writeStream().outputMode("append")
.format("parquet")
.trigger(ProcessingTime.create(pageViewProcessingTime,
TimeUnit.MILLISECONDS))
.option("checkpointLocation", "s3://" + s3Bucket+
"/checkpoint/PageViews")
.partitionBy("eventDate")
.start("s3://" + s3Bucket+ "/" + pageViewPath);
bookingRequestsParquetStreamWriter.awaitTermination();
PageViewsParquetStreamWriter.awaitTermination();
it seems to work fine and I see data written to different paths when the
apps deployed. But, whenever the job is restarted on failure or on manual
stops and starts, it keeps failing with below exception (where
userSessionEventJoin.global is my topic name),
Caused by: java.lang.IllegalArgumentException: Expected e.g.
{"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}, got
{"userSessionEventJoin.global":{"92":154362528,"101
at
org.apache.spark.sql.kafka010.JsonUtils$.partitionOffsets(JsonUtils.scala:74)
at
org.apache.spark.sql.kafka010.KafkaSourceOffset$.apply(KafkaSourceOffset.scala:59)
at
org.apache.spark.sql.kafka010.KafkaSource$$anon$1.deserialize(KafkaSource.scala:134)
at
org.apache.spark.sql.kafka010.KafkaSource$$anon$1.deserialize(KafkaSource.scala:123)
at
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:237)
at
org.apache.spark.sql.kafka010.KafkaSource.initialPartitionOffsets$lzycompute(KafkaSource.scala:138)
if I delete all the checkpointing information, then it starts again and
starts new checkpointing in the given 2 locations, but that means I have to
start processing from the latest offset again and lose all previous offsets.
The spark version is 2.1. Please suggest any resolutions, thanks.
--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/StructuredStreaming-org-apache-spark-sql-streaming-StreamingQueryException-tp28749.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org
Re: StructuredStreaming : StreamingQueryException
Posted by aravias <as...@homeaway.com>.
the bug is related to where long checkpoints are truncated when dealing with
topics have large number of partitions, in my case 120.
--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/StructuredStreaming-StreamingQueryException-tp28749p28754.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org
Re: StructuredStreaming : StreamingQueryException
Posted by aravias <as...@homeaway.com>.
this is a bug in spark version 2.1.0, seems to be fixed in spark 2.1.1 when
ran with that version.
--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/StructuredStreaming-StreamingQueryException-tp28749p28753.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org