You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "Sunny, Mani Kolbe" <Su...@DNB.com> on 2020/06/24 12:13:59 UTC

KinesisIO checkpointing

Hello,

We are developing a beam pipeline which runs on SparkRunner on streaming mode. This pipeline read from Kinesis, do some translations, filtering and finally output to S3 using AvroIO writer. We are using Fixed windows with triggers based on element count and processing time intervals. Outputs path is partitioned by window start timestamp. allowedLateness=0sec

This is working fine, but I have noticed that whenever we restarts streaming, application is starting to read from Kinesis TRIM_HORIZON. That is, it is not resuming from last checkpoint position. Then I found that the checkpoint directory is based on --jobName and --checkpointDir properties. So I tried running as below:

spark-submit --master yarn --deploy-mode cluster --conf spark.dynamicAllocation.enabled=false \
    --driver-memory 1g --executor-memory 1g --num-executors 1 --executor-cores 1 \
    --class com.dnb.optimus.prime.processor.PrimeStreamProcessor \
    --conf spark.executor.extraClassPath=/etc/hbase/conf \
    /tmp/stream-processor-0.0.0.8-spark.jar \
    --runner=SparkRunner \
    --jobName=PrimeStreamProcessor \
    --checkpointDir=hdfs:///tmp/PrimeStreamProcessor checkpoint \
    --useWindow=true \
    --windowDuration=60s --windowLateness=0s --windowElementCount=1 \
    --maxReadTime=-1 \
    --streaming=true

I can see that it is able to fetch checkpoint data from checkpointDir path provided. But When the driver tries to broadcast this information to executors, it is failing with below exception.
20/06/12 15:35:28 ERROR yarn.Client: Application diagnostics message: User class threw exception: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
        at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
        at org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44)
        ....
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678)
Caused by: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
        at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
        at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)


Any idea? Is resuming from checkpoint position on application restart is actually supported on KinesisIO?

Regards,
Mani



RE: KinesisIO checkpointing

Posted by "Sunny, Mani Kolbe" <Su...@DNB.com>.
Hi Alexey,

It is MetricsContainerStepMapAccumulator which is throwing “Accumulator must be registered” error. If that helps.

Regards,
Mani

From: Alexey Romanenko <ar...@gmail.com>
Sent: Friday, July 10, 2020 5:37 PM
To: user@beam.apache.org
Subject: Re: KinesisIO checkpointing

CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe.

It looks very strange because exactly for this reason, in streaming pipeline we initialise accumulators before starting a pipeline [1]

Btw, why do you set “--maxReadTime=-1”? Can you try to run without it?

[1] https://github.com/apache/beam/blob/44e7e95c73dbf9ed6152a4cda772be9f64b02c72/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java#L202<https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2F44e7e95c73dbf9ed6152a4cda772be9f64b02c72%2Frunners%2Fspark%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fspark%2FSparkRunner.java%23L202&data=02%7C01%7CSunnyM%40dnb.com%7Ceb9c06a9bf44414dc25108d824ef79e3%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637299958269870229&sdata=S38kEKJcimIv8%2FLNXkNTBxPjVSjqm9UA8kAkc3iFeog%3D&reserved=0>


On 10 Jul 2020, at 16:27, Sunny, Mani Kolbe <Su...@DNB.com>> wrote:

This is what is happening now. If I run pipeline, first time it runs fine. If I stop-start, it will attempt to recover from checkpoint info stored on checkpointDir and will fail with an error. I am using spark 2.4.0 and Beam 2.22.0.

Spark Submit command used:
APPNAME=test1
spark-submit --master yarn --deploy-mode cluster --conf spark.dynamicAllocation.enabled=false \
    --driver-memory 1g --executor-memory 1g --num-executors 1 --executor-cores 1 \
    --class com.dnb.optimus.prime.processor.PrimeStreamProcessor \
    --conf spark.executor.extraClassPath=/etc/hbase/conf \
    /tmp/stream-processor-0.0.0.8-spark.jar \
    --runner=SparkRunner --localStackMode=false \
    --jobName=PrimeStreamProcessor-${APPNAME} \
    --checkpointDir=hdfs:///tmp/${APPNAME}/checkpoint<hdfs://tmp/$%7BAPPNAME%7D/checkpoint> \
    --awsRegion=us-east-1 \
    --inputStreamName=gts_in_stream_${APPNAME} \
    --outputStreamName=publish_in_stream_${APPNAME} \
    --outputPath=hdfs:///tmp/mani/${APPNAME}/out<hdfs://tmp/mani/$%7BAPPNAME%7D/out> \
    --sedimentTable=${APPNAME}-ORG_ALERTS-ORG_ALERTS-bq-sediment \
    --useWindow=true \
    --streamFromHorizon=true \
    --windowDuration=60s --windowLateness=10s --windowElementCount=1 \
    --updateSediment=true \
    --useSedimentLookups=true \
    --outputPendToFile=true \
    --outputPendToStream=true \
    --primePipeline=ORG_ALERTS --primeCategory=ORG_ALERTS \
    --maxReadTime=-1 \
    --streaming=true

I can see that it is able to fetch checkpoint data from checkpointDir path provided. But When it tries to broadcast this information, it is failing with below exception.
20/06/12 15:35:28 ERROR yarn.Client: Application diagnostics message: User class threw exception: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
        at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
        at org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44)
        ....
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678)
Caused by: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
        at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
        at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)

As far as I understand this is what happening. Beam sdk need to continuously publish checkpoints/watermarks. In sparkRunner mode, it uses Spark's Accumulator to achieve this (MetricsContainerStepMapAccumulator ?)

In Spark, you need to register your Accumulators upfront to ensure serialization. This is done on SparkRunner SDK, but whatever reason, Accumulator is not getting called before registering.

This happens in Streaming mode with --jobName provided.





















20/07/10 13:31:38 ERROR org.apache.spark.streaming.StreamingContext: Error starting the context, marking it as stopped
org.apache.spark.SparkException: Task not serializable
               at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
               at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
               at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
               at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
               at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:798)
               at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:797)
               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
               at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
               at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:797)
               at org.apache.spark.api.java.JavaRDDLike$class.mapPartitionsToPair(JavaRDDLike.scala:188)
               at org.apache.spark.api.java.AbstractJavaRDDLike.mapPartitionsToPair(JavaRDDLike.scala:45)
               at org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8.lambda$evaluate$905e84cb$1(StreamingTransformTranslator.java:416)
               …
…
               at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
               at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
               at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
               at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
               at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
               at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
               at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
               at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
               at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
               at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
               at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
               at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:234)
               at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:229)
               at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
               at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
               at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:229)
               at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:98)
               at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:103)
               at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:583)
               at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
               at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
               at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
               at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)
               at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
               at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556)
               at org.apache.beam.runners.spark.SparkRunner.lambda$run$0(SparkRunner.java:208)
               at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
               at java.util.concurrent.FutureTask.run(FutureTask.java:266)
               at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
               at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
               at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
               at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
               at sun.reflect.GeneratedMethodAccessor39.invoke(Unknown Source)
               at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
               at java.lang.reflect.Method.invoke(Method.java:498)
               at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1244)
               at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)
               at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
               at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
               at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
               at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
               at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
               at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
               at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
               at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
               at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
               at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
               at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
               at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
               at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
               at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
               at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
               at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:798)
               at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:797)
               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
               at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
               at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:797)
               at org.apache.spark.api.java.JavaRDDLike$class.mapPartitionsToPair(JavaRDDLike.scala:188)
               at org.apache.spark.api.java.AbstractJavaRDDLike.mapPartitionsToPair(JavaRDDLike.scala:45)
               at org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8.lambda$evaluate$905e84cb$1(StreamingTransformTranslator.java:416)
               at org.apache.spark.streaming.api.java.JavaDStreamLike$class.scalaTransform$3(JavaDStreamLike.scala:317)
               at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1.apply(JavaDStreamLike.scala:318)
               at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1.apply(JavaDStreamLike.scala:318)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:667)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:667)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:681)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:679)
               at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:46)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
               at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
               at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
               at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
               at scala.Option.orElse(Option.scala:289)
               …
              …
               at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
               at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
               at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
               at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
               at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
               at scala.collection.immutable.List.foreach(List.scala:392)
               at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
               at scala.collection.immutable.List.map(List.scala:296)
               at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
               at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
               at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
               at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
               at scala.Option.orElse(Option.scala:289)
               at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
               at org.apache.spark.streaming.dstream.FilteredDStream.compute(FilteredDStream.scala:36)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
               at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
               at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
               at scala.Option.orElse(Option.scala:289)
               at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
               at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:36)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
               at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
               at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
               at scala.Option.orElse(Option.scala:289)
               at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
               at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
               at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
               at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
               at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
               at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
               at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
               at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
               at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
               at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
               at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
               at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:234)
               at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:229)
               at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
               at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
               at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:229)
               at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:98)
               at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:103)
               at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:583)
               at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
               at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
               at org.apache.spark.util.ThreadUtils$$anon$2.run(ThreadUtils.scala:145)
20/07/10 13:31:38 WARN org.apache.spark.streaming.util.BatchedWriteAheadLog: BatchedWriteAheadLog Writer queue interrupted.
20/07/10 13:31:38 WARN org.apache.spark.streaming.StreamingContext: StreamingContext has already been stopped
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
               at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
               at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:102)
               at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:90)
               at com.dnb.optimus.prime.processor.PrimeStreamProcessor.executeStreamProcessing(PrimeStreamProcessor.java:123)
               at com.dnb.optimus.prime.processor.PrimeStreamProcessor.main(PrimeStreamProcessor.java:140)
Caused by: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
               at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
               at sun.reflect.GeneratedMethodAccessor39.invoke(Unknown Source)
               at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
               at java.lang.reflect.Method.invoke(Method.java:498)
               at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1244)
               at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)
               at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
               at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
               at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
               at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
               at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
               at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
               at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
               at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
               at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
               at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
               at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
               at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
               at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
               at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
               at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
               at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:798)
               at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:797)
               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
               at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
               at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:797)

From: Sunny, Mani Kolbe <Su...@DNB.com>>
Sent: Friday, July 10, 2020 11:43 AM
To: user@beam.apache.org<ma...@beam.apache.org>
Subject: RE: KinesisIO checkpointing

CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe.

If I don’t set withInitialPositionInStream(), it will throw a NPE.

Read kinesisReader = KinesisIO.read()
                //.withInitialPositionInStream(options.getStreamFromHorizon() ? InitialPositionInStream.TRIM_HORIZON : InitialPositionInStream.LATEST)
                .withStreamName(options.getInputStreamName());

Code above will result NPE as shown below. This is with Spark 2.4.0 and Beam 2.22.0

org.apache.beam.sdk.util.UserCodeException: java.lang.NullPointerException
               at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
               at org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$SplitFn$DoFnInvoker.invokeProcessElement(Unknown Source)
               at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
               at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
               at org.apache.beam.runners.spark.translation.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:65)
               at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:137)
               at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:141)
               at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:136)
               at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
               at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
               at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
               at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
               at org.apache.beam.runners.spark.translation.MultiDoFnFunction.call(MultiDoFnFunction.java:125)
               at org.apache.beam.runners.spark.translation.MultiDoFnFunction.call(MultiDoFnFunction.java:63)
               at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
               at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
               at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
               at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
               at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
               at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
               at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
               at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
               at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
               at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
               at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
               at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
               at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
               at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
               at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
               at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
               at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
               at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
               at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
               at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
               at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
               at org.apache.spark.scheduler.Task.run(Task.scala:121)
               at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
               at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
               at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
               at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
               at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
               at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
               at org.apache.beam.sdk.io.kinesis.StartingPointShardsFinder.validateShards(StartingPointShardsFinder.java:185)
               at org.apache.beam.sdk.io.kinesis.StartingPointShardsFinder.findShardsAtStartingPoint(StartingPointShardsFinder.java:115)
               at org.apache.beam.sdk.io.kinesis.DynamicCheckpointGenerator.generate(DynamicCheckpointGenerator.java:59)
               at org.apache.beam.sdk.io.kinesis.KinesisSource.split(KinesisSource.java:93)
               at org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$SplitFn.process(BoundedReadFromUnboundedSource.java:165)

From: Mani Kolbe <ma...@gmail.com>>
Sent: Thursday, July 9, 2020 10:56 PM
To: user@beam.apache.org<ma...@beam.apache.org>
Subject: Re: KinesisIO checkpointing

CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe.

Is it required to set JobName and checkpointDir options for checkpointing to work?



On Thu, 9 Jul, 2020, 9:25 PM Luke Cwik, <lc...@google.com>> wrote:
The BoundedReadFromUnboundedReader does checkpoint the underlying UnboundedSource, is that checkpoint logic not working?
Do you have KinesisIO configured to always read from a specific point?

On Thu, Jul 9, 2020 at 9:56 AM Sunny, Mani Kolbe <Su...@dnb.com>> wrote:
We did the same and started using maxReadTime and put the application to run on a recurring schedule of 5 minutes. It works fine end to end without any error.

But the problem is that it always starts reading from the beginning of the Kinesis stream when it stop-starts.

When I did some investigation on that, I found that when you set maxReadTime, it will run using BoundedReadFromUnboundedSource mode. That essentially converts source in to a bounded one. This means checkpointing or watermark no longer supported. Reader just reads for x number of time and exists.

Is there anyway recommended way to resume reading from the position it finished? Either using maxReadTime or in unboundedSource mode?

Could some point me to a sample pipeline code that uses Kinesis as source?

Regards,
Mani

From: Lars Almgren Schwartz <la...@tink.com>>
Sent: Thursday, June 25, 2020 7:53 AM
To: user@beam.apache.org<ma...@beam.apache.org>
Subject: Re: KinesisIO checkpointing

CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe.

We had the exact same problem, but have not spent any time trying to solve it, we just skipped checkpointing for now.
When we saw this problem we ran Spark 2.4.5 (local mode) and Kinesis 2.18 and 2.19.

On Wed, Jun 24, 2020 at 6:22 PM Sunny, Mani Kolbe <Su...@dnb.com>> wrote:
We are on spark 2.4 and Beam 2.22.0

From: Alexey Romanenko <ar...@gmail.com>>
Sent: Wednesday, June 24, 2020 5:15 PM
To: user@beam.apache.org<ma...@beam.apache.org>
Subject: Re: KinesisIO checkpointing

CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe.

Yes, KinesisIO supports restart from checkpoints and it’s based on runner checkpoints support [1].

Could you specify which version of Spark and Beam you use?

[1] https://stackoverflow.com/questions/62259364/how-apache-beam-manage-kinesis-checkpointing/62349838#62349838<https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F62259364%2Fhow-apache-beam-manage-kinesis-checkpointing%2F62349838%2362349838&data=02%7C01%7CSunnyM%40dnb.com%7Ceb9c06a9bf44414dc25108d824ef79e3%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637299958269870229&sdata=SZr2%2BH46jgI9KodA69R0GW8LcPwFGAnNyYcwKPLdcIg%3D&reserved=0>

On 24 Jun 2020, at 14:13, Sunny, Mani Kolbe <Su...@DNB.com>> wrote:

Hello,

We are developing a beam pipeline which runs on SparkRunner on streaming mode. This pipeline read from Kinesis, do some translations, filtering and finally output to S3 using AvroIO writer. We are using Fixed windows with triggers based on element count and processing time intervals. Outputs path is partitioned by window start timestamp. allowedLateness=0sec

This is working fine, but I have noticed that whenever we restarts streaming, application is starting to read from Kinesis TRIM_HORIZON. That is, it is not resuming from last checkpoint position. Then I found that the checkpoint directory is based on --jobName and --checkpointDir properties. So I tried running as below:

spark-submit --master yarn --deploy-mode cluster --conf spark.dynamicAllocation.enabled=false \
    --driver-memory 1g --executor-memory 1g --num-executors 1 --executor-cores 1 \
    --class com.dnb.optimus.prime.processor.PrimeStreamProcessor \
    --conf spark.executor.extraClassPath=/etc/hbase/conf \
    /tmp/stream-processor-0.0.0.8-spark.jar \
    --runner=SparkRunner \
    --jobName=PrimeStreamProcessor \
    --checkpointDir=hdfs:///tmp/PrimeStreamProcessor<hdfs://tmp/PrimeStreamProcessor> checkpoint \
    --useWindow=true \
    --windowDuration=60s --windowLateness=0s --windowElementCount=1 \
    --maxReadTime=-1 \
    --streaming=true

I can see that it is able to fetch checkpoint data from checkpointDir path provided. But When the driver tries to broadcast this information to executors, it is failing with below exception.
20/06/12 15:35:28 ERROR yarn.Client: Application diagnostics message: User class threw exception: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
        at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
        at org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44)
        ....
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678)
Caused by: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
        at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
        at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)


Any idea? Is resuming from checkpoint position on application restart is actually supported on KinesisIO?

Regards,
Mani


RE: KinesisIO checkpointing

Posted by "Sunny, Mani Kolbe" <Su...@DNB.com>.
Also if running in “batch” mode, is there a way to resume from the last read position in Kinesis after an application restart?

I tried using withTimestamp() option. The plan was to use ApproximateArrivalTimestamp  from last read record, and use it on withTimestamp() option. However Kinesis documentation [1] says: There are no guarantees about ApproximateArrivalTimestamp   accuracy of , or that the time stamp is always increasing. For example, records in a shard or across a stream might have time stamps that are out of order.

Is any alternative approach?

[1] https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html

From: Sunny, Mani Kolbe <Su...@DNB.com>
Sent: Friday, July 10, 2020 6:20 PM
To: user@beam.apache.org
Subject: RE: KinesisIO checkpointing

CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe.

--maxReadTime either not present or set to -1 has same effect. I have tried both.

From: Alexey Romanenko <ar...@gmail.com>>
Sent: Friday, July 10, 2020 5:37 PM
To: user@beam.apache.org<ma...@beam.apache.org>
Subject: Re: KinesisIO checkpointing

CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe.

It looks very strange because exactly for this reason, in streaming pipeline we initialise accumulators before starting a pipeline [1]

Btw, why do you set “--maxReadTime=-1”? Can you try to run without it?

[1] https://github.com/apache/beam/blob/44e7e95c73dbf9ed6152a4cda772be9f64b02c72/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java#L202<https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2F44e7e95c73dbf9ed6152a4cda772be9f64b02c72%2Frunners%2Fspark%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fspark%2FSparkRunner.java%23L202&data=02%7C01%7CSunnyM%40dnb.com%7Cf134ea3ea68b4cef83b808d824f5890b%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637299984284947380&sdata=o4Bus2n461%2Fv7f9W7C0c2P9ylqPw7qVxTkrpl%2FFxHy4%3D&reserved=0>

On 10 Jul 2020, at 16:27, Sunny, Mani Kolbe <Su...@DNB.com>> wrote:

This is what is happening now. If I run pipeline, first time it runs fine. If I stop-start, it will attempt to recover from checkpoint info stored on checkpointDir and will fail with an error. I am using spark 2.4.0 and Beam 2.22.0.

Spark Submit command used:
APPNAME=test1
spark-submit --master yarn --deploy-mode cluster --conf spark.dynamicAllocation.enabled=false \
    --driver-memory 1g --executor-memory 1g --num-executors 1 --executor-cores 1 \
    --class com.dnb.optimus.prime.processor.PrimeStreamProcessor \
    --conf spark.executor.extraClassPath=/etc/hbase/conf \
    /tmp/stream-processor-0.0.0.8-spark.jar \
    --runner=SparkRunner --localStackMode=false \
    --jobName=PrimeStreamProcessor-${APPNAME} \
    --checkpointDir=hdfs:///tmp/${APPNAME}/checkpoint<hdfs://tmp/$%7BAPPNAME%7D/checkpoint> \
    --awsRegion=us-east-1 \
    --inputStreamName=gts_in_stream_${APPNAME} \
    --outputStreamName=publish_in_stream_${APPNAME} \
    --outputPath=hdfs:///tmp/mani/${APPNAME}/out<hdfs://tmp/mani/$%7BAPPNAME%7D/out> \
    --sedimentTable=${APPNAME}-ORG_ALERTS-ORG_ALERTS-bq-sediment \
    --useWindow=true \
    --streamFromHorizon=true \
    --windowDuration=60s --windowLateness=10s --windowElementCount=1 \
    --updateSediment=true \
    --useSedimentLookups=true \
    --outputPendToFile=true \
    --outputPendToStream=true \
    --primePipeline=ORG_ALERTS --primeCategory=ORG_ALERTS \
    --maxReadTime=-1 \
    --streaming=true

I can see that it is able to fetch checkpoint data from checkpointDir path provided. But When it tries to broadcast this information, it is failing with below exception.
20/06/12 15:35:28 ERROR yarn.Client: Application diagnostics message: User class threw exception: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
        at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
        at org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44)
        ....
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678)
Caused by: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
        at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
        at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)

As far as I understand this is what happening. Beam sdk need to continuously publish checkpoints/watermarks. In sparkRunner mode, it uses Spark's Accumulator to achieve this (MetricsContainerStepMapAccumulator ?)

In Spark, you need to register your Accumulators upfront to ensure serialization. This is done on SparkRunner SDK, but whatever reason, Accumulator is not getting called before registering.

This happens in Streaming mode with --jobName provided.





















20/07/10 13:31:38 ERROR org.apache.spark.streaming.StreamingContext: Error starting the context, marking it as stopped
org.apache.spark.SparkException: Task not serializable
               at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
               at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
               at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
               at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
               at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:798)
               at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:797)
               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
               at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
               at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:797)
               at org.apache.spark.api.java.JavaRDDLike$class.mapPartitionsToPair(JavaRDDLike.scala:188)
               at org.apache.spark.api.java.AbstractJavaRDDLike.mapPartitionsToPair(JavaRDDLike.scala:45)
               at org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8.lambda$evaluate$905e84cb$1(StreamingTransformTranslator.java:416)
               …
…
               at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
               at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
               at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
               at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
               at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
               at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
               at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
               at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
               at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
               at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
               at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
               at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:234)
               at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:229)
               at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
               at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
               at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:229)
               at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:98)
               at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:103)
               at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:583)
               at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
               at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
               at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
               at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)
               at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
               at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556)
               at org.apache.beam.runners.spark.SparkRunner.lambda$run$0(SparkRunner.java:208)
               at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
               at java.util.concurrent.FutureTask.run(FutureTask.java:266)
               at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
               at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
               at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
               at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
               at sun.reflect.GeneratedMethodAccessor39.invoke(Unknown Source)
               at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
               at java.lang.reflect.Method.invoke(Method.java:498)
               at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1244)
               at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)
               at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
               at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
               at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
               at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
               at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
               at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
               at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
               at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
               at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
               at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
               at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
               at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
               at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
               at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
               at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
               at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:798)
               at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:797)
               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
               at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
               at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:797)
               at org.apache.spark.api.java.JavaRDDLike$class.mapPartitionsToPair(JavaRDDLike.scala:188)
               at org.apache.spark.api.java.AbstractJavaRDDLike.mapPartitionsToPair(JavaRDDLike.scala:45)
               at org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8.lambda$evaluate$905e84cb$1(StreamingTransformTranslator.java:416)
               at org.apache.spark.streaming.api.java.JavaDStreamLike$class.scalaTransform$3(JavaDStreamLike.scala:317)
               at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1.apply(JavaDStreamLike.scala:318)
               at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1.apply(JavaDStreamLike.scala:318)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:667)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:667)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:681)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:679)
               at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:46)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
               at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
               at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
               at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
               at scala.Option.orElse(Option.scala:289)
               …
              …
               at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
               at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
               at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
               at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
               at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
               at scala.collection.immutable.List.foreach(List.scala:392)
               at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
               at scala.collection.immutable.List.map(List.scala:296)
               at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
               at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
               at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
               at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
               at scala.Option.orElse(Option.scala:289)
               at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
               at org.apache.spark.streaming.dstream.FilteredDStream.compute(FilteredDStream.scala:36)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
               at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
               at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
               at scala.Option.orElse(Option.scala:289)
               at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
               at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:36)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
               at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
               at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
               at scala.Option.orElse(Option.scala:289)
               at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
               at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
               at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
               at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
               at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
               at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
               at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
               at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
               at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
               at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
               at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
               at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:234)
               at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:229)
               at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
               at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
               at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:229)
               at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:98)
               at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:103)
               at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:583)
               at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
               at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
               at org.apache.spark.util.ThreadUtils$$anon$2.run(ThreadUtils.scala:145)
20/07/10 13:31:38 WARN org.apache.spark.streaming.util.BatchedWriteAheadLog: BatchedWriteAheadLog Writer queue interrupted.
20/07/10 13:31:38 WARN org.apache.spark.streaming.StreamingContext: StreamingContext has already been stopped
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
               at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
               at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:102)
               at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:90)
               at com.dnb.optimus.prime.processor.PrimeStreamProcessor.executeStreamProcessing(PrimeStreamProcessor.java:123)
               at com.dnb.optimus.prime.processor.PrimeStreamProcessor.main(PrimeStreamProcessor.java:140)
Caused by: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
               at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
               at sun.reflect.GeneratedMethodAccessor39.invoke(Unknown Source)
               at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
               at java.lang.reflect.Method.invoke(Method.java:498)
               at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1244)
               at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)
               at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
               at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
               at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
               at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
               at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
               at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
               at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
               at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
               at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
               at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
               at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
               at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
               at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
               at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
               at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
               at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:798)
               at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:797)
               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
               at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
               at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:797)

From: Sunny, Mani Kolbe <Su...@DNB.com>>
Sent: Friday, July 10, 2020 11:43 AM
To: user@beam.apache.org<ma...@beam.apache.org>
Subject: RE: KinesisIO checkpointing

CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe.

If I don’t set withInitialPositionInStream(), it will throw a NPE.

Read kinesisReader = KinesisIO.read()
                //.withInitialPositionInStream(options.getStreamFromHorizon() ? InitialPositionInStream.TRIM_HORIZON : InitialPositionInStream.LATEST)
                .withStreamName(options.getInputStreamName());

Code above will result NPE as shown below. This is with Spark 2.4.0 and Beam 2.22.0

org.apache.beam.sdk.util.UserCodeException: java.lang.NullPointerException
               at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
               at org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$SplitFn$DoFnInvoker.invokeProcessElement(Unknown Source)
               at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
               at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
               at org.apache.beam.runners.spark.translation.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:65)
               at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:137)
               at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:141)
               at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:136)
               at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
               at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
               at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
               at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
               at org.apache.beam.runners.spark.translation.MultiDoFnFunction.call(MultiDoFnFunction.java:125)
               at org.apache.beam.runners.spark.translation.MultiDoFnFunction.call(MultiDoFnFunction.java:63)
               at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
               at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
               at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
               at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
               at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
               at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
               at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
               at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
               at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
               at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
               at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
               at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
               at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
               at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
               at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
               at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
               at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
               at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
               at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
               at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
               at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
               at org.apache.spark.scheduler.Task.run(Task.scala:121)
               at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
               at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
               at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
               at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
               at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
               at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
               at org.apache.beam.sdk.io.kinesis.StartingPointShardsFinder.validateShards(StartingPointShardsFinder.java:185)
               at org.apache.beam.sdk.io.kinesis.StartingPointShardsFinder.findShardsAtStartingPoint(StartingPointShardsFinder.java:115)
               at org.apache.beam.sdk.io.kinesis.DynamicCheckpointGenerator.generate(DynamicCheckpointGenerator.java:59)
               at org.apache.beam.sdk.io.kinesis.KinesisSource.split(KinesisSource.java:93)
               at org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$SplitFn.process(BoundedReadFromUnboundedSource.java:165)

