You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by yunchen liu <yu...@hotmail.com> on 2019/10/17 06:45:35 UTC
spark streaming kafka010 consumer thow l lot of errors
Hey everyone,
spark streaming consumer kafka010 throw a lot of errors, spark program does not exit, but after running for some time many Executor loss and no error.
spark version: 2.4.3
kafka version: 0.10
org.apache.kafka.common.errors.InterruptException: java.lang.InterruptedException
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeThrowInterruptException(ConsumerNetworkClient.java:447)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:254)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:205)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:137)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:307)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1138)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:73)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
at streaming.VehicelVsLite$$anonfun$main$3$$anonfun$1.apply(VehicelVsLite.scala:139)
at streaming.VehicelVsLite$$anonfun$main$3$$anonfun$1.apply(VehicelVsLite.scala:135)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.InterruptedException
at streaming.VehicelVsLite$$anonfun$main$3$$anonfun$1.apply(VehicelVsLite.scala:139)
at streaming.VehicelVsLite$$anonfun$main$3$$anonfun$1.apply(VehicelVsLite.scala:135)
val vsRDD = v.mapPartitions(messageIt => {
val signalBuffer = new ArrayBuffer[String]()
while(messageIt.hasNext){
try {
val record = messageIt.next()
val message = record.value()
val topic = record.topic()
val uncompressed = FileUtils.uncompressGz(message)
val signal = new String(uncompressed, "UTF-8")
val rq = JSON.parseObject(signal)
val vehType=rq.getString("VehicleModel")
//TODO 只解析as28lite
if(vehType!=null){
if(vehType.equals("100021")){
val oem = "saic"
val ret = VsParserForSaic.process(oem,rq)
signalBuffer +=ret
}
}
} catch {
case inte: InterruptedException = >
log.error("Publisher thread interrupted. Exception: {}.", record)
case ex: Exception =>
ex.printStackTrace()
log.info(ex.getMessage)
}
}
signalBuffer.iterator
})
any solution for this issue ?
Many Thanks.
YunKillEroe