You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by KhajaAsmath Mohammed <md...@gmail.com> on 2018/01/16 21:04:38 UTC
Spark Streaming not reading missed data
Hi,
Spark streaming job from kafka is not picking the messages and is always
taking the latest offsets when streaming job is stopped for 2 hours. It is
not picking up the offsets that are required to be processed from
checkpoint directory. any suggestions on how to process the old messages
too when there is shutdown or planned maintenance?
val topics = config.getString(Constants.Properties.KafkaTopics)
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" ->
config.getString(Constants.Properties.KafkaBrokerList))
val sparkSession: SparkSession = runMode match {
case "local" => SparkSession.builder.config(sparkConfig).getOrCreate
case "yarn" =>
SparkSession.builder.config(sparkConfig).enableHiveSupport.getOrCreate
}
val streamingContext = new
StreamingContext(sparkSession.sparkContext,
Seconds(config.getInt(Constants.Properties.BatchInterval)))
streamingContext.checkpoint(config.getString(Constants.Properties.CheckPointDir))
val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](streamingContext, kafkaParams, topicsSet)
val datapointDStream =
messages.map(_._2).map(TransformDatapoint.parseDataPointText)
lazy val sqlCont = sparkSession.sqlContext
hiveDBInstance=config.getString("hiveDBInstance")
TransformDatapoint.readDstreamData(sparkSession,sqlCont,
datapointDStream, runMode, includeIndex, indexNum, datapointTmpTableName,
fencedDPTmpTableName, fencedVINDPTmpTableName,hiveDBInstance)
//transformDstreamData(sparkSession,datapointDStream,runMode,includeIndex,indexNum,datapointTmpTableName,fencedDPTmpTableName,fencedVINDPTmpTableName);
streamingContext.checkpoint(config.getString(Constants.Properties.CheckPointDir))
streamingContext.start()
streamingContext.awaitTermination()
streamingContext.stop(stopSparkContext = true, stopGracefully = true)
Thanks,
Asmath
Re: Spark Streaming not reading missed data
Posted by "vijay.bvp" <bv...@gmail.com>.
you are creating streaming context each time
val streamingContext = new StreamingContext(sparkSession.sparkContext,
Seconds(config.getInt(Constants.Properties.BatchInterval)))
if you want fault-tolerance, to read from where it stopped between spark job
restarts, the correct way is to restore streaming context from the
checkpoint directory
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory,
functionToCreateContext _)
please refer here for checkpointing and to achieve fault-tolerance in case
of driver failures
checkpointing
<https://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing>
hope this helps
--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org
Re: Spark Streaming not reading missed data
Posted by KhajaAsmath Mohammed <md...@gmail.com>.
sometimes I get this messages in logs but the job still runs. do you have
solution on how to fix this? I have added the code in my earlier email.
Exception in thread "pool-22-thread-9" java.lang.NullPointerException
at org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(
Checkpoint.scala:233)
at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
On Tue, Jan 16, 2018 at 3:16 PM, Jörn Franke <jo...@gmail.com> wrote:
> It could be a missing persist before the checkpoint
>
> > On 16. Jan 2018, at 22:04, KhajaAsmath Mohammed <md...@gmail.com>
> wrote:
> >
> > Hi,
> >
> > Spark streaming job from kafka is not picking the messages and is always
> taking the latest offsets when streaming job is stopped for 2 hours. It is
> not picking up the offsets that are required to be processed from
> checkpoint directory. any suggestions on how to process the old messages
> too when there is shutdown or planned maintenance?
> >
> > val topics = config.getString(Constants.Properties.KafkaTopics)
> > val topicsSet = topics.split(",").toSet
> > val kafkaParams = Map[String, String]("metadata.broker.list" ->
> config.getString(Constants.Properties.KafkaBrokerList))
> > val sparkSession: SparkSession = runMode match {
> > case "local" => SparkSession.builder.config(
> sparkConfig).getOrCreate
> > case "yarn" => SparkSession.builder.config(sparkConfig).
> enableHiveSupport.getOrCreate
> > }
> > val streamingContext = new StreamingContext(sparkSession.sparkContext,
> Seconds(config.getInt(Constants.Properties.BatchInterval)))
> > streamingContext.checkpoint(config.getString(Constants.
> Properties.CheckPointDir))
> > val messages = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](streamingContext, kafkaParams, topicsSet)
> > val datapointDStream = messages.map(_._2).map(TransformDatapoint.
> parseDataPointText)
> > lazy val sqlCont = sparkSession.sqlContext
> >
> > hiveDBInstance=config.getString("hiveDBInstance")
> >
> > TransformDatapoint.readDstreamData(sparkSession,sqlCont,
> datapointDStream, runMode, includeIndex, indexNum, datapointTmpTableName,
> fencedDPTmpTableName, fencedVINDPTmpTableName,hiveDBInstance)
> >
> > //transformDstreamData(sparkSession,datapointDStream,
> runMode,includeIndex,indexNum,datapointTmpTableName,fencedDPTmpTableName,
> fencedVINDPTmpTableName);
> > streamingContext.checkpoint(config.getString(Constants.
> Properties.CheckPointDir))
> > streamingContext.start()
> > streamingContext.awaitTermination()
> > streamingContext.stop(stopSparkContext = true, stopGracefully =
> true)
> >
> > Thanks,
> > Asmath
>
Re: Spark Streaming not reading missed data
Posted by Jörn Franke <jo...@gmail.com>.
It could be a missing persist before the checkpoint
> On 16. Jan 2018, at 22:04, KhajaAsmath Mohammed <md...@gmail.com> wrote:
>
> Hi,
>
> Spark streaming job from kafka is not picking the messages and is always taking the latest offsets when streaming job is stopped for 2 hours. It is not picking up the offsets that are required to be processed from checkpoint directory. any suggestions on how to process the old messages too when there is shutdown or planned maintenance?
>
> val topics = config.getString(Constants.Properties.KafkaTopics)
> val topicsSet = topics.split(",").toSet
> val kafkaParams = Map[String, String]("metadata.broker.list" -> config.getString(Constants.Properties.KafkaBrokerList))
> val sparkSession: SparkSession = runMode match {
> case "local" => SparkSession.builder.config(sparkConfig).getOrCreate
> case "yarn" => SparkSession.builder.config(sparkConfig).enableHiveSupport.getOrCreate
> }
> val streamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(config.getInt(Constants.Properties.BatchInterval)))
> streamingContext.checkpoint(config.getString(Constants.Properties.CheckPointDir))
> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topicsSet)
> val datapointDStream = messages.map(_._2).map(TransformDatapoint.parseDataPointText)
> lazy val sqlCont = sparkSession.sqlContext
>
> hiveDBInstance=config.getString("hiveDBInstance")
>
> TransformDatapoint.readDstreamData(sparkSession,sqlCont, datapointDStream, runMode, includeIndex, indexNum, datapointTmpTableName, fencedDPTmpTableName, fencedVINDPTmpTableName,hiveDBInstance)
>
> //transformDstreamData(sparkSession,datapointDStream,runMode,includeIndex,indexNum,datapointTmpTableName,fencedDPTmpTableName,fencedVINDPTmpTableName);
> streamingContext.checkpoint(config.getString(Constants.Properties.CheckPointDir))
> streamingContext.start()
> streamingContext.awaitTermination()
> streamingContext.stop(stopSparkContext = true, stopGracefully = true)
>
> Thanks,
> Asmath
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org