You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by aravias <as...@homeaway.com> on 2017/06/07 03:40:40 UTC

StructuredStreaming : org.apache.spark.sql.streaming.StreamingQueryException

hi,
I have one read stream to consume data from a Kafka topic , and based on an
attribute value in each of the incoming messages, I have to write data to
either of the 2 different locations in S3 (if value1 write to location1,
otherwise to location2).  
On a high level below is what I have for doing that,


Dataset<Row> *kafkaStreamSet* = sparkSession
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", kafkaBootstrap)
            .option("subscribe", kafkaTopic)
            .option("startingOffsets", "latest")
            .option("failOnDataLoss", false)
            .option("maxOffsetsPerTrigger", offsetsPerTrigger)
            .load();

        //raw message to ClickStream
        Dataset<ClickStream> ds1 =
*kafkaStreamSet*.mapPartitions(processClickStreamMessages,
Encoders.bean(ClickStream.class));   

ClickStream.java has 2 child objects  within it  and only one of them will
be populated at a time depending on if the message attribute value is 
either *value1* or *value2*,  

 1) BookingRequest.java if  *value1*,  
 2) PropertyPageView.java if *value2* , 

which I then separate  out as below from clickstream to write to 2 diff
locations in S3,

        //fetch BookingRequests in the ClickStream
        Dataset<BookingRequest> ds2 =
ds1.map(filterBookingRequests,Encoders.bean(BookingRequest.class));

        //fetch PropertyPageViews in the ClickStream
        Dataset<PropertyPageView> ds3 =
ds1.map(filterPropertyPageViews,Encoders.bean(PropertyPageView.class));


finally  ds2 and ds3 are written to 2 different locations ,


       StreamingQuery bookingRequestsParquetStreamWriter =
ds2.writeStream().outputMode("append")
            .format("parquet")
            .trigger(ProcessingTime.create(bookingRequestProcessingTime,
TimeUnit.MILLISECONDS))
            .option("checkpointLocation",  "s3://" + s3Bucket+
"/checkpoint/bookingRequests")
            .partitionBy("eventDate")
            .start("s3://" + s3Bucket+ "/" +  bookingRequestPath);



        StreamingQuery PageViewsParquetStreamWriter =
ds3.writeStream().outputMode("append")
            .format("parquet")
            .trigger(ProcessingTime.create(pageViewProcessingTime,
TimeUnit.MILLISECONDS))
            .option("checkpointLocation",  "s3://" + s3Bucket+
"/checkpoint/PageViews")
            .partitionBy("eventDate")
            .start("s3://" + s3Bucket+ "/" +  pageViewPath);

        bookingRequestsParquetStreamWriter.awaitTermination();

        PageViewsParquetStreamWriter.awaitTermination();



it seems to work  fine and I see data written to different paths when the
apps deployed. But, whenever the job is restarted on failure or on manual
stops and starts, it keeps failing with below exception (where
userSessionEventJoin.global is my topic name),


Caused by: java.lang.IllegalArgumentException: Expected e.g.
{"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}, got
{"userSessionEventJoin.global":{"92":154362528,"101
	at
org.apache.spark.sql.kafka010.JsonUtils$.partitionOffsets(JsonUtils.scala:74)
	at
org.apache.spark.sql.kafka010.KafkaSourceOffset$.apply(KafkaSourceOffset.scala:59)
	at
org.apache.spark.sql.kafka010.KafkaSource$$anon$1.deserialize(KafkaSource.scala:134)
	at
org.apache.spark.sql.kafka010.KafkaSource$$anon$1.deserialize(KafkaSource.scala:123)
	at
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:237)
	at
org.apache.spark.sql.kafka010.KafkaSource.initialPartitionOffsets$lzycompute(KafkaSource.scala:138)


if I delete all the checkpointing information, then it starts again and
starts new checkpointing in the given 2 locations, but that means I have to
start processing from the latest offset again and lose all previous offsets. 
The spark version is 2.1.  Please suggest any resolutions, thanks.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/StructuredStreaming-org-apache-spark-sql-streaming-StreamingQueryException-tp28749.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: StructuredStreaming : StreamingQueryException

Posted by aravias <as...@homeaway.com>.
the bug is related to where long checkpoints are truncated when dealing with
topics have large number of partitions, in my case 120.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/StructuredStreaming-StreamingQueryException-tp28749p28754.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: StructuredStreaming : StreamingQueryException

Posted by aravias <as...@homeaway.com>.
this is a bug in spark version 2.1.0, seems to be fixed in spark 2.1.1 when
ran  with that version.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/StructuredStreaming-StreamingQueryException-tp28749p28753.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org