From: Mani Kolbe <ma...@gmail.com>>
Sent: Thursday, July 9, 2020 10:56 PM
To: user@beam.apache.org<ma...@beam.apache.org>
Subject: Re: KinesisIO checkpointing

CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe.

Is it required to set JobName and checkpointDir options for checkpointing to work?



On Thu, 9 Jul, 2020, 9:25 PM Luke Cwik, <lc...@google.com>> wrote:
The BoundedReadFromUnboundedReader does checkpoint the underlying UnboundedSource, is that checkpoint logic not working?
Do you have KinesisIO configured to always read from a specific point?

On Thu, Jul 9, 2020 at 9:56 AM Sunny, Mani Kolbe <Su...@dnb.com>> wrote:
We did the same and started using maxReadTime and put the application to run on a recurring schedule of 5 minutes. It works fine end to end without any error.

But the problem is that it always starts reading from the beginning of the Kinesis stream when it stop-starts.

When I did some investigation on that, I found that when you set maxReadTime, it will run using BoundedReadFromUnboundedSource mode. That essentially converts source in to a bounded one. This means checkpointing or watermark no longer supported. Reader just reads for x number of time and exists.

Is there anyway recommended way to resume reading from the position it finished? Either using maxReadTime or in unboundedSource mode?

Could some point me to a sample pipeline code that uses Kinesis as source?

Regards,
Mani

From: Lars Almgren Schwartz <la...@tink.com>>
Sent: Thursday, June 25, 2020 7:53 AM
To: user@beam.apache.org<ma...@beam.apache.org>
Subject: Re: KinesisIO checkpointing

CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe.

We had the exact same problem, but have not spent any time trying to solve it, we just skipped checkpointing for now.
When we saw this problem we ran Spark 2.4.5 (local mode) and Kinesis 2.18 and 2.19.

On Wed, Jun 24, 2020 at 6:22 PM Sunny, Mani Kolbe <Su...@dnb.com>> wrote:
We are on spark 2.4 and Beam 2.22.0

From: Alexey Romanenko <ar...@gmail.com>>
Sent: Wednesday, June 24, 2020 5:15 PM
To: user@beam.apache.org<ma...@beam.apache.org>
Subject: Re: KinesisIO checkpointing

CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe.

Yes, KinesisIO supports restart from checkpoints and it’s based on runner checkpoints support [1].

Could you specify which version of Spark and Beam you use?

[1] https://stackoverflow.com/questions/62259364/how-apache-beam-manage-kinesis-checkpointing/62349838#62349838<https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F62259364%2Fhow-apache-beam-manage-kinesis-checkpointing%2F62349838%2362349838&data=02%7C01%7CSunnyM%40dnb.com%7Cf134ea3ea68b4cef83b808d824f5890b%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637299984284947380&sdata=5jHRFrET6hT0uSX%2BRlcLzJZR29C77gCkzP5eMjL5xyc%3D&reserved=0>

On 24 Jun 2020, at 14:13, Sunny, Mani Kolbe <Su...@DNB.com>> wrote:

Hello,

We are developing a beam pipeline which runs on SparkRunner on streaming mode. This pipeline read from Kinesis, do some translations, filtering and finally output to S3 using AvroIO writer. We are using Fixed windows with triggers based on element count and processing time intervals. Outputs path is partitioned by window start timestamp. allowedLateness=0sec

This is working fine, but I have noticed that whenever we restarts streaming, application is starting to read from Kinesis TRIM_HORIZON. That is, it is not resuming from last checkpoint position. Then I found that the checkpoint directory is based on --jobName and --checkpointDir properties. So I tried running as below:

spark-submit --master yarn --deploy-mode cluster --conf spark.dynamicAllocation.enabled=false \
    --driver-memory 1g --executor-memory 1g --num-executors 1 --executor-cores 1 \
    --class com.dnb.optimus.prime.processor.PrimeStreamProcessor \
    --conf spark.executor.extraClassPath=/etc/hbase/conf \
    /tmp/stream-processor-0.0.0.8-spark.jar \
    --runner=SparkRunner \
    --jobName=PrimeStreamProcessor \
    --checkpointDir=hdfs:///tmp/PrimeStreamProcessor<hdfs://tmp/PrimeStreamProcessor> checkpoint \
    --useWindow=true \
    --windowDuration=60s --windowLateness=0s --windowElementCount=1 \
    --maxReadTime=-1 \
    --streaming=true

I can see that it is able to fetch checkpoint data from checkpointDir path provided. But When the driver tries to broadcast this information to executors, it is failing with below exception.
20/06/12 15:35:28 ERROR yarn.Client: Application diagnostics message: User class threw exception: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
        at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
        at org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44)
        ....
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678)
Caused by: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
        at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
        at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)


Any idea? Is resuming from checkpoint position on application restart is actually supported on KinesisIO?

Regards,
Mani


RE: KinesisIO checkpointing

Posted by "Sunny, Mani Kolbe" <Su...@DNB.com>.
--maxReadTime either not present or set to -1 has same effect. I have tried both.

From: Alexey Romanenko <ar...@gmail.com>
Sent: Friday, July 10, 2020 5:37 PM
To: user@beam.apache.org
Subject: Re: KinesisIO checkpointing

CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe.

It looks very strange because exactly for this reason, in streaming pipeline we initialise accumulators before starting a pipeline [1]

Btw, why do you set “--maxReadTime=-1”? Can you try to run without it?

[1] https://github.com/apache/beam/blob/44e7e95c73dbf9ed6152a4cda772be9f64b02c72/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java#L202<https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2F44e7e95c73dbf9ed6152a4cda772be9f64b02c72%2Frunners%2Fspark%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fspark%2FSparkRunner.java%23L202&data=02%7C01%7CSunnyM%40dnb.com%7Ceb9c06a9bf44414dc25108d824ef79e3%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637299958269870229&sdata=S38kEKJcimIv8%2FLNXkNTBxPjVSjqm9UA8kAkc3iFeog%3D&reserved=0>


On 10 Jul 2020, at 16:27, Sunny, Mani Kolbe <Su...@DNB.com>> wrote:

This is what is happening now. If I run pipeline, first time it runs fine. If I stop-start, it will attempt to recover from checkpoint info stored on checkpointDir and will fail with an error. I am using spark 2.4.0 and Beam 2.22.0.

Spark Submit command used:
APPNAME=test1
spark-submit --master yarn --deploy-mode cluster --conf spark.dynamicAllocation.enabled=false \
    --driver-memory 1g --executor-memory 1g --num-executors 1 --executor-cores 1 \
    --class com.dnb.optimus.prime.processor.PrimeStreamProcessor \
    --conf spark.executor.extraClassPath=/etc/hbase/conf \
    /tmp/stream-processor-0.0.0.8-spark.jar \
    --runner=SparkRunner --localStackMode=false \
    --jobName=PrimeStreamProcessor-${APPNAME} \
    --checkpointDir=hdfs:///tmp/${APPNAME}/checkpoint<hdfs://tmp/$%7BAPPNAME%7D/checkpoint> \
    --awsRegion=us-east-1 \
    --inputStreamName=gts_in_stream_${APPNAME} \
    --outputStreamName=publish_in_stream_${APPNAME} \
    --outputPath=hdfs:///tmp/mani/${APPNAME}/out<hdfs://tmp/mani/$%7BAPPNAME%7D/out> \
    --sedimentTable=${APPNAME}-ORG_ALERTS-ORG_ALERTS-bq-sediment \
    --useWindow=true \
    --streamFromHorizon=true \
    --windowDuration=60s --windowLateness=10s --windowElementCount=1 \
    --updateSediment=true \
    --useSedimentLookups=true \
    --outputPendToFile=true \
    --outputPendToStream=true \
    --primePipeline=ORG_ALERTS --primeCategory=ORG_ALERTS \
    --maxReadTime=-1 \
    --streaming=true

I can see that it is able to fetch checkpoint data from checkpointDir path provided. But When it tries to broadcast this information, it is failing with below exception.
20/06/12 15:35:28 ERROR yarn.Client: Application diagnostics message: User class threw exception: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
        at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
        at org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44)
        ....
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678)
Caused by: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
        at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
        at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)

As far as I understand this is what happening. Beam sdk need to continuously publish checkpoints/watermarks. In sparkRunner mode, it uses Spark's Accumulator to achieve this (MetricsContainerStepMapAccumulator ?)

In Spark, you need to register your Accumulators upfront to ensure serialization. This is done on SparkRunner SDK, but whatever reason, Accumulator is not getting called before registering.

This happens in Streaming mode with --jobName provided.





















20/07/10 13:31:38 ERROR org.apache.spark.streaming.StreamingContext: Error starting the context, marking it as stopped
org.apache.spark.SparkException: Task not serializable
               at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
               at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
               at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
               at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
               at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:798)
               at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:797)
               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
               at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
               at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:797)
               at org.apache.spark.api.java.JavaRDDLike$class.mapPartitionsToPair(JavaRDDLike.scala:188)
               at org.apache.spark.api.java.AbstractJavaRDDLike.mapPartitionsToPair(JavaRDDLike.scala:45)
               at org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8.lambda$evaluate$905e84cb$1(StreamingTransformTranslator.java:416)
               …
…
               at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
               at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
               at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
               at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
               at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
               at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
               at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
               at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
               at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
               at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
               at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
               at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:234)
               at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:229)
               at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
               at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
               at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:229)
               at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:98)
               at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:103)
               at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:583)
               at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
               at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
               at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
               at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)
               at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
               at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556)
               at org.apache.beam.runners.spark.SparkRunner.lambda$run$0(SparkRunner.java:208)
               at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
               at java.util.concurrent.FutureTask.run(FutureTask.java:266)
               at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
               at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
               at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
               at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
               at sun.reflect.GeneratedMethodAccessor39.invoke(Unknown Source)
               at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
               at java.lang.reflect.Method.invoke(Method.java:498)
               at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1244)
               at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)
               at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
               at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
               at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
               at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
               at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
               at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
               at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
               at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
               at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
               at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
               at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
               at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
               at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
               at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
               at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
               at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:798)
               at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:797)
               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
               at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
               at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:797)
               at org.apache.spark.api.java.JavaRDDLike$class.mapPartitionsToPair(JavaRDDLike.scala:188)
               at org.apache.spark.api.java.AbstractJavaRDDLike.mapPartitionsToPair(JavaRDDLike.scala:45)
               at org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8.lambda$evaluate$905e84cb$1(StreamingTransformTranslator.java:416)
               at org.apache.spark.streaming.api.java.JavaDStreamLike$class.scalaTransform$3(JavaDStreamLike.scala:317)
               at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1.apply(JavaDStreamLike.scala:318)
               at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1.apply(JavaDStreamLike.scala:318)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:667)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:667)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:681)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:679)
               at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:46)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
               at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
               at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
               at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
               at scala.Option.orElse(Option.scala:289)
               …
              …
               at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
               at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
               at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
               at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
               at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
               at scala.collection.immutable.List.foreach(List.scala:392)
               at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
               at scala.collection.immutable.List.map(List.scala:296)
               at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
               at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
               at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
               at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
               at scala.Option.orElse(Option.scala:289)
               at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
               at org.apache.spark.streaming.dstream.FilteredDStream.compute(FilteredDStream.scala:36)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
               at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
               at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
               at scala.Option.orElse(Option.scala:289)
               at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
               at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:36)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
               at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
               at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
               at scala.Option.orElse(Option.scala:289)
               at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
               at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
               at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
               at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
               at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
               at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
               at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
               at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
               at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
               at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
               at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
               at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:234)
               at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:229)
               at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
               at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
               at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:229)
               at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:98)
               at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:103)
               at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:583)
               at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
               at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
               at org.apache.spark.util.ThreadUtils$$anon$2.run(ThreadUtils.scala:145)
20/07/10 13:31:38 WARN org.apache.spark.streaming.util.BatchedWriteAheadLog: BatchedWriteAheadLog Writer queue interrupted.
20/07/10 13:31:38 WARN org.apache.spark.streaming.StreamingContext: StreamingContext has already been stopped
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
               at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
               at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:102)
               at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:90)
               at com.dnb.optimus.prime.processor.PrimeStreamProcessor.executeStreamProcessing(PrimeStreamProcessor.java:123)
               at com.dnb.optimus.prime.processor.PrimeStreamProcessor.main(PrimeStreamProcessor.java:140)
Caused by: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
               at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
               at sun.reflect.GeneratedMethodAccessor39.invoke(Unknown Source)
               at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
               at java.lang.reflect.Method.invoke(Method.java:498)
               at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1244)
               at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)
               at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
               at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
               at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
               at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
               at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
               at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
               at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
               at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
               at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
               at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
               at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
               at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
               at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
               at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
               at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
               at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:798)
               at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:797)
               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
               at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
               at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:797)

From: Sunny, Mani Kolbe <Su...@DNB.com>>
Sent: Friday, July 10, 2020 11:43 AM
To: user@beam.apache.org<ma...@beam.apache.org>
Subject: RE: KinesisIO checkpointing

CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe.

If I don’t set withInitialPositionInStream(), it will throw a NPE.

