You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Miloš Solujić <mi...@gmail.com> on 2014/07/08 17:36:32 UTC

Kafka trident getting stuck

Hi all,

I'm pretty new to storm and kafka/zookeeper, and I hope that my question is
not to dumb. Here it goes:

I'm using latest stable storm and storm-kafka = 0.9.2-incubating

I've setup test cluster using wirbelsturm tool with unchanged yaml (just
uncommented kafka machine)

here is config snippet for my trident topology:

        BrokerHosts zk = new ZkHosts("zookeeper1");
        TridentKafkaConfig kafkaConf = new TridentKafkaConfig(zk,
"scores");

        kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
        kafkaConf.fetchSizeBytes = 10000;
        kafkaConf.forceFromStart = true;

        Config stormConfig = new Config();
        stormConfig.put(Config.NIMBUS_HOST, NIMBUS_IP);
        stormConfig.put("couchbase.ip", COUCHBASE_CONSOLE);
        stormConfig.put("couchbase.bucket", COUCHBASE_BUCKET);
        stormConfig.put("couchbase.password", COUCHBASE_PASSWORD);
        // performance settings
        stormConfig.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS,
100);
        stormConfig.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, 100);
        stormConfig.setMaxSpoutPending(100000);


        if (args != null && args.length > 0) {

            StormSubmitter.submitTopologyWithProgressBar(args[0],
stormConfig,
                    BuildTridentScoreTopology.build(kafkaConf));
        } else {...}

Now, I've created 'scores' topic in kafka and pushed few test messages
prior to starting topology, with kafkaConf.forceFromStart = true. And
topology processed those messages just fine, and stored them in
tridentState (couhbase)

All new messages are simply ignored!

After redeploying topology (both with forceFromStart = true and
forceFromStart = false) no more messages are ingested from kafka.

here is worker log for one topology deployment and short run
http://pastie.org/private/4xsk6pijvmulwrcg7zgca

those are VMs that host this storm cluster
10.0.0.241 zookeeper1
10.0.0.101 supervisor1
10.0.0.21 kafka1
10.0.0.251 nimbus1

Thanks,
Milos

Re: Kafka trident getting stuck

Posted by Miloš Solujić <mi...@gmail.com>.
Yes
On 10 Jul 2014 14:03, "Danijel Schiavuzzi" <da...@schiavuzzi.com> wrote:

> Did you kill your topology before clearing the Zookeeper data?
>
> On Jul 10, 2014 1:24 PM, "Miloš Solujić" <mi...@gmail.com> wrote:
> >
> > Thanks Danijel for taking interest in my problem.
> >
> > Exactly same feeling I've got (that zookeeper data is corrupted) So I
> purged info about it via zkCli.sh
> >
> > Now I've got some lower level issues:
> >
> > 2014-07-10 11:00:13 b.s.d.worker [INFO] Worker
> 04a17a6b-5aea-47ce-808b-218c4bcc1d51 for storm
> tridentOpaqueTest-topology-3-1404989752 on
> e4cdd619-c7e4-40f0-941e-352ac41daf6e:6701 has finished loading
> > 2014-07-10 11:00:14 s.k.DynamicBrokersReader [INFO] Read partition info
> from zookeeper: GlobalPartitionInformation{partitionMap={0=localhost:9092}}
> > 2014-07-10 11:00:14 o.a.c.f.i.CuratorFrameworkImpl [INFO] Starting
> > 2014-07-10 11:00:14 o.a.z.ZooKeeper [INFO] Initiating client connection,
> connectString=zookeeper1:2181 sessionTimeout=20000
> watcher=org.apache.curator.ConnectionState@203c6f3e
> > 2014-07-10 11:00:14 o.a.z.ClientCnxn [INFO] Opening socket connection to
> server zookeeper1/10.0.0.241:2181. Will not attempt to authenticate using
> SASL (unknown error)
> > 2014-07-10 11:00:14 o.a.z.ClientCnxn [INFO] Socket connection
> established to zookeeper1/10.0.0.241:2181, initiating session
> > 2014-07-10 11:00:14 o.a.z.ClientCnxn [INFO] Session establishment
> complete on server zookeeper1/10.0.0.241:2181, sessionid =
> 0x1471fbc929c00f4, negotiated timeout = 20000
> > 2014-07-10 11:00:14 o.a.c.f.s.ConnectionStateManager [INFO] State
> change: CONNECTED
> > 2014-07-10 11:00:14 o.a.c.f.s.ConnectionStateManager [WARN] There are no
> ConnectionStateListeners registered.
> > 2014-07-10 11:00:15 o.a.z.ZooKeeper [INFO] Session: 0x1471fbc929c00f4
> closed
> > 2014-07-10 11:00:15 o.a.c.f.i.CuratorFrameworkImpl [INFO] Starting
> > 2014-07-10 11:00:15 o.a.z.ClientCnxn [INFO] EventThread shut down
> > 2014-07-10 11:00:15 o.a.z.ZooKeeper [INFO] Initiating client connection,
> connectString=zookeeper1:2181/transactional/tridentTestOpaqueSpout/user
> sessionTimeout=20000 watcher=org.apache.curator.ConnectionState@4fa
> > 983e
> > 2014-07-10 11:00:15 o.a.z.ClientCnxn [INFO] Opening socket connection to
> server zookeeper1/10.0.0.241:2181. Will not attempt to authenticate using
> SASL (unknown error)
> > 2014-07-10 11:00:15 o.a.z.ClientCnxn [INFO] Socket connection
> established to zookeeper1/10.0.0.241:2181, initiating session
> > 2014-07-10 11:00:15 b.s.d.executor [INFO] Prepared bolt spout0:(5)
> > 2014-07-10 11:00:15 o.a.z.ClientCnxn [INFO] Session establishment
> complete on server zookeeper1/10.0.0.241:2181, sessionid =
> 0x1471fbc929c00f5, negotiated timeout = 20000
> > 2014-07-10 11:00:15 o.a.c.f.s.ConnectionStateManager [INFO] State
> change: CONNECTED
> > 2014-07-10 11:00:15 o.a.c.f.s.ConnectionStateManager [WARN] There are no
> ConnectionStateListeners registered.
> > 2014-07-10 11:00:16 b.s.util [ERROR] Async loop died!
> > java.lang.RuntimeException: java.net.ConnectException: Connection refused
> >         at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> >         at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> >         at
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> >         at
> backtype.storm.daemon.executor$fn__5641$fn__5653$fn__5700.invoke(executor.clj:746)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> >         at backtype.storm.util$async_loop$fn__457.invoke(util.clj:431)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> >         at clojure.lang.AFn.run(AFn.java:24) ~[clojure-1.5.1.jar:na]
> >         at java.lang.Thread.run(Thread.java:701) ~[na:1.6.0_30]
> > Caused by: java.net.ConnectException: Connection refused
> >         at sun.nio.ch.Net.connect(Native Method) ~[na:1.6.0_30]
> >         at
> sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:534)
> ~[na:1.6.0_30]
> >         at
> kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> ~[stormjar.jar:na]
> >         at
> kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> ~[stormjar.jar:na]
> >         at
> kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:142)
> ~[stormjar.jar:na]
> >         at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
> ~[stormjar.jar:na]
> >         at
> kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:124)
> ~[stormjar.jar:na]
> >         at
> kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79)
> ~[stormjar.jar:na]
> >         at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:77)
> ~[stormjar.jar:na]
> >         at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:67)
> ~[stormjar.jar:na]
> >         at
> storm.kafka.trident.TridentKafkaEmitter.doEmitNewPartitionBatch(TridentKafkaEmitter.java:111)
> ~[stormjar.jar:na]
> >         at
> storm.kafka.trident.TridentKafkaEmitter.failFastEmitNewPartitionBatch(TridentKafkaEmitter.java:72)
> ~[stormjar.jar:na]
> >         at
> storm.kafka.trident.TridentKafkaEmitter.emitNewPartitionBatch(TridentKafkaEmitter.java:79)
> ~[stormjar.jar:na]
> >         at
> storm.kafka.trident.TridentKafkaEmitter.access$000(TridentKafkaEmitter.java:46)
> ~[stormjar.jar:na]
> >         at
> storm.kafka.trident.TridentKafkaEmitter$1.emitPartitionBatch(TridentKafkaEmitter.java:204)
> ~[stormjar.jar:na]
> >         at
> storm.kafka.trident.TridentKafkaEmitter$1.emitPartitionBatch(TridentKafkaEmitter.java:194)
> ~[stormjar.jar:na]
> >         at
> storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:127)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> >         at
> storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> >         at
> storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:369)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> >         at
> backtype.storm.daemon.executor$fn__5641$tuple_action_fn__5643.invoke(executor.clj:631)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> >         at
> backtype.storm.daemon.executor$mk_task_receiver$fn__5564.invoke(executor.clj:399)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> >         at
> backtype.storm.disruptor$clojure_handler$reify__745.onEvent(disruptor.clj:58)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> >         at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:120)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> >         ... 6 common frames omitted
> > 2014-07-10 11:00:16 b.s.d.executor [ERROR]
> >
> > I've specified in simple test topology that I want only one worker, so
> what happens after this error, supervisor will boot up another woorker,
> which fails with same error after ~ minute of running.
> >
> > What is this about and how to fix this problem? (setup is the same,
> default wirbelsturm cluster running on ubuntu 64b machine)
> >
> > btw, here is new test topology, using opaque kafka spout (same thing
> happens with transactional too)
> >
> >
> > public class TridentKafkaOpaqueDeployer {
> >
> >
> >     public static class PrinterBolt extends BaseFunction {
> >         private static final long serialVersionUID =
> -5585127152942983256L;
> >
> >         @Override
> >         public void execute(TridentTuple tuple, TridentCollector
> tridentCollector) {
> >             System.out.println(tuple.toString());
> >         }
> >     }
> >
> >     public static void main(String[] args) throws Exception {
> >
> >         BrokerHosts brokerHosts = new ZkHosts("zookeeper1");
> >         TridentKafkaConfig kafkaConfig = new
> TridentKafkaConfig(brokerHosts, "tridentOpaqueTest");
> >
> >         kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
> > //        kafkaConfig.forceFromStart = true;
> >
> >
> >         TridentTopology topology = new TridentTopology();
> >
> >         topology
> >             .newStream("tridentTestOpaqueSpout", new
> OpaqueTridentKafkaSpout(kafkaConfig))
> >                     .name("tridentTestOpaqueSpout")
> >
> >             .each(new Fields("str"), new PrinterBolt(),
> >                     new Fields("print"));
> >
> >         Config config = new Config();
> >         config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS,
> 2000);
> >         config.setNumWorkers(1);
> >         config.setMaxSpoutPending(3);
> >
> >         StormSubmitter.submitTopologyWithProgressBar(args[0], config,
> topology.build());
> >     }
> > }
> >
> >
> > // note
> >
> > I also resolved issue with reusing spout name across topologies, as per
> recommendation that Nathan gave some time ago
> > https://groups.google.com/forum/#!topic/storm-user/Tn43K1eGcKY
> >
> >
> >
> >
> > On Wed, Jul 9, 2014 at 9:49 PM, Danijel Schiavuzzi <
> danijel@schiavuzzi.com> wrote:
> >>
> >> Very strange. Could you try deleting Trident's data in Zookeeper:
> >>
> >> $ sh zkCli.sh
> >> rmr /transactional
> >>
> >> and then resubmitting the topology and repeating your test scenario?
> >>
> >> Maybe the the spout's data in Zookeeper got somehow corrupted because
> you are setting forceFromStart in the spout, and resubmitting the topology
> multiple times. I think the transactional topology may be left in an
> undefined state that case.
> >>
> >> You could also enable the LoggingMetricsConsumer in storm.yaml, and
> then check the Kafka spout's kafka.latestOffset metric in metrics.log, and
> compare this offset with the one Kafka's own utility script outputs
> (search under kafka/bin/ for the script).
> >>
> >> On Wednesday, July 9, 2014, Miloš Solujić <mi...@gmail.com>
> wrote:
> >>>
> >>> Yep, I did double checked.
> >>>
> >>> Here is how it's done:
> >>>
> >>> #create topic
> >>> /opt/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper1:2181
> --replication-factor 1 --partition 1 --topic scores
> >>>
> >>> #check what is created
> >>> /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper1:2181 --describe
> --topic scores
> >>>
> >>> #produce few messages
> >>> /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092
> --topic scores
> >>>
> >>> #consumer
> >>> /opt/kafka/bin/kafka-console-consumer.sh --zookeeper zookeeper1:2181
> --topic scores --from-beginning
> >>>
> >>>
> >>>
> >>>
> >>> > try enabling Config.setDebug(true) and monitoring the Kafka spout's
> activity in the logs.
> >>> did that, only tick touples are shipped around, nothing else
> >>>
> >>> > Also, you should paste all your worker logs (worker-*.log files).
> >>> Forgot to mention, only one worker is set, exactly for reason to
> simplify things.
> >>>
> >>>
> >>>
> >>> Here is simplified version of this topology (no trident state, only
> simple printer bolt)
> >>>
> >>>
> >>> public class TridentKafkaDeployer {
> >>>
> >>>     public static class PrinterBolt extends BaseFunction {
> >>>         private static final long serialVersionUID =
> -5585127152942983256L;
> >>>
> >>>         @Override
> >>>         public void execute(TridentTuple tuple, TridentCollector
> tridentCollector) {
> >>>             System.out.println(tuple.toString());
> >>>         }
> >>>     }
> >>>
> >>>     public static void main(String[] args) throws Exception {
> >>>
> >>>         BrokerHosts zk = new ZkHosts("zookeeper1");
> >>>         TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zk,
> "scores");
> >>>
> >>>         kafkaConfig.scheme = new SchemeAsMultiScheme(new
> StringScheme());
> >>>         kafkaConfig.forceFromStart = true;
> >>>
> >>>         TridentTopology topology = new TridentTopology();
> >>>
> >>>         topology
> >>>             .newStream("raw-scores", new
> TransactionalTridentKafkaSpout(kafkaConfig))
> >>>                     .name("kafkaSpout")
> >>>             .each(new Fields("str"), new PrinterBolt(),
> >>>                     new Fields("print"));
> >>>
> >>>
> >>>         Config config = new Config();
> >>>         config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS,
> 2000);
> >>>         config.setNumWorkers(1);
> >>>         config.setMaxSpoutPending(3);
> >>>
> >>>         StormSubmitter.submitTopologyWithProgressBar(args[0], config,
> topology.build());
> >>>     }
> >>> }
> >>>
> >>> Exactly same behaviour (it goes to exactly same kafka topic) = no
> picking up fresh messages in kafka topic.
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> On Tue, Jul 8, 2014 at 7:08 PM, Danijel Schiavuzzi <
> danijel@schiavuzzi.com> wrote:
> >>>>
> >>>> Also, you should paste all your worker logs (worker-*.log files).
> >>>>
> >>>>
> >>>> On Tuesday, July 8, 2014, Danijel Schiavuzzi <da...@schiavuzzi.com>
> wrote:
> >>>>>
> >>>>> I'd double check the Kafka producer to make sure those messages are
> really getting into the right Kafka topic. Also,
> try enabling Config.setDebug(true) and monitoring the Kafka spout's
> activity in the logs. setMaxSpoutPending should always be set, as by
> default it is unset, so you risk internal queue explosion.
> >>>>>
> >>>>> On Tuesday, July 8, 2014, Miloš Solujić <mi...@gmail.com>
> wrote:
> >>>>>>
> >>>>>> Yep. pretty much sure. Via internal kafka-producer.sh
> >>>>>> same method is used to produce initial messages (before first
> launch of topology, that got consumed and processed just fine)
> >>>>>>
> >>>>>> as for maxSpoutPending first I tried with 10, than removed it (left
> default value)
> >>>>>>
> >>>>>>
> >>>>>> On Tue, Jul 8, 2014 at 6:31 PM, Danijel Schiavuzzi <
> danijel@schiavuzzi.com> wrote:
> >>>>>>>
> >>>>>>> Are you sure you are producing new messages into the same Kafka
> topic? What number did you set maxSpoutPending to?
> >>>>>>>
> >>>>>>> On Tuesday, July 8, 2014, Miloš Solujić <mi...@gmail.com>
> wrote:
> >>>>>>>>
> >>>>>>>> Thanks Danijel for your quick proposition.
> >>>>>>>>
> >>>>>>>> I tried lowering down and removing all performance settings
> (those were left from load testing on one machine)
> >>>>>>>>
> >>>>>>>> Still same result: no matter what, new messages are not taken
> from kafka after topology is redeployed.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Tue, Jul 8, 2014 at 6:15 PM, Danijel Schiavuzzi <
> danijel@schiavuzzi.com> wrote:
> >>>>>>>>>
> >>>>>>>>> Try lowering setMaxSpoutPending(100000) to a much lower value
> (like 10). In Trident, setMaxSpoutPending referns to the number of batches,
> not tuples like in plain Storm. Too high values may cause blockages like
> the one you describe.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Tuesday, July 8, 2014, Miloš Solujić <mi...@gmail.com>
> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Hi all,
> >>>>>>>>>>
> >>>>>>>>>> I'm pretty new to storm and kafka/zookeeper, and I hope that my
> question is not to dumb. Here it goes:
> >>>>>>>>>>
> >>>>>>>>>> I'm using latest stable storm and storm-kafka =
> 0.9.2-incubating
> >>>>>>>>>>
> >>>>>>>>>> I've setup test cluster using wirbelsturm tool with unchanged
> yaml (just uncommented kafka machine)
> >>>>>>>>>>
> >>>>>>>>>> here is config snippet for my trident topology:
> >>>>>>>>>>
> >>>>>>>>>>         BrokerHosts zk = new ZkHosts("zookeeper1");
> >>>>>>>>>>         TridentKafkaConfig kafkaConf = new
> TridentKafkaConfig(zk, "scores");
> >>>>>>>>>>
> >>>>>>>>>>         kafkaConf.scheme = new SchemeAsMultiScheme(new
> StringScheme());
> >>>>>>>>>>         kafkaConf.fetchSizeBytes = 10000;
> >>>>>>>>>>         kafkaConf.forceFromStart = true;
> >>>>>>>>>>
> >>>>>>>>>>         Config stormConfig = new Config();
> >>>>>>>>>>         stormConfig.put(Config.NIMBUS_HOST, NIMBUS_IP);
> >>>>>>>>>>         stormConfig.put("couchbase.ip", COUCHBASE_CONSOLE);
> >>>>>>>>>>         stormConfig.put("couchbase.bucket", COUCHBASE_BUCKET);
> >>>>>>>>>>         stormConfig.put("couchbase.password",
> COUCHBASE_PASSWORD);
> >>>>>>>>>>         // performance settings
> >>>>>>>>>>
> stormConfig.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 100);
> >>>>>>>>>>
> stormConfig.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, 100);
> >>>>>>>>>>         stormConfig.setMaxSpoutPending(100000);
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>         if (args != null && args.length > 0) {
> >>>>>>>>>>
> >>>>>>>>>>
> StormSubmitter.submitTopologyWithProgressBar(args[0], stormConfig,
> >>>>>>>>>>                     BuildTridentScoreTopology.build(kafkaConf));
> >>>>>>>>>>         } else {...}
> >>>>>>>>>>
> >>>>>>>>>> Now, I've created 'scores' topic in kafka and pushed few test
> messages prior to starting topology, with kafkaConf.forceFromStart = true.
> And topology processed those messages just fine, and stored them in
> tridentState (couhbase)
> >>>>>>>>>>
> >>>>>>>>>> All new messages are simply ignored!
> >>>>>>>>>>
> >>>>>>>>>> After redeploying topology (both with forceFromStart = true and
> forceFromStart = false) no more messages are ingested from kafka.
> >>>>>>>>>>
> >>>>>>>>>> here is worker log for one topology deployment and short run
> http://pastie.org/private/4xsk6pijvmulwrcg7zgca
> >>>>>>>>>>
> >>>>>>>>>> those are VMs that host this storm cluster
> >>>>>>>>>> 10.0.0.241 zookeeper1
> >>>>>>>>>> 10.0.0.101 supervisor1
> >>>>>>>>>> 10.0.0.21 kafka1
> >>>>>>>>>> 10.0.0.251 nimbus1
> >>>>>>>>>>
> >>>>>>>>>> Thanks,
> >>>>>>>>>> Milos
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> Danijel Schiavuzzi
> >>>>>>>>>
> >>>>>>>>> E: danijel@schiavuzzi.com
> >>>>>>>>> W: www.schiavuzzi.com
> >>>>>>>>> T: +385989035562
> >>>>>>>>> Skype: danijels7
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> Danijel Schiavuzzi
> >>>>>>>
> >>>>>>> E: danijel@schiavuzzi.com
> >>>>>>> W: www.schiavuzzi.com
> >>>>>>> T: +385989035562
> >>>>>>> Skype: danijels7
> >>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> Danijel Schiavuzzi
> >>>>>
> >>>>> E: danijel@schiavuzzi.com
> >>>>> W: www.schiavuzzi.com
> >>>>> T: +385989035562
> >>>>> Skype: danijels7
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> Danijel Schiavuzzi
> >>>>
> >>>> E: danijel@schiavuzzi.com
> >>>> W: www.schiavuzzi.com
> >>>> T: +385989035562
> >>>> Skype: danijels7
> >>>
> >>>
> >>
> >>
> >> --
> >> Danijel Schiavuzzi
> >>
> >> E: danijel@schiavuzzi.com
> >> W: www.schiavuzzi.com
> >> T: +385989035562
> >> Skype: danijels7
> >
> >
>

