You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Nicolas PHUNG (JIRA)" <ji...@apache.org> on 2015/06/02 16:24:21 UTC

[jira] [Comment Edited] (SPARK-7122) KafkaUtils.createDirectStream - unreasonable processing time in absence of load

    [ https://issues.apache.org/jira/browse/SPARK-7122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14569147#comment-14569147 ] 

Nicolas PHUNG edited comment on SPARK-7122 at 6/2/15 2:23 PM:
--------------------------------------------------------------

Mode 1 (old Kafka Streaming API)
{code}
object HotFCANextGen {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("MyApp")

    sparkConf.setIfMissing("spark.master", "local[*]")

    sparkConf.set("spark.eventLog.overwrite", "true")
    sparkConf.set("spark.shuffle.consolidateFiles", "true")

    val ssc = new StreamingContext(sparkConf, Seconds(2))

    val kafkaParams = Map[String, String]("metadata.broker.list" -> kafkaBrokers, "group.id" -> groupId, "zookeeper.connect" -> zookeeperConnect, "auto.offset.reset" -> "smallest", "schema.registry.url" -> kafkaSchemaRegistryUrl)
    val topicMap = kafkaTopics.split(",").map((_, 2)).toMap

    val messages = KafkaUtils.createStream[Object, Object, KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK)

    val analyticsEventsRaw = messages.map(_._2)

    val analyticEventStream: DStream[AnalyticEventEnriched] = analyticsEventsRaw.map(
      avroMessage => {
        val record: GenericRecord = avroMessage.asInstanceOf[GenericRecord]
        val analyticEvent: AnalyticEventEnriched = records.AnalyticEventEnrichedRecordReader.read(record)
        analyticEvent
      }
    )

    analyticEventStream.foreachRDD(rdd => {
      rdd.take(11).foreach(
        element =>
          println(s"Saving to ES $element"))
      EsSpark.saveJsonToEs(rdd, esIndex)
    })

    sys.ShutdownHookThread {
      ssc.stop(true, true)
    }

    ssc.start()
    ssc.awaitTermination()
  }
}
{code}

Mode 2 (new Kafka Streaming API with checkpoint)
{code}
object HotFCANextGen {
  def main(args: Array[String]) {
    val ssc = StreamingContext.getOrCreate(checkpointDirectory, createContext)

    sys.ShutdownHookThread {
      ssc.stop(true, true)
    }

    ssc.start()
    ssc.awaitTermination()
  }

  def createContext(): StreamingContext = {
    val sparkConf = new SparkConf().setAppName("MyApp")

    sparkConf.setIfMissing("spark.master", "local[*]")

    sparkConf.set("spark.eventLog.overwrite", "true")
    sparkConf.set("spark.shuffle.consolidateFiles", "true")

    val ssc = new StreamingContext(sparkConf, Seconds(2))

    val kafkaParams = Map[String, String]("metadata.broker.list" -> kafkaBrokers, "group.id" -> groupId, "zookeeper.connect" -> zookeeperConnect, "auto.offset.reset" -> "smallest", "schema.registry.url" -> kafkaSchemaRegistryUrl)
    val topicsSet = kafkaTopics.split(",").toSet

    val messages = KafkaUtils.createDirectStream[Object, Object, KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet)

    val analyticsEventsRaw = messages.map(_._2)

    val analyticEventStream: DStream[AnalyticEventEnriched] = analyticsEventsRaw.map(
      avroMessage => {

        val record: GenericRecord = avroMessage.asInstanceOf[GenericRecord]

        // Main avro record analytic event
        val analyticEvent: AnalyticEventEnriched = records.AnalyticEventEnrichedRecordReader.read(record)

        analyticEvent
      }
    )

    analyticEventStream.foreachRDD(rdd => {
      rdd.take(11).foreach(
        element =>
          println(s"Saving to ES $element"))
      EsSpark.saveJsonToEs(rdd, esIndex)
    })

    ssc.checkpoint(checkpointDirectory)
    ssc
  }
}
{code}

Strip to bare minimun. Basically, I'm reading Avro format message from Kafka and enrich it after read and persist it to ElasticSearch (the enrich process are the same for both case). There's a checkpoint too for the mode 2 and not in mode 1. I hope this can help.


was (Author: nphung):
Mode 1 (old Kafka Streaming API)
{code}
object HotFCANextGen {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("MyApp")

    sparkConf.setIfMissing("spark.master", "local[*]")

    sparkConf.set("spark.eventLog.overwrite", "true")
    sparkConf.set("spark.shuffle.consolidateFiles", "true")

    val ssc = new StreamingContext(sparkConf, Seconds(2))

    val kafkaParams = Map[String, String]("metadata.broker.list" -> kafkaBrokers, "group.id" -> groupId, "zookeeper.connect" -> zookeeperConnect, "auto.offset.reset" -> "smallest", "schema.registry.url" -> kafkaSchemaRegistryUrl)
    val topicMap = kafkaTopics.split(",").map((_, 2)).toMap

    val messages = KafkaUtils.createStream[Object, Object, KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK)

    val analyticsEventsRaw = messages.map(_._2)

    val analyticEventStream: DStream[AnalyticEventEnriched] = analyticsEventsRaw.map(
      avroMessage => {
        val record: GenericRecord = avroMessage.asInstanceOf[GenericRecord]
        val analyticEvent: AnalyticEventEnriched = records.AnalyticEventEnrichedRecordReader.read(record)
        analyticEvent
      }
    )

    analyticEventStream.foreachRDD(rdd => {
      rdd.take(11).foreach(
        element =>
          println(s"Saving to ES $element"))
      EsSpark.saveJsonToEs(rdd, esIndex)
    })

    sys.ShutdownHookThread {
      ssc.stop(true, true)
    }

    ssc.start()
    ssc.awaitTermination()
  }
}
{code}

Mode 2 (new Kafka Streaming API with checkpoint)
{code}
object HotFCANextGen {
  def main(args: Array[String]) {
    val ssc = StreamingContext.getOrCreate(checkpointDirectory, createContext)

    sys.ShutdownHookThread {
      ssc.stop(true, true)
    }

    ssc.start()
    ssc.awaitTermination()
  }

  def createContext(): StreamingContext = {
    val sparkConf = new SparkConf().setAppName("MyApp")

    sparkConf.setIfMissing("spark.master", "local[*]")

    sparkConf.set("spark.eventLog.overwrite", "true")
    sparkConf.set("spark.shuffle.consolidateFiles", "true")

    val ssc = new StreamingContext(sparkConf, Seconds(2))

    val kafkaParams = Map[String, String]("metadata.broker.list" -> kafkaBrokers, "group.id" -> groupId, "zookeeper.connect" -> zookeeperConnect, "auto.offset.reset" -> "smallest", "schema.registry.url" -> kafkaSchemaRegistryUrl)
    val topicsSet = kafkaTopics.split(",").toSet

    val messages = KafkaUtils.createDirectStream[Object, Object, KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet)

    val analyticsEventsRaw = messages.map(_._2)

    val analyticEventStream: DStream[AnalyticEventEnriched] = analyticsEventsRaw.map(
      avroMessage => {

        val record: GenericRecord = avroMessage.asInstanceOf[GenericRecord]

        // Main avro record analytic event
        val analyticEvent: AnalyticEventEnriched = records.AnalyticEventEnrichedRecordReader.read(record)

        analyticEvent
      }
    )

    analyticEventStream.foreachRDD(rdd => {
      rdd.take(11).foreach(
        element =>
          println(s"Saving to ES $element"))
      EsSpark.saveJsonToEs(rdd, esIndex)
    })

    ssc.checkpoint(checkpointDirectory)
    ssc
  }
}
{code}

Strip to bare minimun. Basically, I'm reading Avro format message from Kafka and enrich it after read and persist it to ElasticSearch (the enrich process are the same for both case). There's a checkpoint too for the mode 2 and not in mode 1

> KafkaUtils.createDirectStream - unreasonable processing time in absence of load
> -------------------------------------------------------------------------------
>
>                 Key: SPARK-7122
>                 URL: https://issues.apache.org/jira/browse/SPARK-7122
>             Project: Spark
>          Issue Type: Question
>          Components: Streaming
>    Affects Versions: 1.3.1
>         Environment: Spark Streaming 1.3.1, standalone mode running on just 1 box: Ubuntu 14.04.2 LTS, 4 cores, 8GB RAM, java version "1.8.0_40"
>            Reporter: Platon Potapov
>            Priority: Minor
>         Attachments: 10.second.window.fast.job.txt, 5.second.window.slow.job.txt, SparkStreamingJob.scala
>
>
> attached is the complete source code of a test spark job. no external data generators are run - just the presence of a kafka topic named "raw" suffices.
> the spark job is run with no load whatsoever. http://localhost:4040/streaming is checked to obtain job processing duration.
> * in case the test contains the following transformation:
> {code}
>     // dummy transformation
>     val temperature = bytes.filter(_._1 == "abc")
>     val abc = temperature.window(Seconds(40), Seconds(5))
>     abc.print()
> {code}
> the median processing time is 3 seconds 80 ms
> * in case the test contains the following transformation:
> {code}
>     // dummy transformation
>     val temperature = bytes.filter(_._1 == "abc")
>     val abc = temperature.map(x => (1, x))
>     abc.print()
> {code}
> the median processing time is just 50 ms
> please explain why does the "window" transformation introduce such a growth of job duration?
> note: the result is the same regardless of the number of kafka topic partitions (I've tried 1 and 8)
> note2: the result is the same regardless of the window parameters (I've tried (20, 2) and (40, 5))



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org