Read kinesisReader = KinesisIO.read()
                //.withInitialPositionInStream(options.getStreamFromHorizon() ? InitialPositionInStream.TRIM_HORIZON : InitialPositionInStream.LATEST)
                .withStreamName(options.getInputStreamName());

Code above will result NPE as shown below. This is with Spark 2.4.0 and Beam 2.22.0

org.apache.beam.sdk.util.UserCodeException: java.lang.NullPointerException
               at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
               at org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$SplitFn$DoFnInvoker.invokeProcessElement(Unknown Source)
               at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
               at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
               at org.apache.beam.runners.spark.translation.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:65)
               at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:137)
               at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:141)
               at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:136)
               at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
               at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
               at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
               at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
               at org.apache.beam.runners.spark.translation.MultiDoFnFunction.call(MultiDoFnFunction.java:125)
               at org.apache.beam.runners.spark.translation.MultiDoFnFunction.call(MultiDoFnFunction.java:63)
               at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
               at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
               at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
               at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
               at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
               at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
               at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
               at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
               at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
               at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
               at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
               at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
               at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
               at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
               at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
               at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
               at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
               at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
               at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
               at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
               at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
               at org.apache.spark.scheduler.Task.run(Task.scala:121)
               at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
               at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
               at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
               at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
               at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
               at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
               at org.apache.beam.sdk.io.kinesis.StartingPointShardsFinder.validateShards(StartingPointShardsFinder.java:185)
               at org.apache.beam.sdk.io.kinesis.StartingPointShardsFinder.findShardsAtStartingPoint(StartingPointShardsFinder.java:115)
               at org.apache.beam.sdk.io.kinesis.DynamicCheckpointGenerator.generate(DynamicCheckpointGenerator.java:59)
               at org.apache.beam.sdk.io.kinesis.KinesisSource.split(KinesisSource.java:93)
               at org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$SplitFn.process(BoundedReadFromUnboundedSource.java:165)

From: Mani Kolbe <ma...@gmail.com>>
Sent: Thursday, July 9, 2020 10:56 PM
To: user@beam.apache.org<ma...@beam.apache.org>
Subject: Re: KinesisIO checkpointing

CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe.

Is it required to set JobName and checkpointDir options for checkpointing to work?



On Thu, 9 Jul, 2020, 9:25 PM Luke Cwik, <lc...@google.com>> wrote:
The BoundedReadFromUnboundedReader does checkpoint the underlying UnboundedSource, is that checkpoint logic not working?
Do you have KinesisIO configured to always read from a specific point?

On Thu, Jul 9, 2020 at 9:56 AM Sunny, Mani Kolbe <Su...@dnb.com>> wrote:
We did the same and started using maxReadTime and put the application to run on a recurring schedule of 5 minutes. It works fine end to end without any error.

But the problem is that it always starts reading from the beginning of the Kinesis stream when it stop-starts.

When I did some investigation on that, I found that when you set maxReadTime, it will run using BoundedReadFromUnboundedSource mode. That essentially converts source in to a bounded one. This means checkpointing or watermark no longer supported. Reader just reads for x number of time and exists.

Is there anyway recommended way to resume reading from the position it finished? Either using maxReadTime or in unboundedSource mode?

Could some point me to a sample pipeline code that uses Kinesis as source?

Regards,
Mani

From: Lars Almgren Schwartz <la...@tink.com>>
Sent: Thursday, June 25, 2020 7:53 AM
To: user@beam.apache.org<ma...@beam.apache.org>
Subject: Re: KinesisIO checkpointing

CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe.

We had the exact same problem, but have not spent any time trying to solve it, we just skipped checkpointing for now.
When we saw this problem we ran Spark 2.4.5 (local mode) and Kinesis 2.18 and 2.19.

On Wed, Jun 24, 2020 at 6:22 PM Sunny, Mani Kolbe <Su...@dnb.com>> wrote:
We are on spark 2.4 and Beam 2.22.0

From: Alexey Romanenko <ar...@gmail.com>>
Sent: Wednesday, June 24, 2020 5:15 PM
To: user@beam.apache.org<ma...@beam.apache.org>
Subject: Re: KinesisIO checkpointing

CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe.

Yes, KinesisIO supports restart from checkpoints and it’s based on runner checkpoints support [1].

Could you specify which version of Spark and Beam you use?

[1] https://stackoverflow.com/questions/62259364/how-apache-beam-manage-kinesis-checkpointing/62349838#62349838<https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F62259364%2Fhow-apache-beam-manage-kinesis-checkpointing%2F62349838%2362349838&data=02%7C01%7CSunnyM%40dnb.com%7Ceb9c06a9bf44414dc25108d824ef79e3%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637299958269870229&sdata=SZr2%2BH46jgI9KodA69R0GW8LcPwFGAnNyYcwKPLdcIg%3D&reserved=0>

On 24 Jun 2020, at 14:13, Sunny, Mani Kolbe <Su...@DNB.com>> wrote:

Hello,

We are developing a beam pipeline which runs on SparkRunner on streaming mode. This pipeline read from Kinesis, do some translations, filtering and finally output to S3 using AvroIO writer. We are using Fixed windows with triggers based on element count and processing time intervals. Outputs path is partitioned by window start timestamp. allowedLateness=0sec

This is working fine, but I have noticed that whenever we restarts streaming, application is starting to read from Kinesis TRIM_HORIZON. That is, it is not resuming from last checkpoint position. Then I found that the checkpoint directory is based on --jobName and --checkpointDir properties. So I tried running as below:

spark-submit --master yarn --deploy-mode cluster --conf spark.dynamicAllocation.enabled=false \
    --driver-memory 1g --executor-memory 1g --num-executors 1 --executor-cores 1 \
    --class com.dnb.optimus.prime.processor.PrimeStreamProcessor \
    --conf spark.executor.extraClassPath=/etc/hbase/conf \
    /tmp/stream-processor-0.0.0.8-spark.jar \
    --runner=SparkRunner \
    --jobName=PrimeStreamProcessor \
    --checkpointDir=hdfs:///tmp/PrimeStreamProcessor<hdfs://tmp/PrimeStreamProcessor> checkpoint \
    --useWindow=true \
    --windowDuration=60s --windowLateness=0s --windowElementCount=1 \
    --maxReadTime=-1 \
    --streaming=true

I can see that it is able to fetch checkpoint data from checkpointDir path provided. But When the driver tries to broadcast this information to executors, it is failing with below exception.
20/06/12 15:35:28 ERROR yarn.Client: Application diagnostics message: User class threw exception: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
        at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
        at org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44)
        ....
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678)
Caused by: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
        at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
        at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)


Any idea? Is resuming from checkpoint position on application restart is actually supported on KinesisIO?

Regards,
Mani


Re: KinesisIO checkpointing

Posted by Alexey Romanenko <ar...@gmail.com>.
It looks very strange because exactly for this reason, in streaming pipeline we initialise accumulators before starting a pipeline [1]

Btw, why do you set “--maxReadTime=-1”? Can you try to run without it?

[1] https://github.com/apache/beam/blob/44e7e95c73dbf9ed6152a4cda772be9f64b02c72/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java#L202

