You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Josh <jo...@gmail.com> on 2015/04/29 17:29:31 UTC

Trident persistentAggregate only working for the first batch!

Hi all,

I'm using the OpaqueTridentKafkaSpout to read batches of tuples from Kafka,
I'm doing some filtering and aggregation and then calling persistAggregate
to maintain a map state:

stream
      .each(new Fields("msg"), new MyFilter())
      .each(new Fields("msg"), new ExtractFieldsFunction(), new
Fields("id"))
      .groupBy(new Fields("id"))
      .persistentAggregate(MyMapState.getNonTransactional, new
CountAggregator(), new Fields("count"))
      .parallelismHint(1)

It works fine, for the first batch, but then I am having a very strange
problem where after the first batch my map state is no longer called. (i.e.
there a call to multiGet and multiPut for the first batch only).

The spout is still providing tuples, as when I debug I can see that the
filter and function both continue to process input tuples (indefinitely).
But the map state never gets called again!

Why would this happen?

I found a couple of very similar threads to this, but they have gone
unanswered:
https://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/wJbKbHDxRqI/KCiBE5lTdGEJ
https://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/OlYUoCjx-uU/99XdNkdaSnIJ


Thanks for any help on this,

Josh

Re: Trident persistentAggregate only working for the first batch!

Posted by Nikhil Singh <ns...@yahoo.com>.
Yes..You can lookup /transactional/*/jkafka/user/partition_0/* or some similar path in the zookeeper and it should tell you the nextOffset value.  


     On Wednesday, May 6, 2015 9:43 AM, Josh <jo...@gmail.com> wrote:
   

 Hi Nikhil,
Thanks for the reply. OK that makes sense, I was trying to figure out where trident is storing stuff in zookeeper - turns out when I was using LocalCluster it was starting its own local Zookeeper instance on port 2000. But I've figured out how to configure it now: by setting the storm.zookeeper.servers property in storm.yaml.
I can see it's storing batch IDs under: /transactional/{spout_name} and like you said deleting the data under the /transactional root has fixed the exception.
Just wondering, how do the batch IDs translate to Kafka offsets? Is there a way to find out which Kafka offset trident has consumed up to, by looking in Zookeeper?
Thanks,Josh



On Wed, May 6, 2015 at 2:09 PM, Nikhil Singh <ns...@yahoo.com> wrote:

Hi Josh,I think you need to clear the Kafka spout's metadata from the trident zookeeper. It has old metadata stored and the offset it has stored is no longer present in the Kafka queues.
I think you can configure it to ignore metadata and always read from the head or tail..
-Nikhil 



     On Wednesday, May 6, 2015 5:24 AM, Josh <jo...@gmail.com> wrote:
   

 I still don't have a good understanding of how/where offsets are stored when using the trident spouts and how to configure it.
I'm getting this error at the moment (the topology was running fine a couple of days ago...) related to offsets, and not sure how to investigate it:
2015-05-06T12:09:20.414+0200 s.k.KafkaUtils [WARN] Got fetch request with offset out of range: [708429188]; retrying with default start offset time from configuration.
 configured start offset time: [-2]2015-05-06T12:09:20.415+0200 b.s.util [ERROR] Async loop died!

I'm using storm 0.9.4 with the OpaqueTrdientKafkaSpout
Any ideas how I can resolve this? :) 


On Fri, May 1, 2015 at 2:05 PM, Josh <jo...@gmail.com> wrote:

Hi Taylor,
Could you be a bit more specific please? I can't find where the wiki describes the trident kafka spout storing offsets or where it's done in the code base.
The wiki explains that SpoutConfig (for the KafkaSpout) has the zkRoot property which determines where it stores zookeeper offsets. But TridentKafkaConfig (for the TransactionalTridentKafkaSpout and OpaqueTridentKafkaSpout) does not have this property.
Thanks,Josh
On Fri, May 1, 2015 at 12:27 PM, P. Taylor Goetz <pt...@gmail.com> wrote:

Hi Josh,
The trident kafka spout stores offsets in zookeeper as well. See:

https://github.com/apache/storm/tree/master/external/storm-kafka
-Taylor
On May 1, 2015, at 5:00 AM, Josh <jo...@gmail.com> wrote:


Looks like this was some kind of network issue... I ran the topology on another box and don't have the problem any more.
I have a related question about the TransactionalTridentKafkaSpout and OpaqueTridentKafkaSpout though: does anyone know where these spouts store their partition offsets to use for recovery in the case of failure? 
I know the normal (non-trident) Kafka spout stores its offsets in Zookeeper, and it has a spout config setting ZkHosts which determines where they are stored. But the trident Kafka spouts don't have this setting. Looking at the source code I can't find any place where it is persisting offsets. Plus it's using the SimpleConsumer so it's not relying on Kafka persisting its offsets.
On Wed, Apr 29, 2015 at 4:29 PM, Josh <jo...@gmail.com> wrote:

Hi all,
I'm using the OpaqueTridentKafkaSpout to read batches of tuples from Kafka, I'm doing some filtering and aggregation and then calling persistAggregate to maintain a map state:
stream      .each(new Fields("msg"), new MyFilter())      .each(new Fields("msg"), new ExtractFieldsFunction(), new Fields("id"))      .groupBy(new Fields("id"))      .persistentAggregate(MyMapState.getNonTransactional, new CountAggregator(), new Fields("count"))      .parallelismHint(1)
It works fine, for the first batch, but then I am having a very strange problem where after the first batch my map state is no longer called. (i.e. there a call to multiGet and multiPut for the first batch only).
The spout is still providing tuples, as when I debug I can see that the filter and function both continue to process input tuples (indefinitely). But the map state never gets called again!
Why would this happen?
I found a couple of very similar threads to this, but they have gone unanswered:https://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/wJbKbHDxRqI/KCiBE5lTdGEJhttps://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/OlYUoCjx-uU/99XdNkdaSnIJ


Thanks for any help on this,
Josh








   



  

Re: Trident persistentAggregate only working for the first batch!

Posted by Nikhil Singh <ns...@yahoo.com>.
Yes, Setting to head or tail is application specific. 
The spout should eventually catchup, except in one case. If offset defaults to -2 (which is the tail) and the data is put in kakfa at a faster rate than it can be consumed then this exception for missing offset will keep popping up as the offset which  was stored in the zookeeper for the current transaction will be missing from the queue by the time the next transaction is processed (because it will be dropped if the queue is of a fixed size). 
-Nikhil 


     On Wednesday, May 6, 2015 10:36 AM, Jeff Maass <JM...@cccis.com> wrote:
   

 
Is there a way to find out which Kafka offset trident has consumed up to, by looking in Zookeeper?:----------------------------------------------------------------------------------------------------------------------------------------There is an app, specific to Kafka, which will monitor a consumer’s offset.  I would recommend only using it in testing, as I believe, in my environment, it caused locking of some sort.  Note that it is looking at the offset data stored by kafka, not the offset data stored by the spout.https://github.com/quantifind/KafkaOffsetMonitor

Regarding the statement:I think you can configure it to ignore metadata and always read from the head or tail..--------------------That configuration should only be done if the logic of your application is ok with * head: replaying previously processed records* tail: not playing / skipping an unknown volume of records
I wouldn’t necessarily “reset” the offsets in zookeeper.  Shouldn’t the spout eventually catch up to the Kafka partitions / find a valid / existing offset?Like this:try 3, exceptiontry 4, exceptiontry 5, exceptiontry 6, returns valid data



From: Josh <jo...@gmail.com>
Reply-To: "user@storm.apache.org" <us...@storm.apache.org>
Date: 2015,Wednesday, May 6 at 09:43
To: "user@storm.apache.org" <us...@storm.apache.org>, Nikhil Singh <ns...@yahoo.com>
Subject: Re: Trident persistentAggregate only working for the first batch!

Hi Nikhil,
Thanks for the reply. OK that makes sense, I was trying to figure out where trident is storing stuff in zookeeper - turns out when I was using LocalCluster it was starting its own local Zookeeper instance on port 2000. But I've figured out how to configure it now: by setting the storm.zookeeper.servers property in storm.yaml.
I can see it's storing batch IDs under: /transactional/{spout_name} and like you said deleting the data under the /transactional root has fixed the exception.
Just wondering, how do the batch IDs translate to Kafka offsets? Is there a way to find out which Kafka offset trident has consumed up to, by looking in Zookeeper?
Thanks,Josh



On Wed, May 6, 2015 at 2:09 PM, Nikhil Singh <ns...@yahoo.com> wrote:

Hi Josh,I think you need to clear the Kafka spout's metadata from the trident zookeeper. It has old metadata stored and the offset it has stored is no longer present in the Kafka queues.
I think you can configure it to ignore metadata and always read from the head or tail..
-Nikhil



On Wednesday, May 6, 2015 5:24 AM, Josh <jo...@gmail.com> wrote:


I still don't have a good understanding of how/where offsets are stored when using the trident spouts and how to configure it.
I'm getting this error at the moment (the topology was running fine a couple of days ago...) related to offsets, and not sure how to investigate it:
2015-05-06T12:09:20.414+0200 s.k.KafkaUtils [WARN] Got fetch request with offset out of range: [708429188]; retrying with default start offset time from configuration.
 configured start offset time: [-2]2015-05-06T12:09:20.415+0200 b.s.util [ERROR] Async loop died!

I'm using storm 0.9.4 with the OpaqueTrdientKafkaSpout
Any ideas how I can resolve this? :) 


On Fri, May 1, 2015 at 2:05 PM, Josh <jo...@gmail.com> wrote:

Hi Taylor,
Could you be a bit more specific please? I can't find where the wiki describes the trident kafka spout storing offsets or where it's done in the code base.
The wiki explains that SpoutConfig (for the KafkaSpout) has the zkRoot property which determines where it stores zookeeper offsets. But TridentKafkaConfig (for the TransactionalTridentKafkaSpout and OpaqueTridentKafkaSpout) does not have this property.
Thanks,Josh
On Fri, May 1, 2015 at 12:27 PM, P. Taylor Goetz <pt...@gmail.com> wrote:

Hi Josh,
The trident kafka spout stores offsets in zookeeper as well. See:

https://github.com/apache/storm/tree/master/external/storm-kafka
-Taylor
On May 1, 2015, at 5:00 AM, Josh <jo...@gmail.com> wrote:


Looks like this was some kind of network issue... I ran the topology on another box and don't have the problem any more.
I have a related question about the TransactionalTridentKafkaSpout and OpaqueTridentKafkaSpout though: does anyone know where these spouts store their partition offsets to use for recovery in the case of failure? 
I know the normal (non-trident) Kafka spout stores its offsets in Zookeeper, and it has a spout config setting ZkHosts which determines where they are stored. But the trident Kafka spouts don't have this setting. Looking at the source code I can't find any place where it is persisting offsets. Plus it's using the SimpleConsumer so it's not relying on Kafka persisting its offsets.
On Wed, Apr 29, 2015 at 4:29 PM, Josh <jo...@gmail.com> wrote:

Hi all,
I'm using the OpaqueTridentKafkaSpout to read batches of tuples from Kafka, I'm doing some filtering and aggregation and then calling persistAggregate to maintain a map state:
stream      .each(new Fields("msg"), new MyFilter())      .each(new Fields("msg"), new ExtractFieldsFunction(), new Fields("id"))      .groupBy(new Fields("id"))      .persistentAggregate(MyMapState.getNonTransactional, new CountAggregator(), new Fields("count"))      .parallelismHint(1)
It works fine, for the first batch, but then I am having a very strange problem where after the first batch my map state is no longer called. (i.e. there a call to multiGet and multiPut for the first batch only).
The spout is still providing tuples, as when I debug I can see that the filter and function both continue to process input tuples (indefinitely). But the map state never gets called again!
Why would this happen?
I found a couple of very similar threads to this, but they have gone unanswered:https://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/wJbKbHDxRqI/KCiBE5lTdGEJhttps://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/OlYUoCjx-uU/99XdNkdaSnIJ


Thanks for any help on this,
Josh












  

Re: Trident persistentAggregate only working for the first batch!

Posted by Jeff Maass <JM...@cccis.com>.
Also, for monitoring a running spout, one could monitor the offset metrics produced by trident/TridentKafkaEmitter.java




From: jeffery maass <jm...@cccis.com>>
Reply-To: "user@storm.apache.org<ma...@storm.apache.org>" <us...@storm.apache.org>>
Date: 2015,Wednesday, May 6 at 10:35
To: "user@storm.apache.org<ma...@storm.apache.org>" <us...@storm.apache.org>>
Subject: Re: Trident persistentAggregate only working for the first batch!


Is there a way to find out which Kafka offset trident has consumed up to, by looking in Zookeeper?:
----------------------------------------------------------------------------------------------------------------------------------------
There is an app, specific to Kafka, which will monitor a consumer’s offset.  I would recommend only using it in testing, as I believe, in my environment, it caused locking of some sort.  Note that it is looking at the offset data stored by kafka, not the offset data stored by the spout.
https://github.com/quantifind/KafkaOffsetMonitor


Regarding the statement:
I think you can configure it to ignore metadata and always read from the head or tail..
--------------------
That configuration should only be done if the logic of your application is ok with
* head: replaying previously processed records
* tail: not playing / skipping an unknown volume of records

I wouldn’t necessarily “reset” the offsets in zookeeper.  Shouldn’t the spout eventually catch up to the Kafka partitions / find a valid / existing offset?
Like this:
try 3, exception
try 4, exception
try 5, exception
try 6, returns valid data




From: Josh <jo...@gmail.com>>
Reply-To: "user@storm.apache.org<ma...@storm.apache.org>" <us...@storm.apache.org>>
Date: 2015,Wednesday, May 6 at 09:43
To: "user@storm.apache.org<ma...@storm.apache.org>" <us...@storm.apache.org>>, Nikhil Singh <ns...@yahoo.com>>
Subject: Re: Trident persistentAggregate only working for the first batch!

Hi Nikhil,

Thanks for the reply. OK that makes sense, I was trying to figure out where trident is storing stuff in zookeeper - turns out when I was using LocalCluster it was starting its own local Zookeeper instance on port 2000. But I've figured out how to configure it now: by setting the storm.zookeeper.servers property in storm.yaml.

I can see it's storing batch IDs under: /transactional/{spout_name} and like you said deleting the data under the /transactional root has fixed the exception.

Just wondering, how do the batch IDs translate to Kafka offsets? Is there a way to find out which Kafka offset trident has consumed up to, by looking in Zookeeper?

Thanks,
Josh




On Wed, May 6, 2015 at 2:09 PM, Nikhil Singh <ns...@yahoo.com>> wrote:
Hi Josh,
I think you need to clear the Kafka spout's metadata from the trident zookeeper. It has old metadata stored and the offset it has stored is no longer present in the Kafka queues.

I think you can configure it to ignore metadata and always read from the head or tail..

-Nikhil




On Wednesday, May 6, 2015 5:24 AM, Josh <jo...@gmail.com>> wrote:


I still don't have a good understanding of how/where offsets are stored when using the trident spouts and how to configure it.

I'm getting this error at the moment (the topology was running fine a couple of days ago...) related to offsets, and not sure how to investigate it:

2015-05-06T12:09:20.414+0200 s.k.KafkaUtils [WARN] Got fetch request with offset out of range: [708429188]; retrying with default start offset time from configuration.
 configured start offset time: [-2]
2015-05-06T12:09:20.415+0200 b.s.util [ERROR] Async loop died!


I'm using storm 0.9.4 with the OpaqueTrdientKafkaSpout

Any ideas how I can resolve this? :)


On Fri, May 1, 2015 at 2:05 PM, Josh <jo...@gmail.com>> wrote:
Hi Taylor,

Could you be a bit more specific please? I can't find where the wiki describes the trident kafka spout storing offsets or where it's done in the code base.

The wiki explains that SpoutConfig (for the KafkaSpout) has the zkRoot property which determines where it stores zookeeper offsets. But TridentKafkaConfig (for the TransactionalTridentKafkaSpout and OpaqueTridentKafkaSpout) does not have this property.

Thanks,
Josh

On Fri, May 1, 2015 at 12:27 PM, P. Taylor Goetz <pt...@gmail.com>> wrote:
Hi Josh,

The trident kafka spout stores offsets in zookeeper as well. See:


https://github.com/apache/storm/tree/master/external/storm-kafka

-Taylor

On May 1, 2015, at 5:00 AM, Josh <jo...@gmail.com>> wrote:

Looks like this was some kind of network issue... I ran the topology on another box and don't have the problem any more.

I have a related question about the TransactionalTridentKafkaSpout and OpaqueTridentKafkaSpout though: does anyone know where these spouts store their partition offsets to use for recovery in the case of failure?

I know the normal (non-trident) Kafka spout stores its offsets in Zookeeper, and it has a spout config setting ZkHosts which determines where they are stored. But the trident Kafka spouts don't have this setting. Looking at the source code I can't find any place where it is persisting offsets. Plus it's using the SimpleConsumer so it's not relying on Kafka persisting its offsets.

On Wed, Apr 29, 2015 at 4:29 PM, Josh <jo...@gmail.com>> wrote:
Hi all,

I'm using the OpaqueTridentKafkaSpout to read batches of tuples from Kafka, I'm doing some filtering and aggregation and then calling persistAggregate to maintain a map state:

stream
      .each(new Fields("msg"), new MyFilter())
      .each(new Fields("msg"), new ExtractFieldsFunction(), new Fields("id"))
      .groupBy(new Fields("id"))
      .persistentAggregate(MyMapState.getNonTransactional, new CountAggregator(), new Fields("count"))
      .parallelismHint(1)

It works fine, for the first batch, but then I am having a very strange problem where after the first batch my map state is no longer called. (i.e. there a call to multiGet and multiPut for the first batch only).

The spout is still providing tuples, as when I debug I can see that the filter and function both continue to process input tuples (indefinitely). But the map state never gets called again!

Why would this happen?

I found a couple of very similar threads to this, but they have gone unanswered:
https://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/wJbKbHDxRqI/KCiBE5lTdGEJ
https://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/OlYUoCjx-uU/99XdNkdaSnIJ


Thanks for any help on this,

Josh







Re: Trident persistentAggregate only working for the first batch!

Posted by Jeff Maass <JM...@cccis.com>.
Is there a way to find out which Kafka offset trident has consumed up to, by looking in Zookeeper?:
----------------------------------------------------------------------------------------------------------------------------------------
There is an app, specific to Kafka, which will monitor a consumer’s offset.  I would recommend only using it in testing, as I believe, in my environment, it caused locking of some sort.  Note that it is looking at the offset data stored by kafka, not the offset data stored by the spout.
https://github.com/quantifind/KafkaOffsetMonitor


Regarding the statement:
I think you can configure it to ignore metadata and always read from the head or tail..
--------------------
That configuration should only be done if the logic of your application is ok with
* head: replaying previously processed records
* tail: not playing / skipping an unknown volume of records

I wouldn’t necessarily “reset” the offsets in zookeeper.  Shouldn’t the spout eventually catch up to the Kafka partitions / find a valid / existing offset?
Like this:
try 3, exception
try 4, exception
try 5, exception
try 6, returns valid data




From: Josh <jo...@gmail.com>>
Reply-To: "user@storm.apache.org<ma...@storm.apache.org>" <us...@storm.apache.org>>
Date: 2015,Wednesday, May 6 at 09:43
To: "user@storm.apache.org<ma...@storm.apache.org>" <us...@storm.apache.org>>, Nikhil Singh <ns...@yahoo.com>>
Subject: Re: Trident persistentAggregate only working for the first batch!

Hi Nikhil,

Thanks for the reply. OK that makes sense, I was trying to figure out where trident is storing stuff in zookeeper - turns out when I was using LocalCluster it was starting its own local Zookeeper instance on port 2000. But I've figured out how to configure it now: by setting the storm.zookeeper.servers property in storm.yaml.

I can see it's storing batch IDs under: /transactional/{spout_name} and like you said deleting the data under the /transactional root has fixed the exception.

Just wondering, how do the batch IDs translate to Kafka offsets? Is there a way to find out which Kafka offset trident has consumed up to, by looking in Zookeeper?

Thanks,
Josh




On Wed, May 6, 2015 at 2:09 PM, Nikhil Singh <ns...@yahoo.com>> wrote:
Hi Josh,
I think you need to clear the Kafka spout's metadata from the trident zookeeper. It has old metadata stored and the offset it has stored is no longer present in the Kafka queues.

I think you can configure it to ignore metadata and always read from the head or tail..

-Nikhil




On Wednesday, May 6, 2015 5:24 AM, Josh <jo...@gmail.com>> wrote:


I still don't have a good understanding of how/where offsets are stored when using the trident spouts and how to configure it.

I'm getting this error at the moment (the topology was running fine a couple of days ago...) related to offsets, and not sure how to investigate it:

2015-05-06T12:09:20.414+0200 s.k.KafkaUtils [WARN] Got fetch request with offset out of range: [708429188]; retrying with default start offset time from configuration.
 configured start offset time: [-2]
2015-05-06T12:09:20.415+0200 b.s.util [ERROR] Async loop died!


I'm using storm 0.9.4 with the OpaqueTrdientKafkaSpout

Any ideas how I can resolve this? :)


On Fri, May 1, 2015 at 2:05 PM, Josh <jo...@gmail.com>> wrote:
Hi Taylor,

Could you be a bit more specific please? I can't find where the wiki describes the trident kafka spout storing offsets or where it's done in the code base.

The wiki explains that SpoutConfig (for the KafkaSpout) has the zkRoot property which determines where it stores zookeeper offsets. But TridentKafkaConfig (for the TransactionalTridentKafkaSpout and OpaqueTridentKafkaSpout) does not have this property.

Thanks,
Josh

On Fri, May 1, 2015 at 12:27 PM, P. Taylor Goetz <pt...@gmail.com>> wrote:
Hi Josh,

The trident kafka spout stores offsets in zookeeper as well. See:


https://github.com/apache/storm/tree/master/external/storm-kafka

-Taylor

On May 1, 2015, at 5:00 AM, Josh <jo...@gmail.com>> wrote:

Looks like this was some kind of network issue... I ran the topology on another box and don't have the problem any more.

I have a related question about the TransactionalTridentKafkaSpout and OpaqueTridentKafkaSpout though: does anyone know where these spouts store their partition offsets to use for recovery in the case of failure?

I know the normal (non-trident) Kafka spout stores its offsets in Zookeeper, and it has a spout config setting ZkHosts which determines where they are stored. But the trident Kafka spouts don't have this setting. Looking at the source code I can't find any place where it is persisting offsets. Plus it's using the SimpleConsumer so it's not relying on Kafka persisting its offsets.

On Wed, Apr 29, 2015 at 4:29 PM, Josh <jo...@gmail.com>> wrote:
Hi all,

I'm using the OpaqueTridentKafkaSpout to read batches of tuples from Kafka, I'm doing some filtering and aggregation and then calling persistAggregate to maintain a map state:

stream
      .each(new Fields("msg"), new MyFilter())
      .each(new Fields("msg"), new ExtractFieldsFunction(), new Fields("id"))
      .groupBy(new Fields("id"))
      .persistentAggregate(MyMapState.getNonTransactional, new CountAggregator(), new Fields("count"))
      .parallelismHint(1)

It works fine, for the first batch, but then I am having a very strange problem where after the first batch my map state is no longer called. (i.e. there a call to multiGet and multiPut for the first batch only).

The spout is still providing tuples, as when I debug I can see that the filter and function both continue to process input tuples (indefinitely). But the map state never gets called again!

Why would this happen?

I found a couple of very similar threads to this, but they have gone unanswered:
https://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/wJbKbHDxRqI/KCiBE5lTdGEJ
https://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/OlYUoCjx-uU/99XdNkdaSnIJ


Thanks for any help on this,

Josh







Re: Trident persistentAggregate only working for the first batch!

Posted by Josh <jo...@gmail.com>.
Sorry I meant transactional.zookeeper.servers in storm.yaml!

On Wed, May 6, 2015 at 3:43 PM, Josh <jo...@gmail.com> wrote:

> Hi Nikhil,
>
> Thanks for the reply. OK that makes sense, I was trying to figure out
> where trident is storing stuff in zookeeper - turns out when I was using
> LocalCluster it was starting its own local Zookeeper instance on port 2000.
> But I've figured out how to configure it now: by setting the
> storm.zookeeper.servers property in storm.yaml.
>
> I can see it's storing batch IDs under: /transactional/{spout_name} and
> like you said deleting the data under the /transactional root has fixed the
> exception.
>
> Just wondering, how do the batch IDs translate to Kafka offsets? Is there
> a way to find out which Kafka offset trident has consumed up to, by looking
> in Zookeeper?
>
> Thanks,
> Josh
>
>
>
>
> On Wed, May 6, 2015 at 2:09 PM, Nikhil Singh <ns...@yahoo.com>
> wrote:
>
>> Hi Josh,
>> I think you need to clear the Kafka spout's metadata from the trident
>> zookeeper. It has old metadata stored and the offset it has stored is no
>> longer present in the Kafka queues.
>>
>> I think you can configure it to ignore metadata and always read from the
>> head or tail..
>>
>> -Nikhil
>>
>>
>>
>>
>>   On Wednesday, May 6, 2015 5:24 AM, Josh <jo...@gmail.com> wrote:
>>
>>
>> I still don't have a good understanding of how/where offsets are stored
>> when using the trident spouts and how to configure it.
>>
>> I'm getting this error at the moment (the topology was running fine a
>> couple of days ago...) related to offsets, and not sure how to investigate
>> it:
>>
>> *2015-05-06T12:09:20.414+0200 s.k.KafkaUtils [WARN] Got fetch request
>> with offset out of range: [708429188]; retrying with default start offset
>> time from configuration.*
>> * configured start offset time: [-2]*
>> *2015-05-06T12:09:20.415+0200 b.s.util [ERROR] Async loop died!*
>>
>>
>> I'm using storm 0.9.4 with the OpaqueTrdientKafkaSpout
>>
>> Any ideas how I can resolve this? :)
>>
>>
>> On Fri, May 1, 2015 at 2:05 PM, Josh <jo...@gmail.com> wrote:
>>
>> Hi Taylor,
>>
>> Could you be a bit more specific please? I can't find where the wiki
>> describes the trident kafka spout storing offsets or where it's done in the
>> code base.
>>
>> The wiki explains that SpoutConfig (for the KafkaSpout) has the zkRoot
>> property which determines where it stores zookeeper offsets. But
>> TridentKafkaConfig (for the TransactionalTridentKafkaSpout and
>> OpaqueTridentKafkaSpout) does not have this property.
>>
>> Thanks,
>> Josh
>>
>> On Fri, May 1, 2015 at 12:27 PM, P. Taylor Goetz <pt...@gmail.com>
>> wrote:
>>
>> Hi Josh,
>>
>> The trident kafka spout stores offsets in zookeeper as well. See:
>>
>>
>> https://github.com/apache/storm/tree/master/external/storm-kafka
>>
>> -Taylor
>>
>> On May 1, 2015, at 5:00 AM, Josh <jo...@gmail.com> wrote:
>>
>> Looks like this was some kind of network issue... I ran the topology on
>> another box and don't have the problem any more.
>>
>> I have a related question about the TransactionalTridentKafkaSpout and
>> OpaqueTridentKafkaSpout though: does anyone know where these spouts store
>> their partition offsets to use for recovery in the case of failure?
>>
>> I know the normal (non-trident) Kafka spout stores its offsets in
>> Zookeeper, and it has a spout config setting ZkHosts which determines where
>> they are stored. But the trident Kafka spouts don't have this setting.
>> Looking at the source code I can't find any place where it is persisting
>> offsets. Plus it's using the SimpleConsumer so it's not relying on Kafka
>> persisting its offsets.
>>
>> On Wed, Apr 29, 2015 at 4:29 PM, Josh <jo...@gmail.com> wrote:
>>
>> Hi all,
>>
>> I'm using the OpaqueTridentKafkaSpout to read batches of tuples from
>> Kafka, I'm doing some filtering and aggregation and then calling
>> persistAggregate to maintain a map state:
>>
>> stream
>>       .each(new Fields("msg"), new MyFilter())
>>       .each(new Fields("msg"), new ExtractFieldsFunction(), new
>> Fields("id"))
>>       .groupBy(new Fields("id"))
>>       .persistentAggregate(MyMapState.getNonTransactional, new
>> CountAggregator(), new Fields("count"))
>>       .parallelismHint(1)
>>
>> It works fine, for the first batch, but then I am having a very strange
>> problem where after the first batch my map state is no longer called. (i.e.
>> there a call to multiGet and multiPut for the first batch only).
>>
>> The spout is still providing tuples, as when I debug I can see that the
>> filter and function both continue to process input tuples (indefinitely).
>> But the map state never gets called again!
>>
>> Why would this happen?
>>
>> I found a couple of very similar threads to this, but they have gone
>> unanswered:
>>
>> https://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/wJbKbHDxRqI/KCiBE5lTdGEJ
>>
>> https://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/OlYUoCjx-uU/99XdNkdaSnIJ
>>
>>
>> Thanks for any help on this,
>>
>> Josh
>>
>>
>>
>>
>>
>>
>>
>

Re: Trident persistentAggregate only working for the first batch!

Posted by Josh <jo...@gmail.com>.
Hi Nikhil,

Thanks for the reply. OK that makes sense, I was trying to figure out where
trident is storing stuff in zookeeper - turns out when I was using
LocalCluster it was starting its own local Zookeeper instance on port 2000.
But I've figured out how to configure it now: by setting the
storm.zookeeper.servers property in storm.yaml.

I can see it's storing batch IDs under: /transactional/{spout_name} and
like you said deleting the data under the /transactional root has fixed the
exception.

Just wondering, how do the batch IDs translate to Kafka offsets? Is there a
way to find out which Kafka offset trident has consumed up to, by looking
in Zookeeper?

Thanks,
Josh




On Wed, May 6, 2015 at 2:09 PM, Nikhil Singh <ns...@yahoo.com> wrote:

> Hi Josh,
> I think you need to clear the Kafka spout's metadata from the trident
> zookeeper. It has old metadata stored and the offset it has stored is no
> longer present in the Kafka queues.
>
> I think you can configure it to ignore metadata and always read from the
> head or tail..
>
> -Nikhil
>
>
>
>
>   On Wednesday, May 6, 2015 5:24 AM, Josh <jo...@gmail.com> wrote:
>
>
> I still don't have a good understanding of how/where offsets are stored
> when using the trident spouts and how to configure it.
>
> I'm getting this error at the moment (the topology was running fine a
> couple of days ago...) related to offsets, and not sure how to investigate
> it:
>
> *2015-05-06T12:09:20.414+0200 s.k.KafkaUtils [WARN] Got fetch request with
> offset out of range: [708429188]; retrying with default start offset time
> from configuration.*
> * configured start offset time: [-2]*
> *2015-05-06T12:09:20.415+0200 b.s.util [ERROR] Async loop died!*
>
>
> I'm using storm 0.9.4 with the OpaqueTrdientKafkaSpout
>
> Any ideas how I can resolve this? :)
>
>
> On Fri, May 1, 2015 at 2:05 PM, Josh <jo...@gmail.com> wrote:
>
> Hi Taylor,
>
> Could you be a bit more specific please? I can't find where the wiki
> describes the trident kafka spout storing offsets or where it's done in the
> code base.
>
> The wiki explains that SpoutConfig (for the KafkaSpout) has the zkRoot
> property which determines where it stores zookeeper offsets. But
> TridentKafkaConfig (for the TransactionalTridentKafkaSpout and
> OpaqueTridentKafkaSpout) does not have this property.
>
> Thanks,
> Josh
>
> On Fri, May 1, 2015 at 12:27 PM, P. Taylor Goetz <pt...@gmail.com>
> wrote:
>
> Hi Josh,
>
> The trident kafka spout stores offsets in zookeeper as well. See:
>
>
> https://github.com/apache/storm/tree/master/external/storm-kafka
>
> -Taylor
>
> On May 1, 2015, at 5:00 AM, Josh <jo...@gmail.com> wrote:
>
> Looks like this was some kind of network issue... I ran the topology on
> another box and don't have the problem any more.
>
> I have a related question about the TransactionalTridentKafkaSpout and
> OpaqueTridentKafkaSpout though: does anyone know where these spouts store
> their partition offsets to use for recovery in the case of failure?
>
> I know the normal (non-trident) Kafka spout stores its offsets in
> Zookeeper, and it has a spout config setting ZkHosts which determines where
> they are stored. But the trident Kafka spouts don't have this setting.
> Looking at the source code I can't find any place where it is persisting
> offsets. Plus it's using the SimpleConsumer so it's not relying on Kafka
> persisting its offsets.
>
> On Wed, Apr 29, 2015 at 4:29 PM, Josh <jo...@gmail.com> wrote:
>
> Hi all,
>
> I'm using the OpaqueTridentKafkaSpout to read batches of tuples from
> Kafka, I'm doing some filtering and aggregation and then calling
> persistAggregate to maintain a map state:
>
> stream
>       .each(new Fields("msg"), new MyFilter())
>       .each(new Fields("msg"), new ExtractFieldsFunction(), new
> Fields("id"))
>       .groupBy(new Fields("id"))
>       .persistentAggregate(MyMapState.getNonTransactional, new
> CountAggregator(), new Fields("count"))
>       .parallelismHint(1)
>
> It works fine, for the first batch, but then I am having a very strange
> problem where after the first batch my map state is no longer called. (i.e.
> there a call to multiGet and multiPut for the first batch only).
>
> The spout is still providing tuples, as when I debug I can see that the
> filter and function both continue to process input tuples (indefinitely).
> But the map state never gets called again!
>
> Why would this happen?
>
> I found a couple of very similar threads to this, but they have gone
> unanswered:
>
> https://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/wJbKbHDxRqI/KCiBE5lTdGEJ
>
> https://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/OlYUoCjx-uU/99XdNkdaSnIJ
>
>
> Thanks for any help on this,
>
> Josh
>
>
>
>
>
>
>

Re: Trident persistentAggregate only working for the first batch!

Posted by Nikhil Singh <ns...@yahoo.com>.
Hi Josh,I think you need to clear the Kafka spout's metadata from the trident zookeeper. It has old metadata stored and the offset it has stored is no longer present in the Kafka queues.
I think you can configure it to ignore metadata and always read from the head or tail..
-Nikhil 



     On Wednesday, May 6, 2015 5:24 AM, Josh <jo...@gmail.com> wrote:
   

 I still don't have a good understanding of how/where offsets are stored when using the trident spouts and how to configure it.
I'm getting this error at the moment (the topology was running fine a couple of days ago...) related to offsets, and not sure how to investigate it:
2015-05-06T12:09:20.414+0200 s.k.KafkaUtils [WARN] Got fetch request with offset out of range: [708429188]; retrying with default start offset time from configuration.
 configured start offset time: [-2]2015-05-06T12:09:20.415+0200 b.s.util [ERROR] Async loop died!

I'm using storm 0.9.4 with the OpaqueTrdientKafkaSpout
Any ideas how I can resolve this? :) 


On Fri, May 1, 2015 at 2:05 PM, Josh <jo...@gmail.com> wrote:

Hi Taylor,
Could you be a bit more specific please? I can't find where the wiki describes the trident kafka spout storing offsets or where it's done in the code base.
The wiki explains that SpoutConfig (for the KafkaSpout) has the zkRoot property which determines where it stores zookeeper offsets. But TridentKafkaConfig (for the TransactionalTridentKafkaSpout and OpaqueTridentKafkaSpout) does not have this property.
Thanks,Josh
On Fri, May 1, 2015 at 12:27 PM, P. Taylor Goetz <pt...@gmail.com> wrote:

Hi Josh,
The trident kafka spout stores offsets in zookeeper as well. See:

https://github.com/apache/storm/tree/master/external/storm-kafka
-Taylor
On May 1, 2015, at 5:00 AM, Josh <jo...@gmail.com> wrote:


Looks like this was some kind of network issue... I ran the topology on another box and don't have the problem any more.
I have a related question about the TransactionalTridentKafkaSpout and OpaqueTridentKafkaSpout though: does anyone know where these spouts store their partition offsets to use for recovery in the case of failure? 
I know the normal (non-trident) Kafka spout stores its offsets in Zookeeper, and it has a spout config setting ZkHosts which determines where they are stored. But the trident Kafka spouts don't have this setting. Looking at the source code I can't find any place where it is persisting offsets. Plus it's using the SimpleConsumer so it's not relying on Kafka persisting its offsets.
On Wed, Apr 29, 2015 at 4:29 PM, Josh <jo...@gmail.com> wrote:

Hi all,
I'm using the OpaqueTridentKafkaSpout to read batches of tuples from Kafka, I'm doing some filtering and aggregation and then calling persistAggregate to maintain a map state:
stream      .each(new Fields("msg"), new MyFilter())      .each(new Fields("msg"), new ExtractFieldsFunction(), new Fields("id"))      .groupBy(new Fields("id"))      .persistentAggregate(MyMapState.getNonTransactional, new CountAggregator(), new Fields("count"))      .parallelismHint(1)
It works fine, for the first batch, but then I am having a very strange problem where after the first batch my map state is no longer called. (i.e. there a call to multiGet and multiPut for the first batch only).
The spout is still providing tuples, as when I debug I can see that the filter and function both continue to process input tuples (indefinitely). But the map state never gets called again!
Why would this happen?
I found a couple of very similar threads to this, but they have gone unanswered:https://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/wJbKbHDxRqI/KCiBE5lTdGEJhttps://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/OlYUoCjx-uU/99XdNkdaSnIJ


Thanks for any help on this,
Josh








  

Re: Trident persistentAggregate only working for the first batch!

Posted by Josh <jo...@gmail.com>.
I still don't have a good understanding of how/where offsets are stored
when using the trident spouts and how to configure it.

I'm getting this error at the moment (the topology was running fine a
couple of days ago...) related to offsets, and not sure how to investigate
it:

*2015-05-06T12:09:20.414+0200 s.k.KafkaUtils [WARN] Got fetch request with
offset out of range: [708429188]; retrying with default start offset time
from configuration.*
* configured start offset time: [-2]*
*2015-05-06T12:09:20.415+0200 b.s.util [ERROR] Async loop died!*


I'm using storm 0.9.4 with the OpaqueTrdientKafkaSpout

Any ideas how I can resolve this? :)


On Fri, May 1, 2015 at 2:05 PM, Josh <jo...@gmail.com> wrote:

> Hi Taylor,
>
> Could you be a bit more specific please? I can't find where the wiki
> describes the trident kafka spout storing offsets or where it's done in the
> code base.
>
> The wiki explains that SpoutConfig (for the KafkaSpout) has the zkRoot
> property which determines where it stores zookeeper offsets. But
> TridentKafkaConfig (for the TransactionalTridentKafkaSpout and
> OpaqueTridentKafkaSpout) does not have this property.
>
> Thanks,
> Josh
>
> On Fri, May 1, 2015 at 12:27 PM, P. Taylor Goetz <pt...@gmail.com>
> wrote:
>
>> Hi Josh,
>>
>> The trident kafka spout stores offsets in zookeeper as well. See:
>>
>>
>> https://github.com/apache/storm/tree/master/external/storm-kafka
>>
>> -Taylor
>>
>> On May 1, 2015, at 5:00 AM, Josh <jo...@gmail.com> wrote:
>>
>> Looks like this was some kind of network issue... I ran the topology on
>> another box and don't have the problem any more.
>>
>> I have a related question about the TransactionalTridentKafkaSpout and
>> OpaqueTridentKafkaSpout though: does anyone know where these spouts store
>> their partition offsets to use for recovery in the case of failure?
>>
>> I know the normal (non-trident) Kafka spout stores its offsets in
>> Zookeeper, and it has a spout config setting ZkHosts which determines where
>> they are stored. But the trident Kafka spouts don't have this setting.
>> Looking at the source code I can't find any place where it is persisting
>> offsets. Plus it's using the SimpleConsumer so it's not relying on Kafka
>> persisting its offsets.
>>
>> On Wed, Apr 29, 2015 at 4:29 PM, Josh <jo...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> I'm using the OpaqueTridentKafkaSpout to read batches of tuples from
>>> Kafka, I'm doing some filtering and aggregation and then calling
>>> persistAggregate to maintain a map state:
>>>
>>> stream
>>>       .each(new Fields("msg"), new MyFilter())
>>>       .each(new Fields("msg"), new ExtractFieldsFunction(), new
>>> Fields("id"))
>>>       .groupBy(new Fields("id"))
>>>       .persistentAggregate(MyMapState.getNonTransactional, new
>>> CountAggregator(), new Fields("count"))
>>>       .parallelismHint(1)
>>>
>>> It works fine, for the first batch, but then I am having a very strange
>>> problem where after the first batch my map state is no longer called. (i.e.
>>> there a call to multiGet and multiPut for the first batch only).
>>>
>>> The spout is still providing tuples, as when I debug I can see that the
>>> filter and function both continue to process input tuples (indefinitely).
>>> But the map state never gets called again!
>>>
>>> Why would this happen?
>>>
>>> I found a couple of very similar threads to this, but they have gone
>>> unanswered:
>>>
>>> https://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/wJbKbHDxRqI/KCiBE5lTdGEJ
>>>
>>> https://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/OlYUoCjx-uU/99XdNkdaSnIJ
>>>
>>>
>>> Thanks for any help on this,
>>>
>>> Josh
>>>
>>
>>
>

Re: Trident persistentAggregate only working for the first batch!

Posted by Josh <jo...@gmail.com>.
Hi Taylor,

Could you be a bit more specific please? I can't find where the wiki
describes the trident kafka spout storing offsets or where it's done in the
code base.

The wiki explains that SpoutConfig (for the KafkaSpout) has the zkRoot
property which determines where it stores zookeeper offsets. But
TridentKafkaConfig (for the TransactionalTridentKafkaSpout and
OpaqueTridentKafkaSpout) does not have this property.

Thanks,
Josh

On Fri, May 1, 2015 at 12:27 PM, P. Taylor Goetz <pt...@gmail.com> wrote:

> Hi Josh,
>
> The trident kafka spout stores offsets in zookeeper as well. See:
>
>
> https://github.com/apache/storm/tree/master/external/storm-kafka
>
> -Taylor
>
> On May 1, 2015, at 5:00 AM, Josh <jo...@gmail.com> wrote:
>
> Looks like this was some kind of network issue... I ran the topology on
> another box and don't have the problem any more.
>
> I have a related question about the TransactionalTridentKafkaSpout and
> OpaqueTridentKafkaSpout though: does anyone know where these spouts store
> their partition offsets to use for recovery in the case of failure?
>
> I know the normal (non-trident) Kafka spout stores its offsets in
> Zookeeper, and it has a spout config setting ZkHosts which determines where
> they are stored. But the trident Kafka spouts don't have this setting.
> Looking at the source code I can't find any place where it is persisting
> offsets. Plus it's using the SimpleConsumer so it's not relying on Kafka
> persisting its offsets.
>
> On Wed, Apr 29, 2015 at 4:29 PM, Josh <jo...@gmail.com> wrote:
>
>> Hi all,
>>
>> I'm using the OpaqueTridentKafkaSpout to read batches of tuples from
>> Kafka, I'm doing some filtering and aggregation and then calling
>> persistAggregate to maintain a map state:
>>
>> stream
>>       .each(new Fields("msg"), new MyFilter())
>>       .each(new Fields("msg"), new ExtractFieldsFunction(), new
>> Fields("id"))
>>       .groupBy(new Fields("id"))
>>       .persistentAggregate(MyMapState.getNonTransactional, new
>> CountAggregator(), new Fields("count"))
>>       .parallelismHint(1)
>>
>> It works fine, for the first batch, but then I am having a very strange
>> problem where after the first batch my map state is no longer called. (i.e.
>> there a call to multiGet and multiPut for the first batch only).
>>
>> The spout is still providing tuples, as when I debug I can see that the
>> filter and function both continue to process input tuples (indefinitely).
>> But the map state never gets called again!
>>
>> Why would this happen?
>>
>> I found a couple of very similar threads to this, but they have gone
>> unanswered:
>>
>> https://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/wJbKbHDxRqI/KCiBE5lTdGEJ
>>
>> https://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/OlYUoCjx-uU/99XdNkdaSnIJ
>>
>>
>> Thanks for any help on this,
>>
>> Josh
>>
>
>

Re: Trident persistentAggregate only working for the first batch!

Posted by "P. Taylor Goetz" <pt...@gmail.com>.
Hi Josh,

The trident kafka spout stores offsets in zookeeper as well. See:


https://github.com/apache/storm/tree/master/external/storm-kafka

-Taylor

> On May 1, 2015, at 5:00 AM, Josh <jo...@gmail.com> wrote:
> 
> Looks like this was some kind of network issue... I ran the topology on another box and don't have the problem any more.
> 
> I have a related question about the TransactionalTridentKafkaSpout and OpaqueTridentKafkaSpout though: does anyone know where these spouts store their partition offsets to use for recovery in the case of failure? 
> 
> I know the normal (non-trident) Kafka spout stores its offsets in Zookeeper, and it has a spout config setting ZkHosts which determines where they are stored. But the trident Kafka spouts don't have this setting. Looking at the source code I can't find any place where it is persisting offsets. Plus it's using the SimpleConsumer so it's not relying on Kafka persisting its offsets.
> 
>> On Wed, Apr 29, 2015 at 4:29 PM, Josh <jo...@gmail.com> wrote:
>> Hi all,
>> 
>> I'm using the OpaqueTridentKafkaSpout to read batches of tuples from Kafka, I'm doing some filtering and aggregation and then calling persistAggregate to maintain a map state:
>> 
>> stream
>>       .each(new Fields("msg"), new MyFilter())
>>       .each(new Fields("msg"), new ExtractFieldsFunction(), new Fields("id"))
>>       .groupBy(new Fields("id"))
>>       .persistentAggregate(MyMapState.getNonTransactional, new CountAggregator(), new Fields("count"))
>>       .parallelismHint(1)
>> 
>> It works fine, for the first batch, but then I am having a very strange problem where after the first batch my map state is no longer called. (i.e. there a call to multiGet and multiPut for the first batch only).
>> 
>> The spout is still providing tuples, as when I debug I can see that the filter and function both continue to process input tuples (indefinitely). But the map state never gets called again!
>> 
>> Why would this happen?
>> 
>> I found a couple of very similar threads to this, but they have gone unanswered:
>> https://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/wJbKbHDxRqI/KCiBE5lTdGEJ
>> https://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/OlYUoCjx-uU/99XdNkdaSnIJ
>> 
>> 
>> Thanks for any help on this,
>> 
>> Josh
> 

Re: Trident persistentAggregate only working for the first batch!

Posted by Josh <jo...@gmail.com>.
Looks like this was some kind of network issue... I ran the topology on
another box and don't have the problem any more.

I have a related question about the TransactionalTridentKafkaSpout and
OpaqueTridentKafkaSpout though: does anyone know where these spouts store
their partition offsets to use for recovery in the case of failure?

I know the normal (non-trident) Kafka spout stores its offsets in
Zookeeper, and it has a spout config setting ZkHosts which determines where
they are stored. But the trident Kafka spouts don't have this setting.
Looking at the source code I can't find any place where it is persisting
offsets. Plus it's using the SimpleConsumer so it's not relying on Kafka
persisting its offsets.

On Wed, Apr 29, 2015 at 4:29 PM, Josh <jo...@gmail.com> wrote:

> Hi all,
>
> I'm using the OpaqueTridentKafkaSpout to read batches of tuples from
> Kafka, I'm doing some filtering and aggregation and then calling
> persistAggregate to maintain a map state:
>
> stream
>       .each(new Fields("msg"), new MyFilter())
>       .each(new Fields("msg"), new ExtractFieldsFunction(), new
> Fields("id"))
>       .groupBy(new Fields("id"))
>       .persistentAggregate(MyMapState.getNonTransactional, new
> CountAggregator(), new Fields("count"))
>       .parallelismHint(1)
>
> It works fine, for the first batch, but then I am having a very strange
> problem where after the first batch my map state is no longer called. (i.e.
> there a call to multiGet and multiPut for the first batch only).
>
> The spout is still providing tuples, as when I debug I can see that the
> filter and function both continue to process input tuples (indefinitely).
> But the map state never gets called again!
>
> Why would this happen?
>
> I found a couple of very similar threads to this, but they have gone
> unanswered:
>
> https://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/wJbKbHDxRqI/KCiBE5lTdGEJ
>
> https://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/OlYUoCjx-uU/99XdNkdaSnIJ
>
>
> Thanks for any help on this,
>
> Josh
>