You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Eugene <ed...@gmail.com> on 2014/08/07 16:41:19 UTC
location of OpaqueTridentKafkaSpout offset in Zookeeper
Hi,
I have a topology that stuck processing messages and in order to
troubleshoot I want to see or remove offset in zookeeper. The problem is I
can find any offset for my topic / client. I checked all zookepper nodes
and it's not there. Where OpaqueTridentKafkaSpout should keep offset?
my topology:
BrokerHosts brokerHosts = new ZkHosts(".....");
TridentTopology topology = new TridentTopology();
TridentKafkaConfig spoutConfig = new TridentKafkaConfig(
brokerHosts,
"batch_processing_events",
"batch_processing_event_client");
spoutConfig.scheme = new SchemeAsMultiScheme(new EventStringScheme());
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConfig);
Stream inputStream=topology.newStream("offlineEvents", spout)
In zookeeper I am looking for "batch_processing_event_client" but it's not
exist in consumer or transactional:
ls /
[transactional, clusterstate.json, consumers, storm, controller_epoch,
configs, admin, zookeeper, aliases.json, config, controller, live_nodes,
overseer, brokers, collections, overseer_elect]
[zk: localhost:2181(CONNECTED) 1] ls /consumers
[fullindexer, kafkatest, testgroup, deltaindexer, search-content-loader,
mytest]
[zk: localhost:2181(CONNECTED) 2] ls /transactional
[offlineEvents]
Thanks
Eugene.
re: location of OpaqueTridentKafkaSpout offset in Zookeeper
Posted by "shengyi.pan" <sh...@gmail.com>.
Hi,
I have installed Storm-0.9.2 and was also confused by this ques for a long time, my opinion is summarized as follows :
Storm use transactional.zookeeper.* info to store OpaqueTridentKafkaSpout offset in Zookeeper.
You can find the defualt configure at : https://github.com/apache/incubator-storm/blob/master/conf/defaults.yaml
such as : transactional.zookeeper.root: "/transactional"
The OpaqueTridentKafkaSpout zk path is /transactional/spoutName,
and the offset meta info is stored in /transactional/spoutName/user/partition_*/number
The znode name ( number ) is increated continuously ( I don't know why offset meta is udpated while no data is processing in Storm ....)
if you get the znode value ,you will see:
{"topic":"flume-topic","partition":1,"instanceId":"9c0573f9-bf4f-46b4-93c6-4603dd4ecd34","nextOffset":70,"topology":{"name":"test-kafka-trident","id":"9c0573f9-bf4f-46b4-93c6-4603dd4ecd34"},"broker":{"host":"192.168.198.192","port":9092},"offset":70}
The znode value is the Map data in class : TridentKafkaEmitter.
I still have two ques :
1. why the znode name (/transactional/spoutName/user/partition_*/number) is increated continuously even there are no data processed in Storm ?
2. which Storm Class do the OpaqueTridentKafkaSpout 's offset updating ?
2014-08-15
shengyi.pan
发件人:Eugene <ed...@gmail.com>
发送时间:2014-08-07 22:41
主题:location of OpaqueTridentKafkaSpout offset in Zookeeper
收件人:"user"<us...@storm.incubator.apache.org>
抄送:
Hi,
I have a topology that stuck processing messages and in order to troubleshoot I want to see or remove offset in zookeeper. The problem is I can find any offset for my topic / client. I checked all zookepper nodes and it's not there. Where OpaqueTridentKafkaSpout should keep offset?
my topology:
BrokerHosts brokerHosts = new ZkHosts(".....");
TridentTopology topology = new TridentTopology();
TridentKafkaConfig spoutConfig = new TridentKafkaConfig(
brokerHosts,
"batch_processing_events",
"batch_processing_event_client");
spoutConfig.scheme = new SchemeAsMultiScheme(new EventStringScheme());
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConfig);
Stream inputStream=topology.newStream("offlineEvents", spout)
In zookeeper I am looking for "batch_processing_event_client" but it's not exist in consumer or transactional:
ls /
[transactional, clusterstate.json, consumers, storm, controller_epoch, configs, admin, zookeeper, aliases.json, config, controller, live_nodes, overseer, brokers, collections, overseer_elect]
[zk: localhost:2181(CONNECTED) 1] ls /consumers
[fullindexer, kafkatest, testgroup, deltaindexer, search-content-loader, mytest]
[zk: localhost:2181(CONNECTED) 2] ls /transactional
[offlineEvents]
Thanks
Eugene.