> On 10 Jul 2020, at 16:27, Sunny, Mani Kolbe <Su...@DNB.com> wrote:
> 
> This is what is happening now. If I run pipeline, first time it runs fine. If I stop-start, it will attempt to recover from checkpoint info stored on checkpointDir and will fail with an error. I am using spark 2.4.0 and Beam 2.22.0.
>  
> Spark Submit command used:
> APPNAME=test1
> spark-submit --master yarn --deploy-mode cluster --conf spark.dynamicAllocation.enabled=false \
>     --driver-memory 1g --executor-memory 1g --num-executors 1 --executor-cores 1 \
>     --class com.dnb.optimus.prime.processor.PrimeStreamProcessor \
>     --conf spark.executor.extraClassPath=/etc/hbase/conf \
>     /tmp/stream-processor-0.0.0.8-spark.jar \
>     --runner=SparkRunner --localStackMode=false \
>     --jobName=PrimeStreamProcessor-${APPNAME} \
>     --checkpointDir=hdfs:///tmp/${APPNAME}/checkpoint <hdfs:///tmp/$%7BAPPNAME%7D/checkpoint> \
>     --awsRegion=us-east-1 \
>     --inputStreamName=gts_in_stream_${APPNAME} \
>     --outputStreamName=publish_in_stream_${APPNAME} \
>     --outputPath=hdfs:///tmp/mani/${APPNAME}/out <hdfs:///tmp/mani/$%7BAPPNAME%7D/out> \
>     --sedimentTable=${APPNAME}-ORG_ALERTS-ORG_ALERTS-bq-sediment \
>     --useWindow=true \
>     --streamFromHorizon=true \
>     --windowDuration=60s --windowLateness=10s --windowElementCount=1 \
>     --updateSediment=true \
>     --useSedimentLookups=true \
>     --outputPendToFile=true \
>     --outputPendToStream=true \
>     --primePipeline=ORG_ALERTS --primeCategory=ORG_ALERTS \
>     --maxReadTime=-1 \
>     --streaming=true
>  
> I can see that it is able to fetch checkpoint data from checkpointDir path provided. But When it tries to broadcast this information, it is failing with below exception.
> 20/06/12 15:35:28 ERROR yarn.Client: Application diagnostics message: User class threw exception: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
>         at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
>         at org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44)
>         ....
>         at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678)
> Caused by: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
>         at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
>         at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)
>  
> As far as I understand this is what happening. Beam sdk need to continuously publish checkpoints/watermarks. In sparkRunner mode, it uses Spark's Accumulator to achieve this (MetricsContainerStepMapAccumulator ?)
>  
> In Spark, you need to register your Accumulators upfront to ensure serialization. This is done on SparkRunner SDK, but whatever reason, Accumulator is not getting called before registering. 
>  
> This happens in Streaming mode with --jobName provided.
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  
> 20/07/10 13:31:38 ERROR org.apache.spark.streaming.StreamingContext: Error starting the context, marking it as stopped
> org.apache.spark.SparkException: Task not serializable
>                at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
>                at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
>                at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
>                at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
>                at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:798)
>                at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:797)
>                at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>                at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>                at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
>                at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:797)
>                at org.apache.spark.api.java.JavaRDDLike$class.mapPartitionsToPair(JavaRDDLike.scala:188)
>                at org.apache.spark.api.java.AbstractJavaRDDLike.mapPartitionsToPair(JavaRDDLike.scala:45)
>                at org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8.lambda$evaluate$905e84cb$1(StreamingTransformTranslator.java:416)
>                …
> …
>                at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
>                at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
>                at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
>                at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
>                at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>                at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>                at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>                at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>                at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
>                at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>                at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
>                at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:234)
>                at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:229)
>                at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>                at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>                at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:229)
>                at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:98)
>                at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:103)
>                at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:583)
>                at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
>                at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
>                at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
>                at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)
>                at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
>                at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556)
>                at org.apache.beam.runners.spark.SparkRunner.lambda$run$0(SparkRunner.java:208)
>                at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>                at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>                at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
>                at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
>                at sun.reflect.GeneratedMethodAccessor39.invoke(Unknown Source)
>                at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>                at java.lang.reflect.Method.invoke(Method.java:498)
>                at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1244)
>                at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)
>                at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>                at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>                at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>                at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>                at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>                at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>                at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>                at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>                at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>                at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
>                at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
>                at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
>                at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
>                at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
>                at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
>                at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:798)
>                at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:797)
>                at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>                at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>                at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
>                at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:797)
>                at org.apache.spark.api.java.JavaRDDLike$class.mapPartitionsToPair(JavaRDDLike.scala:188)
>                at org.apache.spark.api.java.AbstractJavaRDDLike.mapPartitionsToPair(JavaRDDLike.scala:45)
>                at org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8.lambda$evaluate$905e84cb$1(StreamingTransformTranslator.java:416)
>                at org.apache.spark.streaming.api.java.JavaDStreamLike$class.scalaTransform$3(JavaDStreamLike.scala:317)
>                at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1.apply(JavaDStreamLike.scala:318)
>                at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1.apply(JavaDStreamLike.scala:318)
>                at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:667)
>                at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:667)
>                at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:681)
>                at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:679)
>                at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:46)
>                at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>                at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>                at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>                at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>                at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>                at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
>                at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
>                at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
>                at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
>                at scala.Option.orElse(Option.scala:289)
>                …
>               …
>                at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
>                at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
>                at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
>                at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>                at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>                at scala.collection.immutable.List.foreach(List.scala:392)
>                at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>                at scala.collection.immutable.List.map(List.scala:296)
>                at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
>                at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>                at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>                at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>                at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>                at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>                at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
>                at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
>                at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
>                at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
>                at scala.Option.orElse(Option.scala:289)
>                at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
>                at org.apache.spark.streaming.dstream.FilteredDStream.compute(FilteredDStream.scala:36)
>                at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>                at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>                at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>                at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>                at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>                at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
>                at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
>                at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
>                at scala.Option.orElse(Option.scala:289)
>                at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
>                at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:36)
>                at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>                at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>                at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>                at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>                at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>                at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
>                at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
>                at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
>                at scala.Option.orElse(Option.scala:289)
>                at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
>                at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
>                at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
>                at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
>                at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>                at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>                at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>                at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>                at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
>                at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>                at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
>                at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:234)
>                at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:229)
>                at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>                at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>                at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:229)
>                at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:98)
>                at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:103)
>                at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:583)
>                at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
>                at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
>                at org.apache.spark.util.ThreadUtils$$anon$2.run(ThreadUtils.scala:145)
> 20/07/10 13:31:38 WARN org.apache.spark.streaming.util.BatchedWriteAheadLog: BatchedWriteAheadLog Writer queue interrupted.
> 20/07/10 13:31:38 WARN org.apache.spark.streaming.StreamingContext: StreamingContext has already been stopped
> Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
>                at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
>                at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:102)
>                at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:90)
>                at com.dnb.optimus.prime.processor.PrimeStreamProcessor.executeStreamProcessing(PrimeStreamProcessor.java:123)
>                at com.dnb.optimus.prime.processor.PrimeStreamProcessor.main(PrimeStreamProcessor.java:140)
> Caused by: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
>                at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
>                at sun.reflect.GeneratedMethodAccessor39.invoke(Unknown Source)
>                at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>                at java.lang.reflect.Method.invoke(Method.java:498)
>                at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1244)
>                at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)
>                at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>                at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>                at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>                at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>                at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>                at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>                at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>                at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>                at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>                at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
>                at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
>                at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
>                at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
>                at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
>                at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
>                at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:798)
>                at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:797)
>                at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>                at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>                at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
>                at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:797)
>  
> From: Sunny, Mani Kolbe <SunnyM@DNB.com <ma...@DNB.com>> 
> Sent: Friday, July 10, 2020 11:43 AM
> To: user@beam.apache.org <ma...@beam.apache.org>
> Subject: RE: KinesisIO checkpointing
>  
> CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe.
>  
> If I don’t set withInitialPositionInStream(), it will throw a NPE.
>  
> Read kinesisReader = KinesisIO.read()
>                 //.withInitialPositionInStream(options.getStreamFromHorizon() ? InitialPositionInStream.TRIM_HORIZON : InitialPositionInStream.LATEST)
>                 .withStreamName(options.getInputStreamName());
>  
> Code above will result NPE as shown below. This is with Spark 2.4.0 and Beam 2.22.0
>  
> org.apache.beam.sdk.util.UserCodeException: java.lang.NullPointerException
>                at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>                at org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$SplitFn$DoFnInvoker.invokeProcessElement(Unknown Source)
>                at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>                at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>                at org.apache.beam.runners.spark.translation.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:65)
>                at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:137)
>                at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:141)
>                at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:136)
>                at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
>                at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
>                at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
>                at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
>                at org.apache.beam.runners.spark.translation.MultiDoFnFunction.call(MultiDoFnFunction.java:125)
>                at org.apache.beam.runners.spark.translation.MultiDoFnFunction.call(MultiDoFnFunction.java:63)
>                at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
>                at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
>                at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
>                at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
>                at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>                at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>                at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>                at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>                at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>                at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>                at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>                at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>                at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>                at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>                at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>                at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>                at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>                at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>                at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>                at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>                at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
>                at org.apache.spark.scheduler.Task.run(Task.scala:121)
>                at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
>                at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>                at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
>                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>                at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
>                at org.apache.beam.sdk.io.kinesis.StartingPointShardsFinder.validateShards(StartingPointShardsFinder.java:185)
>                at org.apache.beam.sdk.io.kinesis.StartingPointShardsFinder.findShardsAtStartingPoint(StartingPointShardsFinder.java:115)
>                at org.apache.beam.sdk.io.kinesis.DynamicCheckpointGenerator.generate(DynamicCheckpointGenerator.java:59)
>                at org.apache.beam.sdk.io.kinesis.KinesisSource.split(KinesisSource.java:93)
>                at org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$SplitFn.process(BoundedReadFromUnboundedSource.java:165)
>  
> From: Mani Kolbe <manikolbe@gmail.com <ma...@gmail.com>> 
> Sent: Thursday, July 9, 2020 10:56 PM
> To: user@beam.apache.org <ma...@beam.apache.org>
> Subject: Re: KinesisIO checkpointing
>  
> CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe.
>  
> Is it required to set JobName and checkpointDir options for checkpointing to work?
>  
>  
>  
> On Thu, 9 Jul, 2020, 9:25 PM Luke Cwik, <lcwik@google.com <ma...@google.com>> wrote:
> The BoundedReadFromUnboundedReader does checkpoint the underlying UnboundedSource, is that checkpoint logic not working?
> Do you have KinesisIO configured to always read from a specific point?
>  
> On Thu, Jul 9, 2020 at 9:56 AM Sunny, Mani Kolbe <SunnyM@dnb.com <ma...@dnb.com>> wrote:
> We did the same and started using maxReadTime and put the application to run on a recurring schedule of 5 minutes. It works fine end to end without any error.
>  
> But the problem is that it always starts reading from the beginning of the Kinesis stream when it stop-starts.
>  
> When I did some investigation on that, I found that when you set maxReadTime, it will run using BoundedReadFromUnboundedSource mode. That essentially converts source in to a bounded one. This means checkpointing or watermark no longer supported. Reader just reads for x number of time and exists.
>  
> Is there anyway recommended way to resume reading from the position it finished? Either using maxReadTime or in unboundedSource mode?
>  
> Could some point me to a sample pipeline code that uses Kinesis as source?
>  
> Regards,
> Mani
>  
> From: Lars Almgren Schwartz <lars.almgren@tink.com <ma...@tink.com>> 
> Sent: Thursday, June 25, 2020 7:53 AM
> To: user@beam.apache.org <ma...@beam.apache.org>
> Subject: Re: KinesisIO checkpointing
>  
> CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe.
>  
> We had the exact same problem, but have not spent any time trying to solve it, we just skipped checkpointing for now.
> When we saw this problem we ran Spark 2.4.5 (local mode) and Kinesis 2.18 and 2.19.
>  
> On Wed, Jun 24, 2020 at 6:22 PM Sunny, Mani Kolbe <SunnyM@dnb.com <ma...@dnb.com>> wrote:
> We are on spark 2.4 and Beam 2.22.0
>  
> From: Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> 
> Sent: Wednesday, June 24, 2020 5:15 PM
> To: user@beam.apache.org <ma...@beam.apache.org>
> Subject: Re: KinesisIO checkpointing
>  
> CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe.
>  
> Yes, KinesisIO supports restart from checkpoints and it’s based on runner checkpoints support [1].  
>  
> Could you specify which version of Spark and Beam you use?
>  
> [1] https://stackoverflow.com/questions/62259364/how-apache-beam-manage-kinesis-checkpointing/62349838#62349838 <https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F62259364%2Fhow-apache-beam-manage-kinesis-checkpointing%2F62349838%2362349838&data=02%7C01%7CSunnyM%40dnb.com%7Cafec5f7b8ed143b94cc908d824be0cd3%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637299745979465190&sdata=tCnaB7tW%2BpCLpxFtwM8Kd5jqDVdLavN9eOrpgcOZufI%3D&reserved=0>
>  
> 
> On 24 Jun 2020, at 14:13, Sunny, Mani Kolbe <SunnyM@DNB.com <ma...@DNB.com>> wrote:
>  
> Hello,
>  
> We are developing a beam pipeline which runs on SparkRunner on streaming mode. This pipeline read from Kinesis, do some translations, filtering and finally output to S3 using AvroIO writer. We are using Fixed windows with triggers based on element count and processing time intervals. Outputs path is partitioned by window start timestamp. allowedLateness=0sec
>  
> This is working fine, but I have noticed that whenever we restarts streaming, application is starting to read from Kinesis TRIM_HORIZON. That is, it is not resuming from last checkpoint position. Then I found that the checkpoint directory is based on --jobName and --checkpointDir properties. So I tried running as below:
>  
> spark-submit --master yarn --deploy-mode cluster --conf spark.dynamicAllocation.enabled=false \
>     --driver-memory 1g --executor-memory 1g --num-executors 1 --executor-cores 1 \
>     --class com.dnb.optimus.prime.processor.PrimeStreamProcessor \
>     --conf spark.executor.extraClassPath=/etc/hbase/conf \
>     /tmp/stream-processor-0.0.0.8-spark.jar \
>     --runner=SparkRunner \
>     --jobName=PrimeStreamProcessor \
>     --checkpointDir=hdfs:///tmp/PrimeStreamProcessor checkpoint \
>     --useWindow=true \
>     --windowDuration=60s --windowLateness=0s --windowElementCount=1 \
>     --maxReadTime=-1 \
>     --streaming=true
>  
> I can see that it is able to fetch checkpoint data from checkpointDir path provided. But When the driver tries to broadcast this information to executors, it is failing with below exception.
> 20/06/12 15:35:28 ERROR yarn.Client: Application diagnostics message: User class threw exception: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
>         at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
>         at org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44)
>         ....
>         at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678)
> Caused by: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
>         at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
>         at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)
>  
>  
> Any idea? Is resuming from checkpoint position on application restart is actually supported on KinesisIO?
>  
> Regards,
> Mani


RE: KinesisIO checkpointing

Posted by "Sunny, Mani Kolbe" <Su...@DNB.com>.
This is what is happening now. If I run pipeline, first time it runs fine. If I stop-start, it will attempt to recover from checkpoint info stored on checkpointDir and will fail with an error. I am using spark 2.4.0 and Beam 2.22.0.

Spark Submit command used:
APPNAME=test1
spark-submit --master yarn --deploy-mode cluster --conf spark.dynamicAllocation.enabled=false \
    --driver-memory 1g --executor-memory 1g --num-executors 1 --executor-cores 1 \
    --class com.dnb.optimus.prime.processor.PrimeStreamProcessor \
    --conf spark.executor.extraClassPath=/etc/hbase/conf \
    /tmp/stream-processor-0.0.0.8-spark.jar \
    --runner=SparkRunner --localStackMode=false \
    --jobName=PrimeStreamProcessor-${APPNAME} \
    --checkpointDir=hdfs:///tmp/${APPNAME}/checkpoint \
    --awsRegion=us-east-1 \
    --inputStreamName=gts_in_stream_${APPNAME} \
    --outputStreamName=publish_in_stream_${APPNAME} \
    --outputPath=hdfs:///tmp/mani/${APPNAME}/out \
    --sedimentTable=${APPNAME}-ORG_ALERTS-ORG_ALERTS-bq-sediment \
    --useWindow=true \
    --streamFromHorizon=true \
    --windowDuration=60s --windowLateness=10s --windowElementCount=1 \
    --updateSediment=true \
    --useSedimentLookups=true \
    --outputPendToFile=true \
    --outputPendToStream=true \
    --primePipeline=ORG_ALERTS --primeCategory=ORG_ALERTS \
    --maxReadTime=-1 \
    --streaming=true

I can see that it is able to fetch checkpoint data from checkpointDir path provided. But When it tries to broadcast this information, it is failing with below exception.
20/06/12 15:35:28 ERROR yarn.Client: Application diagnostics message: User class threw exception: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
        at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
        at org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44)
        ....
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678)
Caused by: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
        at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
        at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)

As far as I understand this is what happening. Beam sdk need to continuously publish checkpoints/watermarks. In sparkRunner mode, it uses Spark's Accumulator to achieve this (MetricsContainerStepMapAccumulator ?)

In Spark, you need to register your Accumulators upfront to ensure serialization. This is done on SparkRunner SDK, but whatever reason, Accumulator is not getting called before registering.

This happens in Streaming mode with --jobName provided.





















20/07/10 13:31:38 ERROR org.apache.spark.streaming.StreamingContext: Error starting the context, marking it as stopped
org.apache.spark.SparkException: Task not serializable
               at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
               at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
               at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
               at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
               at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:798)
               at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:797)
               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
               at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
               at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:797)
               at org.apache.spark.api.java.JavaRDDLike$class.mapPartitionsToPair(JavaRDDLike.scala:188)
               at org.apache.spark.api.java.AbstractJavaRDDLike.mapPartitionsToPair(JavaRDDLike.scala:45)
               at org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8.lambda$evaluate$905e84cb$1(StreamingTransformTranslator.java:416)
               …
…
               at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
               at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
               at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
               at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
               at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
               at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
               at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
               at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
               at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
               at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
               at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
               at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:234)
               at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:229)
               at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
               at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
               at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:229)
               at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:98)
               at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:103)
               at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:583)
               at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
               at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
               at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
               at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)
               at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
               at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556)
               at org.apache.beam.runners.spark.SparkRunner.lambda$run$0(SparkRunner.java:208)
               at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
               at java.util.concurrent.FutureTask.run(FutureTask.java:266)
               at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
               at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
               at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
               at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
               at sun.reflect.GeneratedMethodAccessor39.invoke(Unknown Source)
               at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
               at java.lang.reflect.Method.invoke(Method.java:498)
               at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1244)
               at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)
               at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
               at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
               at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
               at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
               at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
               at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
               at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
               at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
               at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
               at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
               at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
               at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
               at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
               at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
               at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
               at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:798)
               at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:797)
               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
               at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
               at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:797)
               at org.apache.spark.api.java.JavaRDDLike$class.mapPartitionsToPair(JavaRDDLike.scala:188)
               at org.apache.spark.api.java.AbstractJavaRDDLike.mapPartitionsToPair(JavaRDDLike.scala:45)
               at org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8.lambda$evaluate$905e84cb$1(StreamingTransformTranslator.java:416)
               at org.apache.spark.streaming.api.java.JavaDStreamLike$class.scalaTransform$3(JavaDStreamLike.scala:317)
               at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1.apply(JavaDStreamLike.scala:318)
               at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1.apply(JavaDStreamLike.scala:318)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:667)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:667)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:681)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:679)
               at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:46)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
               at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
               at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
               at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
               at scala.Option.orElse(Option.scala:289)
               …
              …
               at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
               at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
               at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
               at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
               at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
               at scala.collection.immutable.List.foreach(List.scala:392)
               at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
               at scala.collection.immutable.List.map(List.scala:296)
               at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
               at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
               at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
               at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
               at scala.Option.orElse(Option.scala:289)
               at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
               at org.apache.spark.streaming.dstream.FilteredDStream.compute(FilteredDStream.scala:36)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
               at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
               at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
               at scala.Option.orElse(Option.scala:289)
               at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
               at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:36)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
               at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
               at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
               at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
               at scala.Option.orElse(Option.scala:289)
               at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
               at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
               at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
               at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
               at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
               at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
               at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
               at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
               at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
               at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
               at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
               at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:234)
               at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:229)
               at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
               at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
               at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:229)
               at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:98)
               at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:103)
               at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:583)
               at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
               at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
               at org.apache.spark.util.ThreadUtils$$anon$2.run(ThreadUtils.scala:145)
20/07/10 13:31:38 WARN org.apache.spark.streaming.util.BatchedWriteAheadLog: BatchedWriteAheadLog Writer queue interrupted.
20/07/10 13:31:38 WARN org.apache.spark.streaming.StreamingContext: StreamingContext has already been stopped
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
               at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
               at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:102)
               at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:90)
               at com.dnb.optimus.prime.processor.PrimeStreamProcessor.executeStreamProcessing(PrimeStreamProcessor.java:123)
               at com.dnb.optimus.prime.processor.PrimeStreamProcessor.main(PrimeStreamProcessor.java:140)
Caused by: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
               at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
               at sun.reflect.GeneratedMethodAccessor39.invoke(Unknown Source)
               at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
               at java.lang.reflect.Method.invoke(Method.java:498)
               at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1244)
               at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)
               at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
               at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
               at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
               at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
               at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
               at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
               at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
               at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
               at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
               at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
               at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
               at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
               at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
               at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
               at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
               at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:798)
               at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:797)
               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
               at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
               at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:797)

From: Sunny, Mani Kolbe <Su...@DNB.com>
Sent: Friday, July 10, 2020 11:43 AM
To: user@beam.apache.org
Subject: RE: KinesisIO checkpointing

CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe.

If I don’t set withInitialPositionInStream(), it will throw a NPE.

Read kinesisReader = KinesisIO.read()
                //.withInitialPositionInStream(options.getStreamFromHorizon() ? InitialPositionInStream.TRIM_HORIZON : InitialPositionInStream.LATEST)
                .withStreamName(options.getInputStreamName());

Code above will result NPE as shown below. This is with Spark 2.4.0 and Beam 2.22.0

org.apache.beam.sdk.util.UserCodeException: java.lang.NullPointerException
               at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
               at org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$SplitFn$DoFnInvoker.invokeProcessElement(Unknown Source)
               at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
               at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
               at org.apache.beam.runners.spark.translation.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:65)
               at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:137)
               at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:141)
               at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:136)
               at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
               at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
               at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
               at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
               at org.apache.beam.runners.spark.translation.MultiDoFnFunction.call(MultiDoFnFunction.java:125)
               at org.apache.beam.runners.spark.translation.MultiDoFnFunction.call(MultiDoFnFunction.java:63)
               at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
               at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
               at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
               at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
               at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
               at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
               at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
               at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
               at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
               at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
               at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
               at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
               at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
               at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
               at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
               at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
               at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
               at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
               at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
               at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
               at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
               at org.apache.spark.scheduler.Task.run(Task.scala:121)
               at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
               at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
               at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
               at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
               at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
               at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
               at org.apache.beam.sdk.io.kinesis.StartingPointShardsFinder.validateShards(StartingPointShardsFinder.java:185)
               at org.apache.beam.sdk.io.kinesis.StartingPointShardsFinder.findShardsAtStartingPoint(StartingPointShardsFinder.java:115)
               at org.apache.beam.sdk.io.kinesis.DynamicCheckpointGenerator.generate(DynamicCheckpointGenerator.java:59)
               at org.apache.beam.sdk.io.kinesis.KinesisSource.split(KinesisSource.java:93)
               at org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$SplitFn.process(BoundedReadFromUnboundedSource.java:165)

From: Mani Kolbe <ma...@gmail.com>>
Sent: Thursday, July 9, 2020 10:56 PM
To: user@beam.apache.org<ma...@beam.apache.org>
Subject: Re: KinesisIO checkpointing

CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe.

Is it required to set JobName and checkpointDir options for checkpointing to work?



On Thu, 9 Jul, 2020, 9:25 PM Luke Cwik, <lc...@google.com>> wrote:
The BoundedReadFromUnboundedReader does checkpoint the underlying UnboundedSource, is that checkpoint logic not working?
Do you have KinesisIO configured to always read from a specific point?

On Thu, Jul 9, 2020 at 9:56 AM Sunny, Mani Kolbe <Su...@dnb.com>> wrote:
We did the same and started using maxReadTime and put the application to run on a recurring schedule of 5 minutes. It works fine end to end without any error.

But the problem is that it always starts reading from the beginning of the Kinesis stream when it stop-starts.

When I did some investigation on that, I found that when you set maxReadTime, it will run using BoundedReadFromUnboundedSource mode. That essentially converts source in to a bounded one. This means checkpointing or watermark no longer supported. Reader just reads for x number of time and exists.

Is there anyway recommended way to resume reading from the position it finished? Either using maxReadTime or in unboundedSource mode?

Could some point me to a sample pipeline code that uses Kinesis as source?

Regards,
Mani

From: Lars Almgren Schwartz <la...@tink.com>>
Sent: Thursday, June 25, 2020 7:53 AM
To: user@beam.apache.org<ma...@beam.apache.org>
Subject: Re: KinesisIO checkpointing

CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe.

We had the exact same problem, but have not spent any time trying to solve it, we just skipped checkpointing for now.
When we saw this problem we ran Spark 2.4.5 (local mode) and Kinesis 2.18 and 2.19.

On Wed, Jun 24, 2020 at 6:22 PM Sunny, Mani Kolbe <Su...@dnb.com>> wrote:
We are on spark 2.4 and Beam 2.22.0

From: Alexey Romanenko <ar...@gmail.com>>
Sent: Wednesday, June 24, 2020 5:15 PM
To: user@beam.apache.org<ma...@beam.apache.org>
Subject: Re: KinesisIO checkpointing

CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe.

Yes, KinesisIO supports restart from checkpoints and it’s based on runner checkpoints support [1].

Could you specify which version of Spark and Beam you use?

[1] https://stackoverflow.com/questions/62259364/how-apache-beam-manage-kinesis-checkpointing/62349838#62349838<https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F62259364%2Fhow-apache-beam-manage-kinesis-checkpointing%2F62349838%2362349838&data=02%7C01%7CSunnyM%40dnb.com%7Cafec5f7b8ed143b94cc908d824be0cd3%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637299745979465190&sdata=tCnaB7tW%2BpCLpxFtwM8Kd5jqDVdLavN9eOrpgcOZufI%3D&reserved=0>

On 24 Jun 2020, at 14:13, Sunny, Mani Kolbe <Su...@DNB.com>> wrote:

Hello,

We are developing a beam pipeline which runs on SparkRunner on streaming mode. This pipeline read from Kinesis, do some translations, filtering and finally output to S3 using AvroIO writer. We are using Fixed windows with triggers based on element count and processing time intervals. Outputs path is partitioned by window start timestamp. allowedLateness=0sec

This is working fine, but I have noticed that whenever we restarts streaming, application is starting to read from Kinesis TRIM_HORIZON. That is, it is not resuming from last checkpoint position. Then I found that the checkpoint directory is based on --jobName and --checkpointDir properties. So I tried running as below:

spark-submit --master yarn --deploy-mode cluster --conf spark.dynamicAllocation.enabled=false \
    --driver-memory 1g --executor-memory 1g --num-executors 1 --executor-cores 1 \
    --class com.dnb.optimus.prime.processor.PrimeStreamProcessor \
    --conf spark.executor.extraClassPath=/etc/hbase/conf \
    /tmp/stream-processor-0.0.0.8-spark.jar \
    --runner=SparkRunner \
    --jobName=PrimeStreamProcessor \
    --checkpointDir=hdfs:///tmp/PrimeStreamProcessor checkpoint \
    --useWindow=true \
    --windowDuration=60s --windowLateness=0s --windowElementCount=1 \
    --maxReadTime=-1 \
    --streaming=true

I can see that it is able to fetch checkpoint data from checkpointDir path provided. But When the driver tries to broadcast this information to executors, it is failing with below exception.
20/06/12 15:35:28 ERROR yarn.Client: Application diagnostics message: User class threw exception: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
        at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
        at org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44)
        ....
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678)
Caused by: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
        at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
        at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)


Any idea? Is resuming from checkpoint position on application restart is actually supported on KinesisIO?

Regards,
Mani


RE: KinesisIO checkpointing

Posted by "Sunny, Mani Kolbe" <Su...@DNB.com>.
If I don’t set withInitialPositionInStream(), it will throw a NPE.

Read kinesisReader = KinesisIO.read()
                //.withInitialPositionInStream(options.getStreamFromHorizon() ? InitialPositionInStream.TRIM_HORIZON : InitialPositionInStream.LATEST)
                .withStreamName(options.getInputStreamName());

Code above will result NPE as shown below. This is with Spark 2.4.0 and Beam 2.22.0

org.apache.beam.sdk.util.UserCodeException: java.lang.NullPointerException
               at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
               at org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$SplitFn$DoFnInvoker.invokeProcessElement(Unknown Source)
               at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
               at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
               at org.apache.beam.runners.spark.translation.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:65)
               at org.apache.beam.runners.spark.translation.SparkProcessContext$ProcCtxtIterator.computeNext(SparkProcessContext.java:137)
               at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:141)
               at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:136)
               at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
               at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
               at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
               at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
               at org.apache.beam.runners.spark.translation.MultiDoFnFunction.call(MultiDoFnFunction.java:125)
               at org.apache.beam.runners.spark.translation.MultiDoFnFunction.call(MultiDoFnFunction.java:63)
               at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
               at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:186)
               at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
               at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
               at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
               at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
               at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
               at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
               at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
               at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
               at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
               at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
               at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
               at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
               at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
               at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
               at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
               at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
               at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
               at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
               at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
               at org.apache.spark.scheduler.Task.run(Task.scala:121)
               at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
               at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
               at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
               at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
               at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
               at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
               at org.apache.beam.sdk.io.kinesis.StartingPointShardsFinder.validateShards(StartingPointShardsFinder.java:185)
               at org.apache.beam.sdk.io.kinesis.StartingPointShardsFinder.findShardsAtStartingPoint(StartingPointShardsFinder.java:115)
               at org.apache.beam.sdk.io.kinesis.DynamicCheckpointGenerator.generate(DynamicCheckpointGenerator.java:59)
               at org.apache.beam.sdk.io.kinesis.KinesisSource.split(KinesisSource.java:93)
               at org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$SplitFn.process(BoundedReadFromUnboundedSource.java:165)

From: Mani Kolbe <ma...@gmail.com>
Sent: Thursday, July 9, 2020 10:56 PM
To: user@beam.apache.org
Subject: Re: KinesisIO checkpointing

CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe.

Is it required to set JobName and checkpointDir options for checkpointing to work?



On Thu, 9 Jul, 2020, 9:25 PM Luke Cwik, <lc...@google.com>> wrote:
The BoundedReadFromUnboundedReader does checkpoint the underlying UnboundedSource, is that checkpoint logic not working?
Do you have KinesisIO configured to always read from a specific point?

On Thu, Jul 9, 2020 at 9:56 AM Sunny, Mani Kolbe <Su...@dnb.com>> wrote:
We did the same and started using maxReadTime and put the application to run on a recurring schedule of 5 minutes. It works fine end to end without any error.

But the problem is that it always starts reading from the beginning of the Kinesis stream when it stop-starts.

When I did some investigation on that, I found that when you set maxReadTime, it will run using BoundedReadFromUnboundedSource mode. That essentially converts source in to a bounded one. This means checkpointing or watermark no longer supported. Reader just reads for x number of time and exists.

Is there anyway recommended way to resume reading from the position it finished? Either using maxReadTime or in unboundedSource mode?

Could some point me to a sample pipeline code that uses Kinesis as source?

Regards,
Mani

From: Lars Almgren Schwartz <la...@tink.com>>
Sent: Thursday, June 25, 2020 7:53 AM
To: user@beam.apache.org<ma...@beam.apache.org>
Subject: Re: KinesisIO checkpointing

CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe.

We had the exact same problem, but have not spent any time trying to solve it, we just skipped checkpointing for now.
When we saw this problem we ran Spark 2.4.5 (local mode) and Kinesis 2.18 and 2.19.

On Wed, Jun 24, 2020 at 6:22 PM Sunny, Mani Kolbe <Su...@dnb.com>> wrote:
We are on spark 2.4 and Beam 2.22.0

From: Alexey Romanenko <ar...@gmail.com>>
Sent: Wednesday, June 24, 2020 5:15 PM
To: user@beam.apache.org<ma...@beam.apache.org>
Subject: Re: KinesisIO checkpointing

CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe.

Yes, KinesisIO supports restart from checkpoints and it’s based on runner checkpoints support [1].

Could you specify which version of Spark and Beam you use?

[1] https://stackoverflow.com/questions/62259364/how-apache-beam-manage-kinesis-checkpointing/62349838#62349838<https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F62259364%2Fhow-apache-beam-manage-kinesis-checkpointing%2F62349838%2362349838&data=02%7C01%7CSunnyM%40dnb.com%7Cce0be33cfe3f4a90c68e08d82452f8e2%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637299286070707698&sdata=cyEyW26hP1yQyHx%2Fa1Qomg3SzjB%2FqwbiRmpeEvDam8Q%3D&reserved=0>

On 24 Jun 2020, at 14:13, Sunny, Mani Kolbe <Su...@DNB.com>> wrote:

Hello,

We are developing a beam pipeline which runs on SparkRunner on streaming mode. This pipeline read from Kinesis, do some translations, filtering and finally output to S3 using AvroIO writer. We are using Fixed windows with triggers based on element count and processing time intervals. Outputs path is partitioned by window start timestamp. allowedLateness=0sec

This is working fine, but I have noticed that whenever we restarts streaming, application is starting to read from Kinesis TRIM_HORIZON. That is, it is not resuming from last checkpoint position. Then I found that the checkpoint directory is based on --jobName and --checkpointDir properties. So I tried running as below:

spark-submit --master yarn --deploy-mode cluster --conf spark.dynamicAllocation.enabled=false \
    --driver-memory 1g --executor-memory 1g --num-executors 1 --executor-cores 1 \
    --class com.dnb.optimus.prime.processor.PrimeStreamProcessor \
    --conf spark.executor.extraClassPath=/etc/hbase/conf \
    /tmp/stream-processor-0.0.0.8-spark.jar \
    --runner=SparkRunner \
    --jobName=PrimeStreamProcessor \
    --checkpointDir=hdfs:///tmp/PrimeStreamProcessor checkpoint \
    --useWindow=true \
    --windowDuration=60s --windowLateness=0s --windowElementCount=1 \
    --maxReadTime=-1 \
    --streaming=true

I can see that it is able to fetch checkpoint data from checkpointDir path provided. But When the driver tries to broadcast this information to executors, it is failing with below exception.
20/06/12 15:35:28 ERROR yarn.Client: Application diagnostics message: User class threw exception: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
        at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
        at org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44)
        ....
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678)
Caused by: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
        at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
        at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)


Any idea? Is resuming from checkpoint position on application restart is actually supported on KinesisIO?

Regards,
Mani


Re: KinesisIO checkpointing

Posted by Mani Kolbe <ma...@gmail.com>.
Is it required to set JobName and checkpointDir options for checkpointing
to work?



On Thu, 9 Jul, 2020, 9:25 PM Luke Cwik, <lc...@google.com> wrote:

> The BoundedReadFromUnboundedReader does checkpoint the underlying
> UnboundedSource, is that checkpoint logic not working?
> Do you have KinesisIO configured to always read from a specific point?
>
> On Thu, Jul 9, 2020 at 9:56 AM Sunny, Mani Kolbe <Su...@dnb.com> wrote:
>
>> We did the same and started using maxReadTime and put the application to
>> run on a recurring schedule of 5 minutes. It works fine end to end without
>> any error.
>>
>>
>>
>> But the problem is that it always starts reading from the beginning of
>> the Kinesis stream when it stop-starts.
>>
>>
>>
>> When I did some investigation on that, I found that when you set
>> maxReadTime, it will run using BoundedReadFromUnboundedSource mode. That
>> essentially converts source in to a bounded one. This means checkpointing
>> or watermark no longer supported. Reader just reads for x number of time
>> and exists.
>>
>>
>>
>> Is there anyway recommended way to resume reading from the position it
>> finished? Either using maxReadTime or in unboundedSource mode?
>>
>>
>>
>> Could some point me to a sample pipeline code that uses Kinesis as source?
>>
>>
>>
>> Regards,
>>
>> Mani
>>
>>
>>
>> *From:* Lars Almgren Schwartz <la...@tink.com>
>> *Sent:* Thursday, June 25, 2020 7:53 AM
>> *To:* user@beam.apache.org
>> *Subject:* Re: KinesisIO checkpointing
>>
>>
>>
>> *CAUTION:* This email originated from outside of D&B. Please do not
>> click links or open attachments unless you recognize the sender and know
>> the content is safe.
>>
>>
>>
>> We had the exact same problem, but have not spent any time trying to
>> solve it, we just skipped checkpointing for now.
>>
>> When we saw this problem we ran Spark 2.4.5 (local mode) and Kinesis 2.18
>> and 2.19.
>>
>>
>>
>> On Wed, Jun 24, 2020 at 6:22 PM Sunny, Mani Kolbe <Su...@dnb.com> wrote:
>>
>> We are on spark 2.4 and Beam 2.22.0
>>
>>
>>
>> *From:* Alexey Romanenko <ar...@gmail.com>
>> *Sent:* Wednesday, June 24, 2020 5:15 PM
>> *To:* user@beam.apache.org
>> *Subject:* Re: KinesisIO checkpointing
>>
>>
>>
>> *CAUTION:* This email originated from outside of D&B. Please do not
>> click links or open attachments unless you recognize the sender and know
>> the content is safe.
>>
>>
>>
>> Yes, KinesisIO supports restart from checkpoints and it’s based on runner
>> checkpoints support [1].
>>
>>
>>
>> Could you specify which version of Spark and Beam you use?
>>
>>
>>
>> [1]
>> https://stackoverflow.com/questions/62259364/how-apache-beam-manage-kinesis-checkpointing/62349838#62349838
>> <https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F62259364%2Fhow-apache-beam-manage-kinesis-checkpointing%2F62349838%2362349838&data=02%7C01%7CSunnyM%40dnb.com%7Cfc04d9e9af094878c42208d818d47035%7C19e2b708bf12437597198dec42771b3e%7C0%7C1%7C637286647986523098&sdata=dwQxTL7iaK0xBrFl8xrq7Y35OSJWqejuWdgtJCBwhGM%3D&reserved=0>
>>
>>
>>
>> On 24 Jun 2020, at 14:13, Sunny, Mani Kolbe <Su...@DNB.com> wrote:
>>
>>
>>
>> Hello,
>>
>>
>>
>> We are developing a beam pipeline which runs on SparkRunner on streaming
>> mode. This pipeline read from Kinesis, do some translations, filtering and
>> finally output to S3 using AvroIO writer. We are using Fixed windows with
>> triggers based on element count and processing time intervals. Outputs path
>> is partitioned by window start timestamp. allowedLateness=0sec
>>
>>
>>
>> This is working fine, but I have noticed that whenever we restarts
>> streaming, application is starting to read from Kinesis TRIM_HORIZON. That
>> is, it is not resuming from last checkpoint position. Then I found that the
>> checkpoint directory is based on --jobName and --checkpointDir properties.
>> So I tried running as below:
>>
>>
>>
>> *spark-submit --master yarn --deploy-mode cluster --conf
>> spark.dynamicAllocation.enabled=false \*
>>
>> *    --driver-memory 1g --executor-memory 1g --num-executors 1
>> --executor-cores 1 \*
>>
>> *    --class com.dnb.optimus.prime.processor.PrimeStreamProcessor \*
>>
>> *    --conf spark.executor.extraClassPath=/etc/hbase/conf \*
>>
>> *    /tmp/stream-processor-0.0.0.8-spark.jar \*
>>
>> *    --runner=SparkRunner \*
>>
>> *    --jobName=PrimeStreamProcessor \*
>>
>> *    --checkpointDir=hdfs:///tmp/PrimeStreamProcessor checkpoint \*
>>
>> *    --useWindow=true \*
>>
>> *    --windowDuration=60s --windowLateness=0s --windowElementCount=1 \*
>>
>> *    --maxReadTime=-1 \*
>>
>> *    --streaming=true*
>>
>>
>>
>> I can see that it is able to fetch checkpoint data from *checkpointDir* path
>> provided. But When the driver tries to broadcast this information to
>> executors, it is failing with below exception.
>>
>>
>>
>>
>>
>>
>>
>>
>> *20/06/12 15:35:28 ERROR yarn.Client: Application diagnostics message:
>> User class threw exception:
>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> java.lang.UnsupportedOperationException: Accumulator must be registered
>> before send to executor         at
>> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
>>         at
>> org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44)
>>         ....         at
>> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678)
>> Caused by: java.lang.UnsupportedOperationException: Accumulator must be
>> registered before send to executor         at
>> org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
>>         at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)*
>>
>>
>>
>>
>>
>> Any idea? Is resuming from checkpoint position on application restart is
>> actually supported on KinesisIO?
>>
>>
>>
>> Regards,
>>
>> Mani
>>
>>
>>
>>

Re: KinesisIO checkpointing

Posted by Luke Cwik <lc...@google.com>.
The BoundedReadFromUnboundedReader does checkpoint the underlying
UnboundedSource, is that checkpoint logic not working?
Do you have KinesisIO configured to always read from a specific point?

On Thu, Jul 9, 2020 at 9:56 AM Sunny, Mani Kolbe <Su...@dnb.com> wrote:

> We did the same and started using maxReadTime and put the application to
> run on a recurring schedule of 5 minutes. It works fine end to end without
> any error.
>
>
>
> But the problem is that it always starts reading from the beginning of the
> Kinesis stream when it stop-starts.
>
>
>
> When I did some investigation on that, I found that when you set
> maxReadTime, it will run using BoundedReadFromUnboundedSource mode. That
> essentially converts source in to a bounded one. This means checkpointing
> or watermark no longer supported. Reader just reads for x number of time
> and exists.
>
>
>
> Is there anyway recommended way to resume reading from the position it
> finished? Either using maxReadTime or in unboundedSource mode?
>
>
>
> Could some point me to a sample pipeline code that uses Kinesis as source?
>
>
>
> Regards,
>
> Mani
>
>
>
> *From:* Lars Almgren Schwartz <la...@tink.com>
> *Sent:* Thursday, June 25, 2020 7:53 AM
> *To:* user@beam.apache.org
> *Subject:* Re: KinesisIO checkpointing
>
>
>
> *CAUTION:* This email originated from outside of D&B. Please do not click
> links or open attachments unless you recognize the sender and know the
> content is safe.
>
>
>
> We had the exact same problem, but have not spent any time trying to solve
> it, we just skipped checkpointing for now.
>
> When we saw this problem we ran Spark 2.4.5 (local mode) and Kinesis 2.18
> and 2.19.
>
>
>
> On Wed, Jun 24, 2020 at 6:22 PM Sunny, Mani Kolbe <Su...@dnb.com> wrote:
>
> We are on spark 2.4 and Beam 2.22.0
>
>
>
> *From:* Alexey Romanenko <ar...@gmail.com>
> *Sent:* Wednesday, June 24, 2020 5:15 PM
> *To:* user@beam.apache.org
> *Subject:* Re: KinesisIO checkpointing
>
>
>
> *CAUTION:* This email originated from outside of D&B. Please do not click
> links or open attachments unless you recognize the sender and know the
> content is safe.
>
>
>
> Yes, KinesisIO supports restart from checkpoints and it’s based on runner
> checkpoints support [1].
>
>
>
> Could you specify which version of Spark and Beam you use?
>
>
>
> [1]
> https://stackoverflow.com/questions/62259364/how-apache-beam-manage-kinesis-checkpointing/62349838#62349838
> <https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F62259364%2Fhow-apache-beam-manage-kinesis-checkpointing%2F62349838%2362349838&data=02%7C01%7CSunnyM%40dnb.com%7Cfc04d9e9af094878c42208d818d47035%7C19e2b708bf12437597198dec42771b3e%7C0%7C1%7C637286647986523098&sdata=dwQxTL7iaK0xBrFl8xrq7Y35OSJWqejuWdgtJCBwhGM%3D&reserved=0>
>
>
>
> On 24 Jun 2020, at 14:13, Sunny, Mani Kolbe <Su...@DNB.com> wrote:
>
>
>
> Hello,
>
>
>
> We are developing a beam pipeline which runs on SparkRunner on streaming
> mode. This pipeline read from Kinesis, do some translations, filtering and
> finally output to S3 using AvroIO writer. We are using Fixed windows with
> triggers based on element count and processing time intervals. Outputs path
> is partitioned by window start timestamp. allowedLateness=0sec
>
>
>
> This is working fine, but I have noticed that whenever we restarts
> streaming, application is starting to read from Kinesis TRIM_HORIZON. That
> is, it is not resuming from last checkpoint position. Then I found that the
> checkpoint directory is based on --jobName and --checkpointDir properties.
> So I tried running as below:
>
>
>
> *spark-submit --master yarn --deploy-mode cluster --conf
> spark.dynamicAllocation.enabled=false \*
>
> *    --driver-memory 1g --executor-memory 1g --num-executors 1
> --executor-cores 1 \*
>
> *    --class com.dnb.optimus.prime.processor.PrimeStreamProcessor \*
>
> *    --conf spark.executor.extraClassPath=/etc/hbase/conf \*
>
> *    /tmp/stream-processor-0.0.0.8-spark.jar \*
>
> *    --runner=SparkRunner \*
>
> *    --jobName=PrimeStreamProcessor \*
>
> *    --checkpointDir=hdfs:///tmp/PrimeStreamProcessor checkpoint \*
>
> *    --useWindow=true \*
>
> *    --windowDuration=60s --windowLateness=0s --windowElementCount=1 \*
>
> *    --maxReadTime=-1 \*
>
> *    --streaming=true*
>
>
>
> I can see that it is able to fetch checkpoint data from *checkpointDir* path
> provided. But When the driver tries to broadcast this information to
> executors, it is failing with below exception.
>
>
>
>
>
>
>
>
> *20/06/12 15:35:28 ERROR yarn.Client: Application diagnostics message:
> User class threw exception:
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.UnsupportedOperationException: Accumulator must be registered
> before send to executor         at
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
>         at
> org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44)
>         ....         at
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678)
> Caused by: java.lang.UnsupportedOperationException: Accumulator must be
> registered before send to executor         at
> org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
>         at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)*
>
>
>
>
>
> Any idea? Is resuming from checkpoint position on application restart is
> actually supported on KinesisIO?
>
>
>
> Regards,
>
> Mani
>
>
>
>

RE: KinesisIO checkpointing

