You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "Rastogi, Pankaj" <pa...@verizon.com> on 2017/06/08 05:42:24 UTC

SPARK-19547

Hi,
 I have been trying to distribute Kafka topics among different instances of same consumer group. I am using KafkaDirectStream API for creating DStreams. After the second consumer group comes up, Kafka does partition rebalance and then Spark driver of the first consumer dies with the following exception:

java.lang.IllegalStateException: No current assignment for partition myTopic_5-0
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:264)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:336)
at org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1236)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:197)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:214)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

I see that there is Spark ticket opened with the same issue(https://issues.apache.org/jira/browse/SPARK-19547) but it has been marked as INVALID. Can someone explain why this ticket is marked INVALID.

Thanks,
Pankaj

Re: [E] Re: SPARK-19547

Posted by Sree V <sr...@yahoo.com.INVALID>.
Hi Pankaj,
>> After the second consumer group comes upDo you mean a second consumer starts with the same consumer group as the first ?

createDirectStream is overloaded. One of the method, doesn't need you to specify partitions of a topic.

Cheers
- Sree 

    On Thursday, June 8, 2017 9:56 AM, "Rastogi, Pankaj" <pa...@verizon.com> wrote:
 

 Hi,
 Thank you for your reply!
 You got it right! I am trying to run multiple streams using the same
consumer, so that I can distribute different partitions among different
instances of the consumer group. I don¹t want to provide the list of
partitions in createDirectStream API. If I do that then it will become
difficult to handle consumer failure as those partitions won¹t be ready by
any consumer. Also I will have to handle addition of new partitions.

I wanted to see if I can use partition rebalance feature.

Pankaj

On 6/8/17, 8:24 AM, "Cody Koeninger" <co...@koeninger.org> wrote:

>Can you explain in more detail what you mean by "distribute Kafka
>topics among different instances of same consumer group"?
>
>If you're trying to run multiple streams using the same consumer
>group, it's already documented that you shouldn't do that.
>
>On Thu, Jun 8, 2017 at 12:43 AM, Rastogi, Pankaj
><pa...@verizon.com> wrote:
>> Hi,
>>  I have been trying to distribute Kafka topics among different
>>instances of
>> same consumer group. I am using KafkaDirectStream API for creating
>>DStreams.
>> After the second consumer group comes up, Kafka does partition
>>rebalance and
>> then Spark driver of the first consumer dies with the following
>>exception:
>>
>> java.lang.IllegalStateException: No current assignment for partition
>> myTopic_5-0
>> at
>> 
>>org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedSta
>>te(SubscriptionState.java:264)
>> at
>> 
>>org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetR
>>eset(SubscriptionState.java:336)
>> at
>> 
>>org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.j
>>ava:1236)
>> at
>> 
>>org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets
>>(DirectKafkaInputDStream.scala:197)
>> at
>> 
>>org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(Direc
>>tKafkaInputDStream.scala:214)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1$$anonfun$apply$7.apply(DStream.scala:341)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1$$anonfun$apply$7.apply(DStream.scala:341)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1.apply(DStream.scala:340)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1.apply(DStream.scala:340)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(D
>>Stream.scala:415)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(
>>DStream.scala:335)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(
>>DStream.scala:333)
>> at scala.Option.orElse(Option.scala:257)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330
>>)
>> at
>> 
>>org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(Tr
>>ansformedDStream.scala:42)
>> at
>> 
>>org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(Tr
>>ansformedDStream.scala:42)
>> at
>> 
>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca
>>la:244)
>> at
>> 
>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca
>>la:244)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>> at
>> 
>>org.apache.spark.streaming.dstream.TransformedDStream.compute(Transformed
>>DStream.scala:42)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1$$anonfun$apply$7.apply(DStream.scala:341)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1$$anonfun$apply$7.apply(DStream.scala:341)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1.apply(DStream.scala:340)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1.apply(DStream.scala:340)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(D
>>Stream.scala:415)
>> at
>> 
>>org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalP
>>roperties(TransformedDStream.scala:65)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(
>>DStream.scala:335)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(
>>DStream.scala:333)
>> at scala.Option.orElse(Option.scala:257)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330
>>)
>> at
>> 
>>org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStr
>>eam.scala:48)
>> at
>> 
>>org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sca
>>la:117)
>> at
>> 
>>org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sca
>>la:116)
>> at
>> 
>>scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike
>>.scala:251)
>> at
>> 
>>scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike
>>.scala:251)
>> at
>> 
>>scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scal
>>a:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at 
>>scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>> at
>> 
>>org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:1
>>16)
>> at
>> 
>>org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGen
>>erator.scala:249)
>> at
>> 
>>org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGen
>>erator.scala:247)
>> at scala.util.Try$.apply(Try.scala:161)
>> at
>> 
>>org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerat
>>or.scala:247)
>> at
>> 
>>org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$stream
>>ing$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
>> at
>> 
>>org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGe
>>nerator.scala:89)
>> at
>> 
>>org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGe
>>nerator.scala:88)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>
>> I see that there is Spark ticket opened with the same
>> 
>>issue(https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.
>>org_jira_browse_SPARK-2D19547&d=DwIBaQ&c=udBTRvFvXC5Dhqg7UHpJlPps3mZ3LRxp
>>b6__0PomBTQ&r=zQqmwCNxd6rBWnFRMGXIzVL1nRVw40AD5ViBUj89NkA&m=z6Y6ytitXzXsa
>>VNguwUlEw9bqH1xFzdB41wcJAbNex4&s=wKLdTZtkzJCT8c4egqXfosrZ3KJAC0rNSZG_DPAL
>>OYw&e= ) but it has been
>> marked as INVALID. Can someone explain why this ticket is marked
>>INVALID.
>>
>> Thanks,
>> Pankaj
>
>---------------------------------------------------------------------
>To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>


