You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by KhajaAsmath Mohammed <md...@gmail.com> on 2018/01/16 21:04:38 UTC

Spark Streaming not reading missed data

Hi,

Spark streaming job from kafka is not picking the messages and is always
taking the latest offsets when streaming job is stopped for 2 hours. It is
not picking up the offsets that are required to be processed from
checkpoint directory.  any suggestions on how to process the old messages
too when there is shutdown or planned maintenance?

     val topics = config.getString(Constants.Properties.KafkaTopics)
      val topicsSet = topics.split(",").toSet
      val kafkaParams = Map[String, String]("metadata.broker.list" ->
config.getString(Constants.Properties.KafkaBrokerList))
      val sparkSession: SparkSession = runMode match {
        case "local" => SparkSession.builder.config(sparkConfig).getOrCreate
        case "yarn"  =>
SparkSession.builder.config(sparkConfig).enableHiveSupport.getOrCreate
      }
      val streamingContext = new
StreamingContext(sparkSession.sparkContext,
Seconds(config.getInt(Constants.Properties.BatchInterval)))

streamingContext.checkpoint(config.getString(Constants.Properties.CheckPointDir))
      val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](streamingContext, kafkaParams, topicsSet)
      val datapointDStream =
messages.map(_._2).map(TransformDatapoint.parseDataPointText)
      lazy val sqlCont = sparkSession.sqlContext

      hiveDBInstance=config.getString("hiveDBInstance")

      TransformDatapoint.readDstreamData(sparkSession,sqlCont,
datapointDStream, runMode, includeIndex, indexNum, datapointTmpTableName,
fencedDPTmpTableName, fencedVINDPTmpTableName,hiveDBInstance)


//transformDstreamData(sparkSession,datapointDStream,runMode,includeIndex,indexNum,datapointTmpTableName,fencedDPTmpTableName,fencedVINDPTmpTableName);

streamingContext.checkpoint(config.getString(Constants.Properties.CheckPointDir))
      streamingContext.start()
      streamingContext.awaitTermination()
      streamingContext.stop(stopSparkContext = true, stopGracefully = true)

Thanks,
Asmath

Re: Spark Streaming not reading missed data

Posted by "vijay.bvp" <bv...@gmail.com>.
you are creating streaming context each time

val streamingContext = new StreamingContext(sparkSession.sparkContext,
Seconds(config.getInt(Constants.Properties.BatchInterval)))

if you want fault-tolerance, to read from where it stopped between spark job
restarts, the correct way is to restore streaming context from the
checkpoint directory

// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory,
functionToCreateContext _)

please refer here for checkpointing and to achieve fault-tolerance in case
of driver failures
checkpointing
<https://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing>  

hope this helps





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Spark Streaming not reading missed data

Posted by KhajaAsmath Mohammed <md...@gmail.com>.
sometimes I get this messages in logs but the job still runs. do you have
solution on how to fix this? I have added the code in my earlier email.

Exception in thread "pool-22-thread-9" java.lang.NullPointerException

at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(
Checkpoint.scala:233)

at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1145)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

On Tue, Jan 16, 2018 at 3:16 PM, Jörn Franke <jo...@gmail.com> wrote:

