You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Morrigan Jones <mo...@jwplayer.com> on 2016/05/05 15:32:43 UTC

Storm 1.0.0 TransactionalTridentKafkaSpout Issue

I'm in the process of upgrading our Storm code from 0.10.0 to 1.0.0 and
I've run into an issue with TransactionalTridentKafkaSpout. When running
one of our topologies I'm getting the following exception:

Caused by: java.lang.ClassCastException: java.util.ArrayList cannot be cast
to java.lang.Integer
at
org.apache.storm.trident.spout.PartitionedTridentSpoutExecutor$Coordinator.initializeTransaction(PartitionedTridentSpoutExecutor.java:55)
~[storm-core-1.0.0.jar:1.0.0]
at
org.apache.storm.trident.spout.PartitionedTridentSpoutExecutor$Coordinator.initializeTransaction(PartitionedTridentSpoutExecutor.java:43)
~[storm-core-1.0.0.jar:1.0.0]
at
org.apache.storm.trident.spout.TridentSpoutCoordinator.execute(TridentSpoutCoordinator.java:70)
~[storm-core-1.0.0.jar:1.0.0]
at
org.apache.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50)
~[storm-core-1.0.0.jar:1.0.0]

The issue appears to be caused by a change in
PartitionedTridentSpoutExecutor between the two versions, specifically this
method:

1.0.0 -
https://github.com/apache/storm/blob/v1.0.0/storm-core/src/jvm/org/apache/storm/trident/spout/PartitionedTridentSpoutExecutor.java#L51

public Integer initializeTransaction(long txid, Integer prevMetadata,
Integer currMetadata) {
    if(currMetadata!=null) {
        return currMetadata;
    } else {
        return _coordinator.getPartitionsForBatch();
    }
}

0.10.0 -
https://github.com/apache/storm/blob/v0.10.0/storm-core/src/jvm/storm/trident/spout/PartitionedTridentSpoutExecutor.java#L51

public Object initializeTransaction(long txid, Object prevMetadata, Object
currMetadata) {
    if(currMetadata!=null) {
        return currMetadata;
    } else {
        return _coordinator.getPartitionsForBatch();
    }
}

The OpaquePartitionedTridentSpoutExecutor still uses Object for the
metadata. Is this a bug in PartitionedTridentSpoutExecutor that is breaking
transactional spouts? Any help will be appreciated.

Thanks!

Re: Storm 1.0.0 TransactionalTridentKafkaSpout Issue

Posted by Morrigan Jones <mo...@jwplayer.com>.
I added a bug to the Storm JIRA for this:
https://issues.apache.org/jira/browse/STORM-1854

On Fri, May 20, 2016 at 12:39 AM, Romain Leroux <le...@gmail.com>
wrote:

> Same issued happened to us, can't run any of our topologies since they all
> use TransactionalTridentKafkaSpout... Huge regression.
>
> It looks like this is due to:
> org.apache.storm.kafka.trident.Coordinator implements
> IPartitionedTridentSpout.Coordinator<List<GlobalPartitionInformation>> ...
> while
> org.apache.storm.trident.spout.PartitionedTridentSpoutExecutor.Coordinator
> implements ITridentSpout.BatchCoordinator<Integer>
> with
> private IPartitionedTridentSpout.Coordinator<Integer> _coordinator;
>
> and of course Integer != List<GlobalPartitionInformation> ...
>
>
>
> 2016-05-06 0:32 GMT+09:00 Morrigan Jones <mo...@jwplayer.com>:
>
>> I'm in the process of upgrading our Storm code from 0.10.0 to 1.0.0 and
>> I've run into an issue with TransactionalTridentKafkaSpout. When running
>> one of our topologies I'm getting the following exception:
>>
>> Caused by: java.lang.ClassCastException: java.util.ArrayList cannot be
>> cast to java.lang.Integer
>> at
>> org.apache.storm.trident.spout.PartitionedTridentSpoutExecutor$Coordinator.initializeTransaction(PartitionedTridentSpoutExecutor.java:55)
>> ~[storm-core-1.0.0.jar:1.0.0]
>> at
>> org.apache.storm.trident.spout.PartitionedTridentSpoutExecutor$Coordinator.initializeTransaction(PartitionedTridentSpoutExecutor.java:43)
>> ~[storm-core-1.0.0.jar:1.0.0]
>> at
>> org.apache.storm.trident.spout.TridentSpoutCoordinator.execute(TridentSpoutCoordinator.java:70)
>> ~[storm-core-1.0.0.jar:1.0.0]
>> at
>> org.apache.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50)
>> ~[storm-core-1.0.0.jar:1.0.0]
>>
>> The issue appears to be caused by a change in
>> PartitionedTridentSpoutExecutor between the two versions, specifically this
>> method:
>>
>> 1.0.0 -
>> https://github.com/apache/storm/blob/v1.0.0/storm-core/src/jvm/org/apache/storm/trident/spout/PartitionedTridentSpoutExecutor.java#L51
>>
>> public Integer initializeTransaction(long txid, Integer prevMetadata,
>> Integer currMetadata) {
>>     if(currMetadata!=null) {
>>         return currMetadata;
>>     } else {
>>         return _coordinator.getPartitionsForBatch();
>>     }
>> }
>>
>> 0.10.0 -
>> https://github.com/apache/storm/blob/v0.10.0/storm-core/src/jvm/storm/trident/spout/PartitionedTridentSpoutExecutor.java#L51
>>
>> public Object initializeTransaction(long txid, Object prevMetadata,
>> Object currMetadata) {
>>     if(currMetadata!=null) {
>>         return currMetadata;
>>     } else {
>>         return _coordinator.getPartitionsForBatch();
>>     }
>> }
>>
>> The OpaquePartitionedTridentSpoutExecutor still uses Object for the
>> metadata. Is this a bug in PartitionedTridentSpoutExecutor that is breaking
>> transactional spouts? Any help will be appreciated.
>>
>> Thanks!
>>
>
>


