You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by "McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)" <ch...@hpe.com> on 2016/04/14 22:00:19 UTC

GetKafka blowing up with assertion error in Kafka client code

I’m running based of of 0.7.0 Snapshot.  The GetKafka config is pretty generic.  Batch size 1, 1 concurrent task.


2016-04-14 19:27:23,204 ERROR [Timer-Driven Process Thread-9] o.apache.nifi.processors.kafka.GetKafka
java.lang.IllegalStateException: java.util.concurrent.ExecutionException: java.lang.AssertionError: assertion failed
        at org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:355) ~[na:na]
        at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) ~[nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
        at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1059) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
        at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
        at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
        at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_45]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_45]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_45]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_45]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_45]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_45]
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
Caused by: java.util.concurrent.ExecutionException: java.lang.AssertionError: assertion failed
        at java.util.concurrent.FutureTask.report(FutureTask.java:122) [na:1.8.0_45]
        at java.util.concurrent.FutureTask.get(FutureTask.java:206) [na:1.8.0_45]
        at org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:348) ~[na:na]
        ... 12 common frames omitted
Caused by: java.lang.AssertionError: assertion failed
        at scala.Predef$.assert(Predef.scala:165) ~[na:na]
        at kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:51) ~[na:na]
        at kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:49) ~[na:na]
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) ~[na:na]
        at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) ~[na:na]
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) ~[na:na]
        at kafka.consumer.TopicCount$.makeConsumerThreadIdsPerTopic(TopicCount.scala:49) ~[na:na]
        at kafka.consumer.StaticTopicCount.getConsumerThreadIdsPerTopic(TopicCount.scala:113) ~[na:na]
        at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:226) ~[na:na]
        at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:85) ~[na:na]
        at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:97) ~[na:na]
        at org.apache.nifi.processors.kafka.GetKafka.createConsumers(GetKafka.java:281) ~[na:na]
        at org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:343) ~[na:na]
        at org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:340) ~[na:na]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_45]
        ... 3 common frames omitted

Re: GetKafka blowing up with assertion error in Kafka client code

Posted by Joe Witt <jo...@gmail.com>.
As with any system to system interaction things can happen.  All
systems, including Kafka provide facilities to allow systems writing
to or consuming from it, to recover from failure cases.  So let's just
focus on what the config/environment is and do our best to provide
ways to work past these issues.  It doesn't help us or anyone else to
highlight frequent deadlocks so let's just stay focused on what we can
do to help.

On Thu, Apr 14, 2016 at 4:13 PM, Oleg Zhurakousky
<oz...@hortonworks.com> wrote:
> Chris
> That is correct and for a change I am pretty happy to see this stack trace as it clearly shows the problem and validates the approach we have.
> So here are more details. . .
>
> The root failure is in Kafka (as you can see from the stack trace). All we are doing is encapsulating interaction with Kafka into cancelable Future so we can cancel if and when Kafka deadlocks (which we noticed happens rather often)
> When we execute Future.get() it results in ExecutionException which caries the original Kafka exception (AssertionError).
> Now I am not sure what that assertion error really means in the context of what you are trying to do but its clearly a problem originated in Kafka.
> Could you share your config or whatever other details?
>
> Cheers
> Oleg
>
>> On Apr 14, 2016, at 4:00 PM, McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
>>
>> I’m running based of of 0.7.0 Snapshot.  The GetKafka config is pretty generic.  Batch size 1, 1 concurrent task.
>>
>>
>> 2016-04-14 19:27:23,204 ERROR [Timer-Driven Process Thread-9] o.apache.nifi.processors.kafka.GetKafka
>> java.lang.IllegalStateException: java.util.concurrent.ExecutionException: java.lang.AssertionError: assertion failed
>>        at org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:355) ~[na:na]
>>        at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) ~[nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>        at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1059) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>        at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>        at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>        at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_45]
>>        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_45]
>>        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_45]
>>        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_45]
>>        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_45]
>>        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_45]
>>        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
>> Caused by: java.util.concurrent.ExecutionException: java.lang.AssertionError: assertion failed
>>        at java.util.concurrent.FutureTask.report(FutureTask.java:122) [na:1.8.0_45]
>>        at java.util.concurrent.FutureTask.get(FutureTask.java:206) [na:1.8.0_45]
>>        at org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:348) ~[na:na]
>>        ... 12 common frames omitted
>> Caused by: java.lang.AssertionError: assertion failed
>>        at scala.Predef$.assert(Predef.scala:165) ~[na:na]
>>        at kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:51) ~[na:na]
>>        at kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:49) ~[na:na]
>>        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) ~[na:na]
>>        at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) ~[na:na]
>>        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) ~[na:na]
>>        at kafka.consumer.TopicCount$.makeConsumerThreadIdsPerTopic(TopicCount.scala:49) ~[na:na]
>>        at kafka.consumer.StaticTopicCount.getConsumerThreadIdsPerTopic(TopicCount.scala:113) ~[na:na]
>>        at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:226) ~[na:na]
>>        at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:85) ~[na:na]
>>        at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:97) ~[na:na]
>>        at org.apache.nifi.processors.kafka.GetKafka.createConsumers(GetKafka.java:281) ~[na:na]
>>        at org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:343) ~[na:na]
>>        at org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:340) ~[na:na]
>>        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_45]
>>        ... 3 common frames omitted
>