Posted by "Sunny, Mani Kolbe" <Su...@DNB.com>.
We did the same and started using maxReadTime and put the application to run on a recurring schedule of 5 minutes. It works fine end to end without any error.

But the problem is that it always starts reading from the beginning of the Kinesis stream when it stop-starts.

When I did some investigation on that, I found that when you set maxReadTime, it will run using BoundedReadFromUnboundedSource mode. That essentially converts source in to a bounded one. This means checkpointing or watermark no longer supported. Reader just reads for x number of time and exists.

Is there anyway recommended way to resume reading from the position it finished? Either using maxReadTime or in unboundedSource mode?

Could some point me to a sample pipeline code that uses Kinesis as source?

Regards,
Mani

From: Lars Almgren Schwartz <la...@tink.com>
Sent: Thursday, June 25, 2020 7:53 AM
To: user@beam.apache.org
Subject: Re: KinesisIO checkpointing

CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe.

We had the exact same problem, but have not spent any time trying to solve it, we just skipped checkpointing for now.
When we saw this problem we ran Spark 2.4.5 (local mode) and Kinesis 2.18 and 2.19.

On Wed, Jun 24, 2020 at 6:22 PM Sunny, Mani Kolbe <Su...@dnb.com>> wrote:
We are on spark 2.4 and Beam 2.22.0

From: Alexey Romanenko <ar...@gmail.com>>
Sent: Wednesday, June 24, 2020 5:15 PM
To: user@beam.apache.org<ma...@beam.apache.org>
Subject: Re: KinesisIO checkpointing

CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe.

Yes, KinesisIO supports restart from checkpoints and it’s based on runner checkpoints support [1].

Could you specify which version of Spark and Beam you use?

[1] https://stackoverflow.com/questions/62259364/how-apache-beam-manage-kinesis-checkpointing/62349838#62349838<https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F62259364%2Fhow-apache-beam-manage-kinesis-checkpointing%2F62349838%2362349838&data=02%7C01%7CSunnyM%40dnb.com%7Cfc04d9e9af094878c42208d818d47035%7C19e2b708bf12437597198dec42771b3e%7C0%7C1%7C637286647986523098&sdata=dwQxTL7iaK0xBrFl8xrq7Y35OSJWqejuWdgtJCBwhGM%3D&reserved=0>

On 24 Jun 2020, at 14:13, Sunny, Mani Kolbe <Su...@DNB.com>> wrote:

Hello,

We are developing a beam pipeline which runs on SparkRunner on streaming mode. This pipeline read from Kinesis, do some translations, filtering and finally output to S3 using AvroIO writer. We are using Fixed windows with triggers based on element count and processing time intervals. Outputs path is partitioned by window start timestamp. allowedLateness=0sec

This is working fine, but I have noticed that whenever we restarts streaming, application is starting to read from Kinesis TRIM_HORIZON. That is, it is not resuming from last checkpoint position. Then I found that the checkpoint directory is based on --jobName and --checkpointDir properties. So I tried running as below:

spark-submit --master yarn --deploy-mode cluster --conf spark.dynamicAllocation.enabled=false \
    --driver-memory 1g --executor-memory 1g --num-executors 1 --executor-cores 1 \
    --class com.dnb.optimus.prime.processor.PrimeStreamProcessor \
    --conf spark.executor.extraClassPath=/etc/hbase/conf \
    /tmp/stream-processor-0.0.0.8-spark.jar \
    --runner=SparkRunner \
    --jobName=PrimeStreamProcessor \
    --checkpointDir=hdfs:///tmp/PrimeStreamProcessor checkpoint \
    --useWindow=true \
    --windowDuration=60s --windowLateness=0s --windowElementCount=1 \
    --maxReadTime=-1 \
    --streaming=true

I can see that it is able to fetch checkpoint data from checkpointDir path provided. But When the driver tries to broadcast this information to executors, it is failing with below exception.
20/06/12 15:35:28 ERROR yarn.Client: Application diagnostics message: User class threw exception: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
        at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
        at org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44)
        ....
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678)
Caused by: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
        at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
        at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)


Any idea? Is resuming from checkpoint position on application restart is actually supported on KinesisIO?

Regards,
Mani


Re: KinesisIO checkpointing

Posted by Lars Almgren Schwartz <la...@tink.com>.
We had the exact same problem, but have not spent any time trying to solve
it, we just skipped checkpointing for now.
When we saw this problem we ran Spark 2.4.5 (local mode) and Kinesis 2.18
and 2.19.

On Wed, Jun 24, 2020 at 6:22 PM Sunny, Mani Kolbe <Su...@dnb.com> wrote:

> We are on spark 2.4 and Beam 2.22.0
>
>
>
> *From:* Alexey Romanenko <ar...@gmail.com>
> *Sent:* Wednesday, June 24, 2020 5:15 PM
> *To:* user@beam.apache.org
> *Subject:* Re: KinesisIO checkpointing
>
>
>
> *CAUTION:* This email originated from outside of D&B. Please do not click
> links or open attachments unless you recognize the sender and know the
> content is safe.
>
>
>
> Yes, KinesisIO supports restart from checkpoints and it’s based on runner
> checkpoints support [1].
>
>
>
> Could you specify which version of Spark and Beam you use?
>
>
>
> [1]
> https://stackoverflow.com/questions/62259364/how-apache-beam-manage-kinesis-checkpointing/62349838#62349838
> <https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F62259364%2Fhow-apache-beam-manage-kinesis-checkpointing%2F62349838%2362349838&data=02%7C01%7CSunnyM%40dnb.com%7C6e4c3abaf7a4488398ce08d81859bc9c%7C19e2b708bf12437597198dec42771b3e%7C0%7C1%7C637286120987612030&sdata=zzUcv9LYIX65yl%2FjV6uMP5C5cZdBgStDVkRL3NXRfm8%3D&reserved=0>
>
>
>
> On 24 Jun 2020, at 14:13, Sunny, Mani Kolbe <Su...@DNB.com> wrote:
>
>
>
> Hello,
>
>
>
> We are developing a beam pipeline which runs on SparkRunner on streaming
> mode. This pipeline read from Kinesis, do some translations, filtering and
> finally output to S3 using AvroIO writer. We are using Fixed windows with
> triggers based on element count and processing time intervals. Outputs path
> is partitioned by window start timestamp. allowedLateness=0sec
>
>
>
> This is working fine, but I have noticed that whenever we restarts
> streaming, application is starting to read from Kinesis TRIM_HORIZON. That
> is, it is not resuming from last checkpoint position. Then I found that the
> checkpoint directory is based on --jobName and --checkpointDir properties.
> So I tried running as below:
>
>
>
> *spark-submit --master yarn --deploy-mode cluster --conf
> spark.dynamicAllocation.enabled=false \*
>
> *    --driver-memory 1g --executor-memory 1g --num-executors 1
> --executor-cores 1 \*
>
> *    --class com.dnb.optimus.prime.processor.PrimeStreamProcessor \*
>
> *    --conf spark.executor.extraClassPath=/etc/hbase/conf \*
>
> *    /tmp/stream-processor-0.0.0.8-spark.jar \*
>
> *    --runner=SparkRunner \*
>
> *    --jobName=PrimeStreamProcessor \*
>
> *    --checkpointDir=hdfs:///tmp/PrimeStreamProcessor checkpoint \*
>
> *    --useWindow=true \*
>
> *    --windowDuration=60s --windowLateness=0s --windowElementCount=1 \*
>
> *    --maxReadTime=-1 \*
>
> *    --streaming=true*
>
>
>
> I can see that it is able to fetch checkpoint data from *checkpointDir* path
> provided. But When the driver tries to broadcast this information to
> executors, it is failing with below exception.
>
>
>
>
>
>
>
>
> *20/06/12 15:35:28 ERROR yarn.Client: Application diagnostics message:
> User class threw exception:
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.UnsupportedOperationException: Accumulator must be registered
> before send to executor         at
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
>         at
> org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44)
>         ....         at
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678)
> Caused by: java.lang.UnsupportedOperationException: Accumulator must be
> registered before send to executor         at
> org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
>         at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)*
>
>
>
>
>
> Any idea? Is resuming from checkpoint position on application restart is
> actually supported on KinesisIO?
>
>
>
> Regards,
>
> Mani
>
>
>

RE: KinesisIO checkpointing

Posted by "Sunny, Mani Kolbe" <Su...@DNB.com>.
We are on spark 2.4 and Beam 2.22.0

From: Alexey Romanenko <ar...@gmail.com>
Sent: Wednesday, June 24, 2020 5:15 PM
To: user@beam.apache.org
Subject: Re: KinesisIO checkpointing

CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe.

Yes, KinesisIO supports restart from checkpoints and it’s based on runner checkpoints support [1].

Could you specify which version of Spark and Beam you use?

[1] https://stackoverflow.com/questions/62259364/how-apache-beam-manage-kinesis-checkpointing/62349838#62349838<https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F62259364%2Fhow-apache-beam-manage-kinesis-checkpointing%2F62349838%2362349838&data=02%7C01%7CSunnyM%40dnb.com%7C6e4c3abaf7a4488398ce08d81859bc9c%7C19e2b708bf12437597198dec42771b3e%7C0%7C1%7C637286120987612030&sdata=zzUcv9LYIX65yl%2FjV6uMP5C5cZdBgStDVkRL3NXRfm8%3D&reserved=0>


On 24 Jun 2020, at 14:13, Sunny, Mani Kolbe <Su...@DNB.com>> wrote:

Hello,

We are developing a beam pipeline which runs on SparkRunner on streaming mode. This pipeline read from Kinesis, do some translations, filtering and finally output to S3 using AvroIO writer. We are using Fixed windows with triggers based on element count and processing time intervals. Outputs path is partitioned by window start timestamp. allowedLateness=0sec

This is working fine, but I have noticed that whenever we restarts streaming, application is starting to read from Kinesis TRIM_HORIZON. That is, it is not resuming from last checkpoint position. Then I found that the checkpoint directory is based on --jobName and --checkpointDir properties. So I tried running as below:

spark-submit --master yarn --deploy-mode cluster --conf spark.dynamicAllocation.enabled=false \
    --driver-memory 1g --executor-memory 1g --num-executors 1 --executor-cores 1 \
    --class com.dnb.optimus.prime.processor.PrimeStreamProcessor \
    --conf spark.executor.extraClassPath=/etc/hbase/conf \
    /tmp/stream-processor-0.0.0.8-spark.jar \
    --runner=SparkRunner \
    --jobName=PrimeStreamProcessor \
    --checkpointDir=hdfs:///tmp/PrimeStreamProcessor<hdfs://tmp/PrimeStreamProcessor> checkpoint \
    --useWindow=true \
    --windowDuration=60s --windowLateness=0s --windowElementCount=1 \
    --maxReadTime=-1 \
    --streaming=true

I can see that it is able to fetch checkpoint data from checkpointDir path provided. But When the driver tries to broadcast this information to executors, it is failing with below exception.
20/06/12 15:35:28 ERROR yarn.Client: Application diagnostics message: User class threw exception: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
        at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
        at org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44)
        ....
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678)
Caused by: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
        at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
        at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)


Any idea? Is resuming from checkpoint position on application restart is actually supported on KinesisIO?

Regards,
Mani


Re: KinesisIO checkpointing

Posted by Alexey Romanenko <ar...@gmail.com>.
Yes, KinesisIO supports restart from checkpoints and it’s based on runner checkpoints support [1].  

Could you specify which version of Spark and Beam you use?

[1] https://stackoverflow.com/questions/62259364/how-apache-beam-manage-kinesis-checkpointing/62349838#62349838 <https://stackoverflow.com/questions/62259364/how-apache-beam-manage-kinesis-checkpointing/62349838#62349838>

> On 24 Jun 2020, at 14:13, Sunny, Mani Kolbe <Su...@DNB.com> wrote:
> 
> Hello,
>  
> We are developing a beam pipeline which runs on SparkRunner on streaming mode. This pipeline read from Kinesis, do some translations, filtering and finally output to S3 using AvroIO writer. We are using Fixed windows with triggers based on element count and processing time intervals. Outputs path is partitioned by window start timestamp. allowedLateness=0sec
>  
> This is working fine, but I have noticed that whenever we restarts streaming, application is starting to read from Kinesis TRIM_HORIZON. That is, it is not resuming from last checkpoint position. Then I found that the checkpoint directory is based on --jobName and --checkpointDir properties. So I tried running as below:
>  
> spark-submit --master yarn --deploy-mode cluster --conf spark.dynamicAllocation.enabled=false \
>     --driver-memory 1g --executor-memory 1g --num-executors 1 --executor-cores 1 \
>     --class com.dnb.optimus.prime.processor.PrimeStreamProcessor \
>     --conf spark.executor.extraClassPath=/etc/hbase/conf \
>     /tmp/stream-processor-0.0.0.8-spark.jar \
>     --runner=SparkRunner \
>     --jobName=PrimeStreamProcessor \
>     --checkpointDir=hdfs:///tmp/PrimeStreamProcessor <hdfs:///tmp/PrimeStreamProcessor> checkpoint \
>     --useWindow=true \
>     --windowDuration=60s --windowLateness=0s --windowElementCount=1 \
>     --maxReadTime=-1 \
>     --streaming=true
>  
> I can see that it is able to fetch checkpoint data from checkpointDir path provided. But When the driver tries to broadcast this information to executors, it is failing with below exception.
> 20/06/12 15:35:28 ERROR yarn.Client: Application diagnostics message: User class threw exception: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
>         at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
>         at org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44)
>         ....
>         at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:678)
> Caused by: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
>         at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
>         at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)
>  
>  
> Any idea? Is resuming from checkpoint position on application restart is actually supported on KinesisIO?
>  
> Regards,
> Mani