You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:28 UTC

[18/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigCommands.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigCommands.java b/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigCommands.java
deleted file mode 100644
index 407769b..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigCommands.java
+++ /dev/null
@@ -1,437 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hedwig.admin.console;
-
-import java.util.Map;
-import java.util.List;
-import java.util.LinkedList;
-import java.util.LinkedHashMap;
-
-/**
- * List all the available commands
- */
-public final class HedwigCommands {
-
-    static final String[] EMPTY_ARRAY = new String[0];
-
-    //
-    // List all commands used to play with hedwig
-    //
-
-    /* PUB : publish a message to hedwig */
-    static final String PUB = "pub";
-    static final String PUB_DESC = "Publish a message to a topic in Hedwig";
-    static final String[] PUB_USAGE = new String[] {
-        "usage: pub {topic} {message}",
-        "",
-        "  {topic}   : topic name.",
-        "              any printable string without spaces.",
-        "  {message} : message body.",
-        "              remaining arguments are used as message body to publish.",
-    };
-
-    /* SUB : subscriber a topic in hedwig for a specified subscriber */
-    static final String SUB = "sub";
-    static final String SUB_DESC = "Subscribe a topic for a specified subscriber";
-    static final String[] SUB_USAGE = new String[] {
-        "usage: sub {topic} {subscriber} [mode]",
-        "",
-        "  {topic}      : topic name.",
-        "                 any printable string without spaces.",
-        "  {subscriber} : subscriber id.",
-        "                 any printable string without spaces.",
-        "  [mode]       : mode to create subscription.",
-        "  [receive]    : bool. whether to start delivery to receive messages.",
-        "",
-        "  available modes: (default value is 1)",
-        "    0 = CREATE: create subscription.",
-        "                if the subscription is exsited, it will fail.",
-        "    1 = ATTACH: attach to exsited subscription.",
-        "                if the subscription is not existed, it will faile.",
-        "    2 = CREATE_OR_ATTACH:",
-        "                attach to subscription, if not existed create one."
-    };
-
-    /* CLOSESUB : close the subscription of a subscriber for a topic */
-    static final String CLOSESUB = "closesub";
-    static final String CLOSESUB_DESC = "Close subscription of a subscriber to a specified topic";
-    static final String[] CLOSESUB_USAGE = new String[] {
-        "usage: closesub {topic} {subscriber}",
-        "",
-        "  {topic}      : topic name.",
-        "                 any printable string without spaces.",
-        "  {subscriber} : subscriber id.",
-        "                 any printable string without spaces.",
-        "",
-        " NOTE: this command just cleanup subscription states on client side.",
-        "       You can try UNSUB to clean subscription states on server side.",
-    };
-
-    /* UNSUB: unsubscribe of a subscriber to a topic */
-    static final String UNSUB = "unsub";
-    static final String UNSUB_DESC = "Unsubscribe a topic for a subscriber";
-    static final String[] UNSUB_USAGE = new String[] {
-        "usage: unsub {topic} {subscriber}",
-        "",
-        "  {topic}      : topic name.",
-        "                 any printable string without spaces.",
-        "  {subscriber} : subscriber id.",
-        "                 any printable string without spaces.",
-        "",
-        " NOTE: this command will cleanup subscription states on server side.",
-        "       You can try CLOSESUB to just clean subscription states on client side.",
-    };
-
-    static final String RMSUB = "rmsub";
-    static final String RMSUB_DESC = "Remove subscriptions for topics";
-    static final String[] RMSUB_USAGE = new String[] {
-        "usage: rmsub {topic_prefix} {start_topic} {end_topic} {subscriber_prefix} {start_sub} {end_sub}",
-        "",
-        "  {topic_prefix}       : topic prefix.",
-        "  {start_topic}        : start topic id.",
-        "  {end_topic}          : end topic id.",
-        "  {subscriber_prefix}  : subscriber prefix.",
-        "  {start_sub}          : start subscriber id.",
-        "  {end_sub}            : end subscriber id.",
-    };
-
-    /* CONSUME: move consume ptr of a subscription with specified steps */
-    static final String CONSUME = "consume";
-    static final String CONSUME_DESC = "Move consume ptr of a subscription with sepcified steps";
-    static final String[] CONSUME_USAGE = new String[] {
-        "usage: consume {topic} {subscriber} {nmsgs}",
-        "",
-        "  {topic}      : topic name.",
-        "                 any printable string without spaces.",
-        "  {subscriber} : subscriber id.",
-        "                 any printable string without spaces.",
-        "  {nmsgs}      : how many messages to move consume ptr.",
-        "",
-        "  Example:",
-        "  suppose, from zk we know subscriber B consumed topic T to message 10",
-        "  [hedwig: (standalone) 1] consume T B 2",
-        "  after executed above command, a consume(10+2) request will be sent to hedwig.",
-        "",
-        "  NOTE:",
-        "  since Hedwig updates subscription consume ptr lazily, so you need to know that",
-        "    1) the consumption ptr read from zookeeper may be stable; ",
-        "    2) after sent the consume request, hedwig may just move ptr in its memory and lazily update it to zookeeper. you may not see the ptr changed when DESCRIBE the topic.",
-    };
-
-    /* CONSUMETO: move consume ptr of a subscription to a specified pos */
-    static final String CONSUMETO = "consumeto";
-    static final String CONSUMETO_DESC = "Move consume ptr of a subscription to a specified message id";
-    static final String[] CONSUMETO_USAGE = new String[] {
-        "usage: consumeto {topic} {subscriber} {msg_id}",
-        "",
-        "  {topic}      : topic name.",
-        "                 any printable string without spaces.",
-        "  {subscriber} : subscriber id.",
-        "                 any printable string without spaces.",
-        "  {msg_id}     : message id that consume ptr will be moved to.",
-        "                 if the message id is less than current consume ptr,",
-        "                 hedwig will do nothing.",
-        "",
-        "  Example:",
-        "  suppose, from zk we know subscriber B consumed topic T to message 10",
-        "  [hedwig: (standalone) 1] consumeto T B 12",
-        "  after executed above command, a consume(12) request will be sent to hedwig.",
-        "",
-        "  NOTE:",
-        "  since Hedwig updates subscription consume ptr lazily, so you need to know that",
-        "    1) the consumption ptr read from zookeeper may be stable; ",
-        "    2) after sent the consume request, hedwig may just move ptr in its memory and lazily update it to zookeeper. you may not see the ptr changed when DESCRIBE the topic.",
-    };
-
-    /* PUBSUB: a healthy checking command to ensure cluster is running */
-    static final String PUBSUB = "pubsub";
-    static final String PUBSUB_DESC = "A healthy checking command to ensure hedwig is in running state";
-    static final String[] PUBSUB_USAGE = new String[] {
-        "usage: pubsub {topic} {subscriber} {timeout_secs} {message}",
-        "",
-        "  {topic}        : topic name.",
-        "                   any printable string without spaces.",
-        "  {subscriber}   : subscriber id.",
-        "                   any printable string without spaces.",
-        "  {timeout_secs} : how long will the subscriber wait for published message.",
-        "  {message}      : message body.",
-        "                   remaining arguments are used as message body to publish.",
-        "",
-        "  Example:",
-        "  [hedwig: (standalone) 1] pubsub TOPIC SUBID 10 TEST_MESSAGS",
-        "",
-        "  1) hw will subscribe topic TOPIC as subscriber SUBID;",
-        "  2) subscriber SUBID will wait a message until 10 seconds;",
-        "  3) hw publishes TEST_MESSAGES to topic TOPIC;",
-        "  4) if subscriber recevied message in 10 secs, it checked that whether the message is published message.",
-        "     if true, it will return SUCCESS, otherwise return FAILED.",
-    };
-
-    //
-    // List all commands used to admin hedwig
-    //
-
-    /* SHOW: list all available hub servers or topics */
-    static final String SHOW = "show";
-    static final String SHOW_DESC = "list all available hub servers or topics";
-    static final String[] SHOW_USAGE = new String[] {
-        "usage: show [topics | hubs]",
-        "",
-        "  show topics :",
-        "    listing all available topics in hedwig.",
-        "",
-        "  show hubs :",
-        "    listing all available hubs in hedwig.",
-        "",
-        "  NOTES:",
-        "  'show topics' will not works when there are millions of topics in hedwig, since we have packetLen limitation fetching data from zookeeper.",
-    };
-
-    static final String SHOW_TOPICS = "topics";
-    static final String SHOW_HUBS   = "hubs";
-
-    /* DESCRIBE: show the metadata of a topic */
-    static final String DESCRIBE = "describe";
-    static final String DESCRIBE_DESC = "show metadata of a topic, including topic owner, persistence info, subscriptions info";
-    static final String[] DESCRIBE_USAGE = new String[] {
-        "usage: describe topic {topic}",
-        "",
-        "  {topic} : topic name.",
-        "            any printable string without spaces.",
-        "",
-        "  Example: describe topic ttttt",
-        "",
-        "  Output:",
-        "  ===== Topic Information : ttttt =====",
-        "",
-        "  Owner : 98.137.99.27:9875:9876",
-        "",
-        "  >>> Persistence Info <<<",
-        "  Ledger 54729 [ 1 ~ 59 ]",
-        "  Ledger 54731 [ 60 ~ 60 ]",
-        "  Ledger 54733 [ 61 ~ 61 ]",
-        "",
-        "  >>> Subscription Info <<<",
-        "  Subscriber mysub : consumeSeqId: local:50",
-    };
-
-    static final String DESCRIBE_TOPIC = "topic";
-
-    /* READTOPIC: read messages of a specified topic */
-    static final String READTOPIC = "readtopic";
-    static final String READTOPIC_DESC = "read messages of a specified topic";
-    static final String[] READTOPIC_USAGE = new String[] {
-        "usage: readtopic {topic} [start_msg_id]",
-        "",
-        "  {topic}        : topic name.",
-        "                   any printable string without spaces.",
-        "  [start_msg_id] : message id that start to read from.",
-        "",
-        "  no start_msg_id provided:",
-        "    it will start from least_consumed_message_id + 1.",
-        "    least_consume_message_id is computed from all its subscribers.",
-        "",
-        "  start_msg_id provided:",
-        "    it will start from MAX(start_msg_id, least_consumed_message_id).",
-        "",
-        "  MESSAGE FORMAT:",
-        "",
-        "  ---------- MSGID=LOCAL(51) ----------",
-        "  MsgId:     LOCAL(51)",
-        "  SrcRegion: standalone",
-        "  Message:",
-        "",
-        "  hello",
-    };
-
-    /* FORMAT: format metadata for Hedwig */
-    static final String FORMAT = "format";
-    static final String FORMAT_DESC = "format metadata for Hedwig";
-    static final String[] FORMAT_USAGE = new String[] {
-        "usage: format [-force]",
-        "",
-        "  [-force] : Format metadata for Hedwig w/o confirmation.",
-    };
-
-
-    //
-    // List other useful commands
-    //
-
-    /* SET: set whether printing zk watches or not */
-    static final String SET = "set";
-    static final String SET_DESC = "set whether printing zk watches or not";
-    static final String[] SET_USAGE = EMPTY_ARRAY;
-
-    /* HISTORY: list history commands */
-    static final String HISTORY = "history";
-    static final String HISTORY_DESC = "list history commands";
-    static final String[] HISTORY_USAGE = EMPTY_ARRAY;
-
-    /* REDO: redo previous command */
-    static final String REDO = "redo";
-    static final String REDO_DESC = "redo history command";
-    static final String[] REDO_USAGE = new String[] {
-        "usage: redo [{cmdno} | !]",
-        "",
-        "  {cmdno} : history command no.",
-        "  !       : last command.",
-    };
-
-    /* HELP: print usage information of a specified command */
-    static final String HELP = "help";
-    static final String HELP_DESC = "print usage information of a specified command";
-    static final String[] HELP_USAGE = new String[] {
-        "usage: help {command}",
-        "",
-        "  {command} : command name",
-    };
-
-    static final String QUIT = "quit";
-    static final String QUIT_DESC = "exit console";
-    static final String[] QUIT_USAGE = EMPTY_ARRAY;
-
-    static final String EXIT = "exit";
-    static final String EXIT_DESC = QUIT_DESC;
-    static final String[] EXIT_USAGE = EMPTY_ARRAY;
-
-    public static enum COMMAND {
-
-        CMD_PUB (PUB, PUB_DESC, PUB_USAGE),
-        CMD_SUB (SUB, SUB_DESC, SUB_USAGE),
-        CMD_CLOSESUB (CLOSESUB, CLOSESUB_DESC, CLOSESUB_USAGE),
-        CMD_UNSUB (UNSUB, UNSUB_DESC, UNSUB_USAGE),
-        CMD_RMSUB (RMSUB, RMSUB_DESC, RMSUB_USAGE),
-        CMD_CONSUME (CONSUME, CONSUME_DESC, CONSUME_USAGE),
-        CMD_CONSUMETO (CONSUMETO, CONSUMETO_DESC, CONSUMETO_USAGE),
-        CMD_PUBSUB (PUBSUB, PUBSUB_DESC, PUBSUB_USAGE),
-        CMD_SHOW (SHOW, SHOW_DESC, SHOW_USAGE),
-        CMD_DESCRIBE (DESCRIBE, DESCRIBE_DESC, DESCRIBE_USAGE),
-        CMD_READTOPIC (READTOPIC, READTOPIC_DESC, READTOPIC_USAGE),
-        CMD_FORMAT (FORMAT, FORMAT_DESC, FORMAT_USAGE),
-        CMD_SET (SET, SET_DESC, SET_USAGE),
-        CMD_HISTORY (HISTORY, HISTORY_DESC, HISTORY_USAGE),
-        CMD_REDO (REDO, REDO_DESC, REDO_USAGE),
-        CMD_HELP (HELP, HELP_DESC, HELP_USAGE),
-        CMD_QUIT (QUIT, QUIT_DESC, QUIT_USAGE),
-        CMD_EXIT (EXIT, EXIT_DESC, EXIT_USAGE),
-        // sub commands
-        CMD_SHOW_TOPICS (SHOW_TOPICS, "", EMPTY_ARRAY),
-        CMD_SHOW_HUBS (SHOW_HUBS, "", EMPTY_ARRAY),
-        CMD_DESCRIBE_TOPIC (DESCRIBE_TOPIC, "", EMPTY_ARRAY);
-
-        COMMAND(String name, String desc, String[] usage) {
-            this.name = name;
-            this.desc = desc;
-            this.usage = usage;
-            this.subCmds = new LinkedHashMap<String, COMMAND>();
-        }
-
-        public String getName() { return name; }
-
-        public String getDescription() { return desc; }
-
-        public Map<String, COMMAND> getSubCommands() { return subCmds; }
-
-        public void addSubCommand(COMMAND c) {
-            this.subCmds.put(c.name, c);
-        };
-
-        public void printUsage() {
-            System.err.println(name + ": " + desc);
-            for(String line : usage) {
-                System.err.println(line);
-            }
-            System.err.println();
-        }
-
-        protected String name;
-        protected String desc;
-        protected String[] usage;
-        protected Map<String, COMMAND> subCmds;
-    }
-
-    static Map<String, COMMAND> commands = null;
-
-    private static void addCommand(COMMAND c) {
-        commands.put(c.getName(), c);
-    }
-
-    static synchronized void init() {
-        if (commands != null) {
-            return;
-        }
-        commands = new LinkedHashMap<String, COMMAND>();
-
-        addCommand(COMMAND.CMD_PUB);
-        addCommand(COMMAND.CMD_SUB);
-        addCommand(COMMAND.CMD_CLOSESUB);
-        addCommand(COMMAND.CMD_UNSUB);
-        addCommand(COMMAND.CMD_RMSUB);
-        addCommand(COMMAND.CMD_CONSUME);
-        addCommand(COMMAND.CMD_CONSUMETO);
-        addCommand(COMMAND.CMD_PUBSUB);
-
-        // show
-        COMMAND.CMD_SHOW.addSubCommand(COMMAND.CMD_SHOW_TOPICS);
-        COMMAND.CMD_SHOW.addSubCommand(COMMAND.CMD_SHOW_HUBS);
-        addCommand(COMMAND.CMD_SHOW);
-
-        // describe
-        COMMAND.CMD_DESCRIBE.addSubCommand(COMMAND.CMD_DESCRIBE_TOPIC);
-        addCommand(COMMAND.CMD_DESCRIBE);
-
-        addCommand(COMMAND.CMD_READTOPIC);
-        addCommand(COMMAND.CMD_FORMAT);
-        addCommand(COMMAND.CMD_SET);
-        addCommand(COMMAND.CMD_HISTORY);
-        addCommand(COMMAND.CMD_REDO);
-        addCommand(COMMAND.CMD_HELP);
-        addCommand(COMMAND.CMD_QUIT);
-        addCommand(COMMAND.CMD_EXIT);
-    }
-
-    public static Map<String, COMMAND> getHedwigCommands() {
-        return commands;
-    }
-
-    /**
-     * Find candidate commands by the specified token list
-     *
-     * @param token token list
-     *
-     * @return list of candidate commands
-     */
-    public static List<String> findCandidateCommands(String[] tokens) {
-        List<String> cmds = new LinkedList<String>();
-
-        Map<String, COMMAND> cmdMap = commands;
-        for (int i=0; i<(tokens.length - 1); i++) {
-            COMMAND c = cmdMap.get(tokens[i]);
-            // no commands
-            if (c == null || c.getSubCommands().size() <= 0) {
-                return cmds;
-            } else {
-                cmdMap = c.getSubCommands();
-            }
-        }
-        cmds.addAll(cmdMap.keySet());
-        return cmds;
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java b/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java
deleted file mode 100644
index 3a5ef5a..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java
+++ /dev/null
@@ -1,1038 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hedwig.admin.console;
-
-import jline.ConsoleReader;
-import jline.History;
-import jline.Terminal;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.bookkeeper.util.MathUtils;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.hedwig.admin.HedwigAdmin;
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.api.Publisher;
-import org.apache.hedwig.client.api.Subscriber;
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.HedwigClient;
-import org.apache.hedwig.protocol.PubSubProtocol.LedgerRange;
-import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
-import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.topics.HubInfo;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.HedwigSocketAddress;
-import org.apache.hedwig.util.SubscriptionListener;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.ByteString;
-import static com.google.common.base.Charsets.UTF_8;
-
-import static org.apache.hedwig.admin.console.HedwigCommands.*;
-import static org.apache.hedwig.admin.console.HedwigCommands.COMMAND.*;
-
-/**
- * Console Client to Hedwig
- */
-public class HedwigConsole {
-    private static final Logger LOG = LoggerFactory.getLogger(HedwigConsole.class);
-    // NOTE: now it is fixed passwd in bookkeeper
-    static byte[] passwd = "sillysecret".getBytes(UTF_8);
-
-    // history file name
-    static final String HW_HISTORY_FILE = ".hw_history";
-
-    static final char[] CONTINUE_OR_QUIT = new char[] { 'Q', 'q', '\n' };
-
-    protected MyCommandOptions cl = new MyCommandOptions();
-    protected HashMap<Integer, String> history = new LinkedHashMap<Integer, String>();
-    protected int commandCount = 0;
-    protected boolean printWatches = true;
-    protected Map<String, MyCommand> myCommands;
-
-    protected boolean inConsole = true;
-    protected ConsoleReader console = null;
-
-    protected HedwigAdmin admin;
-    protected HedwigClient hubClient;
-    protected Publisher publisher;
-    protected Subscriber subscriber;
-    protected ConsoleMessageHandler consoleHandler =
-            new ConsoleMessageHandler();
-    protected Terminal terminal;
-
-    protected String myRegion;
-
-    interface MyCommand {
-        boolean runCmd(String[] args) throws Exception;
-    }
-
-    static class HelpCmd implements MyCommand {
-
-        @Override
-        public boolean runCmd(String[] args) throws Exception {
-            boolean printUsage = true;
-            if (args.length >= 2) {
-                String command = args[1];
-                COMMAND c = getHedwigCommands().get(command);
-                if (c != null) {
-                    c.printUsage();
-                    printUsage = false;
-                }
-            }
-            if (printUsage) {
-                usage();
-            }
-            return true;
-        }
-    }
-
-    class ExitCmd implements MyCommand {
-
-        @Override
-        public boolean runCmd(String[] args) throws Exception {
-            printMessage("Quitting ...");
-            hubClient.close();
-            admin.close();
-            Runtime.getRuntime().exit(0);
-            return true;
-        }
-    }
-
-    class RedoCmd implements MyCommand {
-
-        @Override
-        public boolean runCmd(String[] args) throws Exception {
-            if (args.length < 2) {
-                return false;
-            }
-
-            int index;
-            if ("!".equals(args[1])) {
-                index = commandCount - 1;
-            } else {
-                index = Integer.decode(args[1]);
-                if (commandCount <= index) {
-                    System.err.println("Command index out of range");
-                    return false;
-                }
-            }
-            cl.parseCommand(history.get(index));
-            if (cl.getCommand().equals("redo")) {
-                System.err.println("No redoing redos");
-                return false;
-            }
-            history.put(commandCount, history.get(index));
-            processCmd(cl);
-            return true;
-        }
-        
-    }
-
-    class HistoryCmd implements MyCommand {
-
-        @Override
-        public boolean runCmd(String[] args) throws Exception {
-            for (int i=commandCount - 10; i<=commandCount; ++i) {
-                if (i < 0) {
-                    continue;
-                }
-                System.out.println(i + " - " + history.get(i));
-            }
-            return true;
-        }
-        
-    }
-
-    class SetCmd implements MyCommand {
-
-        @Override
-        public boolean runCmd(String[] args) throws Exception {
-            if (args.length < 3 || !"printwatches".equals(args[1])) {
-                return false;
-            } else if (args.length == 2) {
-                System.out.println("printwatches is " + (printWatches ? "on" : "off"));
-            } else {
-                printWatches = args[2].equals("on");
-            }
-            return true;
-        }
-        
-    }
-
-    class PubCmd implements MyCommand {
-
-        @Override
-        public boolean runCmd(String[] args) throws Exception {
-            if (args.length < 3) {
-                return false;
-            }
-            ByteString topic = ByteString.copyFromUtf8(args[1]);
-
-            StringBuilder sb = new StringBuilder();
-            for (int i=2; i<args.length; i++) {
-                sb.append(args[i]);
-                if (i != args.length - 1) {
-                    sb.append(' ');
-                }
-            }
-            ByteString msgBody = ByteString.copyFromUtf8(sb.toString());
-            Message msg = Message.newBuilder().setBody(msgBody).build();
-            try {
-                publisher.publish(topic, msg);
-                System.out.println("PUB DONE");
-            } catch (Exception e) {
-                System.err.println("PUB FAILED");
-                e.printStackTrace();
-            }
-            return true;
-        }
-        
-    }
-
-    static class ConsoleMessageHandler implements MessageHandler {
-
-        @Override
-        public void deliver(ByteString topic, ByteString subscriberId,
-                Message msg, Callback<Void> callback, Object context) {
-            System.out.println("Received message from topic " + topic.toStringUtf8() + 
-                    " for subscriber " + subscriberId.toStringUtf8() + " : "
-                    + msg.getBody().toStringUtf8());
-            callback.operationFinished(context, null);
-        }
-        
-    }
-
-    static class ConsoleSubscriptionListener implements SubscriptionListener {
-
-        @Override
-        public void processEvent(ByteString t, ByteString s, SubscriptionEvent event) {
-            System.out.println("Subscription Channel for (topic:" + t.toStringUtf8() + ", subscriber:"
-                                + s.toStringUtf8() + ") received event : " + event);
-        }
-    }
-
-    class SubCmd implements MyCommand {
-
-        @Override
-        public boolean runCmd(String[] args) throws Exception {
-            CreateOrAttach mode;
-            boolean receive = true;
-            if (args.length < 3) {
-                return false;
-            } else if (args.length == 3) {
-                mode = CreateOrAttach.ATTACH;
-                receive = true;
-            } else {
-                try {
-                    mode = CreateOrAttach.valueOf(Integer.parseInt(args[3]));
-                } catch (Exception e) {
-                    System.err.println("Unknow mode : " + args[3]);
-                    return false;
-                }
-                if (args.length >= 5) {
-                    try {
-                        receive = Boolean.parseBoolean(args[4]);
-                    } catch (Exception e) {
-                        receive = false;
-                    }
-                }
-            }
-            if (mode == null) {
-                System.err.println("Unknow mode : " + args[3]);
-                return false;
-            }
-            ByteString topic = ByteString.copyFromUtf8(args[1]);
-            ByteString subId = ByteString.copyFromUtf8(args[2]);
-            try {
-                SubscriptionOptions options =
-                    SubscriptionOptions.newBuilder().setCreateOrAttach(mode)
-                                       .setForceAttach(false).build();
-                subscriber.subscribe(topic, subId, options);
-                if (receive) {
-                    subscriber.startDelivery(topic, subId, consoleHandler);
-                    System.out.println("SUB DONE AND RECEIVE");
-                } else {
-                    System.out.println("SUB DONE BUT NOT RECEIVE");
-                }
-            } catch (Exception e) {
-                System.err.println("SUB FAILED");
-                e.printStackTrace();
-            }
-            return true;
-        }
-    }
-
-    class UnsubCmd implements MyCommand {
-
-        @Override
-        public boolean runCmd(String[] args) throws Exception {
-            if (args.length < 3) {
-                return false;
-            }
-            ByteString topic = ByteString.copyFromUtf8(args[1]);
-            ByteString subId = ByteString.copyFromUtf8(args[2]);
-            try {
-                subscriber.stopDelivery(topic, subId);
-                subscriber.unsubscribe(topic, subId);
-                System.out.println("UNSUB DONE");
-            } catch (Exception e) {
-                System.err.println("UNSUB FAILED");
-                e.printStackTrace();
-            }
-            return true;
-        }
-        
-    }
-
-    class RmsubCmd implements MyCommand {
-
-        @Override
-        public boolean runCmd(String[] args) throws Exception {
-            if (args.length < 7) {
-                return false;
-            }
-            String topicPrefix = args[1];
-            int startTopic = Integer.parseInt(args[2]);
-            int endTopic = Integer.parseInt(args[3]);
-            String subPrefix = args[4];
-            int startSub = Integer.parseInt(args[5]);
-            int endSub = Integer.parseInt(args[6]);
-            if (startTopic > endTopic || endSub < startSub) {
-                return false;
-            }
-            for (int i=startTopic; i<=endTopic; i++) {
-                ByteString topic = ByteString.copyFromUtf8(topicPrefix + i);
-                try {
-                    for (int j=startSub; j<=endSub; j++) {
-                        ByteString sub = ByteString.copyFromUtf8(subPrefix + j);
-                        SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-                            .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-                        subscriber.subscribe(topic, sub, opts);
-                        subscriber.unsubscribe(topic, sub);
-                    }
-                    System.out.println("RMSUB " + topic.toStringUtf8() + " DONE");
-                } catch (Exception e) {
-                    System.err.println("RMSUB " + topic.toStringUtf8() + " FAILED");
-                    e.printStackTrace();
-                }
-            }
-            return true;
-        }
-
-    }
-    
-    class CloseSubscriptionCmd implements MyCommand {
-
-        @Override
-        public boolean runCmd(String[] args) throws Exception {
-            if (args.length < 3) {
-                return false;
-            }
-            ByteString topic = ByteString.copyFromUtf8(args[1]);
-            ByteString sudId = ByteString.copyFromUtf8(args[2]);
-            
-            try {
-                subscriber.stopDelivery(topic, sudId);
-                subscriber.closeSubscription(topic, sudId);
-            } catch (Exception e) {
-                System.err.println("CLOSESUB FAILED");
-            }
-            return true;
-        }
-        
-    }
-    
-    class ConsumeToCmd implements MyCommand {
-
-        @Override
-        public boolean runCmd(String[] args) throws Exception {
-            if (args.length < 4) {
-                return false;
-            }
-            ByteString topic = ByteString.copyFromUtf8(args[1]);
-            ByteString subId = ByteString.copyFromUtf8(args[2]);
-            long msgId = Long.parseLong(args[3]);
-            MessageSeqId consumeId = MessageSeqId.newBuilder().setLocalComponent(msgId).build();
-            try {
-                subscriber.consume(topic, subId, consumeId);
-            } catch (Exception e) {
-                System.err.println("CONSUMETO FAILED");
-            }
-            return true;
-        }
-        
-    }
-    
-    class ConsumeCmd implements MyCommand {
-
-        @Override
-        public boolean runCmd(String[] args) throws Exception {
-            if (args.length < 4) {
-                return false;
-            }
-            long lastConsumedId = 0;
-            SubscriptionData subData = admin.getSubscription(ByteString.copyFromUtf8(args[1]),
-                                                             ByteString.copyFromUtf8(args[2]));
-            if (null == subData) {
-                System.err.println("Failed to read subscription for topic: " + args[1]
-                                 + " subscriber: " + args[2]);
-                return true;
-            }
-            lastConsumedId = subData.getState().getMsgId().getLocalComponent();
-            long numMessagesToConsume = Long.parseLong(args[3]);
-            long idToConsumed = lastConsumedId + numMessagesToConsume;
-            System.out.println("Try to move subscriber(" + args[2] + ") consume ptr of topic(" + args[1]
-                             + ") from " + lastConsumedId + " to " + idToConsumed);
-            MessageSeqId consumeId = MessageSeqId.newBuilder().setLocalComponent(idToConsumed).build();
-            ByteString topic = ByteString.copyFromUtf8(args[1]);
-            ByteString subId = ByteString.copyFromUtf8(args[2]);
-            try {
-                subscriber.consume(topic, subId, consumeId);
-            } catch (Exception e) {
-                System.err.println("CONSUME FAILED");
-            }
-            return true;
-        }
-        
-    }
-
-    class PubSubCmd implements MyCommand {
-
-        @Override
-        public boolean runCmd(String[] args) throws Exception {
-            if (args.length < 5) {
-                return false;
-            }
-            final long startTime = MathUtils.now();
-
-            final ByteString topic = ByteString.copyFromUtf8(args[1]);
-            final ByteString subId = ByteString.copyFromUtf8(args[2] + "-" + startTime);
-            int timeoutSecs = 60;
-            try {
-                timeoutSecs = Integer.parseInt(args[3]);
-            } catch (NumberFormatException nfe) {
-            }
-
-            StringBuilder sb = new StringBuilder();
-            for (int i=4; i<args.length; i++) {
-                sb.append(args[i]);
-                if (i != args.length - 1) {
-                    sb.append(' ');
-                }
-            }
-            // append a timestamp tag
-            ByteString msgBody = ByteString.copyFromUtf8(sb.toString() + "-" + startTime);
-            final Message msg = Message.newBuilder().setBody(msgBody).build();
-
-            boolean subscribed = false;
-            boolean success = false;
-            final CountDownLatch isDone = new CountDownLatch(1);
-            long elapsedTime = 0L;
-
-            System.out.println("Starting PUBSUB test ...");
-            try {
-                // sub the topic
-                SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-                    .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-                subscriber.subscribe(topic, subId, opts);
-                subscribed = true;
-
-                System.out.println("Sub topic " + topic.toStringUtf8() + ", subscriber id " + subId.toStringUtf8());
-
-                
-
-                // pub topic
-                publisher.publish(topic, msg);
-                System.out.println("Pub topic " + topic.toStringUtf8() + " : " + msg.getBody().toStringUtf8());
-
-                // ensure subscriber first, publish next, then we start delivery to receive message
-                // if start delivery first before publish, isDone may notify before wait
-                subscriber.startDelivery(topic, subId, new MessageHandler() {
-
-                    @Override
-                    public void deliver(ByteString thisTopic, ByteString subscriberId,
-                            Message message, Callback<Void> callback, Object context) {
-                        if (thisTopic.equals(topic) && subscriberId.equals(subId) &&
-                            msg.getBody().equals(message.getBody())) {
-                            System.out.println("Received message : " + message.getBody().toStringUtf8());
-                            isDone.countDown();
-                        }
-                        callback.operationFinished(context, null);
-                    }
-
-                });
-
-                // wait for the message
-                success = isDone.await(timeoutSecs, TimeUnit.SECONDS);
-                elapsedTime = MathUtils.now() - startTime;
-            } finally {
-                try {
-                    if (subscribed) {
-                        subscriber.stopDelivery(topic, subId);
-                        subscriber.unsubscribe(topic, subId);
-                    }
-                } finally {
-                    if (success) {
-                        System.out.println("PUBSUB SUCCESS. TIME: " + elapsedTime + " MS");
-                    } else {
-                        System.out.println("PUBSUB FAILED. ");
-                    }
-                    return success;
-                }
-            }
-        }
-
-    }
-    
-    class ReadTopicCmd implements MyCommand {
-
-        @Override
-        public boolean runCmd(String[] args) throws Exception {
-            if (args.length < 2) {
-                return false;
-            }
-            ReadTopic rt;
-            ByteString topic = ByteString.copyFromUtf8(args[1]);
-            if (args.length == 2) {
-                rt = new ReadTopic(admin, topic, inConsole);
-            } else {
-                rt = new ReadTopic(admin, topic, Long.parseLong(args[2]), inConsole);
-            }
-            rt.readTopic();
-            return true;
-        }
-        
-    }
-
-    class ShowCmd implements MyCommand {
-
-        static final int MAX_TOPICS_PER_SHOW = 100;
-
-        @Override
-        public boolean runCmd(String[] args) throws Exception {
-            if (args.length < 2) {
-                return false;
-            }
-            String errorMsg = null;
-            try {
-                if (HedwigCommands.SHOW_HUBS.equals(args[1])) {
-                    errorMsg = "Unable to fetch the list of hub servers";
-                    showHubs();
-                } else if (HedwigCommands.SHOW_TOPICS.equals(args[1])) {
-                    errorMsg = "Unable to fetch the list of topics";
-                    showTopics();
-                } else {
-                    System.err.println("ERROR: Unknown show command '" + args[1] + "'");
-                    return false;
-                }
-            } catch (Exception e) {
-                if (null != errorMsg) {
-                    System.err.println(errorMsg);
-                }
-                e.printStackTrace();
-            }
-            return true;
-        }
-
-        protected void showHubs() throws Exception {
-            Map<HedwigSocketAddress, HedwigAdmin.HubStats> hubs = admin.getAvailableHubs();
-            System.out.println("Available Hub Servers:");
-            for (Map.Entry<HedwigSocketAddress, HedwigAdmin.HubStats> entry : hubs.entrySet()) {
-                System.out.println("\t" + entry.getKey() + " :\t" + entry.getValue());
-            }
-        }
-
-        protected void showTopics() throws Exception {
-            List<String> topics = new ArrayList<String>();
-            Iterator<ByteString> iter = admin.getTopics();
-
-            System.out.println("Topic List:");
-            boolean stop = false;
-            while (iter.hasNext()) {
-                if (topics.size() >= MAX_TOPICS_PER_SHOW) {
-                    System.out.println(topics);
-                    topics.clear();
-                    stop = !continueOrQuit();
-                    if (stop) {
-                        break;
-                    }
-                }
-                ByteString t = iter.next();
-                topics.add(t.toStringUtf8());
-            }
-            if (!stop) {
-                System.out.println(topics);
-            }
-        }
-
-        
-        
-    }
-
-    class DescribeCmd implements MyCommand {
-
-        @Override
-        public boolean runCmd(String[] args) throws Exception {
-            if (args.length < 3) {
-                return false;
-            }
-            if (HedwigCommands.DESCRIBE_TOPIC.equals(args[1])) {
-                return describeTopic(args[2]);
-            } else {
-                return false;
-            }
-        }
-
-        protected boolean describeTopic(String topic) throws Exception {
-            ByteString btopic = ByteString.copyFromUtf8(topic);
-            HubInfo owner = admin.getTopicOwner(btopic);
-            List<LedgerRange> ranges = admin.getTopicLedgers(btopic);
-            Map<ByteString, SubscriptionData> states = admin.getTopicSubscriptions(btopic);
-
-            System.out.println("===== Topic Information : " + topic + " =====");
-            System.out.println();
-            System.out.println("Owner : " + (owner == null ? "NULL" :
-                               owner.toString().trim().replaceAll("\n", ", ")));
-            System.out.println();
-
-            // print ledgers
-            printTopicLedgers(ranges);
-            // print subscriptions
-            printTopicSubscriptions(states);
-
-            return true;
-        }
-
-        private void printTopicLedgers(List<LedgerRange> ranges) {
-            System.out.println(">>> Persistence Info <<<");
-            if (null == ranges) {
-                System.out.println("N/A");
-                return;
-            }
-            if (ranges.isEmpty()) {
-                System.out.println("No Ledger used.");
-                return;
-            }
-            for (LedgerRange range : ranges) {
-                System.out.println("Ledger " + range.getLedgerId() + " [ "
-                                   + range.getStartSeqIdIncluded() + " ~ "
-                                   + range.getEndSeqIdIncluded().getLocalComponent() + " ]");
-            }
-            System.out.println();
-        }
-
-        private void printTopicSubscriptions(Map<ByteString, SubscriptionData> states) {
-            System.out.println(">>> Subscription Info <<<");
-            if (0 == states.size()) {
-                System.out.println("No subscriber.");
-                return;
-            }
-            for (Map.Entry<ByteString, SubscriptionData> entry : states.entrySet()) {
-                System.out.println("Subscriber " + entry.getKey().toStringUtf8() + " : "
-                                 + SubscriptionStateUtils.toString(entry.getValue()));
-            }
-            System.out.println();
-        }
-
-    }
-
-    class FormatCmd implements MyCommand {
-
-        @Override
-        public boolean runCmd(String[] args) throws Exception {
-            boolean force = false;
-            if (args.length >= 2 && "-force".equals(args[1])) {
-                force = true;
-            }
-            boolean doFormat = true;
-            System.out.println("You ask to format hedwig metadata stored in "
-                               + admin.getMetadataManagerFactory().getClass().getName() + ".");
-            if (!force) {
-                doFormat = continueOrQuit();
-            }
-            if (doFormat) {
-                admin.format();
-                System.out.println("Formatted hedwig metadata successfully.");
-            } else {
-                System.out.println("Given up formatting hedwig metadata.");
-            }
-            return true;
-        }
-
-    }
-
-    protected Map<String, MyCommand> buildMyCommands() {
-        Map<String, MyCommand> cmds =
-                new HashMap<String, MyCommand>();
-
-        ExitCmd exitCmd = new ExitCmd();
-        cmds.put(EXIT, exitCmd);
-        cmds.put(QUIT, exitCmd);
-        cmds.put(HELP, new HelpCmd());
-        cmds.put(HISTORY, new HistoryCmd());
-        cmds.put(REDO, new RedoCmd());
-        cmds.put(SET, new SetCmd());
-        cmds.put(PUB, new PubCmd());
-        cmds.put(SUB, new SubCmd());
-        cmds.put(PUBSUB, new PubSubCmd());
-        cmds.put(CLOSESUB, new CloseSubscriptionCmd());
-        cmds.put(UNSUB, new UnsubCmd());
-        cmds.put(RMSUB, new RmsubCmd());
-        cmds.put(CONSUME, new ConsumeCmd());
-        cmds.put(CONSUMETO, new ConsumeToCmd());
-        cmds.put(SHOW, new ShowCmd());
-        cmds.put(DESCRIBE, new DescribeCmd());
-        cmds.put(READTOPIC, new ReadTopicCmd());
-        cmds.put(FORMAT, new FormatCmd());
-
-        return cmds;
-    }
-
-    static void usage() {
-        System.err.println("HedwigConsole [options] [command] [args]");
-        System.err.println();
-        System.err.println("Avaiable commands:");
-        for (String cmd : getHedwigCommands().keySet()) {
-            System.err.println("\t" + cmd);
-        }
-        System.err.println();
-    }
-
-    /**
-     * A storage class for both command line options and shell commands.
-     */
-    static private class MyCommandOptions {
-
-        private Map<String,String> options = new HashMap<String,String>();
-        private List<String> cmdArgs = null;
-        private String command = null;
-
-        public MyCommandOptions() {
-        }
-
-        public String getOption(String opt) {
-            return options.get(opt);
-        }
-
-        public String getCommand( ) {
-            return command;
-        }
-
-        public String getCmdArgument( int index ) {
-            return cmdArgs.get(index);
-        }
-
-        public int getNumArguments( ) {
-            return cmdArgs.size();
-        }
-
-        public String[] getArgArray() {
-            return cmdArgs.toArray(new String[0]);
-        }
-
-        /**
-         * Parses a command line that may contain one or more flags
-         * before an optional command string
-         * @param args command line arguments
-         * @return true if parsing succeeded, false otherwise.
-         */
-        public boolean parseOptions(String[] args) {
-            List<String> argList = Arrays.asList(args);
-            Iterator<String> it = argList.iterator();
-
-            while (it.hasNext()) {
-                String opt = it.next();
-                if (!opt.startsWith("-")) {
-                    command = opt;
-                    cmdArgs = new ArrayList<String>( );
-                    cmdArgs.add( command );
-                    while (it.hasNext()) {
-                        cmdArgs.add(it.next());
-                    }
-                    return true;
-                } else {
-                    try {
-                        options.put(opt.substring(1), it.next());
-                    } catch (NoSuchElementException e) {
-                        System.err.println("Error: no argument found for option "
-                                + opt);
-                        return false;
-                    }
-                }
-            }
-            return true;
-        }
-
-        /**
-         * Breaks a string into command + arguments.
-         * @param cmdstring string of form "cmd arg1 arg2..etc"
-         * @return true if parsing succeeded.
-         */
-        public boolean parseCommand( String cmdstring ) {
-            String[] args = cmdstring.split(" ");
-            if (args.length == 0){
-                return false;
-            }
-            command = args[0];
-            cmdArgs = Arrays.asList(args);
-            return true;
-        }
-    }
-
-    private class MyWatcher implements Watcher {
-        public void process(WatchedEvent event) {
-            if (getPrintWatches()) {
-                printMessage("WATCHER::");
-                printMessage(event.toString());
-            }
-        }
-    }
-
-    public void printMessage(String msg) {
-        if (inConsole) {
-            System.out.println("\n"+msg);
-        }
-    }
-
-    /**
-     * Hedwig Console
-     *
-     * @param args arguments
-     * @throws IOException
-     * @throws InterruptedException 
-     */
-    public HedwigConsole(String[] args) throws IOException, InterruptedException {
-        // Setup Terminal
-        terminal = Terminal.setupTerminal();
-        HedwigCommands.init();
-        cl.parseOptions(args);
-
-        if (cl.getCommand() == null) {
-            inConsole = true;
-        } else {
-            inConsole = false;
-        }
-
-        org.apache.bookkeeper.conf.ClientConfiguration bkClientConf =
-            new org.apache.bookkeeper.conf.ClientConfiguration();
-        ServerConfiguration hubServerConf = new ServerConfiguration();
-        String serverCfgFile = cl.getOption("server-cfg");
-        if (serverCfgFile != null) {
-            try {
-                hubServerConf.loadConf(new File(serverCfgFile).toURI().toURL());
-            } catch (ConfigurationException e) {
-                throw new IOException(e);
-            }
-            try {
-                bkClientConf.loadConf(new File(serverCfgFile).toURI().toURL());
-            } catch (ConfigurationException e) {
-                throw new IOException(e);
-            }
-        }
-
-        ClientConfiguration hubClientCfg = new ClientConfiguration();
-        String clientCfgFile = cl.getOption("client-cfg");
-        if (clientCfgFile != null) {
-            try {
-                hubClientCfg.loadConf(new File(clientCfgFile).toURI().toURL());
-            } catch (ConfigurationException e) {
-                throw new IOException(e);
-            }
-        }
-
-        printMessage("Connecting to zookeeper/bookkeeper using HedwigAdmin");
-        try {
-            admin = new HedwigAdmin(bkClientConf, hubServerConf);
-            admin.getZkHandle().register(new MyWatcher());
-        } catch (Exception e) {
-            throw new IOException(e);
-        }
-        
-        printMessage("Connecting to default hub server " + hubClientCfg.getDefaultServerHost());
-        hubClient = new HedwigClient(hubClientCfg);
-        publisher = hubClient.getPublisher();
-        subscriber = hubClient.getSubscriber();
-        subscriber.addSubscriptionListener(new ConsoleSubscriptionListener());
-        
-        // other parameters
-        myRegion = hubServerConf.getMyRegion();
-    }
-
-    public boolean getPrintWatches() {
-        return printWatches;
-    }
-
-    protected String getPrompt() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("[hedwig: (").append(myRegion).append(") ").append(commandCount).append("] ");
-        return sb.toString();
-    }
-
-    protected boolean continueOrQuit() throws IOException {
-        System.out.println("Press <Return> to continue, or Q to cancel ...");
-        int ch;
-        if (null != console) {
-            ch = console.readCharacter(CONTINUE_OR_QUIT);
-        } else {
-            do {
-                ch = terminal.readCharacter(System.in);
-            } while (ch != 'q' && ch != 'Q' && ch != '\n');
-        }
-        if (ch == 'q' ||
-            ch == 'Q') {
-            return false;
-        }
-        return true;
-    }
-
-    protected void addToHistory(int i, String cmd) {
-        history.put(i, cmd);
-    }
-
-    public void executeLine(String line) {
-        if (!line.equals("")) {
-            cl.parseCommand(line);
-            addToHistory(commandCount, line);
-            processCmd(cl);
-            commandCount++;
-        }
-    }
-
-    protected boolean processCmd(MyCommandOptions co) {
-        String[] args = co.getArgArray();
-        String cmd = co.getCommand();
-        if (args.length < 1) {
-            usage();
-            return false;
-        }
-        if (!getHedwigCommands().containsKey(cmd)) {
-            usage();
-            return false;
-        }
-
-        LOG.debug("Processing {}", cmd);
-
-        MyCommand myCommand = myCommands.get(cmd);
-        if (myCommand == null) {
-            System.err.println("No Command Processor found for command " + cmd);
-            usage();
-            return false;
-        }
-
-        long startTime = MathUtils.now();
-        boolean success = false;
-        try {
-            success = myCommand.runCmd(args);
-        } catch (Exception e) {
-            e.printStackTrace();
-            success = false;
-        }
-        long elapsedTime = MathUtils.now() - startTime;
-        if (inConsole) {
-            if (success) {
-                System.out.println("Finished " + ((double)elapsedTime / 1000) + " s.");
-            } else {
-                COMMAND c = getHedwigCommands().get(cmd);
-                if (c != null) {
-                    c.printUsage();
-                }
-            }
-        }
-        return success;
-    }
-
-    @SuppressWarnings("unchecked")
-    void run() throws IOException {
-        inConsole = true;
-        myCommands = buildMyCommands();
-        if (cl.getCommand() == null) {
-            System.out.println("Welcome to Hedwig!");
-            System.out.println("JLine support is enabled");
-
-            console = new ConsoleReader();
-            JLineHedwigCompletor completor = new JLineHedwigCompletor(admin);
-            console.addCompletor(completor);
-
-            // load history file
-            History history = new History();
-            File file = new File(System.getProperty("hw.history",
-                                 new File(System.getProperty("user.home"), HW_HISTORY_FILE).toString()));
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("History file is " + file.toString());
-            }
-            history.setHistoryFile(file);
-            // set history to console reader
-            console.setHistory(history);
-            // load history from history file
-            history.moveToFirstEntry();
-
-            while (history.next()) {
-                String entry = history.current();
-                if (!entry.equals("")) {
-                    addToHistory(commandCount, entry);
-                }
-                commandCount++;
-            }
-            System.out.println("JLine history support is enabled");
-
-            String line;
-            while ((line = console.readLine(getPrompt())) != null) {
-                executeLine(line);
-                history.addToHistory(line);
-            }
-        }
-
-        inConsole = false;
-        processCmd(cl);
-        try {
-            myCommands.get(EXIT).runCmd(new String[0]);
-        } catch (Exception e) {
-        }
-    }
-
-    public static void main(String[] args) throws IOException, InterruptedException {
-        HedwigConsole console = new HedwigConsole(args);
-        console.run();
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/admin/console/JLineHedwigCompletor.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/admin/console/JLineHedwigCompletor.java b/hedwig-server/src/main/java/org/apache/hedwig/admin/console/JLineHedwigCompletor.java
deleted file mode 100644
index cdc0c33..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/admin/console/JLineHedwigCompletor.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hedwig.admin.console;
-
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.zookeeper.KeeperException;
-import org.apache.hedwig.admin.HedwigAdmin;
-
-import com.google.protobuf.ByteString;
-
-import jline.Completor;
-
-import static org.apache.hedwig.admin.console.HedwigCommands.*;
-
-/**
- * A jline completor for hedwig console
- */
-public class JLineHedwigCompletor implements Completor {
-    // for topic completion
-    static final int MAX_TOPICS_TO_SEARCH = 1000;
-
-    private HedwigAdmin admin;
-
-    public JLineHedwigCompletor(HedwigAdmin admin) {
-        this.admin = admin;
-    }
-
-    @Override
-    public int complete(String buffer, int cursor, List candidates) {
-        // Guarantee that the final token is the one we're expanding
-        buffer = buffer.substring(0,cursor);
-        String[] tokens = buffer.split(" ");
-        if (buffer.endsWith(" ")) {
-            String[] newTokens = new String[tokens.length + 1];
-            System.arraycopy(tokens, 0, newTokens, 0, tokens.length);
-            newTokens[newTokens.length - 1] = "";
-            tokens = newTokens;
-        }
-        
-        if (tokens.length > 2 &&
-            DESCRIBE.equalsIgnoreCase(tokens[0]) &&
-            DESCRIBE_TOPIC.equalsIgnoreCase(tokens[1])) {
-            return completeTopic(buffer, tokens[2], candidates);
-        } else if (tokens.length > 1 &&
-                   (SUB.equalsIgnoreCase(tokens[0]) ||
-                    PUB.equalsIgnoreCase(tokens[0]) ||
-                    CLOSESUB.equalsIgnoreCase(tokens[0]) ||
-                    CONSUME.equalsIgnoreCase(tokens[0]) ||
-                    CONSUMETO.equalsIgnoreCase(tokens[0]) ||
-                    READTOPIC.equalsIgnoreCase(tokens[0]))) {
-            return completeTopic(buffer, tokens[1], candidates);
-        }
-        List cmds = HedwigCommands.findCandidateCommands(tokens);
-        return completeCommand(buffer, tokens[tokens.length - 1], cmds, candidates);
-    }
-
-    @SuppressWarnings("unchecked")
-    private int completeCommand(String buffer, String token,
-            List commands, List candidates) {
-        for (Object cmdo : commands) {
-            assert (cmdo instanceof String);
-            if (((String)cmdo).startsWith(token)) {
-                candidates.add(cmdo);
-            }
-        }
-        return buffer.lastIndexOf(" ") + 1;
-    }
-
-    @SuppressWarnings("unchecked")
-    private int completeTopic(String buffer, String token, List candidates) {
-        try {
-            Iterator<ByteString> children = admin.getTopics();
-            int i = 0;
-            while (children.hasNext() && i <= MAX_TOPICS_TO_SEARCH) {
-                String child = children.next().toStringUtf8();
-                if (child.startsWith(token)) {
-                    candidates.add(child);
-                }
-                ++i;
-            }
-        } catch (Exception e) {
-            return buffer.length();
-        }
-        return candidates.size() == 0 ? buffer.length() : buffer.lastIndexOf(" ") + 1;
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java b/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java
deleted file mode 100644
index edad190..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hedwig.admin.console;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.Enumeration;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.BookKeeper.DigestType;
-import org.apache.bookkeeper.client.LedgerEntry;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.hedwig.admin.HedwigAdmin;
-import org.apache.hedwig.protocol.PubSubProtocol.LedgerRange;
-import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.RegionSpecificSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-
-import static com.google.common.base.Charsets.UTF_8;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-
-/**
- * A tool to read topic messages.
- *
- * This tool :
- * 1) read persistence info from zookeeper: ledger ranges
- * 2) read subscription infor from zookeeper: we can know the least message id (ledger id) 
- * 3) use bk client to read message starting from least message id
- */
-public class ReadTopic {
-    
-    final HedwigAdmin admin;
-    final ByteString topic;
-    long startSeqId;
-    long leastConsumedSeqId = Long.MAX_VALUE;
-    final boolean inConsole;
-
-    static final int RC_OK = 0;
-    static final int RC_ERROR = -1;
-    static final int RC_NOTOPIC = -2;
-    static final int RC_NOLEDGERS = -3;
-    static final int RC_NOSUBSCRIBERS = -4;
-    
-    static final int NUM_MESSAGES_TO_PRINT = 15;
-
-    List<LedgerRange> ledgers = new ArrayList<LedgerRange>();
-    
-    /**
-     * Constructor
-     */
-    public ReadTopic(HedwigAdmin admin, ByteString topic, boolean inConsole) {
-        this(admin, topic, 1, inConsole);
-    }
-
-    /**
-     * Constructor
-     */
-    public ReadTopic(HedwigAdmin admin, ByteString topic, long msgSeqId, boolean inConsole) {
-        this.admin = admin;
-        this.topic = topic;
-        this.startSeqId = msgSeqId;
-        this.inConsole = inConsole;
-    }
-    
-    /**
-     * Check whether the topic existed or not
-     *
-     * @return RC_OK if topic is existed; RC_NOTOPIC if not.
-     * @throws Exception
-     */
-    protected int checkTopic() throws Exception {
-        return admin.hasTopic(topic) ? RC_OK : RC_NOTOPIC;
-    }
-    
-    /**
-     * Get the ledgers used by this topic to store messages
-     *
-     * @return RC_OK if topic has messages; RC_NOLEDGERS if not.
-     * @throws Exception
-     */
-    protected int getTopicLedgers() throws Exception {
-        List<LedgerRange> ranges = admin.getTopicLedgers(topic); 
-        if (null == ranges || ranges.isEmpty()) {
-            return RC_NOLEDGERS;
-        }
-        ledgers.addAll(ranges);
-        return RC_OK;
-    }
-    
-    protected int getLeastSubscription() throws Exception {
-        Map<ByteString, SubscriptionData> states = admin.getTopicSubscriptions(topic); 
-        if (states.isEmpty()) {
-            return RC_NOSUBSCRIBERS;
-        }
-        for (Map.Entry<ByteString, SubscriptionData> entry : states.entrySet()) {
-            SubscriptionData state = entry.getValue();
-            long localMsgId = state.getState().getMsgId().getLocalComponent();
-            if (localMsgId < leastConsumedSeqId) {
-                leastConsumedSeqId = localMsgId;
-            }
-        }
-        if (leastConsumedSeqId == Long.MAX_VALUE) {
-            leastConsumedSeqId = 0;
-        }
-        return RC_OK;
-    }
-    
-    public void readTopic() {
-        try {
-            int rc = _readTopic();
-            switch (rc) {
-            case RC_NOTOPIC:
-                System.err.println("No topic " + topic + " found.");
-                break;
-            case RC_NOLEDGERS:
-                System.err.println("No message is published to topic " + topic);
-                break;
-            default:
-                break;
-            }
-        } catch (Exception e) {
-            System.err.println("ERROR: read messages of topic " + topic + " failed.");
-            e.printStackTrace();
-        }
-    }
-    
-    protected int _readTopic() throws Exception {
-        int rc;
-        // check topic
-        rc = checkTopic();
-        if (RC_OK != rc) {
-            return rc;
-        }
-        // get topic ledgers
-        rc = getTopicLedgers();
-        if (RC_OK != rc) {
-            return rc;
-        }
-        // get topic subscription to find the least one
-        rc = getLeastSubscription();
-        if (RC_NOSUBSCRIBERS == rc) {
-            startSeqId = 1;
-        } else if (RC_OK == rc) {
-            if (leastConsumedSeqId > startSeqId) {
-                startSeqId = leastConsumedSeqId + 1;
-            }
-        } else {
-            return rc;
-        }
-
-        for (LedgerRange range : ledgers) {
-            long endSeqId = range.getEndSeqIdIncluded().getLocalComponent();
-            if (endSeqId < startSeqId) {
-                continue;
-            }
-            boolean toContinue = readLedger(range);
-            startSeqId = endSeqId + 1;
-            if (!toContinue) {
-                break;
-            }
-        }
-        
-        return RC_OK;
-    }
-    
-    /**
-     * Read a specific ledger
-     *
-     * @param ledger in memory ledger range
-     * @param endSeqId end seq id
-     * @return true if continue, otherwise false
-     * @throws BKException
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    protected boolean readLedger(LedgerRange ledger)
-    throws BKException, IOException, InterruptedException {
-        long tEndSeqId = ledger.getEndSeqIdIncluded().getLocalComponent();
-
-        if (tEndSeqId < this.startSeqId) {
-            return true;
-        }
-        // Open Ledger Handle
-        long ledgerId = ledger.getLedgerId();
-        System.out.println("\n>>>>> " + ledger + " <<<<<\n");
-        LedgerHandle lh = null;
-        try {
-            lh = admin.getBkHandle().openLedgerNoRecovery(ledgerId, admin.getBkDigestType(), admin.getBkPasswd());
-        } catch (BKException e) {
-            System.err.println("ERROR: No ledger " + ledgerId + " found. maybe garbage collected due to the messages are consumed.");
-        }
-        if (null == lh) {
-            return true;
-        }
-        long expectedEntryId = startSeqId - ledger.getStartSeqIdIncluded();
-        
-        long correctedEndSeqId = tEndSeqId;
-        try {
-            while (startSeqId <= tEndSeqId) {
-                correctedEndSeqId = Math.min(startSeqId + NUM_MESSAGES_TO_PRINT - 1, tEndSeqId);
-                
-                try {
-                    Enumeration<LedgerEntry> seq =
-                        lh.readEntries(startSeqId - ledger.getStartSeqIdIncluded(),
-                                       correctedEndSeqId - ledger.getStartSeqIdIncluded());
-                    LedgerEntry entry = null;
-                    while (seq.hasMoreElements()) {
-                        entry = seq.nextElement();
-                        Message message;
-                        try {
-                            message = Message.parseFrom(entry.getEntryInputStream());
-                        } catch (IOException e) {
-                            System.out.println("WARN: Unreadable message found\n");
-                            expectedEntryId++;
-                            continue;
-                        }
-                        if (expectedEntryId != entry.getEntryId()
-                            || (message.getMsgId().getLocalComponent() - ledger.getStartSeqIdIncluded()) != expectedEntryId) {
-                            throw new IOException("ERROR: Message ids are out of order : expected entry id " + expectedEntryId
-                                                + ", current entry id " + entry.getEntryId() + ", msg seq id " + message.getMsgId().getLocalComponent());
-                        }
-                        expectedEntryId++;
-                        formatMessage(message);
-
-                    }
-                    startSeqId = correctedEndSeqId + 1;
-                    if (inConsole) {
-                        if (!pressKeyToContinue()) {
-                            return false;
-                        }
-                    }
-                } catch (BKException.BKReadException be) {
-                    throw be;
-                }
-            }
-        } catch (BKException bke) {
-            if (tEndSeqId != Long.MAX_VALUE) {
-                System.err.println("ERROR: ledger " + ledgerId + " may be corrupted, since read messages ["
-                                 + startSeqId + " ~ " + correctedEndSeqId + " ] failed :");
-                throw bke;
-            }
-        }
-        System.out.println("\n");
-        return true;
-    }
-    
-    protected void formatMessage(Message message) {
-        // print msg id
-        String msgId;
-        if (!message.hasMsgId()) {
-            msgId = "N/A";
-        } else {
-            MessageSeqId seqId = message.getMsgId();
-            StringBuilder idBuilder = new StringBuilder();
-            if (seqId.hasLocalComponent()) {
-                idBuilder.append("LOCAL(").append(seqId.getLocalComponent()).append(")");
-            } else {
-                List<RegionSpecificSeqId> remoteIds = seqId.getRemoteComponentsList();
-                int i = 0, numRegions = remoteIds.size();
-                idBuilder.append("REMOTE(");
-                for (RegionSpecificSeqId rssid : remoteIds) {
-                    idBuilder.append(rssid.getRegion().toStringUtf8());
-                    idBuilder.append("[");
-                    idBuilder.append(rssid.getSeqId());
-                    idBuilder.append("]");
-                    ++i;
-                    if (i < numRegions) {
-                        idBuilder.append(",");
-                    }
-                }
-                idBuilder.append(")");
-            }
-            msgId = idBuilder.toString();
-        }
-        System.out.println("---------- MSGID=" + msgId + " ----------");
-        System.out.println("MsgId:     " + msgId);
-        // print source region
-        if (message.hasSrcRegion()) {
-            System.out.println("SrcRegion: " + message.getSrcRegion().toStringUtf8());
-        } else {
-            System.out.println("SrcRegion: N/A");
-        }
-        // print message body
-        System.out.println("Message:");
-        System.out.println();
-        if (message.hasBody()) {
-            System.out.println(message.getBody().toStringUtf8());
-        } else {
-            System.out.println("N/A");
-        }
-        System.out.println();
-    }
-    
-    boolean pressKeyToContinue() throws IOException {
-        System.out.println("Press Y to continue...");
-        BufferedReader stdin = new BufferedReader(new InputStreamReader(System.in, UTF_8));
-        int ch = stdin.read();
-        if (ch == 'y' ||
-            ch == 'Y') {
-            return true;
-        }
-        return false;
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/data/MessageFormatter.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/data/MessageFormatter.java b/hedwig-server/src/main/java/org/apache/hedwig/data/MessageFormatter.java
deleted file mode 100644
index 6ac6879..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/data/MessageFormatter.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.hedwig.data;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.bookkeeper.util.EntryFormatter;
-import org.apache.commons.configuration.Configuration;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.RegionSpecificSeqId;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Format a pub sub message into a readable format.
- */
-public class MessageFormatter extends EntryFormatter {
-    private static final Logger logger = LoggerFactory.getLogger(MessageFormatter.class);
-
-    static final String MESSAGE_PAYLOAD_FORMATTER_CLASS = "message_payload_formatter_class";
-
-    EntryFormatter dataFormatter = EntryFormatter.STRING_FORMATTER;
-
-    @Override
-    public void setConf(Configuration conf) {
-        super.setConf(conf);
-        dataFormatter = EntryFormatter.newEntryFormatter(conf, MESSAGE_PAYLOAD_FORMATTER_CLASS);
-    }
-
-    @Override
-    public void formatEntry(java.io.InputStream input) {
-        Message message;
-        try {
-            message = Message.parseFrom(input);
-        } catch (IOException e) {
-            System.out.println("WARN: Unreadable message found\n");
-            EntryFormatter.STRING_FORMATTER.formatEntry(input);
-            return;
-        }
-        formatMessage(message);
-    }
-
-    @Override
-    public void formatEntry(byte[] data) {
-        Message message;
-        try {
-            message = Message.parseFrom(data);
-        } catch (IOException e) {
-            System.out.println("WARN: Unreadable message found\n");
-            EntryFormatter.STRING_FORMATTER.formatEntry(data);
-            return;
-        }
-        formatMessage(message);
-    }
-
-    void formatMessage(Message message) {
-        // print msg id
-        String msgId;
-        if (!message.hasMsgId()) {
-            msgId = "N/A";
-        } else {
-            MessageSeqId seqId = message.getMsgId();
-            StringBuilder idBuilder = new StringBuilder();
-            if (seqId.hasLocalComponent()) {
-                idBuilder.append("LOCAL(").append(seqId.getLocalComponent()).append(")");
-            } else {
-                List<RegionSpecificSeqId> remoteIds = seqId.getRemoteComponentsList();
-                int i = 0, numRegions = remoteIds.size();
-                idBuilder.append("REMOTE(");
-                for (RegionSpecificSeqId rssid : remoteIds) {
-                    idBuilder.append(rssid.getRegion().toStringUtf8());
-                    idBuilder.append("[");
-                    idBuilder.append(rssid.getSeqId());
-                    idBuilder.append("]");
-                    ++i;
-                    if (i < numRegions) {
-                        idBuilder.append(",");
-                    }
-                }
-                idBuilder.append(")");
-            }
-            msgId = idBuilder.toString();
-        }
-        System.out.println("****** MSGID=" + msgId + " ******");
-        System.out.println("MessageId:      " + msgId);
-        // print source region
-        if (message.hasSrcRegion()) {
-            System.out.println("SrcRegion:      " + message.getSrcRegion().toStringUtf8());
-        } else {
-            System.out.println("SrcRegion:      N/A");
-        }
-        // print message body
-        if (message.hasBody()) {
-            System.out.println("Body:");
-            dataFormatter.formatEntry(message.getBody().toByteArray());
-        } else {
-            System.out.println("Body:           N/A");
-        }
-        System.out.println();
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/AbstractBenchmark.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/AbstractBenchmark.java b/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/AbstractBenchmark.java
deleted file mode 100644
index 21687eb..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/AbstractBenchmark.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.benchmark;
-
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.bookkeeper.util.MathUtils;
-import org.apache.hedwig.util.ConcurrencyUtils;
-
-public abstract class AbstractBenchmark {
-
-    private static final Logger logger = LoggerFactory.getLogger(AbstractBenchmark.class);
-
-    AtomicLong totalLatency = new AtomicLong();
-    LinkedBlockingQueue<Boolean> doneSignalQueue = new LinkedBlockingQueue<Boolean>();
-
-    abstract void doOps(int numOps) throws Exception;
-    abstract void tearDown() throws Exception;
-
-    protected class AbstractCallback {
-        AtomicInteger numDone = new AtomicInteger(0);
-        Semaphore outstanding;
-        int numOps;
-        boolean logging;
-
-        public AbstractCallback(Semaphore outstanding, int numOps) {
-            this.outstanding = outstanding;
-            this.numOps = numOps;
-            logging = Boolean.getBoolean("progress");
-        }
-
-        public void handle(boolean success, Object ctx) {
-            outstanding.release();
-
-            if (!success) {
-                ConcurrencyUtils.put(doneSignalQueue, false);
-                return;
-            }
-
-            totalLatency.addAndGet(MathUtils.now() - (Long)ctx);
-            int numDoneInt = numDone.incrementAndGet();
-
-            if (logging && numDoneInt % 10000 == 0) {
-                logger.info("Finished " + numDoneInt + " ops");
-            }
-
-            if (numOps == numDoneInt) {
-                ConcurrencyUtils.put(doneSignalQueue, true);
-            }
-        }
-    }
-
-    public void runPhase(String phase, int numOps) throws Exception {
-        long startTime = MathUtils.now();
-
-        doOps(numOps);
-
-        if (!doneSignalQueue.take()) {
-            logger.error("One or more operations failed in phase: " + phase);
-            throw new RuntimeException();
-        } else {
-            logger.info("Phase: " + phase + " Avg latency : " + totalLatency.get() / numOps + ", tput = " + (numOps * 1000/ (MathUtils.now() - startTime)));
-        }
-    }
-
-
-
-
-
-    public void run() throws Exception {
-
-        int numWarmup = Integer.getInteger("nWarmup", 50000);
-        runPhase("warmup", numWarmup);
-
-        logger.info("Sleeping for 10 seconds");
-        Thread.sleep(10000);
-        //reset latency
-        totalLatency.set(0);
-
-        int numOps = Integer.getInteger("nOps", 400000);
-        runPhase("real", numOps);
-
-        tearDown();
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookieBenchmark.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookieBenchmark.java b/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookieBenchmark.java
deleted file mode 100644
index d58883d..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookieBenchmark.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.benchmark;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Semaphore;
-import org.apache.bookkeeper.conf.ClientConfiguration;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.bookkeeper.proto.BookieClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
-import org.apache.bookkeeper.util.MathUtils;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-
-public class BookieBenchmark extends AbstractBenchmark {
-
-    private static final Logger logger = LoggerFactory.getLogger(BookkeeperBenchmark.class);
-
-    BookieClient bkc;
-    BookieSocketAddress addr;
-    ClientSocketChannelFactory channelFactory;
-    OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder()
-            .name("BookieBenchmarkScheduler")
-            .numThreads(1)
-            .build();
-
-    public BookieBenchmark(String bookieHostPort)  throws Exception {
-        channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
-        bkc = new BookieClient(new ClientConfiguration(), channelFactory, executor);
-        String[] hostPort = bookieHostPort.split(":");
-        addr = new BookieSocketAddress(hostPort[0], Integer.parseInt(hostPort[1]));
-    }
-
-
-    @Override
-    void doOps(final int numOps) throws Exception {
-        int numOutstanding = Integer.getInteger("nPars",1000);
-        final Semaphore outstanding = new Semaphore(numOutstanding);
-
-
-        WriteCallback callback = new WriteCallback() {
-            AbstractCallback handler = new AbstractCallback(outstanding, numOps);
-
-            @Override
-            public void writeComplete(int rc, long ledgerId, long entryId,
-            BookieSocketAddress addr, Object ctx) {
-                handler.handle(rc == BKException.Code.OK, ctx);
-            }
-        };
-
-        byte[] passwd = new byte[20];
-        int size = Integer.getInteger("size", 1024);
-        byte[] data = new byte[size];
-
-        for (int i=0; i<numOps; i++) {
-            outstanding.acquire();
-
-            ByteBuffer buffer = ByteBuffer.allocate(44);
-            long ledgerId = 1000;
-            buffer.putLong(ledgerId);
-            buffer.putLong(i);
-            buffer.putLong(0);
-            buffer.put(passwd);
-            buffer.rewind();
-            ChannelBuffer toSend = ChannelBuffers.wrappedBuffer(ChannelBuffers.wrappedBuffer(buffer.slice()), ChannelBuffers.wrappedBuffer(data));
-            bkc.addEntry(addr, ledgerId, passwd, i, toSend, callback, MathUtils.now(), 0);
-        }
-
-    }
-
-    @Override
-    public void tearDown() {
-        bkc.close();
-        channelFactory.releaseExternalResources();
-        executor.shutdown();
-    }
-
-
-    public static void main(String[] args) throws Exception {
-        BookieBenchmark benchmark = new BookieBenchmark(args[0]);
-        benchmark.run();
-    }
-
-
-}