Re: GetKafka blowing up with assertion error in Kafka client code

Posted by "McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)" <ch...@hpe.com>.
I’ve created a JIRA[1] for this.  

Chris

[1] https://issues.apache.org/jira/browse/NIFI-1827





On 4/28/16, 1:36 PM, "Oleg Zhurakousky" <oz...@hortonworks.com> wrote:

>Chris
>
>Thanks for looking into this and describing the problem. Indeed we have seen similar symptoms but would need to further investigate and see if there is an option to stop the internal to Kafka reconnect thread. It appears there are configuration properties in the new API to do that, while I am not sure about the old at the moment. 
>As I said, will investigate further and let you know 
>
>Thanks again for looking into this
>
>Oleg 
>
>Sent from my iPhone
>
>> On Apr 28, 2016, at 18:41, McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
>> 
>> Oleg,
>> 
>> I have reproduced the problem.  Its pretty easy to do. Just delete and recreate the topic while the processor is running.  I think I saw a similar problem when I increased the partitions in the topic.  That problem resolved itself when I restarted the GetKafka processors.  However, to resolve this problem restarting the processor does not work. It must be that something is being stored in Zookeeper.  I am guessing that deleting and recreating the processor will do the trick.  Is there any debugging information which I can provide to you?
>> 
>> Thanks,
>> Chris
>> 
>> 
>> 
>>> On 4/14/16, 8:32 PM, "Oleg Zhurakousky" <oz...@hortonworks.com> wrote:
>>> 
>>> Thanks Chris
>>> 
>>> Indeed let us know if/when/how to reproduce it so we can evaluate and see if it is something we can validate/handle in NiFi before it is passed to Kafka (e.g., validation etc)
>>> 
>>> Cheers
>>> Oleg
>>> 
>>>> On Apr 14, 2016, at 8:25 PM, McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
>>>> 
>>>> I looked at the Kafka client code and it seemed to me to be a bug in the caller. There is a map passed that maps topics to number of consumers. In this case it asserting that the number of consumers is greater than zero. If I can repro the problem I'll try to isolate it in the debugger and provide more details.
>>>> 
>>>> 
>>>> 
>>>> Sent from my Verizon, Samsung Galaxy smartphone
>>>> 
>>>> 
>>>> -------- Original message --------
>>>> From: Oleg Zhurakousky <oz...@hortonworks.com>
>>>> Date: 4/14/16 4:14 PM (GMT-05:00)
>>>> To: dev@nifi.apache.org
>>>> Subject: Re: GetKafka blowing up with assertion error in Kafka client code
>>>> 
>>>> Chris
>>>> That is correct and for a change I am pretty happy to see this stack trace as it clearly shows the problem and validates the approach we have.
>>>> So here are more details. . .
>>>> 
>>>> The root failure is in Kafka (as you can see from the stack trace). All we are doing is encapsulating interaction with Kafka into cancelable Future so we can cancel if and when Kafka deadlocks (which we noticed happens rather often)
>>>> When we execute Future.get() it results in ExecutionException which caries the original Kafka exception (AssertionError).
>>>> Now I am not sure what that assertion error really means in the context of what you are trying to do but its clearly a problem originated in Kafka.
>>>> Could you share your config or whatever other details?
>>>> 
>>>> Cheers
>>>> Oleg
>>>> 
>>>>> On Apr 14, 2016, at 4:00 PM, McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
>>>>> 
>>>>> I’m running based of of 0.7.0 Snapshot.  The GetKafka config is pretty generic.  Batch size 1, 1 concurrent task.
>>>>> 
>>>>> 
>>>>> 2016-04-14 19:27:23,204 ERROR [Timer-Driven Process Thread-9] o.apache.nifi.processors.kafka.GetKafka
>>>>> java.lang.IllegalStateException: java.util.concurrent.ExecutionException: java.lang.AssertionError: assertion failed
>>>>>      at org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:355) ~[na:na]
>>>>>      at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) ~[nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>>>>      at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1059) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>>>>      at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>>>>      at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>>>>      at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>>>>      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_45]
>>>>>      at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_45]
>>>>>      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_45]
>>>>>      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_45]
>>>>>      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_45]
>>>>>      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_45]
>>>>>      at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
>>>>> Caused by: java.util.concurrent.ExecutionException: java.lang.AssertionError: assertion failed
>>>>>      at java.util.concurrent.FutureTask.report(FutureTask.java:122) [na:1.8.0_45]
>>>>>      at java.util.concurrent.FutureTask.get(FutureTask.java:206) [na:1.8.0_45]
>>>>>      at org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:348) ~[na:na]
>>>>>      ... 12 common frames omitted
>>>>> Caused by: java.lang.AssertionError: assertion failed
>>>>>      at scala.Predef$.assert(Predef.scala:165) ~[na:na]
>>>>>      at kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:51) ~[na:na]
>>>>>      at kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:49) ~[na:na]
>>>>>      at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) ~[na:na]
>>>>>      at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) ~[na:na]
>>>>>      at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) ~[na:na]
>>>>>      at kafka.consumer.TopicCount$.makeConsumerThreadIdsPerTopic(TopicCount.scala:49) ~[na:na]
>>>>>      at kafka.consumer.StaticTopicCount.getConsumerThreadIdsPerTopic(TopicCount.scala:113) ~[na:na]
>>>>>      at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:226) ~[na:na]
>>>>>      at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:85) ~[na:na]
>>>>>      at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:97) ~[na:na]
>>>>>      at org.apache.nifi.processors.kafka.GetKafka.createConsumers(GetKafka.java:281) ~[na:na]
>>>>>      at org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:343) ~[na:na]
>>>>>      at org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:340) ~[na:na]
>>>>>      at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_45]
>>>>>      ... 3 common frames omitted
>>> 

Re: GetKafka blowing up with assertion error in Kafka client code

Posted by Oleg Zhurakousky <oz...@hortonworks.com>.
Chris

Thanks for looking into this and describing the problem. Indeed we have seen similar symptoms but would need to further investigate and see if there is an option to stop the internal to Kafka reconnect thread. It appears there are configuration properties in the new API to do that, while I am not sure about the old at the moment. 
As I said, will investigate further and let you know 

Thanks again for looking into this

Oleg 

Sent from my iPhone

> On Apr 28, 2016, at 18:41, McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
> 
> Oleg,
> 
> I have reproduced the problem.  Its pretty easy to do. Just delete and recreate the topic while the processor is running.  I think I saw a similar problem when I increased the partitions in the topic.  That problem resolved itself when I restarted the GetKafka processors.  However, to resolve this problem restarting the processor does not work. It must be that something is being stored in Zookeeper.  I am guessing that deleting and recreating the processor will do the trick.  Is there any debugging information which I can provide to you?
> 
> Thanks,
> Chris
> 
> 
> 
>> On 4/14/16, 8:32 PM, "Oleg Zhurakousky" <oz...@hortonworks.com> wrote:
>> 
>> Thanks Chris
>> 
>> Indeed let us know if/when/how to reproduce it so we can evaluate and see if it is something we can validate/handle in NiFi before it is passed to Kafka (e.g., validation etc)
>> 
>> Cheers
>> Oleg
>> 
>>> On Apr 14, 2016, at 8:25 PM, McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
>>> 
>>> I looked at the Kafka client code and it seemed to me to be a bug in the caller. There is a map passed that maps topics to number of consumers. In this case it asserting that the number of consumers is greater than zero. If I can repro the problem I'll try to isolate it in the debugger and provide more details.
>>> 
>>> 
>>> 
>>> Sent from my Verizon, Samsung Galaxy smartphone
>>> 
>>> 
>>> -------- Original message --------
>>> From: Oleg Zhurakousky <oz...@hortonworks.com>
>>> Date: 4/14/16 4:14 PM (GMT-05:00)
>>> To: dev@nifi.apache.org
>>> Subject: Re: GetKafka blowing up with assertion error in Kafka client code
>>> 
>>> Chris
>>> That is correct and for a change I am pretty happy to see this stack trace as it clearly shows the problem and validates the approach we have.
>>> So here are more details. . .
>>> 
>>> The root failure is in Kafka (as you can see from the stack trace). All we are doing is encapsulating interaction with Kafka into cancelable Future so we can cancel if and when Kafka deadlocks (which we noticed happens rather often)
>>> When we execute Future.get() it results in ExecutionException which caries the original Kafka exception (AssertionError).
>>> Now I am not sure what that assertion error really means in the context of what you are trying to do but its clearly a problem originated in Kafka.
>>> Could you share your config or whatever other details?
>>> 
>>> Cheers
>>> Oleg
>>> 
>>>> On Apr 14, 2016, at 4:00 PM, McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
>>>> 
>>>> I’m running based of of 0.7.0 Snapshot.  The GetKafka config is pretty generic.  Batch size 1, 1 concurrent task.
>>>> 
>>>> 
>>>> 2016-04-14 19:27:23,204 ERROR [Timer-Driven Process Thread-9] o.apache.nifi.processors.kafka.GetKafka
>>>> java.lang.IllegalStateException: java.util.concurrent.ExecutionException: java.lang.AssertionError: assertion failed
>>>>      at org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:355) ~[na:na]
>>>>      at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) ~[nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>>>      at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1059) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>>>      at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>>>      at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>>>      at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>>>      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_45]
>>>>      at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_45]
>>>>      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_45]
>>>>      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_45]
>>>>      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_45]
>>>>      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_45]
>>>>      at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
>>>> Caused by: java.util.concurrent.ExecutionException: java.lang.AssertionError: assertion failed
>>>>      at java.util.concurrent.FutureTask.report(FutureTask.java:122) [na:1.8.0_45]
>>>>      at java.util.concurrent.FutureTask.get(FutureTask.java:206) [na:1.8.0_45]
>>>>      at org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:348) ~[na:na]
>>>>      ... 12 common frames omitted
>>>> Caused by: java.lang.AssertionError: assertion failed
>>>>      at scala.Predef$.assert(Predef.scala:165) ~[na:na]
>>>>      at kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:51) ~[na:na]
>>>>      at kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:49) ~[na:na]
>>>>      at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) ~[na:na]
>>>>      at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) ~[na:na]
>>>>      at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) ~[na:na]
>>>>      at kafka.consumer.TopicCount$.makeConsumerThreadIdsPerTopic(TopicCount.scala:49) ~[na:na]
>>>>      at kafka.consumer.StaticTopicCount.getConsumerThreadIdsPerTopic(TopicCount.scala:113) ~[na:na]
>>>>      at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:226) ~[na:na]
>>>>      at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:85) ~[na:na]
>>>>      at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:97) ~[na:na]
>>>>      at org.apache.nifi.processors.kafka.GetKafka.createConsumers(GetKafka.java:281) ~[na:na]
>>>>      at org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:343) ~[na:na]
>>>>      at org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:340) ~[na:na]
>>>>      at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_45]
>>>>      ... 3 common frames omitted
>> 

Re: GetKafka blowing up with assertion error in Kafka client code

Posted by "McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)" <ch...@hpe.com>.
Oleg,

I have reproduced the problem.  Its pretty easy to do. Just delete and recreate the topic while the processor is running.  I think I saw a similar problem when I increased the partitions in the topic.  That problem resolved itself when I restarted the GetKafka processors.  However, to resolve this problem restarting the processor does not work. It must be that something is being stored in Zookeeper.  I am guessing that deleting and recreating the processor will do the trick.  Is there any debugging information which I can provide to you?

Thanks,
Chris



On 4/14/16, 8:32 PM, "Oleg Zhurakousky" <oz...@hortonworks.com> wrote:

>Thanks Chris
>
>Indeed let us know if/when/how to reproduce it so we can evaluate and see if it is something we can validate/handle in NiFi before it is passed to Kafka (e.g., validation etc)
>
>Cheers
>Oleg
>
>> On Apr 14, 2016, at 8:25 PM, McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
>> 
>> I looked at the Kafka client code and it seemed to me to be a bug in the caller. There is a map passed that maps topics to number of consumers. In this case it asserting that the number of consumers is greater than zero. If I can repro the problem I'll try to isolate it in the debugger and provide more details.
>> 
>> 
>> 
>> Sent from my Verizon, Samsung Galaxy smartphone
>> 
>> 
>> -------- Original message --------
>> From: Oleg Zhurakousky <oz...@hortonworks.com>
>> Date: 4/14/16 4:14 PM (GMT-05:00)
>> To: dev@nifi.apache.org
>> Subject: Re: GetKafka blowing up with assertion error in Kafka client code
>> 
>> Chris
>> That is correct and for a change I am pretty happy to see this stack trace as it clearly shows the problem and validates the approach we have.
>> So here are more details. . .
>> 
>> The root failure is in Kafka (as you can see from the stack trace). All we are doing is encapsulating interaction with Kafka into cancelable Future so we can cancel if and when Kafka deadlocks (which we noticed happens rather often)
>> When we execute Future.get() it results in ExecutionException which caries the original Kafka exception (AssertionError).
>> Now I am not sure what that assertion error really means in the context of what you are trying to do but its clearly a problem originated in Kafka.
>> Could you share your config or whatever other details?
>> 
>> Cheers
>> Oleg
>> 
>>> On Apr 14, 2016, at 4:00 PM, McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
>>> 
>>> I’m running based of of 0.7.0 Snapshot.  The GetKafka config is pretty generic.  Batch size 1, 1 concurrent task.
>>> 
>>> 
>>> 2016-04-14 19:27:23,204 ERROR [Timer-Driven Process Thread-9] o.apache.nifi.processors.kafka.GetKafka
>>> java.lang.IllegalStateException: java.util.concurrent.ExecutionException: java.lang.AssertionError: assertion failed
>>>       at org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:355) ~[na:na]
>>>       at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) ~[nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>>       at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1059) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>>       at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>>       at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>>       at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>>       at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_45]
>>>       at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_45]
>>>       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_45]
>>>       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_45]
>>>       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_45]
>>>       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_45]
>>>       at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
>>> Caused by: java.util.concurrent.ExecutionException: java.lang.AssertionError: assertion failed
>>>       at java.util.concurrent.FutureTask.report(FutureTask.java:122) [na:1.8.0_45]
>>>       at java.util.concurrent.FutureTask.get(FutureTask.java:206) [na:1.8.0_45]
>>>       at org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:348) ~[na:na]
>>>       ... 12 common frames omitted
>>> Caused by: java.lang.AssertionError: assertion failed
>>>       at scala.Predef$.assert(Predef.scala:165) ~[na:na]
>>>       at kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:51) ~[na:na]
>>>       at kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:49) ~[na:na]
>>>       at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) ~[na:na]
>>>       at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) ~[na:na]
>>>       at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) ~[na:na]
>>>       at kafka.consumer.TopicCount$.makeConsumerThreadIdsPerTopic(TopicCount.scala:49) ~[na:na]
>>>       at kafka.consumer.StaticTopicCount.getConsumerThreadIdsPerTopic(TopicCount.scala:113) ~[na:na]
>>>       at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:226) ~[na:na]
>>>       at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:85) ~[na:na]
>>>       at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:97) ~[na:na]
>>>       at org.apache.nifi.processors.kafka.GetKafka.createConsumers(GetKafka.java:281) ~[na:na]
>>>       at org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:343) ~[na:na]
>>>       at org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:340) ~[na:na]
>>>       at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_45]
>>>       ... 3 common frames omitted
>> 
>> 
>

Re: GetKafka blowing up with assertion error in Kafka client code

Posted by Oleg Zhurakousky <oz...@hortonworks.com>.
Thanks Chris

Indeed let us know if/when/how to reproduce it so we can evaluate and see if it is something we can validate/handle in NiFi before it is passed to Kafka (e.g., validation etc)

Cheers
Oleg

> On Apr 14, 2016, at 8:25 PM, McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
> 
> I looked at the Kafka client code and it seemed to me to be a bug in the caller. There is a map passed that maps topics to number of consumers. In this case it asserting that the number of consumers is greater than zero. If I can repro the problem I'll try to isolate it in the debugger and provide more details.
> 
> 
> 
> Sent from my Verizon, Samsung Galaxy smartphone
> 
> 
> -------- Original message --------
> From: Oleg Zhurakousky <oz...@hortonworks.com>
> Date: 4/14/16 4:14 PM (GMT-05:00)
> To: dev@nifi.apache.org
> Subject: Re: GetKafka blowing up with assertion error in Kafka client code
> 
> Chris
> That is correct and for a change I am pretty happy to see this stack trace as it clearly shows the problem and validates the approach we have.
> So here are more details. . .
> 
> The root failure is in Kafka (as you can see from the stack trace). All we are doing is encapsulating interaction with Kafka into cancelable Future so we can cancel if and when Kafka deadlocks (which we noticed happens rather often)
> When we execute Future.get() it results in ExecutionException which caries the original Kafka exception (AssertionError).
> Now I am not sure what that assertion error really means in the context of what you are trying to do but its clearly a problem originated in Kafka.
> Could you share your config or whatever other details?
> 
> Cheers
> Oleg
> 
>> On Apr 14, 2016, at 4:00 PM, McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
>> 
>> I’m running based of of 0.7.0 Snapshot.  The GetKafka config is pretty generic.  Batch size 1, 1 concurrent task.
>> 
>> 
>> 2016-04-14 19:27:23,204 ERROR [Timer-Driven Process Thread-9] o.apache.nifi.processors.kafka.GetKafka
>> java.lang.IllegalStateException: java.util.concurrent.ExecutionException: java.lang.AssertionError: assertion failed
>>       at org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:355) ~[na:na]
>>       at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) ~[nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>       at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1059) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>       at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>       at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>       at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>>       at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_45]
>>       at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_45]
>>       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_45]
>>       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_45]
>>       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_45]
>>       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_45]
>>       at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
>> Caused by: java.util.concurrent.ExecutionException: java.lang.AssertionError: assertion failed
>>       at java.util.concurrent.FutureTask.report(FutureTask.java:122) [na:1.8.0_45]
>>       at java.util.concurrent.FutureTask.get(FutureTask.java:206) [na:1.8.0_45]
>>       at org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:348) ~[na:na]
>>       ... 12 common frames omitted
>> Caused by: java.lang.AssertionError: assertion failed
>>       at scala.Predef$.assert(Predef.scala:165) ~[na:na]
>>       at kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:51) ~[na:na]
>>       at kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:49) ~[na:na]
>>       at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) ~[na:na]
>>       at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) ~[na:na]
>>       at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) ~[na:na]
>>       at kafka.consumer.TopicCount$.makeConsumerThreadIdsPerTopic(TopicCount.scala:49) ~[na:na]
>>       at kafka.consumer.StaticTopicCount.getConsumerThreadIdsPerTopic(TopicCount.scala:113) ~[na:na]
>>       at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:226) ~[na:na]
>>       at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:85) ~[na:na]
>>       at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:97) ~[na:na]
>>       at org.apache.nifi.processors.kafka.GetKafka.createConsumers(GetKafka.java:281) ~[na:na]
>>       at org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:343) ~[na:na]
>>       at org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:340) ~[na:na]
>>       at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_45]
>>       ... 3 common frames omitted
> 
> 


RE: GetKafka blowing up with assertion error in Kafka client code

Posted by "McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote)" <ch...@hpe.com>.
I looked at the Kafka client code and it seemed to me to be a bug in the caller. There is a map passed that maps topics to number of consumers. In this case it asserting that the number of consumers is greater than zero. If I can repro the problem I'll try to isolate it in the debugger and provide more details.



Sent from my Verizon, Samsung Galaxy smartphone


-------- Original message --------
From: Oleg Zhurakousky <oz...@hortonworks.com>
Date: 4/14/16 4:14 PM (GMT-05:00)
To: dev@nifi.apache.org
Subject: Re: GetKafka blowing up with assertion error in Kafka client code

Chris
That is correct and for a change I am pretty happy to see this stack trace as it clearly shows the problem and validates the approach we have.
So here are more details. . .

The root failure is in Kafka (as you can see from the stack trace). All we are doing is encapsulating interaction with Kafka into cancelable Future so we can cancel if and when Kafka deadlocks (which we noticed happens rather often)
When we execute Future.get() it results in ExecutionException which caries the original Kafka exception (AssertionError).
Now I am not sure what that assertion error really means in the context of what you are trying to do but its clearly a problem originated in Kafka.
Could you share your config or whatever other details?

Cheers
Oleg

