You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2017/08/26 20:38:00 UTC

[jira] [Resolved] (SPARK-21844) Checkpointing issue in Spark Streaming involving Dataframes

     [ https://issues.apache.org/jira/browse/SPARK-21844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sean Owen resolved SPARK-21844.
-------------------------------
    Resolution: Invalid

"Why isn't this working" questions are better for StackOverflow or the mailing list.

> Checkpointing issue in Spark Streaming involving Dataframes
> -----------------------------------------------------------
>
>                 Key: SPARK-21844
>                 URL: https://issues.apache.org/jira/browse/SPARK-21844
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.1.0
>         Environment: Spark 2.1.0 , Kafka  0.10
>            Reporter: Chandramouli Muthukumaran
>
> I have started spark streaming recently and implementing checkpoint. I'm storing the checkpoint in HDFS. when the streaming failed it's able to go back to the last checkpoint but getting NullPointerException and the streaming job is getting killed. I'm able to see the checkpoints in HDFS. Not sure why I'm getting the exception even though there is chckpoint in HDFS. Any inputs will be helpful. Not sure if it is a bug
> {code:java}
> package ca.twitter2
> import org.apache.kafka.clients._
> import org.apache.kafka._
> import org.apache.kafka.clients.consumer.ConsumerConfig
> import org.apache.kafka.clients._
> import org.apache.spark._
> import org.apache.spark.streaming._
> import org.apache.log4j._
> import org.apache.spark.streaming.kafka010._
> import org.apache.spark.streaming.kafka010.KafkaUtils
> import java.util.HashMap
> import org.apache.spark.sql._
> import org.apache.spark.sql.functions._
> import org.apache.log4j.{Level, Logger}
> object NGINXLogProcessingWindowedwithcheckpointv2 {    
>      case class AccessLog(Datetime: String, requesterip: String, httpcode: String, method: String, serverip2: String, responsetime: String, operation: String, application: String)
>       val checkpointDir = "hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccesslogcheckpoint6"
>       val WINDOW_LENGTH = Seconds(43200)
>       val SLIDE_INTERVAL = Seconds(120)
>       
>    def creatingFunc(): StreamingContext = {
>    println("Creating new context")
>    val sparkConf = new SparkConf().setAppName("NGINXLogAnalysiswindowedwithcheckpoint")
>    
>   .setMaster("local")
>    val ssc = new StreamingContext(sparkConf, Seconds(120))
>     sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
>    //val checkpointDir = "hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccesslogcheckpoint"
> 	    
> 	  ssc.checkpoint(checkpointDir)
> 	  	
> 	 val spark = SparkSession
>                    .builder()                  
>                    .getOrCreate()
>                    
>        val topics = List("REST").toSet
>        // Logger.getLogger("org").setLevel(Level.ERROR)
>         //Logger.getLogger("akka").setLevel(Level.ERROR)
>        val kafkaParams = Map[String, Object](
>                          "bootstrap.servers" -> "10.24.18.36:6667",
>                           //"bootstrap.servers" -> "10.71.52.119:9092",
>           // "bootstrap.servers" -> "192.168.123.36:6667",
>                          "group.id" -> "2",
>                           ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->"org.apache.kafka.common.serialization.StringDeserializer",
>                           ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
>                           "auto.offset.reset" -> "latest",
>                           "enable.auto.commit" -> (false: java.lang.Boolean)
>                                              )
>    
>                // Create the direct stream with the Kafka parameters and topics
>       val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
>       val  kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, consumerStrategy)
>           //kafkaStream.checkpoint(Seconds(600))
>       val lines = kafkaStream.map(_.value()).repartition(4)
>       val lineswindowed =lines.window(WINDOW_LENGTH, SLIDE_INTERVAL)
>       val lines2= lineswindowed.map(_.split(","))
>       val lines4slide= lines2.map(p => AccessLog(p(0),p(2).toString,p(4).toString,p(3).toString, p(8).toString, p(7).toString, p(10), p(12)))
>             lines4slide.foreachRDD { rdd2 =>
>                if (!rdd2.isEmpty) {
>                     val count = rdd2.count
>                     println("count received " + count)
>                     import org.apache.spark.sql.functions._
>                     import spark.implicits._
>                     rdd2.count
>                    rdd2.checkpoint   
>                     
>                     val LogDF = rdd2.toDF()
>                                          LogDF.createOrReplaceTempView("Log")
>                     val LogDFslide = LogDF.select($"Datetime",$"requesterip".cast("string"),$"httpcode",expr("(split(method, ' '))[1]").cast("string").as("request"),expr("(split(method, ' '))[2]").cast("string").as("webserviceurl"),expr("(split(method, ' '))[3]").cast("string").as("protocol"), $"serverip2", $"responsetime",expr("(split(operation, '/'))[4]").cast("string").as("operationtype"), $"application".cast("string"))
>                     LogDFslide.createOrReplaceTempView("LogDFslide")
>                     //LogDFslide.printSchema()
>                     //LogDFslide.show
>                     val Log2DFslide = spark.sql("SELECT Datetime,requesterip,httpcode, substring(request,2,length(request))as request2,webserviceurl, protocol, serverip2, split(webserviceurl, '/')[3] as webservice3, responsetime, substring(operationtype,1,length(operationtype)-4) as httpsoapaction, application FROM LogDFslide")
>                     Log2DFslide.createOrReplaceTempView("Log2DFslide")
>                     val Log2DFslideoutput = spark.sql("SELECT Datetime,requesterip,httpcode, request2,webserviceurl, protocol, serverip2, split(webservice3, '[?]')[0] as webservice, responsetime, httpsoapaction, application FROM Log2DFslide")                    // Log2DFslide.show   
>                     //println("printing line3")
>                     //Log2DFslideoutput.show
>                    // Log2DFslideoutput.write.mode(SaveMode.Overwrite).orc("hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccessLogWindowedcheckpointed");
>                     val   log2DFFilter = spark.sql("SELECT Datetime,requesterip,httpcode, request2,webserviceurl, protocol, serverip2, split(webservice3, '[?]')[0] as webservice2, responsetime, httpsoapaction, application from  Log2DFslide where responsetime <>'-' and responsetime <>'' ")  
>                     log2DFFilter.createOrReplaceTempView("log2DFFilter")
>                     //log2DFFilter.printSchema()
>                     log2DFFilter.show
>                     val Log3DFslide = spark.sql( "Select initcap(webservice2) as webservice, round(avg(responsetime),4) as Averageresponsetime from log2DFFilter where webservice2 <>'' group by initcap(webservice2) ")
>                     // val  Log3DFslide =  log2DFFilter.select(expr("initcap(webservice2)"), expr("round(avg(responsetime),4)").as("Averageresponsetime")  ).groupBy(expr("initcap(webservice2)"))
>                 
>                     // Log3DFslide.printSchema()
>                     Log3DFslide.createOrReplaceTempView("Log3DFslide")
>                             
>                      Log3DFslide.show
>                     //Log3DFslide.write.mode(SaveMode.Overwrite).orc("hdfs://ln461.alt.calgary.ca:8020/user/NGINXAccessLogstatistics");
>                            
>                                  }
>                
>                                   } 
>                 
> 								  ssc
>    
>       } 
>        def main(args: Array[String]) { 
>     
>     
>         val context = StreamingContext.getActiveOrCreate(checkpointDir, creatingFunc _)
>    
>         //val ssc = StreamingContext.getOrCreate(checkpointDir,() => { creatingFunc(checkpointDir) })
>         context.start() 
>         context.awaitTermination() 
>      }
>         
>       
> }
> {code}
> I get the following error@
> {code:java}
> 17/08/26 13:41:00 ERROR JobScheduler: Error running job streaming job 1503776400000 ms.0
> java.lang.NullPointerException
> 	at org.apache.spark.sql.SQLImplicits.rddToDatasetHolder(SQLImplicits.scala:165)
> 	at ca.twitter2.NGINXLogProcessingWindowedwithcheckpointv2$$anonfun$creatingFunc$1.apply(NGINXLogProcessingWindowedwithcheckpointv2.scala:69)
> 	at ca.twitter2.NGINXLogProcessingWindowedwithcheckpointv2$$anonfun$creatingFunc$1.apply(NGINXLogProcessingWindowedwithcheckpointv2.scala:60)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
> 	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
> 	at scala.util.Try$.apply(Try.scala:192)
> 	at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
> 	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:254)
> 	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254)
> 	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254)
> 	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> 	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:253)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> 	at java.lang.Thread.run(Unknown Source)
> Exception in thread "main" java.lang.NullPointerException
> 	at org.apache.spark.sql.SQLImplicits.rddToDatasetHolder(SQLImplicits.scala:165)
> 	at ca.twitter2.NGINXLogProcessingWindowedwithcheckpointv2$$anonfun$creatingFunc$1.apply(NGINXLogProcessingWindowedwithcheckpointv2.scala:69)
> 	at ca.twitter2.NGINXLogProcessingWindowedwithcheckpointv2$$anonfun$creatingFunc$1.apply(NGINXLogProcessingWindowedwithcheckpointv2.scala:60)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
> 	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
> 	at scala.util.Try$.apply(Try.scala:192)
> 	at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
> 	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:254)
> 	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254)
> 	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254)
> 	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> 	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:253)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> 	at java.lang.Thread.run(Unknown Source)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org