Re: Kafka trident getting stuck

Posted by Danijel Schiavuzzi <da...@schiavuzzi.com>.
Did you kill your topology before clearing the Zookeeper data?

On Jul 10, 2014 1:24 PM, "Miloš Solujić" <mi...@gmail.com> wrote:
>
> Thanks Danijel for taking interest in my problem.
>
> Exactly same feeling I've got (that zookeeper data is corrupted) So I
purged info about it via zkCli.sh
>
> Now I've got some lower level issues:
>
> 2014-07-10 11:00:13 b.s.d.worker [INFO] Worker
04a17a6b-5aea-47ce-808b-218c4bcc1d51 for storm
tridentOpaqueTest-topology-3-1404989752 on
e4cdd619-c7e4-40f0-941e-352ac41daf6e:6701 has finished loading
> 2014-07-10 11:00:14 s.k.DynamicBrokersReader [INFO] Read partition info
from zookeeper: GlobalPartitionInformation{partitionMap={0=localhost:9092}}
> 2014-07-10 11:00:14 o.a.c.f.i.CuratorFrameworkImpl [INFO] Starting
> 2014-07-10 11:00:14 o.a.z.ZooKeeper [INFO] Initiating client connection,
connectString=zookeeper1:2181 sessionTimeout=20000
watcher=org.apache.curator.ConnectionState@203c6f3e
> 2014-07-10 11:00:14 o.a.z.ClientCnxn [INFO] Opening socket connection to
server zookeeper1/10.0.0.241:2181. Will not attempt to authenticate using
SASL (unknown error)
> 2014-07-10 11:00:14 o.a.z.ClientCnxn [INFO] Socket connection established
to zookeeper1/10.0.0.241:2181, initiating session
> 2014-07-10 11:00:14 o.a.z.ClientCnxn [INFO] Session establishment
complete on server zookeeper1/10.0.0.241:2181, sessionid =
0x1471fbc929c00f4, negotiated timeout = 20000
> 2014-07-10 11:00:14 o.a.c.f.s.ConnectionStateManager [INFO] State change:
CONNECTED
> 2014-07-10 11:00:14 o.a.c.f.s.ConnectionStateManager [WARN] There are no
ConnectionStateListeners registered.
> 2014-07-10 11:00:15 o.a.z.ZooKeeper [INFO] Session: 0x1471fbc929c00f4
closed
> 2014-07-10 11:00:15 o.a.c.f.i.CuratorFrameworkImpl [INFO] Starting
> 2014-07-10 11:00:15 o.a.z.ClientCnxn [INFO] EventThread shut down
> 2014-07-10 11:00:15 o.a.z.ZooKeeper [INFO] Initiating client connection,
connectString=zookeeper1:2181/transactional/tridentTestOpaqueSpout/user
sessionTimeout=20000 watcher=org.apache.curator.ConnectionState@4fa
> 983e
> 2014-07-10 11:00:15 o.a.z.ClientCnxn [INFO] Opening socket connection to
server zookeeper1/10.0.0.241:2181. Will not attempt to authenticate using
SASL (unknown error)
> 2014-07-10 11:00:15 o.a.z.ClientCnxn [INFO] Socket connection established
to zookeeper1/10.0.0.241:2181, initiating session
> 2014-07-10 11:00:15 b.s.d.executor [INFO] Prepared bolt spout0:(5)
> 2014-07-10 11:00:15 o.a.z.ClientCnxn [INFO] Session establishment
complete on server zookeeper1/10.0.0.241:2181, sessionid =
0x1471fbc929c00f5, negotiated timeout = 20000
> 2014-07-10 11:00:15 o.a.c.f.s.ConnectionStateManager [INFO] State change:
CONNECTED
> 2014-07-10 11:00:15 o.a.c.f.s.ConnectionStateManager [WARN] There are no
ConnectionStateListeners registered.
> 2014-07-10 11:00:16 b.s.util [ERROR] Async loop died!
> java.lang.RuntimeException: java.net.ConnectException: Connection refused
>         at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>         at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>         at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>         at
backtype.storm.daemon.executor$fn__5641$fn__5653$fn__5700.invoke(executor.clj:746)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>         at backtype.storm.util$async_loop$fn__457.invoke(util.clj:431)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>         at clojure.lang.AFn.run(AFn.java:24) ~[clojure-1.5.1.jar:na]
>         at java.lang.Thread.run(Thread.java:701) ~[na:1.6.0_30]
> Caused by: java.net.ConnectException: Connection refused
>         at sun.nio.ch.Net.connect(Native Method) ~[na:1.6.0_30]
>         at
sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:534)
~[na:1.6.0_30]
>         at
kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
~[stormjar.jar:na]
>         at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
~[stormjar.jar:na]
>         at
kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:142)
~[stormjar.jar:na]
>         at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
~[stormjar.jar:na]
>         at
kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:124)
~[stormjar.jar:na]
>         at
kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79)
~[stormjar.jar:na]
>         at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:77)
~[stormjar.jar:na]
>         at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:67)
~[stormjar.jar:na]
>         at
storm.kafka.trident.TridentKafkaEmitter.doEmitNewPartitionBatch(TridentKafkaEmitter.java:111)
~[stormjar.jar:na]
>         at
storm.kafka.trident.TridentKafkaEmitter.failFastEmitNewPartitionBatch(TridentKafkaEmitter.java:72)
~[stormjar.jar:na]
>         at
storm.kafka.trident.TridentKafkaEmitter.emitNewPartitionBatch(TridentKafkaEmitter.java:79)
~[stormjar.jar:na]
>         at
storm.kafka.trident.TridentKafkaEmitter.access$000(TridentKafkaEmitter.java:46)
~[stormjar.jar:na]
>         at
storm.kafka.trident.TridentKafkaEmitter$1.emitPartitionBatch(TridentKafkaEmitter.java:204)
~[stormjar.jar:na]
>         at
storm.kafka.trident.TridentKafkaEmitter$1.emitPartitionBatch(TridentKafkaEmitter.java:194)
~[stormjar.jar:na]
>         at
storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:127)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>         at
storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>         at
storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:369)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>         at
backtype.storm.daemon.executor$fn__5641$tuple_action_fn__5643.invoke(executor.clj:631)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>         at
backtype.storm.daemon.executor$mk_task_receiver$fn__5564.invoke(executor.clj:399)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>         at
backtype.storm.disruptor$clojure_handler$reify__745.onEvent(disruptor.clj:58)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>         at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:120)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>         ... 6 common frames omitted
> 2014-07-10 11:00:16 b.s.d.executor [ERROR]
>
> I've specified in simple test topology that I want only one worker, so
what happens after this error, supervisor will boot up another woorker,
which fails with same error after ~ minute of running.
>
> What is this about and how to fix this problem? (setup is the same,
default wirbelsturm cluster running on ubuntu 64b machine)
>
> btw, here is new test topology, using opaque kafka spout (same thing
happens with transactional too)
>
>
> public class TridentKafkaOpaqueDeployer {
>
>
>     public static class PrinterBolt extends BaseFunction {
>         private static final long serialVersionUID =
-5585127152942983256L;
>
>         @Override
>         public void execute(TridentTuple tuple, TridentCollector
tridentCollector) {
>             System.out.println(tuple.toString());
>         }
>     }
>
>     public static void main(String[] args) throws Exception {
>
>         BrokerHosts brokerHosts = new ZkHosts("zookeeper1");
>         TridentKafkaConfig kafkaConfig = new
TridentKafkaConfig(brokerHosts, "tridentOpaqueTest");
>
>         kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
> //        kafkaConfig.forceFromStart = true;
>
>
>         TridentTopology topology = new TridentTopology();
>
>         topology
>             .newStream("tridentTestOpaqueSpout", new
OpaqueTridentKafkaSpout(kafkaConfig))
>                     .name("tridentTestOpaqueSpout")
>
>             .each(new Fields("str"), new PrinterBolt(),
>                     new Fields("print"));
>
>         Config config = new Config();
>         config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS,
2000);
>         config.setNumWorkers(1);
>         config.setMaxSpoutPending(3);
>
>         StormSubmitter.submitTopologyWithProgressBar(args[0], config,
topology.build());
>     }
> }
>
>
> // note
>
> I also resolved issue with reusing spout name across topologies, as per
recommendation that Nathan gave some time ago
> https://groups.google.com/forum/#!topic/storm-user/Tn43K1eGcKY
>
>
>
>
> On Wed, Jul 9, 2014 at 9:49 PM, Danijel Schiavuzzi <da...@schiavuzzi.com>
wrote:
>>
>> Very strange. Could you try deleting Trident's data in Zookeeper:
>>
>> $ sh zkCli.sh
>> rmr /transactional
>>
>> and then resubmitting the topology and repeating your test scenario?
>>
>> Maybe the the spout's data in Zookeeper got somehow corrupted because
you are setting forceFromStart in the spout, and resubmitting the topology
multiple times. I think the transactional topology may be left in an
undefined state that case.
>>
>> You could also enable the LoggingMetricsConsumer in storm.yaml, and then
check the Kafka spout's kafka.latestOffset metric in metrics.log, and
compare this offset with the one Kafka's own utility script outputs
(search under kafka/bin/ for the script).
>>
>> On Wednesday, July 9, 2014, Miloš Solujić <mi...@gmail.com>
wrote:
>>>
>>> Yep, I did double checked.
>>>
>>> Here is how it's done:
>>>
>>> #create topic
>>> /opt/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper1:2181
--replication-factor 1 --partition 1 --topic scores
>>>
>>> #check what is created
>>> /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper1:2181 --describe
--topic scores
>>>
>>> #produce few messages
>>> /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092
--topic scores
>>>
>>> #consumer
>>> /opt/kafka/bin/kafka-console-consumer.sh --zookeeper zookeeper1:2181
--topic scores --from-beginning
>>>
>>>
>>>
>>>
>>> > try enabling Config.setDebug(true) and monitoring the Kafka spout's
activity in the logs.
>>> did that, only tick touples are shipped around, nothing else
>>>
>>> > Also, you should paste all your worker logs (worker-*.log files).
>>> Forgot to mention, only one worker is set, exactly for reason to
simplify things.
>>>
>>>
>>>
>>> Here is simplified version of this topology (no trident state, only
simple printer bolt)
>>>
>>>
>>> public class TridentKafkaDeployer {
>>>
>>>     public static class PrinterBolt extends BaseFunction {
>>>         private static final long serialVersionUID =
-5585127152942983256L;
>>>
>>>         @Override
>>>         public void execute(TridentTuple tuple, TridentCollector
tridentCollector) {
>>>             System.out.println(tuple.toString());
>>>         }
>>>     }
>>>
>>>     public static void main(String[] args) throws Exception {
>>>
>>>         BrokerHosts zk = new ZkHosts("zookeeper1");
>>>         TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zk,
"scores");
>>>
>>>         kafkaConfig.scheme = new SchemeAsMultiScheme(new
StringScheme());
>>>         kafkaConfig.forceFromStart = true;
>>>
>>>         TridentTopology topology = new TridentTopology();
>>>
>>>         topology
>>>             .newStream("raw-scores", new
TransactionalTridentKafkaSpout(kafkaConfig))
>>>                     .name("kafkaSpout")
>>>             .each(new Fields("str"), new PrinterBolt(),
>>>                     new Fields("print"));
>>>
>>>
>>>         Config config = new Config();
>>>         config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS,
2000);
>>>         config.setNumWorkers(1);
>>>         config.setMaxSpoutPending(3);
>>>
>>>         StormSubmitter.submitTopologyWithProgressBar(args[0], config,
topology.build());
>>>     }
>>> }
>>>
>>> Exactly same behaviour (it goes to exactly same kafka topic) = no
picking up fresh messages in kafka topic.
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Jul 8, 2014 at 7:08 PM, Danijel Schiavuzzi <
danijel@schiavuzzi.com> wrote:
>>>>
>>>> Also, you should paste all your worker logs (worker-*.log files).
>>>>
>>>>
>>>> On Tuesday, July 8, 2014, Danijel Schiavuzzi <da...@schiavuzzi.com>
wrote:
>>>>>
>>>>> I'd double check the Kafka producer to make sure those messages are
really getting into the right Kafka topic. Also,
try enabling Config.setDebug(true) and monitoring the Kafka spout's
activity in the logs. setMaxSpoutPending should always be set, as by
default it is unset, so you risk internal queue explosion.
>>>>>
>>>>> On Tuesday, July 8, 2014, Miloš Solujić <mi...@gmail.com>
wrote:
>>>>>>
>>>>>> Yep. pretty much sure. Via internal kafka-producer.sh
>>>>>> same method is used to produce initial messages (before first launch
of topology, that got consumed and processed just fine)
>>>>>>
>>>>>> as for maxSpoutPending first I tried with 10, than removed it (left
default value)
>>>>>>
>>>>>>
>>>>>> On Tue, Jul 8, 2014 at 6:31 PM, Danijel Schiavuzzi <
danijel@schiavuzzi.com> wrote:
>>>>>>>
>>>>>>> Are you sure you are producing new messages into the same Kafka
topic? What number did you set maxSpoutPending to?
>>>>>>>
>>>>>>> On Tuesday, July 8, 2014, Miloš Solujić <mi...@gmail.com>
wrote:
>>>>>>>>
>>>>>>>> Thanks Danijel for your quick proposition.
>>>>>>>>
>>>>>>>> I tried lowering down and removing all performance settings (those
were left from load testing on one machine)
>>>>>>>>
>>>>>>>> Still same result: no matter what, new messages are not taken from
kafka after topology is redeployed.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Jul 8, 2014 at 6:15 PM, Danijel Schiavuzzi <
danijel@schiavuzzi.com> wrote:
>>>>>>>>>
>>>>>>>>> Try lowering setMaxSpoutPending(100000) to a much lower value
(like 10). In Trident, setMaxSpoutPending referns to the number of batches,
not tuples like in plain Storm. Too high values may cause blockages like
the one you describe.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tuesday, July 8, 2014, Miloš Solujić <mi...@gmail.com>
wrote:
>>>>>>>>>>
>>>>>>>>>> Hi all,
>>>>>>>>>>
>>>>>>>>>> I'm pretty new to storm and kafka/zookeeper, and I hope that my
question is not to dumb. Here it goes:
>>>>>>>>>>
>>>>>>>>>> I'm using latest stable storm and storm-kafka = 0.9.2-incubating
>>>>>>>>>>
>>>>>>>>>> I've setup test cluster using wirbelsturm tool with unchanged
yaml (just uncommented kafka machine)
>>>>>>>>>>
>>>>>>>>>> here is config snippet for my trident topology:
>>>>>>>>>>
>>>>>>>>>>         BrokerHosts zk = new ZkHosts("zookeeper1");
>>>>>>>>>>         TridentKafkaConfig kafkaConf = new
TridentKafkaConfig(zk, "scores");
>>>>>>>>>>
>>>>>>>>>>         kafkaConf.scheme = new SchemeAsMultiScheme(new
StringScheme());
>>>>>>>>>>         kafkaConf.fetchSizeBytes = 10000;
>>>>>>>>>>         kafkaConf.forceFromStart = true;
>>>>>>>>>>
>>>>>>>>>>         Config stormConfig = new Config();
>>>>>>>>>>         stormConfig.put(Config.NIMBUS_HOST, NIMBUS_IP);
>>>>>>>>>>         stormConfig.put("couchbase.ip", COUCHBASE_CONSOLE);
>>>>>>>>>>         stormConfig.put("couchbase.bucket", COUCHBASE_BUCKET);
>>>>>>>>>>         stormConfig.put("couchbase.password",
COUCHBASE_PASSWORD);
>>>>>>>>>>         // performance settings
>>>>>>>>>>
stormConfig.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 100);
>>>>>>>>>>
stormConfig.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, 100);
>>>>>>>>>>         stormConfig.setMaxSpoutPending(100000);
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>         if (args != null && args.length > 0) {
>>>>>>>>>>
>>>>>>>>>>
StormSubmitter.submitTopologyWithProgressBar(args[0], stormConfig,
>>>>>>>>>>                     BuildTridentScoreTopology.build(kafkaConf));
>>>>>>>>>>         } else {...}
>>>>>>>>>>
>>>>>>>>>> Now, I've created 'scores' topic in kafka and pushed few test
messages prior to starting topology, with kafkaConf.forceFromStart = true.
And topology processed those messages just fine, and stored them in
tridentState (couhbase)
>>>>>>>>>>
>>>>>>>>>> All new messages are simply ignored!
>>>>>>>>>>
>>>>>>>>>> After redeploying topology (both with forceFromStart = true and
forceFromStart = false) no more messages are ingested from kafka.
>>>>>>>>>>
>>>>>>>>>> here is worker log for one topology deployment and short run
http://pastie.org/private/4xsk6pijvmulwrcg7zgca
>>>>>>>>>>
>>>>>>>>>> those are VMs that host this storm cluster
>>>>>>>>>> 10.0.0.241 zookeeper1
>>>>>>>>>> 10.0.0.101 supervisor1
>>>>>>>>>> 10.0.0.21 kafka1
>>>>>>>>>> 10.0.0.251 nimbus1
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Milos
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Danijel Schiavuzzi
>>>>>>>>>
>>>>>>>>> E: danijel@schiavuzzi.com
>>>>>>>>> W: www.schiavuzzi.com
>>>>>>>>> T: +385989035562
>>>>>>>>> Skype: danijels7
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Danijel Schiavuzzi
>>>>>>>
>>>>>>> E: danijel@schiavuzzi.com
>>>>>>> W: www.schiavuzzi.com
>>>>>>> T: +385989035562
>>>>>>> Skype: danijels7
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Danijel Schiavuzzi
>>>>>
>>>>> E: danijel@schiavuzzi.com
>>>>> W: www.schiavuzzi.com
>>>>> T: +385989035562
>>>>> Skype: danijels7
>>>>
>>>>
>>>>
>>>> --
>>>> Danijel Schiavuzzi
>>>>
>>>> E: danijel@schiavuzzi.com
>>>> W: www.schiavuzzi.com
>>>> T: +385989035562
>>>> Skype: danijels7
>>>
>>>
>>
>>
>> --
>> Danijel Schiavuzzi
>>
>> E: danijel@schiavuzzi.com
>> W: www.schiavuzzi.com
>> T: +385989035562
>> Skype: danijels7
>
>

Re: Kafka trident getting stuck

Posted by Danijel Schiavuzzi <da...@schiavuzzi.com>.
That's right, newStream() stream (spout) names in your topologies must be
unique cluster-wide (because they are included in the Zookeeper node path),
otherwise data corruption may occur as multiple Trident spouts access and
modify the same Zookeeper data.

The ConnectionException seems to be throws by the Kafka consumer in your
Kafka spout. Can you check your Kafka broker is up and running?


On Thu, Jul 10, 2014 at 1:23 PM, Miloš Solujić <mi...@gmail.com>
wrote:

> Thanks Danijel for taking interest in my problem.
>
> Exactly same feeling I've got (that zookeeper data is corrupted) So I
> purged info about it via zkCli.sh
>
> Now I've got some lower level issues:
>
> 2014-07-10 11:00:13 b.s.d.worker [INFO] Worker
> 04a17a6b-5aea-47ce-808b-218c4bcc1d51 for storm
> tridentOpaqueTest-topology-3-1404989752 on
> e4cdd619-c7e4-40f0-941e-352ac41daf6e:6701 has finished loading
> 2014-07-10 11:00:14 s.k.DynamicBrokersReader [INFO] Read partition info
> from zookeeper: GlobalPartitionInformation{partitionMap={0=localhost:9092}}
> 2014-07-10 11:00:14 o.a.c.f.i.CuratorFrameworkImpl [INFO] Starting
> 2014-07-10 11:00:14 o.a.z.ZooKeeper [INFO] Initiating client connection,
> connectString=zookeeper1:2181 sessionTimeout=20000
> watcher=org.apache.curator.ConnectionState@203c6f3e
> 2014-07-10 11:00:14 o.a.z.ClientCnxn [INFO] Opening socket connection to
> server zookeeper1/10.0.0.241:2181. Will not attempt to authenticate using
> SASL (unknown error)
> 2014-07-10 11:00:14 o.a.z.ClientCnxn [INFO] Socket connection established
> to zookeeper1/10.0.0.241:2181, initiating session
> 2014-07-10 11:00:14 o.a.z.ClientCnxn [INFO] Session establishment complete
> on server zookeeper1/10.0.0.241:2181, sessionid = 0x1471fbc929c00f4,
> negotiated timeout = 20000
> 2014-07-10 11:00:14 o.a.c.f.s.ConnectionStateManager [INFO] State change:
> CONNECTED
> 2014-07-10 11:00:14 o.a.c.f.s.ConnectionStateManager [WARN] There are no
> ConnectionStateListeners registered.
> 2014-07-10 11:00:15 o.a.z.ZooKeeper [INFO] Session: 0x1471fbc929c00f4
> closed
> 2014-07-10 11:00:15 o.a.c.f.i.CuratorFrameworkImpl [INFO] Starting
> 2014-07-10 11:00:15 o.a.z.ClientCnxn [INFO] EventThread shut down
> 2014-07-10 11:00:15 o.a.z.ZooKeeper [INFO] Initiating client connection,
> connectString=zookeeper1:2181/transactional/tridentTestOpaqueSpout/user
> sessionTimeout=20000 watcher=org.apache.curator.ConnectionState@4fa
> 983e
> 2014-07-10 11:00:15 o.a.z.ClientCnxn [INFO] Opening socket connection to
> server zookeeper1/10.0.0.241:2181. Will not attempt to authenticate using
> SASL (unknown error)
> 2014-07-10 11:00:15 o.a.z.ClientCnxn [INFO] Socket connection established
> to zookeeper1/10.0.0.241:2181, initiating session
> 2014-07-10 11:00:15 b.s.d.executor [INFO] Prepared bolt spout0:(5)
> 2014-07-10 11:00:15 o.a.z.ClientCnxn [INFO] Session establishment complete
> on server zookeeper1/10.0.0.241:2181, sessionid = 0x1471fbc929c00f5,
> negotiated timeout = 20000
> 2014-07-10 11:00:15 o.a.c.f.s.ConnectionStateManager [INFO] State change:
> CONNECTED
> 2014-07-10 11:00:15 o.a.c.f.s.ConnectionStateManager [WARN] There are no
> ConnectionStateListeners registered.
> 2014-07-10 11:00:16 b.s.util [ERROR] Async loop died!
> java.lang.RuntimeException: java.net.ConnectException: Connection refused
>         at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>         at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>         at
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>         at
> backtype.storm.daemon.executor$fn__5641$fn__5653$fn__5700.invoke(executor.clj:746)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>         at backtype.storm.util$async_loop$fn__457.invoke(util.clj:431)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>         at clojure.lang.AFn.run(AFn.java:24) ~[clojure-1.5.1.jar:na]
>         at java.lang.Thread.run(Thread.java:701) ~[na:1.6.0_30]
> Caused by: java.net.ConnectException: Connection refused
>         at sun.nio.ch.Net.connect(Native Method) ~[na:1.6.0_30]
>         at
> sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:534)
> ~[na:1.6.0_30]
>         at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> ~[stormjar.jar:na]
>         at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> ~[stormjar.jar:na]
>         at
> kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:142)
> ~[stormjar.jar:na]
>         at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
> ~[stormjar.jar:na]
>         at
> kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:124)
> ~[stormjar.jar:na]
>         at
> kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79)
> ~[stormjar.jar:na]
>         at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:77)
> ~[stormjar.jar:na]
>         at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:67)
> ~[stormjar.jar:na]
>         at
> storm.kafka.trident.TridentKafkaEmitter.doEmitNewPartitionBatch(TridentKafkaEmitter.java:111)
> ~[stormjar.jar:na]
>         at
> storm.kafka.trident.TridentKafkaEmitter.failFastEmitNewPartitionBatch(TridentKafkaEmitter.java:72)
> ~[stormjar.jar:na]
>         at
> storm.kafka.trident.TridentKafkaEmitter.emitNewPartitionBatch(TridentKafkaEmitter.java:79)
> ~[stormjar.jar:na]
>         at
> storm.kafka.trident.TridentKafkaEmitter.access$000(TridentKafkaEmitter.java:46)
> ~[stormjar.jar:na]
>         at
> storm.kafka.trident.TridentKafkaEmitter$1.emitPartitionBatch(TridentKafkaEmitter.java:204)
> ~[stormjar.jar:na]
>         at
> storm.kafka.trident.TridentKafkaEmitter$1.emitPartitionBatch(TridentKafkaEmitter.java:194)
> ~[stormjar.jar:na]
>         at
> storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:127)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>         at
> storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>         at
> storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:369)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>         at
> backtype.storm.daemon.executor$fn__5641$tuple_action_fn__5643.invoke(executor.clj:631)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>         at
> backtype.storm.daemon.executor$mk_task_receiver$fn__5564.invoke(executor.clj:399)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>         at
> backtype.storm.disruptor$clojure_handler$reify__745.onEvent(disruptor.clj:58)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>         at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:120)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
>         ... 6 common frames omitted
> 2014-07-10 11:00:16 b.s.d.executor [ERROR]
>
> I've specified in simple test topology that I want only one worker, so
> what happens after this error, supervisor will boot up another woorker,
> which fails with same error after ~ minute of running.
>
> What is this about and how to fix this problem? (setup is the same,
> default wirbelsturm cluster running on ubuntu 64b machine)
>
> btw, here is new test topology, using opaque kafka spout (same thing
> happens with transactional too)
>
>
> public class TridentKafkaOpaqueDeployer {
>
>     public static class PrinterBolt extends BaseFunction {
>         private static final long serialVersionUID = -5585127152942983256L;
>
>         @Override
>         public void execute(TridentTuple tuple, TridentCollector
> tridentCollector) {
>             System.out.println(tuple.toString());
>         }
>     }
>
>     public static void main(String[] args) throws Exception {
>
>         BrokerHosts brokerHosts = new ZkHosts("zookeeper1");
>         TridentKafkaConfig kafkaConfig = new
> TridentKafkaConfig(brokerHosts, "tridentOpaqueTest");
>
>         kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
> //        kafkaConfig.forceFromStart = true;
>
>         TridentTopology topology = new TridentTopology();
>
>         topology
>             .newStream("tridentTestOpaqueSpout", new
> OpaqueTridentKafkaSpout(kafkaConfig))
>                     .name("tridentTestOpaqueSpout")
>             .each(new Fields("str"), new PrinterBolt(),
>                     new Fields("print"));
>
>         Config config = new Config();
>         config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS,
> 2000);
>         config.setNumWorkers(1);
>         config.setMaxSpoutPending(3);
>
>         StormSubmitter.submitTopologyWithProgressBar(args[0], config,
> topology.build());
>     }
> }
>
>
> // note
>
> I also resolved issue with reusing spout name across topologies, as per
> recommendation that Nathan gave some time ago
> https://groups.google.com/forum/#!topic/storm-user/Tn43K1eGcKY
>
>
>
>
> On Wed, Jul 9, 2014 at 9:49 PM, Danijel Schiavuzzi <danijel@schiavuzzi.com
> > wrote:
>
>> Very strange. Could you try deleting Trident's data in Zookeeper:
>>
>> $ sh zkCli.sh
>> rmr /transactional
>>
>> and then resubmitting the topology and repeating your test scenario?
>>
>> Maybe the the spout's data in Zookeeper got somehow corrupted because you
>> are setting forceFromStart in the spout, and resubmitting the topology
>> multiple times. I think the transactional topology may be left in an
>> undefined state that case.
>>
>> You could also enable the LoggingMetricsConsumer in storm.yaml, and then
>> check the Kafka spout's kafka.latestOffset metric in metrics.log, and
>> compare this offset with the one Kafka's own utility script outputs
>> (search under kafka/bin/ for the script).
>>
>> On Wednesday, July 9, 2014, Miloš Solujić <mi...@gmail.com>
>> wrote:
>>
>>> Yep, I did double checked.
>>>
>>> Here is how it's done:
>>>
>>> #create topic
>>> /opt/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper1:2181
>>> --replication-factor 1 --partition 1 --topic scores
>>>
>>> #check what is created
>>> /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper1:2181 --describe
>>> --topic scores
>>>
>>> #produce few messages
>>> /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092
>>> --topic scores
>>>
>>> #consumer
>>> /opt/kafka/bin/kafka-console-consumer.sh --zookeeper zookeeper1:2181
>>> --topic scores --from-beginning
>>>
>>>
>>>
>>>
>>> > try enabling Config.setDebug(true) and monitoring the Kafka spout's
>>> activity in the logs.
>>> did that, only tick touples are shipped around, nothing else
>>>
>>> > Also, you should paste all your worker logs (worker-*.log files).
>>> Forgot to mention, only one worker is set, exactly for reason to
>>> simplify things.
>>>
>>>
>>>
>>> Here is simplified version of this topology (no trident state, only
>>> simple printer bolt)
>>>
>>>
>>> public class TridentKafkaDeployer {
>>>
>>>     public static class PrinterBolt extends BaseFunction {
>>>         private static final long serialVersionUID =
>>> -5585127152942983256L;
>>>
>>>         @Override
>>>         public void execute(TridentTuple tuple, TridentCollector
>>> tridentCollector) {
>>>             System.out.println(tuple.toString());
>>>         }
>>>     }
>>>
>>>     public static void main(String[] args) throws Exception {
>>>
>>>         BrokerHosts zk = new ZkHosts("zookeeper1");
>>>         TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zk,
>>> "scores");
>>>
>>>         kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>>>         kafkaConfig.forceFromStart = true;
>>>
>>>         TridentTopology topology = new TridentTopology();
>>>
>>>         topology
>>>             .newStream("raw-scores", new
>>> TransactionalTridentKafkaSpout(kafkaConfig))
>>>                     .name("kafkaSpout")
>>>             .each(new Fields("str"), new PrinterBolt(),
>>>                     new Fields("print"));
>>>
>>>
>>>         Config config = new Config();
>>>         config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS,
>>> 2000);
>>>         config.setNumWorkers(1);
>>>         config.setMaxSpoutPending(3);
>>>
>>>         StormSubmitter.submitTopologyWithProgressBar(args[0], config,
>>> topology.build());
>>>     }
>>> }
>>>
>>> Exactly same behaviour (it goes to exactly same kafka topic) = no
>>> picking up fresh messages in kafka topic.
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Jul 8, 2014 at 7:08 PM, Danijel Schiavuzzi <
>>> danijel@schiavuzzi.com> wrote:
>>>
>>>> Also, you should paste all your worker logs (worker-*.log files).
>>>>
>>>>
>>>> On Tuesday, July 8, 2014, Danijel Schiavuzzi <da...@schiavuzzi.com>
>>>> wrote:
>>>>
>>>>> I'd double check the Kafka producer to make sure those messages are
>>>>> really getting into the right Kafka topic. Also,
>>>>> try enabling Config.setDebug(true) and monitoring the Kafka spout's
>>>>> activity in the logs. setMaxSpoutPending should always be set, as by
>>>>> default it is unset, so you risk internal queue explosion.
>>>>>
>>>>> On Tuesday, July 8, 2014, Miloš Solujić <mi...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Yep. pretty much sure. Via internal kafka-producer.sh
>>>>>> same method is used to produce initial messages (before first launch
>>>>>> of topology, that got consumed and processed just fine)
>>>>>>
>>>>>>  as for maxSpoutPending first I tried with 10, than removed it (left
>>>>>> default value)
>>>>>>
>>>>>>
>>>>>> On Tue, Jul 8, 2014 at 6:31 PM, Danijel Schiavuzzi <
>>>>>> danijel@schiavuzzi.com> wrote:
>>>>>>
>>>>>>> Are you sure you are producing new messages into the same Kafka
>>>>>>> topic? What number did you set maxSpoutPending to?
>>>>>>>
>>>>>>> On Tuesday, July 8, 2014, Miloš Solujić <mi...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks Danijel for your quick proposition.
>>>>>>>>
>>>>>>>> I tried lowering down and removing all performance settings (those
>>>>>>>> were left from load testing on one machine)
>>>>>>>>
>>>>>>>> Still same result: no matter what, new messages are not taken from
>>>>>>>> kafka after topology is redeployed.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Jul 8, 2014 at 6:15 PM, Danijel Schiavuzzi <
>>>>>>>> danijel@schiavuzzi.com> wrote:
>>>>>>>>
>>>>>>>>> Try lowering setMaxSpoutPending(100000) to a much lower value
>>>>>>>>> (like 10). In Trident, setMaxSpoutPending referns to the number of batches,
>>>>>>>>> not tuples like in plain Storm. Too high values may cause blockages like
>>>>>>>>> the one you describe.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tuesday, July 8, 2014, Miloš Solujić <mi...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi all,
>>>>>>>>>>
>>>>>>>>>> I'm pretty new to storm and kafka/zookeeper, and I hope that my
>>>>>>>>>> question is not to dumb. Here it goes:
>>>>>>>>>>
>>>>>>>>>> I'm using latest stable storm and storm-kafka = 0.9.2-incubating
>>>>>>>>>>
>>>>>>>>>> I've setup test cluster using wirbelsturm tool with unchanged
>>>>>>>>>> yaml (just uncommented kafka machine)
>>>>>>>>>>
>>>>>>>>>> here is config snippet for my trident topology:
>>>>>>>>>>
>>>>>>>>>>         BrokerHosts zk = new ZkHosts("zookeeper1");
>>>>>>>>>>         TridentKafkaConfig kafkaConf = new TridentKafkaConfig(zk,
>>>>>>>>>> "scores");
>>>>>>>>>>
>>>>>>>>>>         kafkaConf.scheme = new SchemeAsMultiScheme(new
>>>>>>>>>> StringScheme());
>>>>>>>>>>         kafkaConf.fetchSizeBytes = 10000;
>>>>>>>>>>         kafkaConf.forceFromStart = true;
>>>>>>>>>>
>>>>>>>>>>         Config stormConfig = new Config();
>>>>>>>>>>         stormConfig.put(Config.NIMBUS_HOST, NIMBUS_IP);
>>>>>>>>>>         stormConfig.put("couchbase.ip", COUCHBASE_CONSOLE);
>>>>>>>>>>         stormConfig.put("couchbase.bucket", COUCHBASE_BUCKET);
>>>>>>>>>>         stormConfig.put("couchbase.password", COUCHBASE_PASSWORD);
>>>>>>>>>>         // performance settings
>>>>>>>>>>
>>>>>>>>>> stormConfig.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 100);
>>>>>>>>>>
>>>>>>>>>> stormConfig.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, 100);
>>>>>>>>>>         stormConfig.setMaxSpoutPending(100000);
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>         if (args != null && args.length > 0) {
>>>>>>>>>>
>>>>>>>>>>             StormSubmitter.submitTopologyWithProgressBar(args[0],
>>>>>>>>>> stormConfig,
>>>>>>>>>>                     BuildTridentScoreTopology.build(kafkaConf));
>>>>>>>>>>         } else {...}
>>>>>>>>>>
>>>>>>>>>> Now, I've created 'scores' topic in kafka and pushed few test
>>>>>>>>>> messages prior to starting topology, with kafkaConf.forceFromStart = true.
>>>>>>>>>> And topology processed those messages just fine, and stored them in
>>>>>>>>>> tridentState (couhbase)
>>>>>>>>>>
>>>>>>>>>> All new messages are simply ignored!
>>>>>>>>>>
>>>>>>>>>> After redeploying topology (both with forceFromStart = true and
>>>>>>>>>> forceFromStart = false) no more messages are ingested from kafka.
>>>>>>>>>>
>>>>>>>>>> here is worker log for one topology deployment and short run
>>>>>>>>>> http://pastie.org/private/4xsk6pijvmulwrcg7zgca
>>>>>>>>>>
>>>>>>>>>> those are VMs that host this storm cluster
>>>>>>>>>> 10.0.0.241 zookeeper1
>>>>>>>>>> 10.0.0.101 supervisor1
>>>>>>>>>> 10.0.0.21 kafka1
>>>>>>>>>> 10.0.0.251 nimbus1
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Milos
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Danijel Schiavuzzi
>>>>>>>>>
>>>>>>>>> E: danijel@schiavuzzi.com
>>>>>>>>> W: www.schiavuzzi.com
>>>>>>>>> T: +385989035562
>>>>>>>>> Skype: danijels7
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Danijel Schiavuzzi
>>>>>>>
>>>>>>> E: danijel@schiavuzzi.com
>>>>>>> W: www.schiavuzzi.com
>>>>>>> T: +385989035562
>>>>>>> Skype: danijels7
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Danijel Schiavuzzi
>>>>>
>>>>> E: danijel@schiavuzzi.com
>>>>> W: www.schiavuzzi.com
>>>>> T: +385989035562
>>>>> Skype: danijels7
>>>>>
>>>>
>>>>
>>>> --
>>>> Danijel Schiavuzzi
>>>>
>>>> E: danijel@schiavuzzi.com
>>>> W: www.schiavuzzi.com
>>>> T: +385989035562
>>>> Skype: danijels7
>>>>
>>>
>>>
>>
>> --
>> Danijel Schiavuzzi
>>
>> E: danijel@schiavuzzi.com
>> W: www.schiavuzzi.com
>> T: +385989035562
>> Skype: danijels7
>>
>
>


-- 
Danijel Schiavuzzi

E: danijel@schiavuzzi.com
W: www.schiavuzzi.com
T: +385989035562
Skype: danijels7

Re: Kafka trident getting stuck

Posted by Miloš Solujić <mi...@gmail.com>.
Thanks Danijel for taking interest in my problem.

Exactly same feeling I've got (that zookeeper data is corrupted) So I
purged info about it via zkCli.sh

Now I've got some lower level issues:

2014-07-10 11:00:13 b.s.d.worker [INFO] Worker
04a17a6b-5aea-47ce-808b-218c4bcc1d51 for storm
tridentOpaqueTest-topology-3-1404989752 on
e4cdd619-c7e4-40f0-941e-352ac41daf6e:6701 has finished loading
2014-07-10 11:00:14 s.k.DynamicBrokersReader [INFO] Read partition info
from zookeeper: GlobalPartitionInformation{partitionMap={0=localhost:9092}}
2014-07-10 11:00:14 o.a.c.f.i.CuratorFrameworkImpl [INFO] Starting
2014-07-10 11:00:14 o.a.z.ZooKeeper [INFO] Initiating client connection,
connectString=zookeeper1:2181 sessionTimeout=20000
watcher=org.apache.curator.ConnectionState@203c6f3e
2014-07-10 11:00:14 o.a.z.ClientCnxn [INFO] Opening socket connection to
server zookeeper1/10.0.0.241:2181. Will not attempt to authenticate using
SASL (unknown error)
2014-07-10 11:00:14 o.a.z.ClientCnxn [INFO] Socket connection established
to zookeeper1/10.0.0.241:2181, initiating session
2014-07-10 11:00:14 o.a.z.ClientCnxn [INFO] Session establishment complete
on server zookeeper1/10.0.0.241:2181, sessionid = 0x1471fbc929c00f4,
negotiated timeout = 20000
2014-07-10 11:00:14 o.a.c.f.s.ConnectionStateManager [INFO] State change:
CONNECTED
2014-07-10 11:00:14 o.a.c.f.s.ConnectionStateManager [WARN] There are no
ConnectionStateListeners registered.
2014-07-10 11:00:15 o.a.z.ZooKeeper [INFO] Session: 0x1471fbc929c00f4 closed
2014-07-10 11:00:15 o.a.c.f.i.CuratorFrameworkImpl [INFO] Starting
2014-07-10 11:00:15 o.a.z.ClientCnxn [INFO] EventThread shut down
2014-07-10 11:00:15 o.a.z.ZooKeeper [INFO] Initiating client connection,
connectString=zookeeper1:2181/transactional/tridentTestOpaqueSpout/user
sessionTimeout=20000 watcher=org.apache.curator.ConnectionState@4fa
983e
2014-07-10 11:00:15 o.a.z.ClientCnxn [INFO] Opening socket connection to
server zookeeper1/10.0.0.241:2181. Will not attempt to authenticate using
SASL (unknown error)
2014-07-10 11:00:15 o.a.z.ClientCnxn [INFO] Socket connection established
to zookeeper1/10.0.0.241:2181, initiating session
2014-07-10 11:00:15 b.s.d.executor [INFO] Prepared bolt spout0:(5)
2014-07-10 11:00:15 o.a.z.ClientCnxn [INFO] Session establishment complete
on server zookeeper1/10.0.0.241:2181, sessionid = 0x1471fbc929c00f5,
negotiated timeout = 20000
2014-07-10 11:00:15 o.a.c.f.s.ConnectionStateManager [INFO] State change:
CONNECTED
2014-07-10 11:00:15 o.a.c.f.s.ConnectionStateManager [WARN] There are no
ConnectionStateListeners registered.
2014-07-10 11:00:16 b.s.util [ERROR] Async loop died!
java.lang.RuntimeException: java.net.ConnectException: Connection refused
        at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at
backtype.storm.daemon.executor$fn__5641$fn__5653$fn__5700.invoke(executor.clj:746)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.util$async_loop$fn__457.invoke(util.clj:431)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at clojure.lang.AFn.run(AFn.java:24) ~[clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:701) ~[na:1.6.0_30]
Caused by: java.net.ConnectException: Connection refused
        at sun.nio.ch.Net.connect(Native Method) ~[na:1.6.0_30]
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:534)
~[na:1.6.0_30]
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
~[stormjar.jar:na]
        at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
~[stormjar.jar:na]
        at
kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:142)
~[stormjar.jar:na]
        at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
~[stormjar.jar:na]
        at
kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:124)
~[stormjar.jar:na]
        at
kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79)
~[stormjar.jar:na]
        at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:77)
~[stormjar.jar:na]
        at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:67)
~[stormjar.jar:na]
        at
storm.kafka.trident.TridentKafkaEmitter.doEmitNewPartitionBatch(TridentKafkaEmitter.java:111)
~[stormjar.jar:na]
        at
storm.kafka.trident.TridentKafkaEmitter.failFastEmitNewPartitionBatch(TridentKafkaEmitter.java:72)
~[stormjar.jar:na]
        at
storm.kafka.trident.TridentKafkaEmitter.emitNewPartitionBatch(TridentKafkaEmitter.java:79)
~[stormjar.jar:na]
        at
storm.kafka.trident.TridentKafkaEmitter.access$000(TridentKafkaEmitter.java:46)
~[stormjar.jar:na]
        at
storm.kafka.trident.TridentKafkaEmitter$1.emitPartitionBatch(TridentKafkaEmitter.java:204)
~[stormjar.jar:na]
        at
storm.kafka.trident.TridentKafkaEmitter$1.emitPartitionBatch(TridentKafkaEmitter.java:194)
~[stormjar.jar:na]
        at
storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:127)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at
storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at
storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:369)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at
backtype.storm.daemon.executor$fn__5641$tuple_action_fn__5643.invoke(executor.clj:631)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at
backtype.storm.daemon.executor$mk_task_receiver$fn__5564.invoke(executor.clj:399)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at
backtype.storm.disruptor$clojure_handler$reify__745.onEvent(disruptor.clj:58)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:120)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        ... 6 common frames omitted
2014-07-10 11:00:16 b.s.d.executor [ERROR]

I've specified in simple test topology that I want only one worker, so what
happens after this error, supervisor will boot up another woorker, which
fails with same error after ~ minute of running.

What is this about and how to fix this problem? (setup is the same, default
wirbelsturm cluster running on ubuntu 64b machine)

btw, here is new test topology, using opaque kafka spout (same thing
happens with transactional too)


public class TridentKafkaOpaqueDeployer {

    public static class PrinterBolt extends BaseFunction {
        private static final long serialVersionUID = -5585127152942983256L;

        @Override
        public void execute(TridentTuple tuple, TridentCollector
tridentCollector) {
            System.out.println(tuple.toString());
        }
    }

    public static void main(String[] args) throws Exception {

        BrokerHosts brokerHosts = new ZkHosts("zookeeper1");
        TridentKafkaConfig kafkaConfig = new
TridentKafkaConfig(brokerHosts, "tridentOpaqueTest");

        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
//        kafkaConfig.forceFromStart = true;

        TridentTopology topology = new TridentTopology();

        topology
            .newStream("tridentTestOpaqueSpout", new
OpaqueTridentKafkaSpout(kafkaConfig))
                    .name("tridentTestOpaqueSpout")
            .each(new Fields("str"), new PrinterBolt(),
                    new Fields("print"));

        Config config = new Config();
        config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS,
2000);
        config.setNumWorkers(1);
        config.setMaxSpoutPending(3);

        StormSubmitter.submitTopologyWithProgressBar(args[0], config,
topology.build());
    }
}


// note

I also resolved issue with reusing spout name across topologies, as per
recommendation that Nathan gave some time ago
https://groups.google.com/forum/#!topic/storm-user/Tn43K1eGcKY




On Wed, Jul 9, 2014 at 9:49 PM, Danijel Schiavuzzi <da...@schiavuzzi.com>
wrote:

> Very strange. Could you try deleting Trident's data in Zookeeper:
>
> $ sh zkCli.sh
> rmr /transactional
>
> and then resubmitting the topology and repeating your test scenario?
>
> Maybe the the spout's data in Zookeeper got somehow corrupted because you
> are setting forceFromStart in the spout, and resubmitting the topology
> multiple times. I think the transactional topology may be left in an
> undefined state that case.
>
> You could also enable the LoggingMetricsConsumer in storm.yaml, and then
> check the Kafka spout's kafka.latestOffset metric in metrics.log, and
> compare this offset with the one Kafka's own utility script outputs
> (search under kafka/bin/ for the script).
>
> On Wednesday, July 9, 2014, Miloš Solujić <mi...@gmail.com> wrote:
>
>> Yep, I did double checked.
>>
>> Here is how it's done:
>>
>> #create topic
>> /opt/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper1:2181
>> --replication-factor 1 --partition 1 --topic scores
>>
>> #check what is created
>> /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper1:2181 --describe
>> --topic scores
>>
>> #produce few messages
>> /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092
>> --topic scores
>>
>> #consumer
>> /opt/kafka/bin/kafka-console-consumer.sh --zookeeper zookeeper1:2181
>> --topic scores --from-beginning
>>
>>
>>
>>
>> > try enabling Config.setDebug(true) and monitoring the Kafka spout's
>> activity in the logs.
>> did that, only tick touples are shipped around, nothing else
>>
>> > Also, you should paste all your worker logs (worker-*.log files).
>> Forgot to mention, only one worker is set, exactly for reason to simplify
>> things.
>>
>>
>>
>> Here is simplified version of this topology (no trident state, only
>> simple printer bolt)
>>
>>
>> public class TridentKafkaDeployer {
>>
>>     public static class PrinterBolt extends BaseFunction {
>>         private static final long serialVersionUID =
>> -5585127152942983256L;
>>
>>         @Override
>>         public void execute(TridentTuple tuple, TridentCollector
>> tridentCollector) {
>>             System.out.println(tuple.toString());
>>         }
>>     }
>>
>>     public static void main(String[] args) throws Exception {
>>
>>         BrokerHosts zk = new ZkHosts("zookeeper1");
>>         TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zk,
>> "scores");
>>
>>         kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>>         kafkaConfig.forceFromStart = true;
>>
>>         TridentTopology topology = new TridentTopology();
>>
>>         topology
>>             .newStream("raw-scores", new
>> TransactionalTridentKafkaSpout(kafkaConfig))
>>                     .name("kafkaSpout")
>>             .each(new Fields("str"), new PrinterBolt(),
>>                     new Fields("print"));
>>
>>
>>         Config config = new Config();
>>         config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS,
>> 2000);
>>         config.setNumWorkers(1);
>>         config.setMaxSpoutPending(3);
>>
>>         StormSubmitter.submitTopologyWithProgressBar(args[0], config,
>> topology.build());
>>     }
>> }
>>
>> Exactly same behaviour (it goes to exactly same kafka topic) = no picking
>> up fresh messages in kafka topic.
>>
>>
>>
>>
>>
>> On Tue, Jul 8, 2014 at 7:08 PM, Danijel Schiavuzzi <
>> danijel@schiavuzzi.com> wrote:
>>
>>> Also, you should paste all your worker logs (worker-*.log files).
>>>
>>>
>>> On Tuesday, July 8, 2014, Danijel Schiavuzzi <da...@schiavuzzi.com>
>>> wrote:
>>>
>>>> I'd double check the Kafka producer to make sure those messages are
>>>> really getting into the right Kafka topic. Also,
>>>> try enabling Config.setDebug(true) and monitoring the Kafka spout's
>>>> activity in the logs. setMaxSpoutPending should always be set, as by
>>>> default it is unset, so you risk internal queue explosion.
>>>>
>>>> On Tuesday, July 8, 2014, Miloš Solujić <mi...@gmail.com>
>>>> wrote:
>>>>
>>>>> Yep. pretty much sure. Via internal kafka-producer.sh
>>>>> same method is used to produce initial messages (before first launch
>>>>> of topology, that got consumed and processed just fine)
>>>>>
>>>>>  as for maxSpoutPending first I tried with 10, than removed it (left
>>>>> default value)
>>>>>
>>>>>
>>>>> On Tue, Jul 8, 2014 at 6:31 PM, Danijel Schiavuzzi <
>>>>> danijel@schiavuzzi.com> wrote:
>>>>>
>>>>>> Are you sure you are producing new messages into the same Kafka
>>>>>> topic? What number did you set maxSpoutPending to?
>>>>>>
>>>>>> On Tuesday, July 8, 2014, Miloš Solujić <mi...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks Danijel for your quick proposition.
>>>>>>>
>>>>>>> I tried lowering down and removing all performance settings (those
>>>>>>> were left from load testing on one machine)
>>>>>>>
>>>>>>> Still same result: no matter what, new messages are not taken from
>>>>>>> kafka after topology is redeployed.
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jul 8, 2014 at 6:15 PM, Danijel Schiavuzzi <
>>>>>>> danijel@schiavuzzi.com> wrote:
>>>>>>>
>>>>>>>> Try lowering setMaxSpoutPending(100000) to a much lower value (like
>>>>>>>> 10). In Trident, setMaxSpoutPending referns to the number of batches, not
>>>>>>>> tuples like in plain Storm. Too high values may cause blockages like the
>>>>>>>> one you describe.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tuesday, July 8, 2014, Miloš Solujić <mi...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi all,
>>>>>>>>>
>>>>>>>>> I'm pretty new to storm and kafka/zookeeper, and I hope that my
>>>>>>>>> question is not to dumb. Here it goes:
>>>>>>>>>
>>>>>>>>> I'm using latest stable storm and storm-kafka = 0.9.2-incubating
>>>>>>>>>
>>>>>>>>> I've setup test cluster using wirbelsturm tool with unchanged yaml
>>>>>>>>> (just uncommented kafka machine)
>>>>>>>>>
>>>>>>>>> here is config snippet for my trident topology:
>>>>>>>>>
>>>>>>>>>         BrokerHosts zk = new ZkHosts("zookeeper1");
>>>>>>>>>         TridentKafkaConfig kafkaConf = new TridentKafkaConfig(zk,
>>>>>>>>> "scores");
>>>>>>>>>
>>>>>>>>>         kafkaConf.scheme = new SchemeAsMultiScheme(new
>>>>>>>>> StringScheme());
>>>>>>>>>         kafkaConf.fetchSizeBytes = 10000;
>>>>>>>>>         kafkaConf.forceFromStart = true;
>>>>>>>>>
>>>>>>>>>         Config stormConfig = new Config();
>>>>>>>>>         stormConfig.put(Config.NIMBUS_HOST, NIMBUS_IP);
>>>>>>>>>         stormConfig.put("couchbase.ip", COUCHBASE_CONSOLE);
>>>>>>>>>         stormConfig.put("couchbase.bucket", COUCHBASE_BUCKET);
>>>>>>>>>         stormConfig.put("couchbase.password", COUCHBASE_PASSWORD);
>>>>>>>>>         // performance settings
>>>>>>>>>
>>>>>>>>> stormConfig.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 100);
>>>>>>>>>
>>>>>>>>> stormConfig.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, 100);
>>>>>>>>>         stormConfig.setMaxSpoutPending(100000);
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>         if (args != null && args.length > 0) {
>>>>>>>>>
>>>>>>>>>             StormSubmitter.submitTopologyWithProgressBar(args[0],
>>>>>>>>> stormConfig,
>>>>>>>>>                     BuildTridentScoreTopology.build(kafkaConf));
>>>>>>>>>         } else {...}
>>>>>>>>>
>>>>>>>>> Now, I've created 'scores' topic in kafka and pushed few test
>>>>>>>>> messages prior to starting topology, with kafkaConf.forceFromStart = true.
>>>>>>>>> And topology processed those messages just fine, and stored them in
>>>>>>>>> tridentState (couhbase)
>>>>>>>>>
>>>>>>>>> All new messages are simply ignored!
>>>>>>>>>
>>>>>>>>> After redeploying topology (both with forceFromStart = true and
>>>>>>>>> forceFromStart = false) no more messages are ingested from kafka.
>>>>>>>>>
>>>>>>>>> here is worker log for one topology deployment and short run
>>>>>>>>> http://pastie.org/private/4xsk6pijvmulwrcg7zgca
>>>>>>>>>
>>>>>>>>> those are VMs that host this storm cluster
>>>>>>>>> 10.0.0.241 zookeeper1
>>>>>>>>> 10.0.0.101 supervisor1
>>>>>>>>> 10.0.0.21 kafka1
>>>>>>>>> 10.0.0.251 nimbus1
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Milos
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Danijel Schiavuzzi
>>>>>>>>
>>>>>>>> E: danijel@schiavuzzi.com
>>>>>>>> W: www.schiavuzzi.com
>>>>>>>> T: +385989035562
>>>>>>>> Skype: danijels7
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Danijel Schiavuzzi
>>>>>>
>>>>>> E: danijel@schiavuzzi.com
>>>>>> W: www.schiavuzzi.com
>>>>>> T: +385989035562
>>>>>> Skype: danijels7
>>>>>>
>>>>>
>>>>>
>>>>
>>>> --
>>>> Danijel Schiavuzzi
>>>>
>>>> E: danijel@schiavuzzi.com
>>>> W: www.schiavuzzi.com
>>>> T: +385989035562
>>>> Skype: danijels7
>>>>
>>>
>>>
>>> --
>>> Danijel Schiavuzzi
>>>
>>> E: danijel@schiavuzzi.com
>>> W: www.schiavuzzi.com
>>> T: +385989035562
>>> Skype: danijels7
>>>
>>
>>
>
> --
> Danijel Schiavuzzi
>
> E: danijel@schiavuzzi.com
> W: www.schiavuzzi.com
> T: +385989035562
> Skype: danijels7
>

Re: Kafka trident getting stuck

Posted by Danijel Schiavuzzi <da...@schiavuzzi.com>.
Very strange. Could you try deleting Trident's data in Zookeeper:

$ sh zkCli.sh
rmr /transactional

and then resubmitting the topology and repeating your test scenario?

Maybe the the spout's data in Zookeeper got somehow corrupted because you
are setting forceFromStart in the spout, and resubmitting the topology
multiple times. I think the transactional topology may be left in an
undefined state that case.

You could also enable the LoggingMetricsConsumer in storm.yaml, and then
check the Kafka spout's kafka.latestOffset metric in metrics.log, and
compare this offset with the one Kafka's own utility script outputs
(search under kafka/bin/ for the script).

On Wednesday, July 9, 2014, Miloš Solujić <mi...@gmail.com> wrote:

> Yep, I did double checked.
>
> Here is how it's done:
>
> #create topic
> /opt/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper1:2181
> --replication-factor 1 --partition 1 --topic scores
>
> #check what is created
> /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper1:2181 --describe
> --topic scores
>
> #produce few messages
> /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092
> --topic scores
>
> #consumer
> /opt/kafka/bin/kafka-console-consumer.sh --zookeeper zookeeper1:2181
> --topic scores --from-beginning
>
>
>
>
> > try enabling Config.setDebug(true) and monitoring the Kafka spout's
> activity in the logs.
> did that, only tick touples are shipped around, nothing else
>
> > Also, you should paste all your worker logs (worker-*.log files).
> Forgot to mention, only one worker is set, exactly for reason to simplify
> things.
>
>
>
> Here is simplified version of this topology (no trident state, only simple
> printer bolt)
>
>
> public class TridentKafkaDeployer {
>
>     public static class PrinterBolt extends BaseFunction {
>         private static final long serialVersionUID = -5585127152942983256L;
>
>         @Override
>         public void execute(TridentTuple tuple, TridentCollector
> tridentCollector) {
>             System.out.println(tuple.toString());
>         }
>     }
>
>     public static void main(String[] args) throws Exception {
>
>         BrokerHosts zk = new ZkHosts("zookeeper1");
>         TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zk,
> "scores");
>
>         kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>         kafkaConfig.forceFromStart = true;
>
>         TridentTopology topology = new TridentTopology();
>
>         topology
>             .newStream("raw-scores", new
> TransactionalTridentKafkaSpout(kafkaConfig))
>                     .name("kafkaSpout")
>             .each(new Fields("str"), new PrinterBolt(),
>                     new Fields("print"));
>
>
>         Config config = new Config();
>         config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS,
> 2000);
>         config.setNumWorkers(1);
>         config.setMaxSpoutPending(3);
>
>         StormSubmitter.submitTopologyWithProgressBar(args[0], config,
> topology.build());
>     }
> }
>
> Exactly same behaviour (it goes to exactly same kafka topic) = no picking
> up fresh messages in kafka topic.
>
>
>
>
>
> On Tue, Jul 8, 2014 at 7:08 PM, Danijel Schiavuzzi <danijel@schiavuzzi.com
> <javascript:_e(%7B%7D,'cvml','danijel@schiavuzzi.com');>> wrote:
>
>> Also, you should paste all your worker logs (worker-*.log files).
>>
>>
>> On Tuesday, July 8, 2014, Danijel Schiavuzzi <danijel@schiavuzzi.com
>> <javascript:_e(%7B%7D,'cvml','danijel@schiavuzzi.com');>> wrote:
>>
>>> I'd double check the Kafka producer to make sure those messages are
>>> really getting into the right Kafka topic. Also,
>>> try enabling Config.setDebug(true) and monitoring the Kafka spout's
>>> activity in the logs. setMaxSpoutPending should always be set, as by
>>> default it is unset, so you risk internal queue explosion.
>>>
>>> On Tuesday, July 8, 2014, Miloš Solujić <mi...@gmail.com> wrote:
>>>
>>>> Yep. pretty much sure. Via internal kafka-producer.sh
>>>> same method is used to produce initial messages (before first launch of
>>>> topology, that got consumed and processed just fine)
>>>>
>>>>  as for maxSpoutPending first I tried with 10, than removed it (left
>>>> default value)
>>>>
>>>>
>>>> On Tue, Jul 8, 2014 at 6:31 PM, Danijel Schiavuzzi <
>>>> danijel@schiavuzzi.com> wrote:
>>>>
>>>>> Are you sure you are producing new messages into the same Kafka
>>>>> topic? What number did you set maxSpoutPending to?
>>>>>
>>>>> On Tuesday, July 8, 2014, Miloš Solujić <mi...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks Danijel for your quick proposition.
>>>>>>
>>>>>> I tried lowering down and removing all performance settings (those
>>>>>> were left from load testing on one machine)
>>>>>>
>>>>>> Still same result: no matter what, new messages are not taken from
>>>>>> kafka after topology is redeployed.
>>>>>>
>>>>>>
>>>>>> On Tue, Jul 8, 2014 at 6:15 PM, Danijel Schiavuzzi <
>>>>>> danijel@schiavuzzi.com> wrote:
>>>>>>
>>>>>>> Try lowering setMaxSpoutPending(100000) to a much lower value (like
>>>>>>> 10). In Trident, setMaxSpoutPending referns to the number of batches, not
>>>>>>> tuples like in plain Storm. Too high values may cause blockages like the
>>>>>>> one you describe.
>>>>>>>
>>>>>>>
>>>>>>> On Tuesday, July 8, 2014, Miloš Solujić <mi...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi all,
>>>>>>>>
>>>>>>>> I'm pretty new to storm and kafka/zookeeper, and I hope that my
>>>>>>>> question is not to dumb. Here it goes:
>>>>>>>>
>>>>>>>> I'm using latest stable storm and storm-kafka = 0.9.2-incubating
>>>>>>>>
>>>>>>>> I've setup test cluster using wirbelsturm tool with unchanged yaml
>>>>>>>> (just uncommented kafka machine)
>>>>>>>>
>>>>>>>> here is config snippet for my trident topology:
>>>>>>>>
>>>>>>>>         BrokerHosts zk = new ZkHosts("zookeeper1");
>>>>>>>>         TridentKafkaConfig kafkaConf = new TridentKafkaConfig(zk,
>>>>>>>> "scores");
>>>>>>>>
>>>>>>>>         kafkaConf.scheme = new SchemeAsMultiScheme(new
>>>>>>>> StringScheme());
>>>>>>>>         kafkaConf.fetchSizeBytes = 10000;
>>>>>>>>         kafkaConf.forceFromStart = true;
>>>>>>>>
>>>>>>>>         Config stormConfig = new Config();
>>>>>>>>         stormConfig.put(Config.NIMBUS_HOST, NIMBUS_IP);
>>>>>>>>         stormConfig.put("couchbase.ip", COUCHBASE_CONSOLE);
>>>>>>>>         stormConfig.put("couchbase.bucket", COUCHBASE_BUCKET);
>>>>>>>>         stormConfig.put("couchbase.password", COUCHBASE_PASSWORD);
>>>>>>>>         // performance settings
>>>>>>>>
>>>>>>>> stormConfig.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 100);
>>>>>>>>         stormConfig.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF,
>>>>>>>> 100);
>>>>>>>>         stormConfig.setMaxSpoutPending(100000);
>>>>>>>>
>>>>>>>>
>>>>>>>>         if (args != null && args.length > 0) {
>>>>>>>>
>>>>>>>>             StormSubmitter.submitTopologyWithProgressBar(args[0],
>>>>>>>> stormConfig,
>>>>>>>>                     BuildTridentScoreTopology.build(kafkaConf));
>>>>>>>>         } else {...}
>>>>>>>>
>>>>>>>> Now, I've created 'scores' topic in kafka and pushed few test
>>>>>>>> messages prior to starting topology, with kafkaConf.forceFromStart = true.
>>>>>>>> And topology processed those messages just fine, and stored them in
>>>>>>>> tridentState (couhbase)
>>>>>>>>
>>>>>>>> All new messages are simply ignored!
>>>>>>>>
>>>>>>>> After redeploying topology (both with forceFromStart = true and
>>>>>>>> forceFromStart = false) no more messages are ingested from kafka.
>>>>>>>>
>>>>>>>> here is worker log for one topology deployment and short run
>>>>>>>> http://pastie.org/private/4xsk6pijvmulwrcg7zgca
>>>>>>>>
>>>>>>>> those are VMs that host this storm cluster
>>>>>>>> 10.0.0.241 zookeeper1
>>>>>>>> 10.0.0.101 supervisor1
>>>>>>>> 10.0.0.21 kafka1
>>>>>>>> 10.0.0.251 nimbus1
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Milos
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Danijel Schiavuzzi
>>>>>>>
>>>>>>> E: danijel@schiavuzzi.com
>>>>>>> W: www.schiavuzzi.com
>>>>>>> T: +385989035562
>>>>>>> Skype: danijels7
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Danijel Schiavuzzi
>>>>>
>>>>> E: danijel@schiavuzzi.com
>>>>> W: www.schiavuzzi.com
>>>>> T: +385989035562
>>>>> Skype: danijels7
>>>>>
>>>>
>>>>
>>>
>>> --
>>> Danijel Schiavuzzi
>>>
>>> E: danijel@schiavuzzi.com
>>> W: www.schiavuzzi.com
>>> T: +385989035562
>>> Skype: danijels7
>>>
>>
>>
>> --
>> Danijel Schiavuzzi
>>
>> E: danijel@schiavuzzi.com
>> <javascript:_e(%7B%7D,'cvml','danijel@schiavuzzi.com');>
>> W: www.schiavuzzi.com
>> T: +385989035562
>> Skype: danijels7
>>
>
>

-- 
Danijel Schiavuzzi

E: danijel@schiavuzzi.com
W: www.schiavuzzi.com
T: +385989035562
Skype: danijels7

Re: Kafka trident getting stuck

Posted by Miloš Solujić <mi...@gmail.com>.
Yep, I did double checked.

Here is how it's done:

#create topic
/opt/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper1:2181
--replication-factor 1 --partition 1 --topic scores

#check what is created
/opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper1:2181 --describe
--topic scores

#produce few messages
/opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092
--topic scores

#consumer
/opt/kafka/bin/kafka-console-consumer.sh --zookeeper zookeeper1:2181
--topic scores --from-beginning




> try enabling Config.setDebug(true) and monitoring the Kafka spout's
activity in the logs.
did that, only tick touples are shipped around, nothing else

> Also, you should paste all your worker logs (worker-*.log files).
Forgot to mention, only one worker is set, exactly for reason to simplify
things.



Here is simplified version of this topology (no trident state, only simple
printer bolt)


public class TridentKafkaDeployer {

    public static class PrinterBolt extends BaseFunction {
        private static final long serialVersionUID = -5585127152942983256L;

        @Override
        public void execute(TridentTuple tuple, TridentCollector
tridentCollector) {
            System.out.println(tuple.toString());
        }
    }

    public static void main(String[] args) throws Exception {

        BrokerHosts zk = new ZkHosts("zookeeper1");
        TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zk,
"scores");

        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        kafkaConfig.forceFromStart = true;

        TridentTopology topology = new TridentTopology();

        topology
            .newStream("raw-scores", new
TransactionalTridentKafkaSpout(kafkaConfig))
                    .name("kafkaSpout")
            .each(new Fields("str"), new PrinterBolt(),
                    new Fields("print"));


        Config config = new Config();
        config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS,
2000);
        config.setNumWorkers(1);
        config.setMaxSpoutPending(3);

        StormSubmitter.submitTopologyWithProgressBar(args[0], config,
topology.build());
    }
}

Exactly same behaviour (it goes to exactly same kafka topic) = no picking
up fresh messages in kafka topic.





On Tue, Jul 8, 2014 at 7:08 PM, Danijel Schiavuzzi <da...@schiavuzzi.com>
wrote:

> Also, you should paste all your worker logs (worker-*.log files).
>
>
> On Tuesday, July 8, 2014, Danijel Schiavuzzi <da...@schiavuzzi.com>
> wrote:
>
>> I'd double check the Kafka producer to make sure those messages are
>> really getting into the right Kafka topic. Also,
>> try enabling Config.setDebug(true) and monitoring the Kafka spout's
>> activity in the logs. setMaxSpoutPending should always be set, as by
>> default it is unset, so you risk internal queue explosion.
>>
>> On Tuesday, July 8, 2014, Miloš Solujić <mi...@gmail.com> wrote:
>>
>>> Yep. pretty much sure. Via internal kafka-producer.sh
>>> same method is used to produce initial messages (before first launch of
>>> topology, that got consumed and processed just fine)
>>>
>>>  as for maxSpoutPending first I tried with 10, than removed it (left
>>> default value)
>>>
>>>
>>> On Tue, Jul 8, 2014 at 6:31 PM, Danijel Schiavuzzi <
>>> danijel@schiavuzzi.com> wrote:
>>>
>>>> Are you sure you are producing new messages into the same Kafka
>>>> topic? What number did you set maxSpoutPending to?
>>>>
>>>> On Tuesday, July 8, 2014, Miloš Solujić <mi...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thanks Danijel for your quick proposition.
>>>>>
>>>>> I tried lowering down and removing all performance settings (those
>>>>> were left from load testing on one machine)
>>>>>
>>>>> Still same result: no matter what, new messages are not taken from
>>>>> kafka after topology is redeployed.
>>>>>
>>>>>
>>>>> On Tue, Jul 8, 2014 at 6:15 PM, Danijel Schiavuzzi <
>>>>> danijel@schiavuzzi.com> wrote:
>>>>>
>>>>>> Try lowering setMaxSpoutPending(100000) to a much lower value (like
>>>>>> 10). In Trident, setMaxSpoutPending referns to the number of batches, not
>>>>>> tuples like in plain Storm. Too high values may cause blockages like the
>>>>>> one you describe.
>>>>>>
>>>>>>
>>>>>> On Tuesday, July 8, 2014, Miloš Solujić <mi...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> I'm pretty new to storm and kafka/zookeeper, and I hope that my
>>>>>>> question is not to dumb. Here it goes:
>>>>>>>
>>>>>>> I'm using latest stable storm and storm-kafka = 0.9.2-incubating
>>>>>>>
>>>>>>> I've setup test cluster using wirbelsturm tool with unchanged yaml
>>>>>>> (just uncommented kafka machine)
>>>>>>>
>>>>>>> here is config snippet for my trident topology:
>>>>>>>
>>>>>>>         BrokerHosts zk = new ZkHosts("zookeeper1");
>>>>>>>         TridentKafkaConfig kafkaConf = new TridentKafkaConfig(zk,
>>>>>>> "scores");
>>>>>>>
>>>>>>>         kafkaConf.scheme = new SchemeAsMultiScheme(new
>>>>>>> StringScheme());
>>>>>>>         kafkaConf.fetchSizeBytes = 10000;
>>>>>>>         kafkaConf.forceFromStart = true;
>>>>>>>
>>>>>>>         Config stormConfig = new Config();
>>>>>>>         stormConfig.put(Config.NIMBUS_HOST, NIMBUS_IP);
>>>>>>>         stormConfig.put("couchbase.ip", COUCHBASE_CONSOLE);
>>>>>>>         stormConfig.put("couchbase.bucket", COUCHBASE_BUCKET);
>>>>>>>         stormConfig.put("couchbase.password", COUCHBASE_PASSWORD);
>>>>>>>         // performance settings
>>>>>>>
>>>>>>> stormConfig.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 100);
>>>>>>>         stormConfig.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF,
>>>>>>> 100);
>>>>>>>         stormConfig.setMaxSpoutPending(100000);
>>>>>>>
>>>>>>>
>>>>>>>         if (args != null && args.length > 0) {
>>>>>>>
>>>>>>>             StormSubmitter.submitTopologyWithProgressBar(args[0],
>>>>>>> stormConfig,
>>>>>>>                     BuildTridentScoreTopology.build(kafkaConf));
>>>>>>>         } else {...}
>>>>>>>
>>>>>>> Now, I've created 'scores' topic in kafka and pushed few test
>>>>>>> messages prior to starting topology, with kafkaConf.forceFromStart = true.
>>>>>>> And topology processed those messages just fine, and stored them in
>>>>>>> tridentState (couhbase)
>>>>>>>
>>>>>>> All new messages are simply ignored!
>>>>>>>
>>>>>>> After redeploying topology (both with forceFromStart = true and
>>>>>>> forceFromStart = false) no more messages are ingested from kafka.
>>>>>>>
>>>>>>> here is worker log for one topology deployment and short run
>>>>>>> http://pastie.org/private/4xsk6pijvmulwrcg7zgca
>>>>>>>
>>>>>>> those are VMs that host this storm cluster
>>>>>>> 10.0.0.241 zookeeper1
>>>>>>> 10.0.0.101 supervisor1
>>>>>>> 10.0.0.21 kafka1
>>>>>>> 10.0.0.251 nimbus1
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Milos
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Danijel Schiavuzzi
>>>>>>
>>>>>> E: danijel@schiavuzzi.com
>>>>>> W: www.schiavuzzi.com
>>>>>> T: +385989035562
>>>>>> Skype: danijels7
>>>>>>
>>>>>
>>>>>
>>>>
>>>> --
>>>> Danijel Schiavuzzi
>>>>
>>>> E: danijel@schiavuzzi.com
>>>> W: www.schiavuzzi.com
>>>> T: +385989035562
>>>> Skype: danijels7
>>>>
>>>
>>>
>>
>> --
>> Danijel Schiavuzzi
>>
>> E: danijel@schiavuzzi.com
>> W: www.schiavuzzi.com
>> T: +385989035562
>> Skype: danijels7
>>
>
>
> --
> Danijel Schiavuzzi
>
> E: danijel@schiavuzzi.com
> W: www.schiavuzzi.com
> T: +385989035562
> Skype: danijels7
>

Re: Kafka trident getting stuck

Posted by Danijel Schiavuzzi <da...@schiavuzzi.com>.
Also, you should paste all your worker logs (worker-*.log files).

On Tuesday, July 8, 2014, Danijel Schiavuzzi <da...@schiavuzzi.com> wrote:

> I'd double check the Kafka producer to make sure those messages are really
> getting into the right Kafka topic. Also,
> try enabling Config.setDebug(true) and monitoring the Kafka spout's
> activity in the logs. setMaxSpoutPending should always be set, as by
> default it is unset, so you risk internal queue explosion.
>
> On Tuesday, July 8, 2014, Miloš Solujić <milos.solujic@gmail.com
> <javascript:_e(%7B%7D,'cvml','milos.solujic@gmail.com');>> wrote:
>
>> Yep. pretty much sure. Via internal kafka-producer.sh
>> same method is used to produce initial messages (before first launch of
>> topology, that got consumed and processed just fine)
>>
>>  as for maxSpoutPending first I tried with 10, than removed it (left
>> default value)
>>
>>
>> On Tue, Jul 8, 2014 at 6:31 PM, Danijel Schiavuzzi <
>> danijel@schiavuzzi.com> wrote:
>>
>>> Are you sure you are producing new messages into the same Kafka
>>> topic? What number did you set maxSpoutPending to?
>>>
>>> On Tuesday, July 8, 2014, Miloš Solujić <mi...@gmail.com> wrote:
>>>
>>>> Thanks Danijel for your quick proposition.
>>>>
>>>> I tried lowering down and removing all performance settings (those were
>>>> left from load testing on one machine)
>>>>
>>>> Still same result: no matter what, new messages are not taken from
>>>> kafka after topology is redeployed.
>>>>
>>>>
>>>> On Tue, Jul 8, 2014 at 6:15 PM, Danijel Schiavuzzi <
>>>> danijel@schiavuzzi.com> wrote:
>>>>
>>>>> Try lowering setMaxSpoutPending(100000) to a much lower value (like
>>>>> 10). In Trident, setMaxSpoutPending referns to the number of batches, not
>>>>> tuples like in plain Storm. Too high values may cause blockages like the
>>>>> one you describe.
>>>>>
>>>>>
>>>>> On Tuesday, July 8, 2014, Miloš Solujić <mi...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I'm pretty new to storm and kafka/zookeeper, and I hope that my
>>>>>> question is not to dumb. Here it goes:
>>>>>>
>>>>>> I'm using latest stable storm and storm-kafka = 0.9.2-incubating
>>>>>>
>>>>>> I've setup test cluster using wirbelsturm tool with unchanged yaml
>>>>>> (just uncommented kafka machine)
>>>>>>
>>>>>> here is config snippet for my trident topology:
>>>>>>
>>>>>>         BrokerHosts zk = new ZkHosts("zookeeper1");
>>>>>>         TridentKafkaConfig kafkaConf = new TridentKafkaConfig(zk,
>>>>>> "scores");
>>>>>>
>>>>>>         kafkaConf.scheme = new SchemeAsMultiScheme(new
>>>>>> StringScheme());
>>>>>>         kafkaConf.fetchSizeBytes = 10000;
>>>>>>         kafkaConf.forceFromStart = true;
>>>>>>
>>>>>>         Config stormConfig = new Config();
>>>>>>         stormConfig.put(Config.NIMBUS_HOST, NIMBUS_IP);
>>>>>>         stormConfig.put("couchbase.ip", COUCHBASE_CONSOLE);
>>>>>>         stormConfig.put("couchbase.bucket", COUCHBASE_BUCKET);
>>>>>>         stormConfig.put("couchbase.password", COUCHBASE_PASSWORD);
>>>>>>         // performance settings
>>>>>>
>>>>>> stormConfig.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 100);
>>>>>>         stormConfig.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF,
>>>>>> 100);
>>>>>>         stormConfig.setMaxSpoutPending(100000);
>>>>>>
>>>>>>
>>>>>>         if (args != null && args.length > 0) {
>>>>>>
>>>>>>             StormSubmitter.submitTopologyWithProgressBar(args[0],
>>>>>> stormConfig,
>>>>>>                     BuildTridentScoreTopology.build(kafkaConf));
>>>>>>         } else {...}
>>>>>>
>>>>>> Now, I've created 'scores' topic in kafka and pushed few test
>>>>>> messages prior to starting topology, with kafkaConf.forceFromStart = true.
>>>>>> And topology processed those messages just fine, and stored them in
>>>>>> tridentState (couhbase)
>>>>>>
>>>>>> All new messages are simply ignored!
>>>>>>
>>>>>> After redeploying topology (both with forceFromStart = true and
>>>>>> forceFromStart = false) no more messages are ingested from kafka.
>>>>>>
>>>>>> here is worker log for one topology deployment and short run
>>>>>> http://pastie.org/private/4xsk6pijvmulwrcg7zgca
>>>>>>
>>>>>> those are VMs that host this storm cluster
>>>>>> 10.0.0.241 zookeeper1
>>>>>> 10.0.0.101 supervisor1
>>>>>> 10.0.0.21 kafka1
>>>>>> 10.0.0.251 nimbus1
>>>>>>
>>>>>> Thanks,
>>>>>> Milos
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Danijel Schiavuzzi
>>>>>
>>>>> E: danijel@schiavuzzi.com
>>>>> W: www.schiavuzzi.com
>>>>> T: +385989035562
>>>>> Skype: danijels7
>>>>>
>>>>
>>>>
>>>
>>> --
>>> Danijel Schiavuzzi
>>>
>>> E: danijel@schiavuzzi.com
>>> W: www.schiavuzzi.com
>>> T: +385989035562
>>> Skype: danijels7
>>>
>>
>>
>
> --
> Danijel Schiavuzzi
>
> E: danijel@schiavuzzi.com
> <javascript:_e(%7B%7D,'cvml','danijel@schiavuzzi.com');>
> W: www.schiavuzzi.com
> T: +385989035562
> Skype: danijels7
>


-- 
Danijel Schiavuzzi

E: danijel@schiavuzzi.com
W: www.schiavuzzi.com
T: +385989035562
Skype: danijels7

Re: Kafka trident getting stuck

Posted by Danijel Schiavuzzi <da...@schiavuzzi.com>.
I'd double check the Kafka producer to make sure those messages are really
getting into the right Kafka topic. Also,
try enabling Config.setDebug(true) and monitoring the Kafka spout's
activity in the logs. setMaxSpoutPending should always be set, as by
default it is unset, so you risk internal queue explosion.

On Tuesday, July 8, 2014, Miloš Solujić <mi...@gmail.com> wrote:

> Yep. pretty much sure. Via internal kafka-producer.sh
> same method is used to produce initial messages (before first launch of
> topology, that got consumed and processed just fine)
>
> as for maxSpoutPending first I tried with 10, than removed it (left
> default value)
>
>
> On Tue, Jul 8, 2014 at 6:31 PM, Danijel Schiavuzzi <danijel@schiavuzzi.com
> <javascript:_e(%7B%7D,'cvml','danijel@schiavuzzi.com');>> wrote:
>
>> Are you sure you are producing new messages into the same Kafka
>> topic? What number did you set maxSpoutPending to?
>>
>> On Tuesday, July 8, 2014, Miloš Solujić <milos.solujic@gmail.com
>> <javascript:_e(%7B%7D,'cvml','milos.solujic@gmail.com');>> wrote:
>>
>>> Thanks Danijel for your quick proposition.
>>>
>>> I tried lowering down and removing all performance settings (those were
>>> left from load testing on one machine)
>>>
>>> Still same result: no matter what, new messages are not taken from kafka
>>> after topology is redeployed.
>>>
>>>
>>> On Tue, Jul 8, 2014 at 6:15 PM, Danijel Schiavuzzi <
>>> danijel@schiavuzzi.com> wrote:
>>>
>>>> Try lowering setMaxSpoutPending(100000) to a much lower value (like
>>>> 10). In Trident, setMaxSpoutPending referns to the number of batches, not
>>>> tuples like in plain Storm. Too high values may cause blockages like the
>>>> one you describe.
>>>>
>>>>
>>>> On Tuesday, July 8, 2014, Miloš Solujić <mi...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I'm pretty new to storm and kafka/zookeeper, and I hope that my
>>>>> question is not to dumb. Here it goes:
>>>>>
>>>>> I'm using latest stable storm and storm-kafka = 0.9.2-incubating
>>>>>
>>>>> I've setup test cluster using wirbelsturm tool with unchanged yaml
>>>>> (just uncommented kafka machine)
>>>>>
>>>>> here is config snippet for my trident topology:
>>>>>
>>>>>         BrokerHosts zk = new ZkHosts("zookeeper1");
>>>>>         TridentKafkaConfig kafkaConf = new TridentKafkaConfig(zk,
>>>>> "scores");
>>>>>
>>>>>         kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
>>>>>         kafkaConf.fetchSizeBytes = 10000;
>>>>>         kafkaConf.forceFromStart = true;
>>>>>
>>>>>         Config stormConfig = new Config();
>>>>>         stormConfig.put(Config.NIMBUS_HOST, NIMBUS_IP);
>>>>>         stormConfig.put("couchbase.ip", COUCHBASE_CONSOLE);
>>>>>         stormConfig.put("couchbase.bucket", COUCHBASE_BUCKET);
>>>>>         stormConfig.put("couchbase.password", COUCHBASE_PASSWORD);
>>>>>         // performance settings
>>>>>
>>>>> stormConfig.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 100);
>>>>>         stormConfig.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF,
>>>>> 100);
>>>>>         stormConfig.setMaxSpoutPending(100000);
>>>>>
>>>>>
>>>>>         if (args != null && args.length > 0) {
>>>>>
>>>>>             StormSubmitter.submitTopologyWithProgressBar(args[0],
>>>>> stormConfig,
>>>>>                     BuildTridentScoreTopology.build(kafkaConf));
>>>>>         } else {...}
>>>>>
>>>>> Now, I've created 'scores' topic in kafka and pushed few test messages
>>>>> prior to starting topology, with kafkaConf.forceFromStart = true. And
>>>>> topology processed those messages just fine, and stored them in
>>>>> tridentState (couhbase)
>>>>>
>>>>> All new messages are simply ignored!
>>>>>
>>>>> After redeploying topology (both with forceFromStart = true and
>>>>> forceFromStart = false) no more messages are ingested from kafka.
>>>>>
>>>>> here is worker log for one topology deployment and short run
>>>>> http://pastie.org/private/4xsk6pijvmulwrcg7zgca
>>>>>
>>>>> those are VMs that host this storm cluster
>>>>> 10.0.0.241 zookeeper1
>>>>> 10.0.0.101 supervisor1
>>>>> 10.0.0.21 kafka1
>>>>> 10.0.0.251 nimbus1
>>>>>
>>>>> Thanks,
>>>>> Milos
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>> --
>>>> Danijel Schiavuzzi
>>>>
>>>> E: danijel@schiavuzzi.com
>>>> W: www.schiavuzzi.com
>>>> T: +385989035562
>>>> Skype: danijels7
>>>>
>>>
>>>
>>
>> --
>> Danijel Schiavuzzi
>>
>> E: danijel@schiavuzzi.com
>> <javascript:_e(%7B%7D,'cvml','danijel@schiavuzzi.com');>
>> W: www.schiavuzzi.com
>> T: +385989035562
>> Skype: danijels7
>>
>
>

-- 
Danijel Schiavuzzi

E: danijel@schiavuzzi.com
W: www.schiavuzzi.com
T: +385989035562
Skype: danijels7

Re: Kafka trident getting stuck

Posted by Miloš Solujić <mi...@gmail.com>.
Yep. pretty much sure. Via internal kafka-producer.sh
same method is used to produce initial messages (before first launch of
topology, that got consumed and processed just fine)

as for maxSpoutPending first I tried with 10, than removed it (left default
value)


On Tue, Jul 8, 2014 at 6:31 PM, Danijel Schiavuzzi <da...@schiavuzzi.com>
wrote:

> Are you sure you are producing new messages into the same Kafka
> topic? What number did you set maxSpoutPending to?
>
> On Tuesday, July 8, 2014, Miloš Solujić <mi...@gmail.com> wrote:
>
>> Thanks Danijel for your quick proposition.
>>
>> I tried lowering down and removing all performance settings (those were
>> left from load testing on one machine)
>>
>> Still same result: no matter what, new messages are not taken from kafka
>> after topology is redeployed.
>>
>>
>> On Tue, Jul 8, 2014 at 6:15 PM, Danijel Schiavuzzi <
>> danijel@schiavuzzi.com> wrote:
>>
>>> Try lowering setMaxSpoutPending(100000) to a much lower value (like 10).
>>> In Trident, setMaxSpoutPending referns to the number of batches, not tuples
>>> like in plain Storm. Too high values may cause blockages like the one you
>>> describe.
>>>
>>>
>>> On Tuesday, July 8, 2014, Miloš Solujić <mi...@gmail.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I'm pretty new to storm and kafka/zookeeper, and I hope that my
>>>> question is not to dumb. Here it goes:
>>>>
>>>> I'm using latest stable storm and storm-kafka = 0.9.2-incubating
>>>>
>>>> I've setup test cluster using wirbelsturm tool with unchanged yaml
>>>> (just uncommented kafka machine)
>>>>
>>>> here is config snippet for my trident topology:
>>>>
>>>>         BrokerHosts zk = new ZkHosts("zookeeper1");
>>>>         TridentKafkaConfig kafkaConf = new TridentKafkaConfig(zk,
>>>> "scores");
>>>>
>>>>         kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
>>>>         kafkaConf.fetchSizeBytes = 10000;
>>>>         kafkaConf.forceFromStart = true;
>>>>
>>>>         Config stormConfig = new Config();
>>>>         stormConfig.put(Config.NIMBUS_HOST, NIMBUS_IP);
>>>>         stormConfig.put("couchbase.ip", COUCHBASE_CONSOLE);
>>>>         stormConfig.put("couchbase.bucket", COUCHBASE_BUCKET);
>>>>         stormConfig.put("couchbase.password", COUCHBASE_PASSWORD);
>>>>         // performance settings
>>>>
>>>> stormConfig.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 100);
>>>>         stormConfig.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF,
>>>> 100);
>>>>         stormConfig.setMaxSpoutPending(100000);
>>>>
>>>>
>>>>         if (args != null && args.length > 0) {
>>>>
>>>>             StormSubmitter.submitTopologyWithProgressBar(args[0],
>>>> stormConfig,
>>>>                     BuildTridentScoreTopology.build(kafkaConf));
>>>>         } else {...}
>>>>
>>>> Now, I've created 'scores' topic in kafka and pushed few test messages
>>>> prior to starting topology, with kafkaConf.forceFromStart = true. And
>>>> topology processed those messages just fine, and stored them in
>>>> tridentState (couhbase)
>>>>
>>>> All new messages are simply ignored!
>>>>
>>>> After redeploying topology (both with forceFromStart = true and
>>>> forceFromStart = false) no more messages are ingested from kafka.
>>>>
>>>> here is worker log for one topology deployment and short run
>>>> http://pastie.org/private/4xsk6pijvmulwrcg7zgca
>>>>
>>>> those are VMs that host this storm cluster
>>>> 10.0.0.241 zookeeper1
>>>> 10.0.0.101 supervisor1
>>>> 10.0.0.21 kafka1
>>>> 10.0.0.251 nimbus1
>>>>
>>>> Thanks,
>>>> Milos
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>> --
>>> Danijel Schiavuzzi
>>>
>>> E: danijel@schiavuzzi.com
>>> W: www.schiavuzzi.com
>>> T: +385989035562
>>> Skype: danijels7
>>>
>>
>>
>
> --
> Danijel Schiavuzzi
>
> E: danijel@schiavuzzi.com
> W: www.schiavuzzi.com
> T: +385989035562
> Skype: danijels7
>

Re: Kafka trident getting stuck

Posted by Danijel Schiavuzzi <da...@schiavuzzi.com>.
Are you sure you are producing new messages into the same Kafka topic? What
number did you set maxSpoutPending to?

On Tuesday, July 8, 2014, Miloš Solujić <mi...@gmail.com> wrote:

> Thanks Danijel for your quick proposition.
>
> I tried lowering down and removing all performance settings (those were
> left from load testing on one machine)
>
> Still same result: no matter what, new messages are not taken from kafka
> after topology is redeployed.
>
>
> On Tue, Jul 8, 2014 at 6:15 PM, Danijel Schiavuzzi <danijel@schiavuzzi.com
> <javascript:_e(%7B%7D,'cvml','danijel@schiavuzzi.com');>> wrote:
>
>> Try lowering setMaxSpoutPending(100000) to a much lower value (like 10).
>> In Trident, setMaxSpoutPending referns to the number of batches, not tuples
>> like in plain Storm. Too high values may cause blockages like the one you
>> describe.
>>
>>
>> On Tuesday, July 8, 2014, Miloš Solujić <milos.solujic@gmail.com
>> <javascript:_e(%7B%7D,'cvml','milos.solujic@gmail.com');>> wrote:
>>
>>> Hi all,
>>>
>>> I'm pretty new to storm and kafka/zookeeper, and I hope that my question
>>> is not to dumb. Here it goes:
>>>
>>> I'm using latest stable storm and storm-kafka = 0.9.2-incubating
>>>
>>> I've setup test cluster using wirbelsturm tool with unchanged yaml (just
>>> uncommented kafka machine)
>>>
>>> here is config snippet for my trident topology:
>>>
>>>         BrokerHosts zk = new ZkHosts("zookeeper1");
>>>         TridentKafkaConfig kafkaConf = new TridentKafkaConfig(zk,
>>> "scores");
>>>
>>>         kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
>>>         kafkaConf.fetchSizeBytes = 10000;
>>>         kafkaConf.forceFromStart = true;
>>>
>>>         Config stormConfig = new Config();
>>>         stormConfig.put(Config.NIMBUS_HOST, NIMBUS_IP);
>>>         stormConfig.put("couchbase.ip", COUCHBASE_CONSOLE);
>>>         stormConfig.put("couchbase.bucket", COUCHBASE_BUCKET);
>>>         stormConfig.put("couchbase.password", COUCHBASE_PASSWORD);
>>>         // performance settings
>>>
>>> stormConfig.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 100);
>>>         stormConfig.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, 100);
>>>         stormConfig.setMaxSpoutPending(100000);
>>>
>>>
>>>         if (args != null && args.length > 0) {
>>>
>>>             StormSubmitter.submitTopologyWithProgressBar(args[0],
>>> stormConfig,
>>>                     BuildTridentScoreTopology.build(kafkaConf));
>>>         } else {...}
>>>
>>> Now, I've created 'scores' topic in kafka and pushed few test messages
>>> prior to starting topology, with kafkaConf.forceFromStart = true. And
>>> topology processed those messages just fine, and stored them in
>>> tridentState (couhbase)
>>>
>>> All new messages are simply ignored!
>>>
>>> After redeploying topology (both with forceFromStart = true and
>>> forceFromStart = false) no more messages are ingested from kafka.
>>>
>>> here is worker log for one topology deployment and short run
>>> http://pastie.org/private/4xsk6pijvmulwrcg7zgca
>>>
>>> those are VMs that host this storm cluster
>>> 10.0.0.241 zookeeper1
>>> 10.0.0.101 supervisor1
>>> 10.0.0.21 kafka1
>>> 10.0.0.251 nimbus1
>>>
>>> Thanks,
>>> Milos
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>> --
>> Danijel Schiavuzzi
>>
>> E: danijel@schiavuzzi.com
>> <javascript:_e(%7B%7D,'cvml','danijel@schiavuzzi.com');>
>> W: www.schiavuzzi.com
>> T: +385989035562
>> Skype: danijels7
>>
>
>

-- 
Danijel Schiavuzzi

E: danijel@schiavuzzi.com
W: www.schiavuzzi.com
T: +385989035562
Skype: danijels7

Re: Kafka trident getting stuck

Posted by Miloš Solujić <mi...@gmail.com>.
Thanks Danijel for your quick proposition.

I tried lowering down and removing all performance settings (those were
left from load testing on one machine)

Still same result: no matter what, new messages are not taken from kafka
after topology is redeployed.


On Tue, Jul 8, 2014 at 6:15 PM, Danijel Schiavuzzi <da...@schiavuzzi.com>
wrote:

> Try lowering setMaxSpoutPending(100000) to a much lower value (like 10).
> In Trident, setMaxSpoutPending referns to the number of batches, not tuples
> like in plain Storm. Too high values may cause blockages like the one you
> describe.
>
>
> On Tuesday, July 8, 2014, Miloš Solujić <mi...@gmail.com> wrote:
>
>> Hi all,
>>
>> I'm pretty new to storm and kafka/zookeeper, and I hope that my question
>> is not to dumb. Here it goes:
>>
>> I'm using latest stable storm and storm-kafka = 0.9.2-incubating
>>
>> I've setup test cluster using wirbelsturm tool with unchanged yaml (just
>> uncommented kafka machine)
>>
>> here is config snippet for my trident topology:
>>
>>         BrokerHosts zk = new ZkHosts("zookeeper1");
>>         TridentKafkaConfig kafkaConf = new TridentKafkaConfig(zk,
>> "scores");
>>
>>         kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
>>         kafkaConf.fetchSizeBytes = 10000;
>>         kafkaConf.forceFromStart = true;
>>
>>         Config stormConfig = new Config();
>>         stormConfig.put(Config.NIMBUS_HOST, NIMBUS_IP);
>>         stormConfig.put("couchbase.ip", COUCHBASE_CONSOLE);
>>         stormConfig.put("couchbase.bucket", COUCHBASE_BUCKET);
>>         stormConfig.put("couchbase.password", COUCHBASE_PASSWORD);
>>         // performance settings
>>
>> stormConfig.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 100);
>>         stormConfig.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, 100);
>>         stormConfig.setMaxSpoutPending(100000);
>>
>>
>>         if (args != null && args.length > 0) {
>>
>>             StormSubmitter.submitTopologyWithProgressBar(args[0],
>> stormConfig,
>>                     BuildTridentScoreTopology.build(kafkaConf));
>>         } else {...}
>>
>> Now, I've created 'scores' topic in kafka and pushed few test messages
>> prior to starting topology, with kafkaConf.forceFromStart = true. And
>> topology processed those messages just fine, and stored them in
>> tridentState (couhbase)
>>
>> All new messages are simply ignored!
>>
>> After redeploying topology (both with forceFromStart = true and
>> forceFromStart = false) no more messages are ingested from kafka.
>>
>> here is worker log for one topology deployment and short run
>> http://pastie.org/private/4xsk6pijvmulwrcg7zgca
>>
>> those are VMs that host this storm cluster
>> 10.0.0.241 zookeeper1
>> 10.0.0.101 supervisor1
>> 10.0.0.21 kafka1
>> 10.0.0.251 nimbus1
>>
>> Thanks,
>> Milos
>>
>>
>>
>>
>>
>>
>>
>>
>
> --
> Danijel Schiavuzzi
>
> E: danijel@schiavuzzi.com
> W: www.schiavuzzi.com
> T: +385989035562
> Skype: danijels7
>

Re: Kafka trident getting stuck

Posted by Danijel Schiavuzzi <da...@schiavuzzi.com>.
Try lowering setMaxSpoutPending(100000) to a much lower value (like 10). In
Trident, setMaxSpoutPending referns to the number of batches, not tuples
like in plain Storm. Too high values may cause blockages like the one you
describe.

On Tuesday, July 8, 2014, Miloš Solujić <mi...@gmail.com> wrote:

> Hi all,
>
> I'm pretty new to storm and kafka/zookeeper, and I hope that my question
> is not to dumb. Here it goes:
>
> I'm using latest stable storm and storm-kafka = 0.9.2-incubating
>
> I've setup test cluster using wirbelsturm tool with unchanged yaml (just
> uncommented kafka machine)
>
> here is config snippet for my trident topology:
>
>         BrokerHosts zk = new ZkHosts("zookeeper1");
>         TridentKafkaConfig kafkaConf = new TridentKafkaConfig(zk,
> "scores");
>
>         kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
>         kafkaConf.fetchSizeBytes = 10000;
>         kafkaConf.forceFromStart = true;
>
>         Config stormConfig = new Config();
>         stormConfig.put(Config.NIMBUS_HOST, NIMBUS_IP);
>         stormConfig.put("couchbase.ip", COUCHBASE_CONSOLE);
>         stormConfig.put("couchbase.bucket", COUCHBASE_BUCKET);
>         stormConfig.put("couchbase.password", COUCHBASE_PASSWORD);
>         // performance settings
>
> stormConfig.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 100);
>         stormConfig.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, 100);
>         stormConfig.setMaxSpoutPending(100000);
>
>
>         if (args != null && args.length > 0) {
>
>             StormSubmitter.submitTopologyWithProgressBar(args[0],
> stormConfig,
>                     BuildTridentScoreTopology.build(kafkaConf));
>         } else {...}
>
> Now, I've created 'scores' topic in kafka and pushed few test messages
> prior to starting topology, with kafkaConf.forceFromStart = true. And
> topology processed those messages just fine, and stored them in
> tridentState (couhbase)
>
> All new messages are simply ignored!
>
> After redeploying topology (both with forceFromStart = true and
> forceFromStart = false) no more messages are ingested from kafka.
>
> here is worker log for one topology deployment and short run
> http://pastie.org/private/4xsk6pijvmulwrcg7zgca
>
> those are VMs that host this storm cluster
> 10.0.0.241 zookeeper1
> 10.0.0.101 supervisor1
> 10.0.0.21 kafka1
> 10.0.0.251 nimbus1
>
> Thanks,
> Milos
>
>
>
>
>
>
>
>

-- 
Danijel Schiavuzzi

E: danijel@schiavuzzi.com
W: www.schiavuzzi.com
T: +385989035562
Skype: danijels7