> On Apr 14, 2016, at 4:00 PM, McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
>
> I’m running based of of 0.7.0 Snapshot.  The GetKafka config is pretty generic.  Batch size 1, 1 concurrent task.
>
>
> 2016-04-14 19:27:23,204 ERROR [Timer-Driven Process Thread-9] o.apache.nifi.processors.kafka.GetKafka
> java.lang.IllegalStateException: java.util.concurrent.ExecutionException: java.lang.AssertionError: assertion failed
>        at org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:355) ~[na:na]
>        at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) ~[nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>        at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1059) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>        at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>        at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>        at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_45]
>        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_45]
>        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_45]
>        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_45]
>        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_45]
>        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_45]
>        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
> Caused by: java.util.concurrent.ExecutionException: java.lang.AssertionError: assertion failed
>        at java.util.concurrent.FutureTask.report(FutureTask.java:122) [na:1.8.0_45]
>        at java.util.concurrent.FutureTask.get(FutureTask.java:206) [na:1.8.0_45]
>        at org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:348) ~[na:na]
>        ... 12 common frames omitted
> Caused by: java.lang.AssertionError: assertion failed
>        at scala.Predef$.assert(Predef.scala:165) ~[na:na]
>        at kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:51) ~[na:na]
>        at kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:49) ~[na:na]
>        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) ~[na:na]
>        at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) ~[na:na]
>        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) ~[na:na]
>        at kafka.consumer.TopicCount$.makeConsumerThreadIdsPerTopic(TopicCount.scala:49) ~[na:na]
>        at kafka.consumer.StaticTopicCount.getConsumerThreadIdsPerTopic(TopicCount.scala:113) ~[na:na]
>        at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:226) ~[na:na]
>        at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:85) ~[na:na]
>        at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:97) ~[na:na]
>        at org.apache.nifi.processors.kafka.GetKafka.createConsumers(GetKafka.java:281) ~[na:na]
>        at org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:343) ~[na:na]
>        at org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:340) ~[na:na]
>        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_45]
>        ... 3 common frames omitted


