You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@bahir.apache.org by "lynn (JIRA)" <ji...@apache.org> on 2018/07/25 00:33:00 UTC
[jira] [Created] (BAHIR-175) Recovering from Failures with
Checkpointing Exception(Mqtt)
lynn created BAHIR-175:
--------------------------
Summary: Recovering from Failures with Checkpointing Exception(Mqtt)
Key: BAHIR-175
URL: https://issues.apache.org/jira/browse/BAHIR-175
Project: Bahir
Issue Type: Bug
Components: Spark Structured Streaming Connectors
Reporter: lynn
1. Reading checkpoints offsets error
org.apache.spark.sql.execution.streaming.SerializedOffset cannot be cast to org.apache.spark.sql.execution.streaming.LongOffset
solution:
The MqttStreamSource.scala source file:
Line 149, getBatch Method:
val startIndex = start.getOrElse(LongOffset(0)) match {
case offset: SerializedOffset => offset.json.toInt
case offset: LongOffset => offset.offset.toInt
}
val endIndex = end match {
case offset: SerializedOffset => offset.json.toInt
case offset: LongOffset => offset.offset.toInt
}
2. The MqttStreamSource.scala source file
getBatch Method:
val data: ArrayBuffer[(String, Timestamp)] = ArrayBuffer.empty
// Move consumed messages to persistent store.
(startIndex + 1 to endIndex).foreach { id =>
val element = messages.getOrElse(id, store.retrieve(id))
data += element
store.store(id, element)
messages.remove(id, element)
}
The following line:
val element = messages.getOrElse(id, store.retrieve(id)) throws error:
java.lang.ClassCastException: scala.Tuple2 cannot be cast to scala.runtime.Nothing$
at org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1$$anonfun$3.apply(MQTTStreamSource.scala:160)
at org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1$$anonfun$3.apply(MQTTStreamSource.scala:160)
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
at scala.collection.concurrent.TrieMap.getOrElse(TrieMap.scala:633)
at org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply$mcZI$sp(MQTTStreamSource.scala:160)
at org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply(MQTTStreamSource.scala:159)
at org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply(MQTTStreamSource.scala:159)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource.getBatch(MQTTStreamSource.scala:159)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets$3.apply(StreamExecution.scala:470)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets$3.apply(StreamExecution.scala:466)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets(StreamExecution.scala:466)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(StreamExecution.scala:297)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:294)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:290)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206)
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)