You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Martin Arrowsmith <ar...@gmail.com> on 2014/10/03 20:20:54 UTC

Kafka Spout running ExcalmationTopology

Hi Storm Experts!

I'm running Storm 0.9.2-incubating and kafka_2.10-0.8.1.1

I'm trying to get the ExclamationTopolgy to run with a Kafka Spout (using
the storm-kafka project included with the Github - not storm-kafka-starter)

Here is the relevant portion of my ExcalamationTopology:

    BrokerHosts hosts = new ZkHosts("localhost:2181");
    SpoutConfig spoutConfig = new SpoutConfig(hosts, "test", "/test",
"discovery");
    spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

    builder.setSpout("kafka", kafkaSpout);

    builder.setBolt("exclaim1", new
ExclamationBolt()).shuffleGrouping("kafka");

When I run the topology on Local mode for a 1 minute, it works for the
first 5 seconds, and then it breaks.

spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

So not sure why it says I'm using String on an Int, and why it works before
but now now.

In my kafka server.properties:

num.network.threads=2
num.io.threads=8
num.partitions=2

Should I be doing something like this?

builder.setSpout("kafka", kafkaSpout, 2);

Here is the output for when it's working, and then when it starts to break.
Any help would be much appreciated!!

As you can see - it starts to die on Thread 11

Best,

Martin

=================
...

13684 [Thread-13-kafka] INFO  backtype.storm.daemon.task - Emitting: kafka
default [why will you]
13684 [Thread-13-kafka] INFO  backtype.storm.daemon.task - Emitting: kafka
__ack_init [-2810730987442558521 -8063320413415537181 3]
13684 [Thread-11-exclaim1] INFO  backtype.storm.daemon.executor -
Processing received message source: kafka:3, stream: default, id:
{-2810730987442558521=-8063320413415537181}, [why will you]
13684 [Thread-11-exclaim1] INFO  backtype.storm.daemon.task - Emitting:
exclaim1 default [why will you!!!]
13685 [Thread-11-exclaim1] INFO  backtype.storm.daemon.task - Emitting:
exclaim1 __ack_ack [-2810730987442558521 -8063320413415537181]
13685 [Thread-17-__acker] INFO  backtype.storm.daemon.executor - Processing
received message source: kafka:3, stream: __ack_init, id: {},
[-2810730987442558521 -8063320413415537181 3]
13685 [Thread-17-__acker] INFO  backtype.storm.daemon.executor - Processing
received message source: exclaim1:2, stream: __ack_ack, id: {},
[-2810730987442558521 -8063320413415537181]
13686 [Thread-17-__acker] INFO  backtype.storm.daemon.task - Emitting
direct: 3; __acker __ack_ack [-2810730987442558521]
13687 [Thread-13-kafka] INFO  backtype.storm.daemon.executor - Processing
received message source: __acker:1, stream: __ack_ack, id: {},
[-2810730987442558521]
13687 [Thread-13-kafka] INFO  backtype.storm.daemon.executor - Acking
message storm.kafka.PartitionManager$KafkaMessageId@6fec0cbf
15686 [Thread-13-kafka] INFO  backtype.storm.daemon.task - Emitting: kafka
default [hit soon]
15687 [Thread-11-exclaim1] INFO  backtype.storm.daemon.executor -
Processing received message source: kafka:3, stream: default, id:
{2311641896827057642=-6472974563298617532}, [hit soon]
15687 [Thread-13-kafka] INFO  backtype.storm.daemon.task - Emitting: kafka
__ack_init [2311641896827057642 -6472974563298617532 3]
15687 [Thread-11-exclaim1] INFO  backtype.storm.daemon.task - Emitting:
exclaim1 default [hit soon!!!]
15687 [Thread-11-exclaim1] INFO  backtype.storm.daemon.task - Emitting:
exclaim1 __ack_ack [2311641896827057642 -6472974563298617532]
15688 [Thread-17-__acker] INFO  backtype.storm.daemon.executor - Processing
received message source: kafka:3, stream: __ack_init, id: {},
[2311641896827057642 -6472974563298617532 3]
15689 [Thread-17-__acker] INFO  backtype.storm.daemon.executor - Processing
received message source: exclaim1:2, stream: __ack_ack, id: {},
[2311641896827057642 -6472974563298617532]
15689 [Thread-17-__acker] INFO  backtype.storm.daemon.task - Emitting
direct: 3; __acker __ack_ack [2311641896827057642]
15691 [Thread-13-kafka] INFO  backtype.storm.daemon.executor - Processing
received message source: __acker:1, stream: __ack_ack, id: {},
[2311641896827057642]
15692 [Thread-13-kafka] INFO  backtype.storm.daemon.executor - Acking
message storm.kafka.PartitionManager$KafkaMessageId@5638c9df
17490 [Thread-11-exclaim1] INFO  backtype.storm.daemon.executor -
Processing received message source: __system:-1, stream: __tick, id: {},
[10]
17496 [Thread-11-exclaim1] ERROR backtype.storm.util - Async loop died!
java.lang.RuntimeException: java.lang.ClassCastException: java.lang.Integer
cannot be cast to java.lang.String
at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.daemon.executor$fn__3353$fn__3365$fn__3412.invoke(executor.clj:746)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.util$async_loop$fn__450.invoke(util.clj:431)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_45]
...

Re: Kafka Spout running ExcalmationTopology

Posted by Martin Arrowsmith <ar...@gmail.com>.
I've now switched to 0.9.3 for Storm - can someone please tell me how to
use the external Zookeeper in Local mode ? Would be very much appreciated
and grateful!

Thanks,

Martin

On Fri, Oct 3, 2014 at 3:55 PM, Martin Arrowsmith <
arrowsmith.martin@gmail.com> wrote:

> After some internet and forum searching, it looks like my Local mode isn't
> connecting to an external zookeeper. Does that sound familiar ? If so, how
> come I can still see some spouts ?
>
> If it's true, any idea how I can connect to an external zookeeper in Local
> mode ?
>
> Martin
>
> On Fri, Oct 3, 2014 at 3:01 PM, Martin Arrowsmith <
> arrowsmith.martin@gmail.com> wrote:
>
>> Sorry again, just wanted to provide more info.
>>
>> The other threads seem fine - it's always Thread 11 that errors. Here is
>> Thread 13 for instance:
>>
>> 7040 [Thread-13-kafka] INFO  backtype.storm.daemon.executor - Opened
>> spout kafka:(3)
>> 7044 [Thread-13-kafka] INFO  backtype.storm.daemon.executor - Activating
>> spout kafka:(3)
>> 7045 [Thread-13-kafka] INFO  storm.kafka.ZkCoordinator - Task [1/1]
>> Refreshing partition manager connections
>> 7047 [Thread-13-kafka-EventThread] INFO
>>  org.apache.curator.framework.state.ConnectionStateManager - State change:
>> CONNECTED
>> 7047 [ConnectionStateManager-0] WARN
>>  org.apache.curator.framework.state.ConnectionStateManager - There are no
>> ConnectionStateListeners registered.
>> 7052 [Thread-13-kafka] INFO  storm.kafka.DynamicBrokersReader - Read
>> partition info from zookeeper:
>> GlobalPartitionInformation{partitionMap={0=localhost:9092,
>> 1=localhost:9092}}
>> 7054 [Thread-13-kafka] INFO  storm.kafka.KafkaUtils - Task [1/1] assigned
>> [Partition{host=localhost:9092, partition=0},
>> Partition{host=localhost:9092, partition=1}]
>> 7054 [Thread-13-kafka] INFO  storm.kafka.ZkCoordinator - Task [1/1]
>> Deleted partition managers: []
>> 7054 [Thread-13-kafka] INFO  storm.kafka.ZkCoordinator - Task [1/1] New
>> partition managers: [Partition{host=localhost:9092, partition=1},
>> Partition{host=localhost:9092, partition=0}]
>>
>> On Fri, Oct 3, 2014 at 2:57 PM, Martin Arrowsmith <
>> arrowsmith.martin@gmail.com> wrote:
>>
>>> Hi again,
>>>
>>> Does my zkRoot need to equal the value that is actually in zookeeper ?
>>> (I'm also new to zookeeper0
>>>
>>> Here is Zookeeper:
>>>
>>> [zk: 127.0.0.1:2181(CONNECTED) 8] ls /brokers/topics/mytopic/partitions
>>> [1, 0]
>>>
>>> And here is what I pass into spoutConfig:
>>>
>>>   SpoutConfig spoutConfig = new SpoutConfig(hosts, "mytopic",
>>> "/brokers/mytopic", "1");
>>>
>>> Right before the crash, debugging statements say:
>>>
>>> 7188 [Thread-13-kafka] INFO  storm.kafka.PartitionManager - Read
>>> partition information from: /brokers/mytopic/1/partition_1  --> null
>>> 7332 [Thread-13-kafka] INFO  storm.kafka.PartitionManager - No partition
>>> information found, using configuration to determine offset
>>> 7332 [Thread-13-kafka] INFO  storm.kafka.PartitionManager - Starting
>>> Kafka localhost:1 from offset 60
>>> 7334 [Thread-13-kafka] INFO  storm.kafka.PartitionManager - Read
>>> partition information from: /brokers/mytopic/1/partition_0  --> null
>>> 7335 [Thread-13-kafka] INFO  storm.kafka.PartitionManager - No partition
>>> information found, using configuration to determine offset
>>> 7335 [Thread-13-kafka] INFO  storm.kafka.PartitionManager - Starting
>>> Kafka localhost:0 from offset 60
>>> 7335 [Thread-13-kafka] INFO  storm.kafka.ZkCoordinator - Task [1/1]
>>> Finished refreshing
>>> 17001 [Thread-11-exclaim1] INFO  backtype.storm.daemon.executor -
>>> Processing received message source: __system:-1, stream: __tick, id: {},
>>> [10]
>>> 17007 [Thread-11-exclaim1] ERROR backtype.storm.util - Async loop died!
>>> java.lang.RuntimeException: java.lang.ClassCastException:
>>> java.lang.Integer cannot be cast to java.lang.String
>>> ....
>>>
>>>
>>>
>>> On Fri, Oct 3, 2014 at 11:20 AM, Martin Arrowsmith <
>>> arrowsmith.martin@gmail.com> wrote:
>>>
>>>> Hi Storm Experts!
>>>>
>>>> I'm running Storm 0.9.2-incubating and kafka_2.10-0.8.1.1
>>>>
>>>> I'm trying to get the ExclamationTopolgy to run with a Kafka Spout
>>>> (using the storm-kafka project included with the Github - not
>>>> storm-kafka-starter)
>>>>
>>>> Here is the relevant portion of my ExcalamationTopology:
>>>>
>>>>     BrokerHosts hosts = new ZkHosts("localhost:2181");
>>>>     SpoutConfig spoutConfig = new SpoutConfig(hosts, "test", "/test",
>>>> "discovery");
>>>>     spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>>>>     KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
>>>>
>>>>     builder.setSpout("kafka", kafkaSpout);
>>>>
>>>>     builder.setBolt("exclaim1", new
>>>> ExclamationBolt()).shuffleGrouping("kafka");
>>>>
>>>> When I run the topology on Local mode for a 1 minute, it works for the
>>>> first 5 seconds, and then it breaks.
>>>>
>>>> spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>>>>
>>>> So not sure why it says I'm using String on an Int, and why it works
>>>> before but now now.
>>>>
>>>> In my kafka server.properties:
>>>>
>>>> num.network.threads=2
>>>> num.io.threads=8
>>>> num.partitions=2
>>>>
>>>> Should I be doing something like this?
>>>>
>>>> builder.setSpout("kafka", kafkaSpout, 2);
>>>>
>>>> Here is the output for when it's working, and then when it starts to
>>>> break. Any help would be much appreciated!!
>>>>
>>>> As you can see - it starts to die on Thread 11
>>>>
>>>> Best,
>>>>
>>>> Martin
>>>>
>>>> =================
>>>> ...
>>>>
>>>> 13684 [Thread-13-kafka] INFO  backtype.storm.daemon.task - Emitting:
>>>> kafka default [why will you]
>>>> 13684 [Thread-13-kafka] INFO  backtype.storm.daemon.task - Emitting:
>>>> kafka __ack_init [-2810730987442558521 -8063320413415537181 3]
>>>> 13684 [Thread-11-exclaim1] INFO  backtype.storm.daemon.executor -
>>>> Processing received message source: kafka:3, stream: default, id:
>>>> {-2810730987442558521=-8063320413415537181}, [why will you]
>>>> 13684 [Thread-11-exclaim1] INFO  backtype.storm.daemon.task - Emitting:
>>>> exclaim1 default [why will you!!!]
>>>> 13685 [Thread-11-exclaim1] INFO  backtype.storm.daemon.task - Emitting:
>>>> exclaim1 __ack_ack [-2810730987442558521 -8063320413415537181]
>>>> 13685 [Thread-17-__acker] INFO  backtype.storm.daemon.executor -
>>>> Processing received message source: kafka:3, stream: __ack_init, id: {},
>>>> [-2810730987442558521 -8063320413415537181 3]
>>>> 13685 [Thread-17-__acker] INFO  backtype.storm.daemon.executor -
>>>> Processing received message source: exclaim1:2, stream: __ack_ack, id: {},
>>>> [-2810730987442558521 -8063320413415537181]
>>>> 13686 [Thread-17-__acker] INFO  backtype.storm.daemon.task - Emitting
>>>> direct: 3; __acker __ack_ack [-2810730987442558521]
>>>> 13687 [Thread-13-kafka] INFO  backtype.storm.daemon.executor -
>>>> Processing received message source: __acker:1, stream: __ack_ack, id: {},
>>>> [-2810730987442558521]
>>>> 13687 [Thread-13-kafka] INFO  backtype.storm.daemon.executor - Acking
>>>> message storm.kafka.PartitionManager$KafkaMessageId@6fec0cbf
>>>> 15686 [Thread-13-kafka] INFO  backtype.storm.daemon.task - Emitting:
>>>> kafka default [hit soon]
>>>> 15687 [Thread-11-exclaim1] INFO  backtype.storm.daemon.executor -
>>>> Processing received message source: kafka:3, stream: default, id:
>>>> {2311641896827057642=-6472974563298617532}, [hit soon]
>>>> 15687 [Thread-13-kafka] INFO  backtype.storm.daemon.task - Emitting:
>>>> kafka __ack_init [2311641896827057642 -6472974563298617532 3]
>>>> 15687 [Thread-11-exclaim1] INFO  backtype.storm.daemon.task - Emitting:
>>>> exclaim1 default [hit soon!!!]
>>>> 15687 [Thread-11-exclaim1] INFO  backtype.storm.daemon.task - Emitting:
>>>> exclaim1 __ack_ack [2311641896827057642 -6472974563298617532]
>>>> 15688 [Thread-17-__acker] INFO  backtype.storm.daemon.executor -
>>>> Processing received message source: kafka:3, stream: __ack_init, id: {},
>>>> [2311641896827057642 -6472974563298617532 3]
>>>> 15689 [Thread-17-__acker] INFO  backtype.storm.daemon.executor -
>>>> Processing received message source: exclaim1:2, stream: __ack_ack, id: {},
>>>> [2311641896827057642 -6472974563298617532]
>>>> 15689 [Thread-17-__acker] INFO  backtype.storm.daemon.task - Emitting
>>>> direct: 3; __acker __ack_ack [2311641896827057642]
>>>> 15691 [Thread-13-kafka] INFO  backtype.storm.daemon.executor -
>>>> Processing received message source: __acker:1, stream: __ack_ack, id: {},
>>>> [2311641896827057642]
>>>> 15692 [Thread-13-kafka] INFO  backtype.storm.daemon.executor - Acking
>>>> message storm.kafka.PartitionManager$KafkaMessageId@5638c9df
>>>> 17490 [Thread-11-exclaim1] INFO  backtype.storm.daemon.executor -
>>>> Processing received message source: __system:-1, stream: __tick, id: {},
>>>> [10]
>>>> 17496 [Thread-11-exclaim1] ERROR backtype.storm.util - Async loop died!
>>>> java.lang.RuntimeException: java.lang.ClassCastException:
>>>> java.lang.Integer cannot be cast to java.lang.String
>>>> at
>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>>>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>>>> at
>>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>>>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>>>> at
>>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>>>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>>>> at
>>>> backtype.storm.daemon.executor$fn__3353$fn__3365$fn__3412.invoke(executor.clj:746)
>>>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>>>> at backtype.storm.util$async_loop$fn__450.invoke(util.clj:431)
>>>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>>>> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>>>> at java.lang.Thread.run(Thread.java:744) [na:1.7.0_45]
>>>> ...
>>>>
>>>
>>>
>>
>

Re: Kafka Spout running ExcalmationTopology

Posted by Martin Arrowsmith <ar...@gmail.com>.
After some internet and forum searching, it looks like my Local mode isn't
connecting to an external zookeeper. Does that sound familiar ? If so, how
come I can still see some spouts ?

If it's true, any idea how I can connect to an external zookeeper in Local
mode ?

Martin

On Fri, Oct 3, 2014 at 3:01 PM, Martin Arrowsmith <
arrowsmith.martin@gmail.com> wrote:

> Sorry again, just wanted to provide more info.
>
> The other threads seem fine - it's always Thread 11 that errors. Here is
> Thread 13 for instance:
>
> 7040 [Thread-13-kafka] INFO  backtype.storm.daemon.executor - Opened spout
> kafka:(3)
> 7044 [Thread-13-kafka] INFO  backtype.storm.daemon.executor - Activating
> spout kafka:(3)
> 7045 [Thread-13-kafka] INFO  storm.kafka.ZkCoordinator - Task [1/1]
> Refreshing partition manager connections
> 7047 [Thread-13-kafka-EventThread] INFO
>  org.apache.curator.framework.state.ConnectionStateManager - State change:
> CONNECTED
> 7047 [ConnectionStateManager-0] WARN
>  org.apache.curator.framework.state.ConnectionStateManager - There are no
> ConnectionStateListeners registered.
> 7052 [Thread-13-kafka] INFO  storm.kafka.DynamicBrokersReader - Read
> partition info from zookeeper:
> GlobalPartitionInformation{partitionMap={0=localhost:9092,
> 1=localhost:9092}}
> 7054 [Thread-13-kafka] INFO  storm.kafka.KafkaUtils - Task [1/1] assigned
> [Partition{host=localhost:9092, partition=0},
> Partition{host=localhost:9092, partition=1}]
> 7054 [Thread-13-kafka] INFO  storm.kafka.ZkCoordinator - Task [1/1]
> Deleted partition managers: []
> 7054 [Thread-13-kafka] INFO  storm.kafka.ZkCoordinator - Task [1/1] New
> partition managers: [Partition{host=localhost:9092, partition=1},
> Partition{host=localhost:9092, partition=0}]
>
> On Fri, Oct 3, 2014 at 2:57 PM, Martin Arrowsmith <
> arrowsmith.martin@gmail.com> wrote:
>
>> Hi again,
>>
>> Does my zkRoot need to equal the value that is actually in zookeeper ?
>> (I'm also new to zookeeper0
>>
>> Here is Zookeeper:
>>
>> [zk: 127.0.0.1:2181(CONNECTED) 8] ls /brokers/topics/mytopic/partitions
>> [1, 0]
>>
>> And here is what I pass into spoutConfig:
>>
>>   SpoutConfig spoutConfig = new SpoutConfig(hosts, "mytopic",
>> "/brokers/mytopic", "1");
>>
>> Right before the crash, debugging statements say:
>>
>> 7188 [Thread-13-kafka] INFO  storm.kafka.PartitionManager - Read
>> partition information from: /brokers/mytopic/1/partition_1  --> null
>> 7332 [Thread-13-kafka] INFO  storm.kafka.PartitionManager - No partition
>> information found, using configuration to determine offset
>> 7332 [Thread-13-kafka] INFO  storm.kafka.PartitionManager - Starting
>> Kafka localhost:1 from offset 60
>> 7334 [Thread-13-kafka] INFO  storm.kafka.PartitionManager - Read
>> partition information from: /brokers/mytopic/1/partition_0  --> null
>> 7335 [Thread-13-kafka] INFO  storm.kafka.PartitionManager - No partition
>> information found, using configuration to determine offset
>> 7335 [Thread-13-kafka] INFO  storm.kafka.PartitionManager - Starting
>> Kafka localhost:0 from offset 60
>> 7335 [Thread-13-kafka] INFO  storm.kafka.ZkCoordinator - Task [1/1]
>> Finished refreshing
>> 17001 [Thread-11-exclaim1] INFO  backtype.storm.daemon.executor -
>> Processing received message source: __system:-1, stream: __tick, id: {},
>> [10]
>> 17007 [Thread-11-exclaim1] ERROR backtype.storm.util - Async loop died!
>> java.lang.RuntimeException: java.lang.ClassCastException:
>> java.lang.Integer cannot be cast to java.lang.String
>> ....
>>
>>
>>
>> On Fri, Oct 3, 2014 at 11:20 AM, Martin Arrowsmith <
>> arrowsmith.martin@gmail.com> wrote:
>>
>>> Hi Storm Experts!
>>>
>>> I'm running Storm 0.9.2-incubating and kafka_2.10-0.8.1.1
>>>
>>> I'm trying to get the ExclamationTopolgy to run with a Kafka Spout
>>> (using the storm-kafka project included with the Github - not
>>> storm-kafka-starter)
>>>
>>> Here is the relevant portion of my ExcalamationTopology:
>>>
>>>     BrokerHosts hosts = new ZkHosts("localhost:2181");
>>>     SpoutConfig spoutConfig = new SpoutConfig(hosts, "test", "/test",
>>> "discovery");
>>>     spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>>>     KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
>>>
>>>     builder.setSpout("kafka", kafkaSpout);
>>>
>>>     builder.setBolt("exclaim1", new
>>> ExclamationBolt()).shuffleGrouping("kafka");
>>>
>>> When I run the topology on Local mode for a 1 minute, it works for the
>>> first 5 seconds, and then it breaks.
>>>
>>> spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>>>
>>> So not sure why it says I'm using String on an Int, and why it works
>>> before but now now.
>>>
>>> In my kafka server.properties:
>>>
>>> num.network.threads=2
>>> num.io.threads=8
>>> num.partitions=2
>>>
>>> Should I be doing something like this?
>>>
>>> builder.setSpout("kafka", kafkaSpout, 2);
>>>
>>> Here is the output for when it's working, and then when it starts to
>>> break. Any help would be much appreciated!!
>>>
>>> As you can see - it starts to die on Thread 11
>>>
>>> Best,
>>>
>>> Martin
>>>
>>> =================
>>> ...
>>>
>>> 13684 [Thread-13-kafka] INFO  backtype.storm.daemon.task - Emitting:
>>> kafka default [why will you]
>>> 13684 [Thread-13-kafka] INFO  backtype.storm.daemon.task - Emitting:
>>> kafka __ack_init [-2810730987442558521 -8063320413415537181 3]
>>> 13684 [Thread-11-exclaim1] INFO  backtype.storm.daemon.executor -
>>> Processing received message source: kafka:3, stream: default, id:
>>> {-2810730987442558521=-8063320413415537181}, [why will you]
>>> 13684 [Thread-11-exclaim1] INFO  backtype.storm.daemon.task - Emitting:
>>> exclaim1 default [why will you!!!]
>>> 13685 [Thread-11-exclaim1] INFO  backtype.storm.daemon.task - Emitting:
>>> exclaim1 __ack_ack [-2810730987442558521 -8063320413415537181]
>>> 13685 [Thread-17-__acker] INFO  backtype.storm.daemon.executor -
>>> Processing received message source: kafka:3, stream: __ack_init, id: {},
>>> [-2810730987442558521 -8063320413415537181 3]
>>> 13685 [Thread-17-__acker] INFO  backtype.storm.daemon.executor -
>>> Processing received message source: exclaim1:2, stream: __ack_ack, id: {},
>>> [-2810730987442558521 -8063320413415537181]
>>> 13686 [Thread-17-__acker] INFO  backtype.storm.daemon.task - Emitting
>>> direct: 3; __acker __ack_ack [-2810730987442558521]
>>> 13687 [Thread-13-kafka] INFO  backtype.storm.daemon.executor -
>>> Processing received message source: __acker:1, stream: __ack_ack, id: {},
>>> [-2810730987442558521]
>>> 13687 [Thread-13-kafka] INFO  backtype.storm.daemon.executor - Acking
>>> message storm.kafka.PartitionManager$KafkaMessageId@6fec0cbf
>>> 15686 [Thread-13-kafka] INFO  backtype.storm.daemon.task - Emitting:
>>> kafka default [hit soon]
>>> 15687 [Thread-11-exclaim1] INFO  backtype.storm.daemon.executor -
>>> Processing received message source: kafka:3, stream: default, id:
>>> {2311641896827057642=-6472974563298617532}, [hit soon]
>>> 15687 [Thread-13-kafka] INFO  backtype.storm.daemon.task - Emitting:
>>> kafka __ack_init [2311641896827057642 -6472974563298617532 3]
>>> 15687 [Thread-11-exclaim1] INFO  backtype.storm.daemon.task - Emitting:
>>> exclaim1 default [hit soon!!!]
>>> 15687 [Thread-11-exclaim1] INFO  backtype.storm.daemon.task - Emitting:
>>> exclaim1 __ack_ack [2311641896827057642 -6472974563298617532]
>>> 15688 [Thread-17-__acker] INFO  backtype.storm.daemon.executor -
>>> Processing received message source: kafka:3, stream: __ack_init, id: {},
>>> [2311641896827057642 -6472974563298617532 3]
>>> 15689 [Thread-17-__acker] INFO  backtype.storm.daemon.executor -
>>> Processing received message source: exclaim1:2, stream: __ack_ack, id: {},
>>> [2311641896827057642 -6472974563298617532]
>>> 15689 [Thread-17-__acker] INFO  backtype.storm.daemon.task - Emitting
>>> direct: 3; __acker __ack_ack [2311641896827057642]
>>> 15691 [Thread-13-kafka] INFO  backtype.storm.daemon.executor -
>>> Processing received message source: __acker:1, stream: __ack_ack, id: {},
>>> [2311641896827057642]
>>> 15692 [Thread-13-kafka] INFO  backtype.storm.daemon.executor - Acking
>>> message storm.kafka.PartitionManager$KafkaMessageId@5638c9df
>>> 17490 [Thread-11-exclaim1] INFO  backtype.storm.daemon.executor -
>>> Processing received message source: __system:-1, stream: __tick, id: {},
>>> [10]
>>> 17496 [Thread-11-exclaim1] ERROR backtype.storm.util - Async loop died!
>>> java.lang.RuntimeException: java.lang.ClassCastException:
>>> java.lang.Integer cannot be cast to java.lang.String
>>> at
>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>>> at
>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>>> at
>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>>> at
>>> backtype.storm.daemon.executor$fn__3353$fn__3365$fn__3412.invoke(executor.clj:746)
>>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>>> at backtype.storm.util$async_loop$fn__450.invoke(util.clj:431)
>>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>>> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>>> at java.lang.Thread.run(Thread.java:744) [na:1.7.0_45]
>>> ...
>>>
>>
>>
>

Re: Kafka Spout running ExcalmationTopology

Posted by Martin Arrowsmith <ar...@gmail.com>.
Sorry again, just wanted to provide more info.

The other threads seem fine - it's always Thread 11 that errors. Here is
Thread 13 for instance:

7040 [Thread-13-kafka] INFO  backtype.storm.daemon.executor - Opened spout
kafka:(3)
7044 [Thread-13-kafka] INFO  backtype.storm.daemon.executor - Activating
spout kafka:(3)
7045 [Thread-13-kafka] INFO  storm.kafka.ZkCoordinator - Task [1/1]
Refreshing partition manager connections
7047 [Thread-13-kafka-EventThread] INFO
 org.apache.curator.framework.state.ConnectionStateManager - State change:
CONNECTED
7047 [ConnectionStateManager-0] WARN
 org.apache.curator.framework.state.ConnectionStateManager - There are no
ConnectionStateListeners registered.
7052 [Thread-13-kafka] INFO  storm.kafka.DynamicBrokersReader - Read
partition info from zookeeper:
GlobalPartitionInformation{partitionMap={0=localhost:9092,
1=localhost:9092}}
7054 [Thread-13-kafka] INFO  storm.kafka.KafkaUtils - Task [1/1] assigned
[Partition{host=localhost:9092, partition=0},
Partition{host=localhost:9092, partition=1}]
7054 [Thread-13-kafka] INFO  storm.kafka.ZkCoordinator - Task [1/1] Deleted
partition managers: []
7054 [Thread-13-kafka] INFO  storm.kafka.ZkCoordinator - Task [1/1] New
partition managers: [Partition{host=localhost:9092, partition=1},
Partition{host=localhost:9092, partition=0}]

On Fri, Oct 3, 2014 at 2:57 PM, Martin Arrowsmith <
arrowsmith.martin@gmail.com> wrote:

> Hi again,
>
> Does my zkRoot need to equal the value that is actually in zookeeper ?
> (I'm also new to zookeeper0
>
> Here is Zookeeper:
>
> [zk: 127.0.0.1:2181(CONNECTED) 8] ls /brokers/topics/mytopic/partitions
> [1, 0]
>
> And here is what I pass into spoutConfig:
>
>   SpoutConfig spoutConfig = new SpoutConfig(hosts, "mytopic",
> "/brokers/mytopic", "1");
>
> Right before the crash, debugging statements say:
>
> 7188 [Thread-13-kafka] INFO  storm.kafka.PartitionManager - Read partition
> information from: /brokers/mytopic/1/partition_1  --> null
> 7332 [Thread-13-kafka] INFO  storm.kafka.PartitionManager - No partition
> information found, using configuration to determine offset
> 7332 [Thread-13-kafka] INFO  storm.kafka.PartitionManager - Starting Kafka
> localhost:1 from offset 60
> 7334 [Thread-13-kafka] INFO  storm.kafka.PartitionManager - Read partition
> information from: /brokers/mytopic/1/partition_0  --> null
> 7335 [Thread-13-kafka] INFO  storm.kafka.PartitionManager - No partition
> information found, using configuration to determine offset
> 7335 [Thread-13-kafka] INFO  storm.kafka.PartitionManager - Starting Kafka
> localhost:0 from offset 60
> 7335 [Thread-13-kafka] INFO  storm.kafka.ZkCoordinator - Task [1/1]
> Finished refreshing
> 17001 [Thread-11-exclaim1] INFO  backtype.storm.daemon.executor -
> Processing received message source: __system:-1, stream: __tick, id: {},
> [10]
> 17007 [Thread-11-exclaim1] ERROR backtype.storm.util - Async loop died!
> java.lang.RuntimeException: java.lang.ClassCastException:
> java.lang.Integer cannot be cast to java.lang.String
> ....
>
>
>
> On Fri, Oct 3, 2014 at 11:20 AM, Martin Arrowsmith <
> arrowsmith.martin@gmail.com> wrote:
>
>> Hi Storm Experts!
>>
>> I'm running Storm 0.9.2-incubating and kafka_2.10-0.8.1.1
>>
>> I'm trying to get the ExclamationTopolgy to run with a Kafka Spout (using
>> the storm-kafka project included with the Github - not storm-kafka-starter)
>>
>> Here is the relevant portion of my ExcalamationTopology:
>>
>>     BrokerHosts hosts = new ZkHosts("localhost:2181");
>>     SpoutConfig spoutConfig = new SpoutConfig(hosts, "test", "/test",
>> "discovery");
>>     spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>>     KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
>>
>>     builder.setSpout("kafka", kafkaSpout);
>>
>>     builder.setBolt("exclaim1", new
>> ExclamationBolt()).shuffleGrouping("kafka");
>>
>> When I run the topology on Local mode for a 1 minute, it works for the
>> first 5 seconds, and then it breaks.
>>
>> spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>>
>> So not sure why it says I'm using String on an Int, and why it works
>> before but now now.
>>
>> In my kafka server.properties:
>>
>> num.network.threads=2
>> num.io.threads=8
>> num.partitions=2
>>
>> Should I be doing something like this?
>>
>> builder.setSpout("kafka", kafkaSpout, 2);
>>
>> Here is the output for when it's working, and then when it starts to
>> break. Any help would be much appreciated!!
>>
>> As you can see - it starts to die on Thread 11
>>
>> Best,
>>
>> Martin
>>
>> =================
>> ...
>>
>> 13684 [Thread-13-kafka] INFO  backtype.storm.daemon.task - Emitting:
>> kafka default [why will you]
>> 13684 [Thread-13-kafka] INFO  backtype.storm.daemon.task - Emitting:
>> kafka __ack_init [-2810730987442558521 -8063320413415537181 3]
>> 13684 [Thread-11-exclaim1] INFO  backtype.storm.daemon.executor -
>> Processing received message source: kafka:3, stream: default, id:
>> {-2810730987442558521=-8063320413415537181}, [why will you]
>> 13684 [Thread-11-exclaim1] INFO  backtype.storm.daemon.task - Emitting:
>> exclaim1 default [why will you!!!]
>> 13685 [Thread-11-exclaim1] INFO  backtype.storm.daemon.task - Emitting:
>> exclaim1 __ack_ack [-2810730987442558521 -8063320413415537181]
>> 13685 [Thread-17-__acker] INFO  backtype.storm.daemon.executor -
>> Processing received message source: kafka:3, stream: __ack_init, id: {},
>> [-2810730987442558521 -8063320413415537181 3]
>> 13685 [Thread-17-__acker] INFO  backtype.storm.daemon.executor -
>> Processing received message source: exclaim1:2, stream: __ack_ack, id: {},
>> [-2810730987442558521 -8063320413415537181]
>> 13686 [Thread-17-__acker] INFO  backtype.storm.daemon.task - Emitting
>> direct: 3; __acker __ack_ack [-2810730987442558521]
>> 13687 [Thread-13-kafka] INFO  backtype.storm.daemon.executor - Processing
>> received message source: __acker:1, stream: __ack_ack, id: {},
>> [-2810730987442558521]
>> 13687 [Thread-13-kafka] INFO  backtype.storm.daemon.executor - Acking
>> message storm.kafka.PartitionManager$KafkaMessageId@6fec0cbf
>> 15686 [Thread-13-kafka] INFO  backtype.storm.daemon.task - Emitting:
>> kafka default [hit soon]
>> 15687 [Thread-11-exclaim1] INFO  backtype.storm.daemon.executor -
>> Processing received message source: kafka:3, stream: default, id:
>> {2311641896827057642=-6472974563298617532}, [hit soon]
>> 15687 [Thread-13-kafka] INFO  backtype.storm.daemon.task - Emitting:
>> kafka __ack_init [2311641896827057642 -6472974563298617532 3]
>> 15687 [Thread-11-exclaim1] INFO  backtype.storm.daemon.task - Emitting:
>> exclaim1 default [hit soon!!!]
>> 15687 [Thread-11-exclaim1] INFO  backtype.storm.daemon.task - Emitting:
>> exclaim1 __ack_ack [2311641896827057642 -6472974563298617532]
>> 15688 [Thread-17-__acker] INFO  backtype.storm.daemon.executor -
>> Processing received message source: kafka:3, stream: __ack_init, id: {},
>> [2311641896827057642 -6472974563298617532 3]
>> 15689 [Thread-17-__acker] INFO  backtype.storm.daemon.executor -
>> Processing received message source: exclaim1:2, stream: __ack_ack, id: {},
>> [2311641896827057642 -6472974563298617532]
>> 15689 [Thread-17-__acker] INFO  backtype.storm.daemon.task - Emitting
>> direct: 3; __acker __ack_ack [2311641896827057642]
>> 15691 [Thread-13-kafka] INFO  backtype.storm.daemon.executor - Processing
>> received message source: __acker:1, stream: __ack_ack, id: {},
>> [2311641896827057642]
>> 15692 [Thread-13-kafka] INFO  backtype.storm.daemon.executor - Acking
>> message storm.kafka.PartitionManager$KafkaMessageId@5638c9df
>> 17490 [Thread-11-exclaim1] INFO  backtype.storm.daemon.executor -
>> Processing received message source: __system:-1, stream: __tick, id: {},
>> [10]
>> 17496 [Thread-11-exclaim1] ERROR backtype.storm.util - Async loop died!
>> java.lang.RuntimeException: java.lang.ClassCastException:
>> java.lang.Integer cannot be cast to java.lang.String
>> at
>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>> at
>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>> at
>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>> at
>> backtype.storm.daemon.executor$fn__3353$fn__3365$fn__3412.invoke(executor.clj:746)
>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>> at backtype.storm.util$async_loop$fn__450.invoke(util.clj:431)
>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>> at java.lang.Thread.run(Thread.java:744) [na:1.7.0_45]
>> ...
>>
>
>

Re: Kafka Spout running ExcalmationTopology

Posted by Martin Arrowsmith <ar...@gmail.com>.
Hi again,

Does my zkRoot need to equal the value that is actually in zookeeper ? (I'm
also new to zookeeper0

Here is Zookeeper:

[zk: 127.0.0.1:2181(CONNECTED) 8] ls /brokers/topics/mytopic/partitions
[1, 0]

And here is what I pass into spoutConfig:

  SpoutConfig spoutConfig = new SpoutConfig(hosts, "mytopic",
"/brokers/mytopic", "1");

Right before the crash, debugging statements say:

7188 [Thread-13-kafka] INFO  storm.kafka.PartitionManager - Read partition
information from: /brokers/mytopic/1/partition_1  --> null
7332 [Thread-13-kafka] INFO  storm.kafka.PartitionManager - No partition
information found, using configuration to determine offset
7332 [Thread-13-kafka] INFO  storm.kafka.PartitionManager - Starting Kafka
localhost:1 from offset 60
7334 [Thread-13-kafka] INFO  storm.kafka.PartitionManager - Read partition
information from: /brokers/mytopic/1/partition_0  --> null
7335 [Thread-13-kafka] INFO  storm.kafka.PartitionManager - No partition
information found, using configuration to determine offset
7335 [Thread-13-kafka] INFO  storm.kafka.PartitionManager - Starting Kafka
localhost:0 from offset 60
7335 [Thread-13-kafka] INFO  storm.kafka.ZkCoordinator - Task [1/1]
Finished refreshing
17001 [Thread-11-exclaim1] INFO  backtype.storm.daemon.executor -
Processing received message source: __system:-1, stream: __tick, id: {},
[10]
17007 [Thread-11-exclaim1] ERROR backtype.storm.util - Async loop died!
java.lang.RuntimeException: java.lang.ClassCastException: java.lang.Integer
cannot be cast to java.lang.String
....



On Fri, Oct 3, 2014 at 11:20 AM, Martin Arrowsmith <
arrowsmith.martin@gmail.com> wrote:

> Hi Storm Experts!
>
> I'm running Storm 0.9.2-incubating and kafka_2.10-0.8.1.1
>
> I'm trying to get the ExclamationTopolgy to run with a Kafka Spout (using
> the storm-kafka project included with the Github - not storm-kafka-starter)
>
> Here is the relevant portion of my ExcalamationTopology:
>
>     BrokerHosts hosts = new ZkHosts("localhost:2181");
>     SpoutConfig spoutConfig = new SpoutConfig(hosts, "test", "/test",
> "discovery");
>     spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>     KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
>
>     builder.setSpout("kafka", kafkaSpout);
>
>     builder.setBolt("exclaim1", new
> ExclamationBolt()).shuffleGrouping("kafka");
>
> When I run the topology on Local mode for a 1 minute, it works for the
> first 5 seconds, and then it breaks.
>
> spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>
> So not sure why it says I'm using String on an Int, and why it works
> before but now now.
>
> In my kafka server.properties:
>
> num.network.threads=2
> num.io.threads=8
> num.partitions=2
>
> Should I be doing something like this?
>
> builder.setSpout("kafka", kafkaSpout, 2);
>
> Here is the output for when it's working, and then when it starts to
> break. Any help would be much appreciated!!
>
> As you can see - it starts to die on Thread 11
>
> Best,
>
> Martin
>
> =================
> ...
>
> 13684 [Thread-13-kafka] INFO  backtype.storm.daemon.task - Emitting: kafka
> default [why will you]
> 13684 [Thread-13-kafka] INFO  backtype.storm.daemon.task - Emitting: kafka
> __ack_init [-2810730987442558521 -8063320413415537181 3]
> 13684 [Thread-11-exclaim1] INFO  backtype.storm.daemon.executor -
> Processing received message source: kafka:3, stream: default, id:
> {-2810730987442558521=-8063320413415537181}, [why will you]
> 13684 [Thread-11-exclaim1] INFO  backtype.storm.daemon.task - Emitting:
> exclaim1 default [why will you!!!]
> 13685 [Thread-11-exclaim1] INFO  backtype.storm.daemon.task - Emitting:
> exclaim1 __ack_ack [-2810730987442558521 -8063320413415537181]
> 13685 [Thread-17-__acker] INFO  backtype.storm.daemon.executor -
> Processing received message source: kafka:3, stream: __ack_init, id: {},
> [-2810730987442558521 -8063320413415537181 3]
> 13685 [Thread-17-__acker] INFO  backtype.storm.daemon.executor -
> Processing received message source: exclaim1:2, stream: __ack_ack, id: {},
> [-2810730987442558521 -8063320413415537181]
> 13686 [Thread-17-__acker] INFO  backtype.storm.daemon.task - Emitting
> direct: 3; __acker __ack_ack [-2810730987442558521]
> 13687 [Thread-13-kafka] INFO  backtype.storm.daemon.executor - Processing
> received message source: __acker:1, stream: __ack_ack, id: {},
> [-2810730987442558521]
> 13687 [Thread-13-kafka] INFO  backtype.storm.daemon.executor - Acking
> message storm.kafka.PartitionManager$KafkaMessageId@6fec0cbf
> 15686 [Thread-13-kafka] INFO  backtype.storm.daemon.task - Emitting: kafka
> default [hit soon]
> 15687 [Thread-11-exclaim1] INFO  backtype.storm.daemon.executor -
> Processing received message source: kafka:3, stream: default, id:
> {2311641896827057642=-6472974563298617532}, [hit soon]
> 15687 [Thread-13-kafka] INFO  backtype.storm.daemon.task - Emitting: kafka
> __ack_init [2311641896827057642 -6472974563298617532 3]
> 15687 [Thread-11-exclaim1] INFO  backtype.storm.daemon.task - Emitting:
> exclaim1 default [hit soon!!!]
> 15687 [Thread-11-exclaim1] INFO  backtype.storm.daemon.task - Emitting:
> exclaim1 __ack_ack [2311641896827057642 -6472974563298617532]
> 15688 [Thread-17-__acker] INFO  backtype.storm.daemon.executor -
> Processing received message source: kafka:3, stream: __ack_init, id: {},
> [2311641896827057642 -6472974563298617532 3]
> 15689 [Thread-17-__acker] INFO  backtype.storm.daemon.executor -
> Processing received message source: exclaim1:2, stream: __ack_ack, id: {},
> [2311641896827057642 -6472974563298617532]
> 15689 [Thread-17-__acker] INFO  backtype.storm.daemon.task - Emitting
> direct: 3; __acker __ack_ack [2311641896827057642]
> 15691 [Thread-13-kafka] INFO  backtype.storm.daemon.executor - Processing
> received message source: __acker:1, stream: __ack_ack, id: {},
> [2311641896827057642]
> 15692 [Thread-13-kafka] INFO  backtype.storm.daemon.executor - Acking
> message storm.kafka.PartitionManager$KafkaMessageId@5638c9df
> 17490 [Thread-11-exclaim1] INFO  backtype.storm.daemon.executor -
> Processing received message source: __system:-1, stream: __tick, id: {},
> [10]
> 17496 [Thread-11-exclaim1] ERROR backtype.storm.util - Async loop died!
> java.lang.RuntimeException: java.lang.ClassCastException:
> java.lang.Integer cannot be cast to java.lang.String
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> at
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> at
> backtype.storm.daemon.executor$fn__3353$fn__3365$fn__3412.invoke(executor.clj:746)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> at backtype.storm.util$async_loop$fn__450.invoke(util.clj:431)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
> at java.lang.Thread.run(Thread.java:744) [na:1.7.0_45]
> ...
>