You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Alchemist <al...@gmail.com> on 2018/09/28 09:13:28 UTC

How to repartition Spark DStream Kafka ConsumerRecord RDD.

 How to repartition Spark DStream Kafka ConsumerRecord RDD.  I am getting uneven size of Kafka topics.. We want to repartition the input RDD based on some logic.
 But when I try to apply the repartition I am getting "object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord" error, I found following workaround
 https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html
Call rdd.forEachPartition and create the NotSerializable object in there like this:rdd.forEachPartition(iter -> {  NotSerializable notSerializable = new NotSerializable();
  // ...Now process iter});
APPLIED HERE
 val stream =KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParam) ).map(_.value())      stream.foreachRDD { rdd =>        val repartitionRDD = flow.repartitionRDD(rdd,1)        println("&&&&&&&&&&&&&& repartitionRDD " + repartitionRDD.count())       val modifiedRDD = rdd.mapPartitions {           iter =>{            val customerRecords: List[ConsumerRecord[String, String]] = List[ConsumerRecord[String, String]]()             while(iter.hasNext){                  val consumerRecord :ConsumerRecord[String, String] = iter.next()                  customerRecords:+ consumerRecord             }             customerRecords.iterator          }        }        val r = modifiedRDD.repartition(1)        println("************* after repartition " + r.count())
BUT still getting same object not Serializable error.   Any help is greatly appreciated.