You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by yunchen liu <yu...@hotmail.com> on 2019/10/17 06:45:35 UTC

spark streaming kafka010 consumer thow l lot of errors

Hey everyone,

    spark streaming consumer kafka010 throw a lot of errors, spark program does not exit, but after running for some time many Executor loss and no error.

    spark  version: 2.4.3
    kafka version: 0.10

org.apache.kafka.common.errors.InterruptException: java.lang.InterruptedException
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeThrowInterruptException(ConsumerNetworkClient.java:447)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:254)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:205)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:137)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:307)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1138)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:73)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
at streaming.VehicelVsLite$$anonfun$main$3$$anonfun$1.apply(VehicelVsLite.scala:139)
at streaming.VehicelVsLite$$anonfun$main$3$$anonfun$1.apply(VehicelVsLite.scala:135)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.InterruptedException

at streaming.VehicelVsLite$$anonfun$main$3$$anonfun$1.apply(VehicelVsLite.scala:139)
at streaming.VehicelVsLite$$anonfun$main$3$$anonfun$1.apply(VehicelVsLite.scala:135)


val vsRDD = v.mapPartitions(messageIt => {
  val signalBuffer = new ArrayBuffer[String]()
  while(messageIt.hasNext){
    try {
      val record = messageIt.next()
      val message = record.value()
      val topic = record.topic()

      val uncompressed = FileUtils.uncompressGz(message)
      val signal = new String(uncompressed, "UTF-8")
      val rq = JSON.parseObject(signal)
      val vehType=rq.getString("VehicleModel")
      //TODO 只解析as28lite
      if(vehType!=null){
        if(vehType.equals("100021")){
          val oem = "saic"
          val ret = VsParserForSaic.process(oem,rq)
          signalBuffer +=ret
        }
      }
    }  catch {
      case inte: InterruptedException = >
        log.error("Publisher thread interrupted. Exception: {}.", record)
      case ex: Exception =>
        ex.printStackTrace()
        log.info(ex.getMessage)
    }
  }
  signalBuffer.iterator
})

any solution for this issue ?

Many Thanks.
YunKillEroe