---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


   

Re: [E] Re: SPARK-19547

Posted by "Rastogi, Pankaj" <pa...@verizon.com>.
Hi,
 Thank you for your reply!
 You got it right! I am trying to run multiple streams using the same
consumer, so that I can distribute different partitions among different
instances of the consumer group. I don¹t want to provide the list of
partitions in createDirectStream API. If I do that then it will become
difficult to handle consumer failure as those partitions won¹t be ready by
any consumer. Also I will have to handle addition of new partitions.

I wanted to see if I can use partition rebalance feature.

Pankaj

On 6/8/17, 8:24 AM, "Cody Koeninger" <co...@koeninger.org> wrote:

>Can you explain in more detail what you mean by "distribute Kafka
>topics among different instances of same consumer group"?
>
>If you're trying to run multiple streams using the same consumer
>group, it's already documented that you shouldn't do that.
>
>On Thu, Jun 8, 2017 at 12:43 AM, Rastogi, Pankaj
><pa...@verizon.com> wrote:
>> Hi,
>>  I have been trying to distribute Kafka topics among different
>>instances of
>> same consumer group. I am using KafkaDirectStream API for creating
>>DStreams.
>> After the second consumer group comes up, Kafka does partition
>>rebalance and
>> then Spark driver of the first consumer dies with the following
>>exception:
>>
>> java.lang.IllegalStateException: No current assignment for partition
>> myTopic_5-0
>> at
>> 
>>org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedSta
>>te(SubscriptionState.java:264)
>> at
>> 
>>org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetR
>>eset(SubscriptionState.java:336)
>> at
>> 
>>org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.j
>>ava:1236)
>> at
>> 
>>org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets
>>(DirectKafkaInputDStream.scala:197)
>> at
>> 
>>org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(Direc
>>tKafkaInputDStream.scala:214)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1$$anonfun$apply$7.apply(DStream.scala:341)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1$$anonfun$apply$7.apply(DStream.scala:341)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1.apply(DStream.scala:340)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1.apply(DStream.scala:340)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(D
>>Stream.scala:415)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(
>>DStream.scala:335)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(
>>DStream.scala:333)
>> at scala.Option.orElse(Option.scala:257)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330
>>)
>> at
>> 
>>org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(Tr
>>ansformedDStream.scala:42)
>> at
>> 
>>org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(Tr
>>ansformedDStream.scala:42)
>> at
>> 
>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca
>>la:244)
>> at
>> 
>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca
>>la:244)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>> at
>> 
>>org.apache.spark.streaming.dstream.TransformedDStream.compute(Transformed
>>DStream.scala:42)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1$$anonfun$apply$7.apply(DStream.scala:341)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1$$anonfun$apply$7.apply(DStream.scala:341)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1.apply(DStream.scala:340)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1.apply(DStream.scala:340)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(D
>>Stream.scala:415)
>> at
>> 
>>org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalP
>>roperties(TransformedDStream.scala:65)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(
>>DStream.scala:335)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(
>>DStream.scala:333)
>> at scala.Option.orElse(Option.scala:257)
>> at
>> 
>>org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330
>>)
>> at
>> 
>>org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStr
>>eam.scala:48)
>> at
>> 
>>org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sca
>>la:117)
>> at
>> 
>>org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sca
>>la:116)
>> at
>> 
>>scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike
>>.scala:251)
>> at
>> 
>>scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike
>>.scala:251)
>> at
>> 
>>scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scal
>>a:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at 
>>scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>> at
>> 
>>org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:1
>>16)
>> at
>> 
>>org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGen
>>erator.scala:249)
>> at
>> 
>>org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGen
>>erator.scala:247)
>> at scala.util.Try$.apply(Try.scala:161)
>> at
>> 
>>org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerat
>>or.scala:247)
>> at
>> 
>>org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$stream
>>ing$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
>> at
>> 
>>org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGe
>>nerator.scala:89)
>> at
>> 
>>org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGe
>>nerator.scala:88)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>
>> I see that there is Spark ticket opened with the same
>> 
>>issue(https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.
>>org_jira_browse_SPARK-2D19547&d=DwIBaQ&c=udBTRvFvXC5Dhqg7UHpJlPps3mZ3LRxp
>>b6__0PomBTQ&r=zQqmwCNxd6rBWnFRMGXIzVL1nRVw40AD5ViBUj89NkA&m=z6Y6ytitXzXsa
>>VNguwUlEw9bqH1xFzdB41wcJAbNex4&s=wKLdTZtkzJCT8c4egqXfosrZ3KJAC0rNSZG_DPAL
>>OYw&e= ) but it has been
>> marked as INVALID. Can someone explain why this ticket is marked
>>INVALID.
>>
>> Thanks,
>> Pankaj
>
>---------------------------------------------------------------------
>To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>