-- 
Morrigan Jones
Principal Engineer
*JW*PLAYER  |  Your Way to Play
morrigan@jwplayer.com  |  jwplayer.com

Re: Storm 1.0.0 TransactionalTridentKafkaSpout Issue

Posted by Romain Leroux <le...@gmail.com>.
Same issued happened to us, can't run any of our topologies since they all
use TransactionalTridentKafkaSpout... Huge regression.

It looks like this is due to:
org.apache.storm.kafka.trident.Coordinator implements
IPartitionedTridentSpout.Coordinator<List<GlobalPartitionInformation>> ...
while
org.apache.storm.trident.spout.PartitionedTridentSpoutExecutor.Coordinator
implements ITridentSpout.BatchCoordinator<Integer>
with
private IPartitionedTridentSpout.Coordinator<Integer> _coordinator;

and of course Integer != List<GlobalPartitionInformation> ...



2016-05-06 0:32 GMT+09:00 Morrigan Jones <mo...@jwplayer.com>:

> I'm in the process of upgrading our Storm code from 0.10.0 to 1.0.0 and
> I've run into an issue with TransactionalTridentKafkaSpout. When running
> one of our topologies I'm getting the following exception:
>
> Caused by: java.lang.ClassCastException: java.util.ArrayList cannot be
> cast to java.lang.Integer
> at
> org.apache.storm.trident.spout.PartitionedTridentSpoutExecutor$Coordinator.initializeTransaction(PartitionedTridentSpoutExecutor.java:55)
> ~[storm-core-1.0.0.jar:1.0.0]
> at
> org.apache.storm.trident.spout.PartitionedTridentSpoutExecutor$Coordinator.initializeTransaction(PartitionedTridentSpoutExecutor.java:43)
> ~[storm-core-1.0.0.jar:1.0.0]
> at
> org.apache.storm.trident.spout.TridentSpoutCoordinator.execute(TridentSpoutCoordinator.java:70)
> ~[storm-core-1.0.0.jar:1.0.0]
> at
> org.apache.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50)
> ~[storm-core-1.0.0.jar:1.0.0]
>
> The issue appears to be caused by a change in
> PartitionedTridentSpoutExecutor between the two versions, specifically this
> method:
>
> 1.0.0 -
> https://github.com/apache/storm/blob/v1.0.0/storm-core/src/jvm/org/apache/storm/trident/spout/PartitionedTridentSpoutExecutor.java#L51
>
> public Integer initializeTransaction(long txid, Integer prevMetadata,
> Integer currMetadata) {
>     if(currMetadata!=null) {
>         return currMetadata;
>     } else {
>         return _coordinator.getPartitionsForBatch();
>     }
> }
>
> 0.10.0 -
> https://github.com/apache/storm/blob/v0.10.0/storm-core/src/jvm/storm/trident/spout/PartitionedTridentSpoutExecutor.java#L51
>
> public Object initializeTransaction(long txid, Object prevMetadata, Object
> currMetadata) {
>     if(currMetadata!=null) {
>         return currMetadata;
>     } else {
>         return _coordinator.getPartitionsForBatch();
>     }
> }
>
> The OpaquePartitionedTridentSpoutExecutor still uses Object for the
> metadata. Is this a bug in PartitionedTridentSpoutExecutor that is breaking
> transactional spouts? Any help will be appreciated.
>
> Thanks!
>