You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by yayj <ya...@gmail.com> on 2014/01/01 10:20:22 UTC

PartitionedTridentSpoutExecutor might cause lastPartitionMeta always becomes null

Hi there,

I was going to implement a partitioned trident spout for Kafka based on storm 0.9.0.1 recently because storm-contrib/storm-kafka seems not being developed and it’s not corresponding to 0.9. But I found my customized Emitter implementing storm.trident.spout.IPartitionedTridentSpout.Emitter didn’t work.

The reason I found was that lastPartitionMeta parameter of method emitPartitionBatchNew was always null in ANY transaction. That’s not acceptable since the JavaDoc of Emitter.emitPartitionBatchNew describes it “returns the metadata that can be used to reconstruct this partition/batch in the future.”(https://github.com/nathanmarz/storm/blob/moved-to-apache/storm-core/src/jvm/storm/trident/spout/IPartitionedTridentSpout.java#L52) Furthermore, my implementation is just similar to what storm-kafka does(https://github.com/nathanmarz/storm-contrib/blob/master/storm-kafka/src/jvm/storm/kafka/trident/KafkaUtils.java#L55).

At last, I found the root cause is located in commit d6c2736(revamp trident spout and partitioned trident spouts to support spouts where the source can change). In this commit, PartitionedTridentSpoutExecutor.Emitter.emitBatch() uses _savedCoordinatorMeta, which is returned by Coordinator.getPartitionsForBatch() to check partition changing, which is described in commit log. If changed, FIRST FLUSHING _partitionStates(line 107: _partitionStates.clear()), which stores All partition states, and then invoking Emitter.getOrderedPartitions() to regenerate partition information.(https://github.com/nathanmarz/storm/blob/moved-to-apache/storm-core/src/jvm/storm/trident/spout/PartitionedTridentSpoutExecutor.java#L105)

Frankly speaking, I thinks it’s too arbitrary. The semanteme of the operation to _partitionStates becomes REPLACEMENT but I think it should be updating, which means adding new ones, removing lost ones, and KEEPING STABLE ONES. I don’t think that it’s logical that one partition changes, all partitions in the same txid must reconstruct their states.

So, I’m wondering whether my comprehension is correct or not? And is it feasible to send a pull request to https://github.com/apache/incubator-storm ?


-------------
Matt



Re: PartitionedTridentSpoutExecutor might cause lastPartitionMeta always becomes null

Posted by Matt <ya...@gmail.com>.
Hi Edison,

Sounds good, and could you please publish your pull request here?

-------------
Matt


在 2014年1月22日,下午4:06,Edison <xe...@gmail.com> 写道:

> I got the refreshing issue solved. It's caused by a bug that GlobalPartitionInformation didn't override the equal() method of Object. 
> Thus, everytime Trident will try to check whether the partition infor has changed or not by invoking equal() method, it will refresh the partition.
> I've submited a pull request to the author, and likely it may be merged into master soon.
> 
> 
> 2014/1/13 Matt <ya...@gmail.com>
> Hi Edison,
> 
> Yes, I have already resolved these problems. And thanks a lot for your help.
> 
> -------------
> Matt
> 
> 
> 在 2014年1月13日,下午4:08,Edison <xe...@gmail.com> 写道:
> 
>> Hi Matt,
>> 
>> For the serialization problem, I feel the same with you. They could use Kryo for a better code reuse. I guess this func is "just work", and the contributors didn't hear any strong voice, so they just leave it there at a lower priority?
>> 
>> BTW, in Trident, the Emitter interface requires a implementation of "void refreshPartitions(List<Partition> partitionResponsibilities);", the comments saying this is used to manage things like connections to brokers. 
>> In the implementation of TridentKafkaEmitter, of the project I shared to you, it closes the connection to Kafka broker everytime when refreshPartition method is called. So in the serverlog of Kafka, lots of junk information saying socket close keeps coming.
>> 
>> In the code OpaquePartitionedTridentSpoutExecutor.emitBatch method, clearly, storm will try to "refresh" the partition everytime when it update coordinate meta, but why does it have to do this? Avoiding connection lost or some?
>> Do you get any clue? 
>> 
>> 
>> 2014/1/13 Matt <ya...@gmail.com>
>> HI Edison,
>> 
>> Thanks for your reply. It does help I think because it does work. But I am still curious about why, because _partitionStates is also been flushed when using storm-kakfk-plus.
>> 
>> At last, I realized that the root cause was not _partitionStates flushed. The real reason is that I used my customized class to interpret lastPartitionMeta. My spout is implementing IOpaquePartitionedTridentSpout<Partitions, Partition, Batch>, while storm-kafka-0.8-plus is implementing IOpaquePartitionedTridentSpout<Partitions, Partition, Map>.
>> 
>> The last template argument(Batch/Map) will be serialized and stored in zookeeper. Storm will read last batch information from zookeeper if it’s not stored locally. But storm fulfills the serialization for this instance by using json-simple hardcoded(https://github.com/nathanmarz/storm/blob/0.9.0.1/storm-core/src/jvm/storm/trident/topology/state/TransactionalState.java#L52) and json-simple doesn’t support reflection, which means not supporting customized class.
>> 
>> In other words, storm implicitly requires that batch information must be Map, String or some others supported by json-simple. But no document mentioned that.
>> 
>> And why not use Kryo to serialize/deserialize batch information, which is widely used in storm already?
>> 
>> -------------
>> Matt
>> 
>> 
>> 在 2014年1月6日,下午4:42,Edison <xe...@gmail.com> 写道:
>> 
>>> check this out : https://github.com/wurstmeister/storm-kafka-0.8-plus
>>> 
>>> 
>>> 2014/1/1 yayj <ya...@gmail.com>
>>> Hi there,
>>> 
>>> I was going to implement a partitioned trident spout for Kafka based on storm 0.9.0.1 recently because storm-contrib/storm-kafka seems not being developed and it’s not corresponding to 0.9. But I found my customized Emitter implementing storm.trident.spout.IPartitionedTridentSpout.Emitter didn’t work.
>>> 
>>> The reason I found was that lastPartitionMeta parameter of method emitPartitionBatchNew was always null in ANY transaction. That’s not acceptable since the JavaDoc of Emitter.emitPartitionBatchNew describes it “returns the metadata that can be used to reconstruct this partition/batch in the future.”(https://github.com/nathanmarz/storm/blob/moved-to-apache/storm-core/src/jvm/storm/trident/spout/IPartitionedTridentSpout.java#L52) Furthermore, my implementation is just similar to what storm-kafka does(https://github.com/nathanmarz/storm-contrib/blob/master/storm-kafka/src/jvm/storm/kafka/trident/KafkaUtils.java#L55).
>>> 
>>> At last, I found the root cause is located in commit d6c2736(revamp trident spout and partitioned trident spouts to support spouts where the source can change). In this commit, PartitionedTridentSpoutExecutor.Emitter.emitBatch() uses _savedCoordinatorMeta, which is returned by Coordinator.getPartitionsForBatch() to check partition changing, which is described in commit log. If changed, FIRST FLUSHING _partitionStates(line 107: _partitionStates.clear()), which stores All partition states, and then invoking Emitter.getOrderedPartitions() to regenerate partition information.(https://github.com/nathanmarz/storm/blob/moved-to-apache/storm-core/src/jvm/storm/trident/spout/PartitionedTridentSpoutExecutor.java#L105)
>>> 
>>> Frankly speaking, I thinks it’s too arbitrary. The semanteme of the operation to _partitionStates becomes REPLACEMENT but I think it should be updating, which means adding new ones, removing lost ones, and KEEPING STABLE ONES. I don’t think that it’s logical that one partition changes, all partitions in the same txid must reconstruct their states.
>>> 
>>> So, I’m wondering whether my comprehension is correct or not? And is it feasible to send a pull request to https://github.com/apache/incubator-storm ?
>>> 
>>> 
>>> -------------
>>> Matt
>>> 
>>> 
>>> 
>> 
>> 
> 
> 


Re: PartitionedTridentSpoutExecutor might cause lastPartitionMeta always becomes null

Posted by Edison <xe...@gmail.com>.
I got the refreshing issue solved. It's caused by a bug
that GlobalPartitionInformation didn't override the equal() method of
Object.
Thus, everytime Trident will try to check whether the partition infor has
changed or not by invoking equal() method, it will refresh the partition.
I've submited a pull request to the author, and likely it may be merged
into master soon.


2014/1/13 Matt <ya...@gmail.com>

> Hi Edison,
>
> Yes, I have already resolved these problems. And thanks a lot for your
> help.
>
> -------------
> Matt
>
>
> 在 2014年1月13日,下午4:08,Edison <xe...@gmail.com> 写道:
>
> Hi Matt,
>
> For the serialization problem, I feel the same with you. They could use
> Kryo for a better code reuse. I guess this func is "just work", and the
> contributors didn't hear any strong voice, so they just leave it there at a
> lower priority?
>
> BTW, in Trident, the Emitter interface requires a implementation of "void
> refreshPartitions(List<Partition> partitionResponsibilities);", the
> comments saying this is used to manage things like connections to brokers.
> In the implementation of TridentKafkaEmitter, of the project I shared to
> you, it closes the connection to Kafka broker everytime when
> refreshPartition method is called. So in the serverlog of Kafka, lots of
> junk information saying socket close keeps coming.
>
> In the code OpaquePartitionedTridentSpoutExecutor.emitBatch method,
> clearly, storm will try to "refresh" the partition everytime when it update
> coordinate meta, but why does it have to do this? Avoiding connection lost
> or some?
> Do you get any clue?
>
>
> 2014/1/13 Matt <ya...@gmail.com>
>
>> HI Edison,
>>
>> Thanks for your reply. It does help I think because it does work. But I
>> am still curious about why, because _partitionStates is also been flushed
>> when using storm-kakfk-plus.
>>
>> At last, I realized that the root cause was not _partitionStates flushed.
>> The real reason is that I used my customized class to interpret
>> lastPartitionMeta. My spout is
>> implementing IOpaquePartitionedTridentSpout<Partitions, Partition, Batch>,
>> while storm-kafka-0.8-plus is
>> implementing IOpaquePartitionedTridentSpout<Partitions, Partition, Map>.
>>
>> The last template argument(Batch/Map) will be serialized and stored in
>> zookeeper. Storm will read last batch information from zookeeper if it’s
>> not stored locally. But storm fulfills the serialization for this instance
>> by using json-simple hardcoded(
>> https://github.com/nathanmarz/storm/blob/0.9.0.1/storm-core/src/jvm/storm/trident/topology/state/TransactionalState.java#L52)
>> and json-simple doesn’t support reflection, which means not supporting
>> customized class.
>>
>> In other words, storm implicitly requires that batch information must be
>> Map, String or some others supported by json-simple. But no document
>> mentioned that.
>>
>> And why not use Kryo to serialize/deserialize batch information, which is
>> widely used in storm already?
>>
>> -------------
>> Matt
>>
>>
>> 在 2014年1月6日,下午4:42,Edison <xe...@gmail.com> 写道:
>>
>> check this out : https://github.com/wurstmeister/storm-kafka-0.8-plus
>>
>>
>> 2014/1/1 yayj <ya...@gmail.com>
>>
>>> Hi there,
>>>
>>> I was going to implement a partitioned trident spout for Kafka based on
>>> storm 0.9.0.1 recently because storm-contrib/storm-kafka seems not being
>>> developed and it’s not corresponding to 0.9. But I found my customized
>>> Emitter implementing storm.trident.spout.IPartitionedTridentSpout.Emitter
>>> didn’t work.
>>>
>>> The reason I found was that lastPartitionMeta parameter of method
>>> emitPartitionBatchNew was always null in ANY transaction. That’s not
>>> acceptable since the JavaDoc of Emitter.emitPartitionBatchNew describes it
>>> “returns the metadata that can be used to reconstruct this partition/batch
>>> in the future.”(
>>> https://github.com/nathanmarz/storm/blob/moved-to-apache/storm-core/src/jvm/storm/trident/spout/IPartitionedTridentSpout.java#L52)
>>> Furthermore, my implementation is just similar to what storm-kafka does(
>>> https://github.com/nathanmarz/storm-contrib/blob/master/storm-kafka/src/jvm/storm/kafka/trident/KafkaUtils.java#L55
>>> ).
>>>
>>> At last, I found the root cause is located in commit d6c2736(revamp
>>> trident spout and partitioned trident spouts to support spouts where the
>>> source can change). In this commit,
>>> PartitionedTridentSpoutExecutor.Emitter.emitBatch() uses _savedCoordinatorMeta,
>>> which is returned by Coordinator.getPartitionsForBatch() to check
>>> partition changing, which is described in commit log. If changed, FIRST
>>> FLUSHING _partitionStates(line 107: _partitionStates.clear()), which
>>> stores All partition states, and then invoking
>>> Emitter.getOrderedPartitions() to regenerate partition information.(
>>> https://github.com/nathanmarz/storm/blob/moved-to-apache/storm-core/src/jvm/storm/trident/spout/PartitionedTridentSpoutExecutor.java#L105
>>> )
>>>
>>> Frankly speaking, I thinks it’s too arbitrary. The semanteme of the
>>> operation to _partitionStates becomes REPLACEMENT but I think it should be
>>> updating, which means adding new ones, removing lost ones, and KEEPING
>>> STABLE ONES. I don’t think that it’s logical that one partition changes,
>>> all partitions in the same txid must reconstruct their states.
>>>
>>> So, I’m wondering whether my comprehension is correct or not? And is
>>> it feasible to send a pull request to
>>> https://github.com/apache/incubator-storm ?
>>>
>>>
>>> -------------
>>> Matt
>>>
>>>
>>>
>>
>>
>
>

Re: PartitionedTridentSpoutExecutor might cause lastPartitionMeta always becomes null

Posted by Matt <ya...@gmail.com>.
Hi Edison,

Yes, I have already resolved these problems. And thanks a lot for your help.

-------------
Matt


在 2014年1月13日,下午4:08,Edison <xe...@gmail.com> 写道:

> Hi Matt,
> 
> For the serialization problem, I feel the same with you. They could use Kryo for a better code reuse. I guess this func is "just work", and the contributors didn't hear any strong voice, so they just leave it there at a lower priority?
> 
> BTW, in Trident, the Emitter interface requires a implementation of "void refreshPartitions(List<Partition> partitionResponsibilities);", the comments saying this is used to manage things like connections to brokers. 
> In the implementation of TridentKafkaEmitter, of the project I shared to you, it closes the connection to Kafka broker everytime when refreshPartition method is called. So in the serverlog of Kafka, lots of junk information saying socket close keeps coming.
> 
> In the code OpaquePartitionedTridentSpoutExecutor.emitBatch method, clearly, storm will try to "refresh" the partition everytime when it update coordinate meta, but why does it have to do this? Avoiding connection lost or some?
> Do you get any clue? 
> 
> 
> 2014/1/13 Matt <ya...@gmail.com>
> HI Edison,
> 
> Thanks for your reply. It does help I think because it does work. But I am still curious about why, because _partitionStates is also been flushed when using storm-kakfk-plus.
> 
> At last, I realized that the root cause was not _partitionStates flushed. The real reason is that I used my customized class to interpret lastPartitionMeta. My spout is implementing IOpaquePartitionedTridentSpout<Partitions, Partition, Batch>, while storm-kafka-0.8-plus is implementing IOpaquePartitionedTridentSpout<Partitions, Partition, Map>.
> 
> The last template argument(Batch/Map) will be serialized and stored in zookeeper. Storm will read last batch information from zookeeper if it’s not stored locally. But storm fulfills the serialization for this instance by using json-simple hardcoded(https://github.com/nathanmarz/storm/blob/0.9.0.1/storm-core/src/jvm/storm/trident/topology/state/TransactionalState.java#L52) and json-simple doesn’t support reflection, which means not supporting customized class.
> 
> In other words, storm implicitly requires that batch information must be Map, String or some others supported by json-simple. But no document mentioned that.
> 
> And why not use Kryo to serialize/deserialize batch information, which is widely used in storm already?
> 
> -------------
> Matt
> 
> 
> 在 2014年1月6日,下午4:42,Edison <xe...@gmail.com> 写道:
> 
>> check this out : https://github.com/wurstmeister/storm-kafka-0.8-plus
>> 
>> 
>> 2014/1/1 yayj <ya...@gmail.com>
>> Hi there,
>> 
>> I was going to implement a partitioned trident spout for Kafka based on storm 0.9.0.1 recently because storm-contrib/storm-kafka seems not being developed and it’s not corresponding to 0.9. But I found my customized Emitter implementing storm.trident.spout.IPartitionedTridentSpout.Emitter didn’t work.
>> 
>> The reason I found was that lastPartitionMeta parameter of method emitPartitionBatchNew was always null in ANY transaction. That’s not acceptable since the JavaDoc of Emitter.emitPartitionBatchNew describes it “returns the metadata that can be used to reconstruct this partition/batch in the future.”(https://github.com/nathanmarz/storm/blob/moved-to-apache/storm-core/src/jvm/storm/trident/spout/IPartitionedTridentSpout.java#L52) Furthermore, my implementation is just similar to what storm-kafka does(https://github.com/nathanmarz/storm-contrib/blob/master/storm-kafka/src/jvm/storm/kafka/trident/KafkaUtils.java#L55).
>> 
>> At last, I found the root cause is located in commit d6c2736(revamp trident spout and partitioned trident spouts to support spouts where the source can change). In this commit, PartitionedTridentSpoutExecutor.Emitter.emitBatch() uses _savedCoordinatorMeta, which is returned by Coordinator.getPartitionsForBatch() to check partition changing, which is described in commit log. If changed, FIRST FLUSHING _partitionStates(line 107: _partitionStates.clear()), which stores All partition states, and then invoking Emitter.getOrderedPartitions() to regenerate partition information.(https://github.com/nathanmarz/storm/blob/moved-to-apache/storm-core/src/jvm/storm/trident/spout/PartitionedTridentSpoutExecutor.java#L105)
>> 
>> Frankly speaking, I thinks it’s too arbitrary. The semanteme of the operation to _partitionStates becomes REPLACEMENT but I think it should be updating, which means adding new ones, removing lost ones, and KEEPING STABLE ONES. I don’t think that it’s logical that one partition changes, all partitions in the same txid must reconstruct their states.
>> 
>> So, I’m wondering whether my comprehension is correct or not? And is it feasible to send a pull request to https://github.com/apache/incubator-storm ?
>> 
>> 
>> -------------
>> Matt
>> 
>> 
>> 
> 
> 


Re: PartitionedTridentSpoutExecutor might cause lastPartitionMeta always becomes null

Posted by Edison <xe...@gmail.com>.
Hi Matt,

For the serialization problem, I feel the same with you. They could use
Kryo for a better code reuse. I guess this func is "just work", and the
contributors didn't hear any strong voice, so they just leave it there at a
lower priority?

BTW, in Trident, the Emitter interface requires a implementation of "void
refreshPartitions(List<Partition> partitionResponsibilities);", the
comments saying this is used to manage things like connections to brokers.
In the implementation of TridentKafkaEmitter, of the project I shared to
you, it closes the connection to Kafka broker everytime when
refreshPartition method is called. So in the serverlog of Kafka, lots of
junk information saying socket close keeps coming.

In the code OpaquePartitionedTridentSpoutExecutor.emitBatch method,
clearly, storm will try to "refresh" the partition everytime when it update
coordinate meta, but why does it have to do this? Avoiding connection lost
or some?
Do you get any clue?


2014/1/13 Matt <ya...@gmail.com>

> HI Edison,
>
> Thanks for your reply. It does help I think because it does work. But I am
> still curious about why, because _partitionStates is also been flushed when
> using storm-kakfk-plus.
>
> At last, I realized that the root cause was not _partitionStates flushed.
> The real reason is that I used my customized class to interpret
> lastPartitionMeta. My spout is
> implementing IOpaquePartitionedTridentSpout<Partitions, Partition, Batch>,
> while storm-kafka-0.8-plus is
> implementing IOpaquePartitionedTridentSpout<Partitions, Partition, Map>.
>
> The last template argument(Batch/Map) will be serialized and stored in
> zookeeper. Storm will read last batch information from zookeeper if it’s
> not stored locally. But storm fulfills the serialization for this instance
> by using json-simple hardcoded(
> https://github.com/nathanmarz/storm/blob/0.9.0.1/storm-core/src/jvm/storm/trident/topology/state/TransactionalState.java#L52)
> and json-simple doesn’t support reflection, which means not supporting
> customized class.
>
> In other words, storm implicitly requires that batch information must be
> Map, String or some others supported by json-simple. But no document
> mentioned that.
>
> And why not use Kryo to serialize/deserialize batch information, which is
> widely used in storm already?
>
> -------------
> Matt
>
>
> 在 2014年1月6日,下午4:42,Edison <xe...@gmail.com> 写道:
>
> check this out : https://github.com/wurstmeister/storm-kafka-0.8-plus
>
>
> 2014/1/1 yayj <ya...@gmail.com>
>
>> Hi there,
>>
>> I was going to implement a partitioned trident spout for Kafka based on
>> storm 0.9.0.1 recently because storm-contrib/storm-kafka seems not being
>> developed and it’s not corresponding to 0.9. But I found my customized
>> Emitter implementing storm.trident.spout.IPartitionedTridentSpout.Emitter
>> didn’t work.
>>
>> The reason I found was that lastPartitionMeta parameter of method
>> emitPartitionBatchNew was always null in ANY transaction. That’s not
>> acceptable since the JavaDoc of Emitter.emitPartitionBatchNew describes it
>> “returns the metadata that can be used to reconstruct this partition/batch
>> in the future.”(
>> https://github.com/nathanmarz/storm/blob/moved-to-apache/storm-core/src/jvm/storm/trident/spout/IPartitionedTridentSpout.java#L52)
>> Furthermore, my implementation is just similar to what storm-kafka does(
>> https://github.com/nathanmarz/storm-contrib/blob/master/storm-kafka/src/jvm/storm/kafka/trident/KafkaUtils.java#L55
>> ).
>>
>> At last, I found the root cause is located in commit d6c2736(revamp
>> trident spout and partitioned trident spouts to support spouts where the
>> source can change). In this commit,
>> PartitionedTridentSpoutExecutor.Emitter.emitBatch() uses _savedCoordinatorMeta,
>> which is returned by Coordinator.getPartitionsForBatch() to check
>> partition changing, which is described in commit log. If changed, FIRST
>> FLUSHING _partitionStates(line 107: _partitionStates.clear()), which
>> stores All partition states, and then invoking
>> Emitter.getOrderedPartitions() to regenerate partition information.(
>> https://github.com/nathanmarz/storm/blob/moved-to-apache/storm-core/src/jvm/storm/trident/spout/PartitionedTridentSpoutExecutor.java#L105
>> )
>>
>> Frankly speaking, I thinks it’s too arbitrary. The semanteme of the
>> operation to _partitionStates becomes REPLACEMENT but I think it should be
>> updating, which means adding new ones, removing lost ones, and KEEPING
>> STABLE ONES. I don’t think that it’s logical that one partition changes,
>> all partitions in the same txid must reconstruct their states.
>>
>> So, I’m wondering whether my comprehension is correct or not? And is
>> it feasible to send a pull request to
>> https://github.com/apache/incubator-storm ?
>>
>>
>> -------------
>> Matt
>>
>>
>>
>
>

Re: PartitionedTridentSpoutExecutor might cause lastPartitionMeta always becomes null

Posted by Matt <ya...@gmail.com>.
HI Edison,

Thanks for your reply. It does help I think because it does work. But I am still curious about why, because _partitionStates is also been flushed when using storm-kakfk-plus.

At last, I realized that the root cause was not _partitionStates flushed. The real reason is that I used my customized class to interpret lastPartitionMeta. My spout is implementing IOpaquePartitionedTridentSpout<Partitions, Partition, Batch>, while storm-kafka-0.8-plus is implementing IOpaquePartitionedTridentSpout<Partitions, Partition, Map>.

The last template argument(Batch/Map) will be serialized and stored in zookeeper. Storm will read last batch information from zookeeper if it’s not stored locally. But storm fulfills the serialization for this instance by using json-simple hardcoded(https://github.com/nathanmarz/storm/blob/0.9.0.1/storm-core/src/jvm/storm/trident/topology/state/TransactionalState.java#L52) and json-simple doesn’t support reflection, which means not supporting customized class.

In other words, storm implicitly requires that batch information must be Map, String or some others supported by json-simple. But no document mentioned that.

And why not use Kryo to serialize/deserialize batch information, which is widely used in storm already?

-------------
Matt


在 2014年1月6日,下午4:42,Edison <xe...@gmail.com> 写道:

> check this out : https://github.com/wurstmeister/storm-kafka-0.8-plus
> 
> 
> 2014/1/1 yayj <ya...@gmail.com>
> Hi there,
> 
> I was going to implement a partitioned trident spout for Kafka based on storm 0.9.0.1 recently because storm-contrib/storm-kafka seems not being developed and it’s not corresponding to 0.9. But I found my customized Emitter implementing storm.trident.spout.IPartitionedTridentSpout.Emitter didn’t work.
> 
> The reason I found was that lastPartitionMeta parameter of method emitPartitionBatchNew was always null in ANY transaction. That’s not acceptable since the JavaDoc of Emitter.emitPartitionBatchNew describes it “returns the metadata that can be used to reconstruct this partition/batch in the future.”(https://github.com/nathanmarz/storm/blob/moved-to-apache/storm-core/src/jvm/storm/trident/spout/IPartitionedTridentSpout.java#L52) Furthermore, my implementation is just similar to what storm-kafka does(https://github.com/nathanmarz/storm-contrib/blob/master/storm-kafka/src/jvm/storm/kafka/trident/KafkaUtils.java#L55).
> 
> At last, I found the root cause is located in commit d6c2736(revamp trident spout and partitioned trident spouts to support spouts where the source can change). In this commit, PartitionedTridentSpoutExecutor.Emitter.emitBatch() uses _savedCoordinatorMeta, which is returned by Coordinator.getPartitionsForBatch() to check partition changing, which is described in commit log. If changed, FIRST FLUSHING _partitionStates(line 107: _partitionStates.clear()), which stores All partition states, and then invoking Emitter.getOrderedPartitions() to regenerate partition information.(https://github.com/nathanmarz/storm/blob/moved-to-apache/storm-core/src/jvm/storm/trident/spout/PartitionedTridentSpoutExecutor.java#L105)
> 
> Frankly speaking, I thinks it’s too arbitrary. The semanteme of the operation to _partitionStates becomes REPLACEMENT but I think it should be updating, which means adding new ones, removing lost ones, and KEEPING STABLE ONES. I don’t think that it’s logical that one partition changes, all partitions in the same txid must reconstruct their states.
> 
> So, I’m wondering whether my comprehension is correct or not? And is it feasible to send a pull request to https://github.com/apache/incubator-storm ?
> 
> 
> -------------
> Matt
> 
> 
> 


Re: PartitionedTridentSpoutExecutor might cause lastPartitionMeta always becomes null

Posted by Edison <xe...@gmail.com>.
check this out : https://github.com/wurstmeister/storm-kafka-0.8-plus


2014/1/1 yayj <ya...@gmail.com>

> Hi there,
>
> I was going to implement a partitioned trident spout for Kafka based on
> storm 0.9.0.1 recently because storm-contrib/storm-kafka seems not being
> developed and it’s not corresponding to 0.9. But I found my customized
> Emitter implementing storm.trident.spout.IPartitionedTridentSpout.Emitter
> didn’t work.
>
> The reason I found was that lastPartitionMeta parameter of method
> emitPartitionBatchNew was always null in ANY transaction. That’s not
> acceptable since the JavaDoc of Emitter.emitPartitionBatchNew describes it
> “returns the metadata that can be used to reconstruct this partition/batch
> in the future.”(
> https://github.com/nathanmarz/storm/blob/moved-to-apache/storm-core/src/jvm/storm/trident/spout/IPartitionedTridentSpout.java#L52)
> Furthermore, my implementation is just similar to what storm-kafka does(
> https://github.com/nathanmarz/storm-contrib/blob/master/storm-kafka/src/jvm/storm/kafka/trident/KafkaUtils.java#L55
> ).
>
> At last, I found the root cause is located in commit d6c2736(revamp
> trident spout and partitioned trident spouts to support spouts where the
> source can change). In this commit,
> PartitionedTridentSpoutExecutor.Emitter.emitBatch() uses _savedCoordinatorMeta,
> which is returned by Coordinator.getPartitionsForBatch() to check
> partition changing, which is described in commit log. If changed, FIRST
> FLUSHING _partitionStates(line 107: _partitionStates.clear()), which
> stores All partition states, and then invoking
> Emitter.getOrderedPartitions() to regenerate partition information.(
> https://github.com/nathanmarz/storm/blob/moved-to-apache/storm-core/src/jvm/storm/trident/spout/PartitionedTridentSpoutExecutor.java#L105
> )
>
> Frankly speaking, I thinks it’s too arbitrary. The semanteme of the
> operation to _partitionStates becomes REPLACEMENT but I think it should be
> updating, which means adding new ones, removing lost ones, and KEEPING
> STABLE ONES. I don’t think that it’s logical that one partition changes,
> all partitions in the same txid must reconstruct their states.
>
> So, I’m wondering whether my comprehension is correct or not? And is
> it feasible to send a pull request to
> https://github.com/apache/incubator-storm ?
>
>
> -------------
> Matt
>
>
>