> It could be a missing persist before the checkpoint
>
> > On 16. Jan 2018, at 22:04, KhajaAsmath Mohammed <md...@gmail.com>
> wrote:
> >
> > Hi,
> >
> > Spark streaming job from kafka is not picking the messages and is always
> taking the latest offsets when streaming job is stopped for 2 hours. It is
> not picking up the offsets that are required to be processed from
> checkpoint directory.  any suggestions on how to process the old messages
> too when there is shutdown or planned maintenance?
> >
> >      val topics = config.getString(Constants.Properties.KafkaTopics)
> >       val topicsSet = topics.split(",").toSet
> >       val kafkaParams = Map[String, String]("metadata.broker.list" ->
> config.getString(Constants.Properties.KafkaBrokerList))
> >       val sparkSession: SparkSession = runMode match {
> >         case "local" => SparkSession.builder.config(
> sparkConfig).getOrCreate
> >         case "yarn"  => SparkSession.builder.config(sparkConfig).
> enableHiveSupport.getOrCreate
> >       }
> >       val streamingContext = new StreamingContext(sparkSession.sparkContext,
> Seconds(config.getInt(Constants.Properties.BatchInterval)))
> >       streamingContext.checkpoint(config.getString(Constants.
> Properties.CheckPointDir))
> >       val messages = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](streamingContext, kafkaParams, topicsSet)
> >       val datapointDStream = messages.map(_._2).map(TransformDatapoint.
> parseDataPointText)
> >       lazy val sqlCont = sparkSession.sqlContext
> >
> >       hiveDBInstance=config.getString("hiveDBInstance")
> >
> >       TransformDatapoint.readDstreamData(sparkSession,sqlCont,
> datapointDStream, runMode, includeIndex, indexNum, datapointTmpTableName,
> fencedDPTmpTableName, fencedVINDPTmpTableName,hiveDBInstance)
> >
> >       //transformDstreamData(sparkSession,datapointDStream,
> runMode,includeIndex,indexNum,datapointTmpTableName,fencedDPTmpTableName,
> fencedVINDPTmpTableName);
> >       streamingContext.checkpoint(config.getString(Constants.
> Properties.CheckPointDir))
> >       streamingContext.start()
> >       streamingContext.awaitTermination()
> >       streamingContext.stop(stopSparkContext = true, stopGracefully =
> true)
> >
> > Thanks,
> > Asmath
>

Re: Spark Streaming not reading missed data

Posted by Jörn Franke <jo...@gmail.com>.
It could be a missing persist before the checkpoint

> On 16. Jan 2018, at 22:04, KhajaAsmath Mohammed <md...@gmail.com> wrote:
> 
> Hi,
> 
> Spark streaming job from kafka is not picking the messages and is always taking the latest offsets when streaming job is stopped for 2 hours. It is not picking up the offsets that are required to be processed from checkpoint directory.  any suggestions on how to process the old messages too when there is shutdown or planned maintenance?
> 
>      val topics = config.getString(Constants.Properties.KafkaTopics)
>       val topicsSet = topics.split(",").toSet
>       val kafkaParams = Map[String, String]("metadata.broker.list" -> config.getString(Constants.Properties.KafkaBrokerList))
>       val sparkSession: SparkSession = runMode match {
>         case "local" => SparkSession.builder.config(sparkConfig).getOrCreate
>         case "yarn"  => SparkSession.builder.config(sparkConfig).enableHiveSupport.getOrCreate
>       }
>       val streamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(config.getInt(Constants.Properties.BatchInterval)))
>       streamingContext.checkpoint(config.getString(Constants.Properties.CheckPointDir))
>       val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topicsSet)
>       val datapointDStream = messages.map(_._2).map(TransformDatapoint.parseDataPointText)
>       lazy val sqlCont = sparkSession.sqlContext
>       
>       hiveDBInstance=config.getString("hiveDBInstance")
>       
>       TransformDatapoint.readDstreamData(sparkSession,sqlCont, datapointDStream, runMode, includeIndex, indexNum, datapointTmpTableName, fencedDPTmpTableName, fencedVINDPTmpTableName,hiveDBInstance)
>       
>       //transformDstreamData(sparkSession,datapointDStream,runMode,includeIndex,indexNum,datapointTmpTableName,fencedDPTmpTableName,fencedVINDPTmpTableName);
>       streamingContext.checkpoint(config.getString(Constants.Properties.CheckPointDir))
>       streamingContext.start()
>       streamingContext.awaitTermination()
>       streamingContext.stop(stopSparkContext = true, stopGracefully = true)
> 
> Thanks,
> Asmath

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org