Re: GetKafka blowing up with assertion error in Kafka client code

Posted by Oleg Zhurakousky <oz...@hortonworks.com>.
Chris
That is correct and for a change I am pretty happy to see this stack trace as it clearly shows the problem and validates the approach we have. 
So here are more details. . .

The root failure is in Kafka (as you can see from the stack trace). All we are doing is encapsulating interaction with Kafka into cancelable Future so we can cancel if and when Kafka deadlocks (which we noticed happens rather often)
When we execute Future.get() it results in ExecutionException which caries the original Kafka exception (AssertionError). 
Now I am not sure what that assertion error really means in the context of what you are trying to do but its clearly a problem originated in Kafka.
Could you share your config or whatever other details?

Cheers
Oleg

> On Apr 14, 2016, at 4:00 PM, McDermott, Chris Kevin (MSDU - STaTS/StorefrontRemote) <ch...@hpe.com> wrote:
> 
> I’m running based of of 0.7.0 Snapshot.  The GetKafka config is pretty generic.  Batch size 1, 1 concurrent task.
> 
> 
> 2016-04-14 19:27:23,204 ERROR [Timer-Driven Process Thread-9] o.apache.nifi.processors.kafka.GetKafka
> java.lang.IllegalStateException: java.util.concurrent.ExecutionException: java.lang.AssertionError: assertion failed
>        at org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:355) ~[na:na]
>        at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) ~[nifi-api-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>        at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1059) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>        at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>        at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>        at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123) [nifi-framework-core-0.7.0-SNAPSHOT.jar:0.7.0-SNAPSHOT]
>        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_45]
>        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_45]
>        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_45]
>        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_45]
>        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_45]
>        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_45]
>        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
> Caused by: java.util.concurrent.ExecutionException: java.lang.AssertionError: assertion failed
>        at java.util.concurrent.FutureTask.report(FutureTask.java:122) [na:1.8.0_45]
>        at java.util.concurrent.FutureTask.get(FutureTask.java:206) [na:1.8.0_45]
>        at org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:348) ~[na:na]
>        ... 12 common frames omitted
> Caused by: java.lang.AssertionError: assertion failed
>        at scala.Predef$.assert(Predef.scala:165) ~[na:na]
>        at kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:51) ~[na:na]
>        at kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:49) ~[na:na]
>        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) ~[na:na]
>        at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) ~[na:na]
>        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) ~[na:na]
>        at kafka.consumer.TopicCount$.makeConsumerThreadIdsPerTopic(TopicCount.scala:49) ~[na:na]
>        at kafka.consumer.StaticTopicCount.getConsumerThreadIdsPerTopic(TopicCount.scala:113) ~[na:na]
>        at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:226) ~[na:na]
>        at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:85) ~[na:na]
>        at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:97) ~[na:na]
>        at org.apache.nifi.processors.kafka.GetKafka.createConsumers(GetKafka.java:281) ~[na:na]
>        at org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:343) ~[na:na]
>        at org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:340) ~[na:na]
>        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_45]
>        ... 3 common frames omitted