You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Eugene <ed...@gmail.com> on 2014/08/07 16:41:19 UTC

location of OpaqueTridentKafkaSpout offset in Zookeeper

Hi,
I have a topology that stuck processing messages and in order to
troubleshoot I want to see or remove offset in zookeeper. The problem is I
can find any offset for my topic / client. I checked all zookepper nodes
and it's not there. Where OpaqueTridentKafkaSpout  should keep offset?

my topology:

BrokerHosts brokerHosts = new ZkHosts(".....");
TridentTopology topology = new TridentTopology();
TridentKafkaConfig spoutConfig = new TridentKafkaConfig(
        brokerHosts,
        "batch_processing_events",
        "batch_processing_event_client");
spoutConfig.scheme = new SchemeAsMultiScheme(new EventStringScheme());
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConfig);
Stream inputStream=topology.newStream("offlineEvents", spout)

In zookeeper I am looking for "batch_processing_event_client" but it's not
exist in consumer or transactional:

ls /
[transactional, clusterstate.json, consumers, storm, controller_epoch,
configs, admin, zookeeper, aliases.json, config, controller, live_nodes,
overseer, brokers, collections, overseer_elect]

[zk: localhost:2181(CONNECTED) 1] ls /consumers
[fullindexer, kafkatest, testgroup, deltaindexer, search-content-loader,
mytest]

[zk: localhost:2181(CONNECTED) 2] ls /transactional
[offlineEvents]

Thanks
Eugene.

re: location of OpaqueTridentKafkaSpout offset in Zookeeper

Posted by "shengyi.pan" <sh...@gmail.com>.
Hi,

I have installed Storm-0.9.2 and was also confused by this ques for a long time, my opinion is summarized as follows : 

Storm use transactional.zookeeper.*  info to store OpaqueTridentKafkaSpout offset in Zookeeper.
You can find the defualt configure at : https://github.com/apache/incubator-storm/blob/master/conf/defaults.yaml

such as : transactional.zookeeper.root: "/transactional"

The OpaqueTridentKafkaSpout zk path is /transactional/spoutName, 
and the offset meta info is stored in /transactional/spoutName/user/partition_*/number

The znode name ( number ) is increated continuously ( I don't know why offset meta is udpated while no data is processing in Storm ....)
if you get the znode value ,you will see:

{"topic":"flume-topic","partition":1,"instanceId":"9c0573f9-bf4f-46b4-93c6-4603dd4ecd34","nextOffset":70,"topology":{"name":"test-kafka-trident","id":"9c0573f9-bf4f-46b4-93c6-4603dd4ecd34"},"broker":{"host":"192.168.198.192","port":9092},"offset":70}

The znode value is the Map data in class : TridentKafkaEmitter.

I still have two ques :

1. why the znode name (/transactional/spoutName/user/partition_*/number) is increated continuously even there are no data processed in Storm ?

2. which Storm Class do the OpaqueTridentKafkaSpout 's offset  updating ?


2014-08-15



shengyi.pan



发件人:Eugene <ed...@gmail.com>
发送时间:2014-08-07 22:41
主题:location of OpaqueTridentKafkaSpout offset in Zookeeper
收件人:"user"<us...@storm.incubator.apache.org>
抄送:

Hi,
I have a topology that stuck processing messages and in order to troubleshoot I want to see or remove offset in zookeeper. The problem is I can find any offset for my topic / client. I checked all zookepper nodes and it's not there. Where OpaqueTridentKafkaSpout  should keep offset?


my topology:


BrokerHosts brokerHosts = new ZkHosts(".....");
TridentTopology topology = new TridentTopology();
TridentKafkaConfig spoutConfig = new TridentKafkaConfig(
        brokerHosts,
        "batch_processing_events",
        "batch_processing_event_client");
spoutConfig.scheme = new SchemeAsMultiScheme(new EventStringScheme());

OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConfig);

Stream inputStream=topology.newStream("offlineEvents", spout)



In zookeeper I am looking for "batch_processing_event_client" but it's not exist in consumer or transactional:


ls /
[transactional, clusterstate.json, consumers, storm, controller_epoch, configs, admin, zookeeper, aliases.json, config, controller, live_nodes, overseer, brokers, collections, overseer_elect]


[zk: localhost:2181(CONNECTED) 1] ls /consumers
[fullindexer, kafkatest, testgroup, deltaindexer, search-content-loader, mytest]


[zk: localhost:2181(CONNECTED) 2] ls /transactional
[offlineEvents]


Thanks
Eugene.