You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@bookkeeper.apache.org by "Sijie Guo (Commented) (JIRA)" <ji...@apache.org> on 2011/10/26 12:21:32 UTC

[jira] [Commented] (BOOKKEEPER-77) Add a console client for hedwig

    [ https://issues.apache.org/jira/browse/BOOKKEEPER-77?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13135850#comment-13135850 ] 

Sijie Guo commented on BOOKKEEPER-77:
-------------------------------------

implementation of hedwig console:

* pub
** use hedwigclient.getPublisher().publish(...) 

* sub
** use hedwigclient.getSubscriber().subscribe() 

* closesub
** use hedwigclient.getSubscriber().closeSubscription() 

* unsub
** use hedwigclient.getSubscriber().unsubscribe() 

* consume/consumeto
** use hedwigclient.getSubscriber().consume() 

* pubsub
*# sub <topic> as subscriber <subscriber_id_prefix>_<cur_time> .
*# subscriber <subscriber_id_prefix>_<cur_time> will wait a message until <timeout_secs> secs.
*# publish a message <message_prefix>_<cur_time> to topic <topic> .
*# when subscriber <subscriber_id_prefix> receive the message, it will check the message is the published message
*# received message or timeout, subscriber <subscriber_id_prefix> will unsubscribe the <topic>
*# quit 

* describe topic
*# check whether the topic existed or not : zk.exists("/hedwig/<region>/topics/<topic>").
*## if existed, go next
*## else return 
*# read the topic owner from zookeeper : /hedwig/<region>/topics/<topic>/owner
*# read the persistent information from zookeeper : /hedwig/<region>/topics/<topic>/ledgers
*## for each ledger, parse its data to get its start message id and end message id
*### if the ledger is not closed (which means the ledger is used for persist messages not), we use bookkeeper client to openLedgerNoRecovery to read the lastConfirmed message id 
*# read the subscription information from zookeeper : /hedwig/<region>/topics/<topic>/subscribers
*## for each subscriber, parse its data to get the subscription state 

* readtopic
*# check whether the topic existed or not : zk.exists("/hedwig/<region>/topics/<topic>").
*# read the persistent information from zookeeper
*# read the subscription information from zookeeper to get the <least_consumed_message_id>
*# find <start_message_id> by MAX( <least_consumed_message_id>, <start_message_id> )
*# loop over the ledgers used for persisting messsages
*## for each ledger, if the messages are all less than <start_message_id>, skip the ledger
*## otherwise, open ledger by calling openLedgerNoRecovery (just for reading) to read the messages and format them 

* show hubs
* fetch list of hosts from zookeeper /hedwig/<region>/available , for each host read its data and convert it to integer. 

* show topics
** fetch list of topics from zookeeper /hedwig/<region>/topics 
                
> Add a console client for hedwig
> -------------------------------
>
>                 Key: BOOKKEEPER-77
>                 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-77
>             Project: Bookkeeper
>          Issue Type: New Feature
>          Components: hedwig-client
>            Reporter: Sijie Guo
>         Attachments: bookkeeper-77-draft.patch
>
>
> implement a console client to use/admin hedwig system.
> Usage : hedwig_console [options] COMMAND [argument ...]
> (if no COMMAND specified, hedwig_console will enter interactive mode.)
> OPTIONS:
> {quote}
>    --zkquorums            the quorum list of zookeeper cluster
>    --zktimeout            timeout of zookeeper client
>    --zk_hedwig_prefix     the prefix of zookeeper path to store hedwig metadata
>    --region               which region of hedwig to connect
>    --consume_interval     the consume interval of hub server
> {quote}
> COMMANDS:
> * pub <topic> <message>
> ** Publish <message> to the specified <topic>. 
> * sub <topic> <subscriber_id> [mode]
> ** Subscribe the specified <topic> as subscriber <subscriber_id>. (NOTE: only run in INTERACTIVE mode now) 
> ** mode: subscription mode. available values are 0, 1, 2.
> *** 0 = CREATE : create the subscription if not subscription before.
> *** 1 = ATTACH (default) : attach the subscription
> *** 2 = CREATE_OR_ATTACH : if the subscription is not existed, create the subscription then attach. 
> * closesub <topic> <subscriber_id>
> ** Close subscription of subscriber <subscriber_id>. (NOTE: it just close the subscription connection and do cleanup work in client-side, without REMOVING subscription state from server side) 
> * unsub <topic> <subscriber_id>
> ** Remove subscription state of subscriber <subscriber_id>. the subscription state of subscriber <subscriber_id> will be removed from server side. 
> * consume <topic> <subscriber_id> <num_messages_to_consume>
> ** Move the subscription ptr of subscriber <subscriber_id> from ptr to ptr + num_messages_to_consume. 
> * consumeto <topic> <subscriber_id> <message_id>
> ** Move the subscription ptr of subscriber <subscriber_id> from ptr to <message_id>. 
> ** NOTE: consume*/*consumeto just sent consume request to hub server and hub server move the subscription ptr in its memory. Hub server lazily persists the subscription ptr to zookeeper. the default persist interval in hub server is 50 messages. so use DESCRIBE TOPIC to show subscription, the subscription ptr might be not changed. 
> * pubsub <topic> <subscriber_id_prefix> <timeout_secs> <message_prefix>
> ** A test command to test healthy of hedwig cluster. 
> *# sub <topic> as subscriber <subscriber_id_prefix>_<cur_time> .
> *# subscriber <subscriber_id_prefix>_<cur_time> will wait a message until <timeout_secs> secs.
> *# publish a message <message_prefix>_<cur_time> to topic <topic> .
> *# when subscriber <subscriber_id_prefix> receive the message, it will check the message is the published message
> *# received message or timeout, subscriber <subscriber_id_prefix> will unsubscribe the <topic>
> *# quit
> {quote}
>       [hedwig: (standalone) 7] pubsub ttttttttt test 10 test_message
>       Starting PUBSUB test ...
>       Sub topic ttttttttt, subscriber id test-1319602021044
>       Pub topic ttttttttt : test_message-1319602021044
>       Received message : test_message-1319602021044
>       PUBSUB SUCCESS. TIME: 43 MS
>       SUCCESS. Finished 0.058 s
> {quote}  
> * show hubs
> ** list all available hub servers. including hostname and how many topics the server owns. 
> {quote}
>       Example:
>       Available Hub Servers:
>               98.137.99.27:9875:9876 :        2
> {quote}
>          
> * show topics
> ** list all existing topics. (NOTE: since we fetch topic lists from zookeeper, we may got PacketLenException when we have millions of topics. it doesn't affect system, just can't display the topic list) 
> * describe topic <topic>
> ** show state of a specified topic, including topic owner, topic persistent information, topic subscriber list and their subscription states. 
> {quote}
>       Example:
>       ===== Topic Information : ttttt =====
>       Owner : 98.137.99.27:9875:9876
>       >>> Persistence Info <<<
>       Ledger 54729 [ 1 ~ 59 ]
>       Ledger 54731 [ 60 ~ 60 ]
>       >>> Subscription Info <<<
>       Subscriber mysub : consumeSeqId: local:50
> {quote}
>          
> * readtopic <topic> [start_msg_id]
> ** read messages of a specified <topic>. 
> *** no <start_msg_id> specified : readtopic will start from <least_consumed_message_id> + 1 of its subscribers. in above exmaple, "readtopic ttttt" will start from 50. if there is no subscription, it will start from 1.
> *** <start_msg_id> specified : since messages consumed will be removed by garbage collection. so readtopic tries to not read consumed message, it will start from MAX( <start_msg_id> , <least_consumed_message_id> ). 
> *** Message Format
> **** MsgId : include two parts: first part is which region the message is published from, second part is message id.
> **** SrcRegion : region name
> **** Message : the message body
> {quote}
>             ---------- MSGID=LOCAL(51) ----------
>             MsgId:     LOCAL(51)
>             SrcRegion: standalone
>             Message:
>             hello
> {quote}         
> * history
> ** list history commands 
> * redo [<cmdno>|!]
> ** redo the specified command by command no. (NOTE: "*redo *" means redo the previous command) 
> * help
> ** print help information 
> * quit|exit
> ** exit the interactive console 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira