You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Biplob Biswas <re...@gmail.com> on 2018/05/17 08:28:44 UTC

Spark Jobs ends when assignment not found for Kafka Partition

Hi,

I am having this peculiar problem with our spark jobs in our cluster, where
the spark job ends with a message:

No current assignment for partition iomkafkaconnector-deliverydata-dev-2


We have a setup where we have 4 kafka partitions and 4 spark executors, so
each partition should be directly read by each executor.  Also, The
interesting thing to note is that we haven't set the '
spark.dynamicAllocation.enabled' to true but still the executors become
dead and new executors are added at times.

The problem I see is that when executors go down, the partitions are not
reassigned to active executors, I can't say why is this happening, but I
have the following log which I receive after which the spark job dies.



18/05/12 03:52:52 INFO internals.ConsumerCoordinator: Setting newly
assigned partitions [hierarchy-updates-dev-0,
iomkafkaconnector-deliverydata-dev-0,
iomkafkaconnector-deliverydata-dev-1] for group
data-in-cleansing-consumer
18/05/12 03:53:01 ERROR scheduler.JobScheduler: Error generating jobs
for time 1526075610000 ms
java.lang.IllegalStateException: No current assignment for partition
iomkafkaconnector-deliverydata-dev-2
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:315)
	at org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1170)
	at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:194)
	at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:211)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
	at scala.Option.orElse(Option.scala:289)
	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
	at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
	at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
	at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
18/05/12 03:53:01 ERROR yarn.ApplicationMaster: User class threw
exception: java.lang.IllegalStateException: No current assignment for
partition iomkafkaconnector-deliverydata-dev-2
java.lang.IllegalStateException: No current assignment for partition
iomkafkaconnector-deliverydata-dev-2
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:315)
	at org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1170)
	at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:194)
	at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:211)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
	at scala.Option.orElse(Option.scala:289)
	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
	at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
	at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
	at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
18/05/12 03:53:01 INFO yarn.ApplicationMaster: Final app status:
FAILED, exitCode: 15, (reason: User class threw exception:
java.lang.IllegalStateException: No current assignment for partition
iomkafkaconnector-deliverydata-dev-2)
18/05/12 03:53:01 INFO streaming.StreamingContext: Invoking
stop(stopGracefully=false) from shutdown hook
18/05/12 03:53:01 INFO scheduler.ReceiverTracker: ReceiverTracker stopped
18/05/12 03:53:01 INFO scheduler.JobGenerator: Stopping JobGenerator immediately
18/05/12 03:53:01 INFO util.RecurringTimer: Stopped timer for
JobGenerator after time 1526097180000
18/05/12 03:53:02 ERROR scheduler.JobScheduler: Error generating jobs
for time 1526075620000 ms
java.lang.IllegalStateException: No current assignment for partition
iomkafkaconnector-deliverydata-dev-2
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:315)
	at org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1170)
	at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:194)
	at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:211)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
	at scala.Option.orElse(Option.scala:289)
	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
	at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
	at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
	at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
18/05/12 03:53:02 WARN kerberos.KerberosLogin: TGT renewal thread has
been interrupted and will exit.
18/05/12 03:53:02 ERROR scheduler.JobScheduler: Error generating jobs
for time 1526075630000 ms
java.lang.IllegalStateException: This consumer has already been closed.
	at org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1417)
	at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1428)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929)
	at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:165)
	at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:184)
	at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:211)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
	at scala.Option.orElse(Option.scala:289)
	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
	at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
	at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
	at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
18/05/12 03:53:02 INFO scheduler.JobGenerator: Stopped JobGenerator
18/05/12 03:53:02 INFO scheduler.JobScheduler: Stopped JobScheduler
18/05/12 03:53:02 INFO handler.ContextHandler: Stopped
o.s.j.s.ServletContextHandler@56a3af9e{/streaming,null,UNAVAILABLE,@Spark}
18/05/12 03:53:02 INFO handler.ContextHandler: Stopped
o.s.j.s.ServletContextHandler@4ca2cc2b{/streaming/batch,null,UNAVAILABLE,@Spark}
18/05/12 03:53:02 INFO handler.ContextHandler: Stopped
o.s.j.s.ServletContextHandler@39330226{/static/streaming,null,UNAVAILABLE,@Spark}
18/05/12 03:53:02 INFO streaming.StreamingContext: StreamingContext
stopped successfully
18/05/12 03:53:02 INFO spark.SparkContext: Invoking stop() from shutdown hook
18/05/12 03:53:02 INFO storage.BlockManagerInfo: Removed
broadcast_537_piece0 on 10.2.0.12:46563 in memory (size: 4.6 KB, free:
93.3 MB)
18/05/12 03:53:02 INFO server.AbstractConnector: Stopped
Spark@45ad8de8{HTTP/1.1,[http/1.1]}{0.0.0.0:0}
18/05/12 03:53:02 INFO storage.BlockManagerInfo: Removed
broadcast_537_piece0 on
cdh2.germanycentral.cloudapp.microsoftazure.de:36878 in memory (size:
4.6 KB, free: 93.3 MB)
18/05/12 03:53:02 INFO storage.BlockManagerInfo: Removed
broadcast_537_piece0 on
cdh3.germanycentral.cloudapp.microsoftazure.de:42605 in memory (size:
4.6 KB, free: 93.3 MB)
18/05/12 03:53:02 INFO storage.BlockManagerInfo: Removed
broadcast_537_piece0 on
cdh2.germanycentral.cloudapp.microsoftazure.de:36740 in memory (size:
4.6 KB, free: 93.3 MB)
18/05/12 03:53:02 INFO storage.BlockManagerInfo: Removed
broadcast_537_piece0 on
cdh2.germanycentral.cloudapp.microsoftazure.de:34228 in memory (size:
4.6 KB, free: 93.3 MB)
18/05/12 03:53:02 INFO ui.SparkUI: Stopped Spark web UI at
http://10.2.0.12:37118
18/05/12 03:53:02 INFO cluster.YarnClusterSchedulerBackend: Shutting
down all executors
18/05/12 03:53:02 INFO
cluster.YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor
to shut down
18/05/12 03:53:02 INFO cluster.SchedulerExtensionServices: Stopping
SchedulerExtensionServices
(serviceOption=None,
 services=List(),
 started=false)
18/05/12 03:53:02 INFO spark.MapOutputTrackerMasterEndpoint:
MapOutputTrackerMasterEndpoint stopped!
18/05/12 03:53:02 INFO memory.MemoryStore: MemoryStore cleared
18/05/12 03:53:02 INFO storage.BlockManager: BlockManager stopped
18/05/12 03:53:02 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
18/05/12 03:53:02 INFO
scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:
OutputCommitCoordinator stopped!
18/05/12 03:53:02 INFO spark.SparkContext: Successfully stopped SparkContext
18/05/12 03:53:02 INFO yarn.ApplicationMaster: Unregistering
ApplicationMaster with FAILED (diag message: User class threw
exception: java.lang.IllegalStateException: No current assignment for
partition iomkafkaconnector-deliverydata-dev-2)
18/05/12 03:53:02 INFO impl.AMRMClientImpl: Waiting for application to
be successfully unregistered.
18/05/12 03:53:02 INFO util.ShutdownHookManager: Shutdown hook called



Thanks & Regards
Biplob Biswas