---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


Re: SPARK-19547

Posted by Cody Koeninger <co...@koeninger.org>.
Can you explain in more detail what you mean by "distribute Kafka
topics among different instances of same consumer group"?

If you're trying to run multiple streams using the same consumer
group, it's already documented that you shouldn't do that.

On Thu, Jun 8, 2017 at 12:43 AM, Rastogi, Pankaj
<pa...@verizon.com> wrote:
> Hi,
>  I have been trying to distribute Kafka topics among different instances of
> same consumer group. I am using KafkaDirectStream API for creating DStreams.
> After the second consumer group comes up, Kafka does partition rebalance and
> then Spark driver of the first consumer dies with the following exception:
>
> java.lang.IllegalStateException: No current assignment for partition
> myTopic_5-0
> at
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:264)
> at
> org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:336)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1236)
> at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:197)
> at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:214)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
> at scala.Option.orElse(Option.scala:257)
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
> at
> org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
> at
> org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at
> org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
> at
> org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
> at scala.Option.orElse(Option.scala:257)
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
> at
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> at
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
> at scala.util.Try$.apply(Try.scala:161)
> at
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
> at
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
> I see that there is Spark ticket opened with the same
> issue(https://issues.apache.org/jira/browse/SPARK-19547) but it has been
> marked as INVALID. Can someone explain why this ticket is marked INVALID.
>
> Thanks,
> Pankaj

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


SPARK-19547

Posted by "Rastogi, Pankaj" <pa...@verizon.com>.
Hi,
 I have been trying to distribute Kafka topics among different instances of same consumer group. I am using KafkaDirectStream API for creating DStreams. After the second consumer group comes up, Kafka does partition rebalance and then Spark driver of the first consumer dies with the following exception:

java.lang.IllegalStateException: No current assignment for partition myTopic_5-0
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:264)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:336)
at org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1236)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:197)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:214)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

I see that there is Spark ticket opened with the same issue(https://issues.apache.org/jira/browse/SPARK-19547) but it has been marked as INVALID. Can someone explain why this ticket is marked INVALID.

Thanks,
Pankaj