You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:28 UTC
[18/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigCommands.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigCommands.java b/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigCommands.java
deleted file mode 100644
index 407769b..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigCommands.java
+++ /dev/null
@@ -1,437 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hedwig.admin.console;
-
-import java.util.Map;
-import java.util.List;
-import java.util.LinkedList;
-import java.util.LinkedHashMap;
-
-/**
- * List all the available commands
- */
-public final class HedwigCommands {
-
- static final String[] EMPTY_ARRAY = new String[0];
-
- //
- // List all commands used to play with hedwig
- //
-
- /* PUB : publish a message to hedwig */
- static final String PUB = "pub";
- static final String PUB_DESC = "Publish a message to a topic in Hedwig";
- static final String[] PUB_USAGE = new String[] {
- "usage: pub {topic} {message}",
- "",
- " {topic} : topic name.",
- " any printable string without spaces.",
- " {message} : message body.",
- " remaining arguments are used as message body to publish.",
- };
-
- /* SUB : subscriber a topic in hedwig for a specified subscriber */
- static final String SUB = "sub";
- static final String SUB_DESC = "Subscribe a topic for a specified subscriber";
- static final String[] SUB_USAGE = new String[] {
- "usage: sub {topic} {subscriber} [mode]",
- "",
- " {topic} : topic name.",
- " any printable string without spaces.",
- " {subscriber} : subscriber id.",
- " any printable string without spaces.",
- " [mode] : mode to create subscription.",
- " [receive] : bool. whether to start delivery to receive messages.",
- "",
- " available modes: (default value is 1)",
- " 0 = CREATE: create subscription.",
- " if the subscription is exsited, it will fail.",
- " 1 = ATTACH: attach to exsited subscription.",
- " if the subscription is not existed, it will faile.",
- " 2 = CREATE_OR_ATTACH:",
- " attach to subscription, if not existed create one."
- };
-
- /* CLOSESUB : close the subscription of a subscriber for a topic */
- static final String CLOSESUB = "closesub";
- static final String CLOSESUB_DESC = "Close subscription of a subscriber to a specified topic";
- static final String[] CLOSESUB_USAGE = new String[] {
- "usage: closesub {topic} {subscriber}",
- "",
- " {topic} : topic name.",
- " any printable string without spaces.",
- " {subscriber} : subscriber id.",
- " any printable string without spaces.",
- "",
- " NOTE: this command just cleanup subscription states on client side.",
- " You can try UNSUB to clean subscription states on server side.",
- };
-
- /* UNSUB: unsubscribe of a subscriber to a topic */
- static final String UNSUB = "unsub";
- static final String UNSUB_DESC = "Unsubscribe a topic for a subscriber";
- static final String[] UNSUB_USAGE = new String[] {
- "usage: unsub {topic} {subscriber}",
- "",
- " {topic} : topic name.",
- " any printable string without spaces.",
- " {subscriber} : subscriber id.",
- " any printable string without spaces.",
- "",
- " NOTE: this command will cleanup subscription states on server side.",
- " You can try CLOSESUB to just clean subscription states on client side.",
- };
-
- static final String RMSUB = "rmsub";
- static final String RMSUB_DESC = "Remove subscriptions for topics";
- static final String[] RMSUB_USAGE = new String[] {
- "usage: rmsub {topic_prefix} {start_topic} {end_topic} {subscriber_prefix} {start_sub} {end_sub}",
- "",
- " {topic_prefix} : topic prefix.",
- " {start_topic} : start topic id.",
- " {end_topic} : end topic id.",
- " {subscriber_prefix} : subscriber prefix.",
- " {start_sub} : start subscriber id.",
- " {end_sub} : end subscriber id.",
- };
-
- /* CONSUME: move consume ptr of a subscription with specified steps */
- static final String CONSUME = "consume";
- static final String CONSUME_DESC = "Move consume ptr of a subscription with sepcified steps";
- static final String[] CONSUME_USAGE = new String[] {
- "usage: consume {topic} {subscriber} {nmsgs}",
- "",
- " {topic} : topic name.",
- " any printable string without spaces.",
- " {subscriber} : subscriber id.",
- " any printable string without spaces.",
- " {nmsgs} : how many messages to move consume ptr.",
- "",
- " Example:",
- " suppose, from zk we know subscriber B consumed topic T to message 10",
- " [hedwig: (standalone) 1] consume T B 2",
- " after executed above command, a consume(10+2) request will be sent to hedwig.",
- "",
- " NOTE:",
- " since Hedwig updates subscription consume ptr lazily, so you need to know that",
- " 1) the consumption ptr read from zookeeper may be stable; ",
- " 2) after sent the consume request, hedwig may just move ptr in its memory and lazily update it to zookeeper. you may not see the ptr changed when DESCRIBE the topic.",
- };
-
- /* CONSUMETO: move consume ptr of a subscription to a specified pos */
- static final String CONSUMETO = "consumeto";
- static final String CONSUMETO_DESC = "Move consume ptr of a subscription to a specified message id";
- static final String[] CONSUMETO_USAGE = new String[] {
- "usage: consumeto {topic} {subscriber} {msg_id}",
- "",
- " {topic} : topic name.",
- " any printable string without spaces.",
- " {subscriber} : subscriber id.",
- " any printable string without spaces.",
- " {msg_id} : message id that consume ptr will be moved to.",
- " if the message id is less than current consume ptr,",
- " hedwig will do nothing.",
- "",
- " Example:",
- " suppose, from zk we know subscriber B consumed topic T to message 10",
- " [hedwig: (standalone) 1] consumeto T B 12",
- " after executed above command, a consume(12) request will be sent to hedwig.",
- "",
- " NOTE:",
- " since Hedwig updates subscription consume ptr lazily, so you need to know that",
- " 1) the consumption ptr read from zookeeper may be stable; ",
- " 2) after sent the consume request, hedwig may just move ptr in its memory and lazily update it to zookeeper. you may not see the ptr changed when DESCRIBE the topic.",
- };
-
- /* PUBSUB: a healthy checking command to ensure cluster is running */
- static final String PUBSUB = "pubsub";
- static final String PUBSUB_DESC = "A healthy checking command to ensure hedwig is in running state";
- static final String[] PUBSUB_USAGE = new String[] {
- "usage: pubsub {topic} {subscriber} {timeout_secs} {message}",
- "",
- " {topic} : topic name.",
- " any printable string without spaces.",
- " {subscriber} : subscriber id.",
- " any printable string without spaces.",
- " {timeout_secs} : how long will the subscriber wait for published message.",
- " {message} : message body.",
- " remaining arguments are used as message body to publish.",
- "",
- " Example:",
- " [hedwig: (standalone) 1] pubsub TOPIC SUBID 10 TEST_MESSAGS",
- "",
- " 1) hw will subscribe topic TOPIC as subscriber SUBID;",
- " 2) subscriber SUBID will wait a message until 10 seconds;",
- " 3) hw publishes TEST_MESSAGES to topic TOPIC;",
- " 4) if subscriber recevied message in 10 secs, it checked that whether the message is published message.",
- " if true, it will return SUCCESS, otherwise return FAILED.",
- };
-
- //
- // List all commands used to admin hedwig
- //
-
- /* SHOW: list all available hub servers or topics */
- static final String SHOW = "show";
- static final String SHOW_DESC = "list all available hub servers or topics";
- static final String[] SHOW_USAGE = new String[] {
- "usage: show [topics | hubs]",
- "",
- " show topics :",
- " listing all available topics in hedwig.",
- "",
- " show hubs :",
- " listing all available hubs in hedwig.",
- "",
- " NOTES:",
- " 'show topics' will not works when there are millions of topics in hedwig, since we have packetLen limitation fetching data from zookeeper.",
- };
-
- static final String SHOW_TOPICS = "topics";
- static final String SHOW_HUBS = "hubs";
-
- /* DESCRIBE: show the metadata of a topic */
- static final String DESCRIBE = "describe";
- static final String DESCRIBE_DESC = "show metadata of a topic, including topic owner, persistence info, subscriptions info";
- static final String[] DESCRIBE_USAGE = new String[] {
- "usage: describe topic {topic}",
- "",
- " {topic} : topic name.",
- " any printable string without spaces.",
- "",
- " Example: describe topic ttttt",
- "",
- " Output:",
- " ===== Topic Information : ttttt =====",
- "",
- " Owner : 98.137.99.27:9875:9876",
- "",
- " >>> Persistence Info <<<",
- " Ledger 54729 [ 1 ~ 59 ]",
- " Ledger 54731 [ 60 ~ 60 ]",
- " Ledger 54733 [ 61 ~ 61 ]",
- "",
- " >>> Subscription Info <<<",
- " Subscriber mysub : consumeSeqId: local:50",
- };
-
- static final String DESCRIBE_TOPIC = "topic";
-
- /* READTOPIC: read messages of a specified topic */
- static final String READTOPIC = "readtopic";
- static final String READTOPIC_DESC = "read messages of a specified topic";
- static final String[] READTOPIC_USAGE = new String[] {
- "usage: readtopic {topic} [start_msg_id]",
- "",
- " {topic} : topic name.",
- " any printable string without spaces.",
- " [start_msg_id] : message id that start to read from.",
- "",
- " no start_msg_id provided:",
- " it will start from least_consumed_message_id + 1.",
- " least_consume_message_id is computed from all its subscribers.",
- "",
- " start_msg_id provided:",
- " it will start from MAX(start_msg_id, least_consumed_message_id).",
- "",
- " MESSAGE FORMAT:",
- "",
- " ---------- MSGID=LOCAL(51) ----------",
- " MsgId: LOCAL(51)",
- " SrcRegion: standalone",
- " Message:",
- "",
- " hello",
- };
-
- /* FORMAT: format metadata for Hedwig */
- static final String FORMAT = "format";
- static final String FORMAT_DESC = "format metadata for Hedwig";
- static final String[] FORMAT_USAGE = new String[] {
- "usage: format [-force]",
- "",
- " [-force] : Format metadata for Hedwig w/o confirmation.",
- };
-
-
- //
- // List other useful commands
- //
-
- /* SET: set whether printing zk watches or not */
- static final String SET = "set";
- static final String SET_DESC = "set whether printing zk watches or not";
- static final String[] SET_USAGE = EMPTY_ARRAY;
-
- /* HISTORY: list history commands */
- static final String HISTORY = "history";
- static final String HISTORY_DESC = "list history commands";
- static final String[] HISTORY_USAGE = EMPTY_ARRAY;
-
- /* REDO: redo previous command */
- static final String REDO = "redo";
- static final String REDO_DESC = "redo history command";
- static final String[] REDO_USAGE = new String[] {
- "usage: redo [{cmdno} | !]",
- "",
- " {cmdno} : history command no.",
- " ! : last command.",
- };
-
- /* HELP: print usage information of a specified command */
- static final String HELP = "help";
- static final String HELP_DESC = "print usage information of a specified command";
- static final String[] HELP_USAGE = new String[] {
- "usage: help {command}",
- "",
- " {command} : command name",
- };
-
- static final String QUIT = "quit";
- static final String QUIT_DESC = "exit console";
- static final String[] QUIT_USAGE = EMPTY_ARRAY;
-
- static final String EXIT = "exit";
- static final String EXIT_DESC = QUIT_DESC;
- static final String[] EXIT_USAGE = EMPTY_ARRAY;
-
- public static enum COMMAND {
-
- CMD_PUB (PUB, PUB_DESC, PUB_USAGE),
- CMD_SUB (SUB, SUB_DESC, SUB_USAGE),
- CMD_CLOSESUB (CLOSESUB, CLOSESUB_DESC, CLOSESUB_USAGE),
- CMD_UNSUB (UNSUB, UNSUB_DESC, UNSUB_USAGE),
- CMD_RMSUB (RMSUB, RMSUB_DESC, RMSUB_USAGE),
- CMD_CONSUME (CONSUME, CONSUME_DESC, CONSUME_USAGE),
- CMD_CONSUMETO (CONSUMETO, CONSUMETO_DESC, CONSUMETO_USAGE),
- CMD_PUBSUB (PUBSUB, PUBSUB_DESC, PUBSUB_USAGE),
- CMD_SHOW (SHOW, SHOW_DESC, SHOW_USAGE),
- CMD_DESCRIBE (DESCRIBE, DESCRIBE_DESC, DESCRIBE_USAGE),
- CMD_READTOPIC (READTOPIC, READTOPIC_DESC, READTOPIC_USAGE),
- CMD_FORMAT (FORMAT, FORMAT_DESC, FORMAT_USAGE),
- CMD_SET (SET, SET_DESC, SET_USAGE),
- CMD_HISTORY (HISTORY, HISTORY_DESC, HISTORY_USAGE),
- CMD_REDO (REDO, REDO_DESC, REDO_USAGE),
- CMD_HELP (HELP, HELP_DESC, HELP_USAGE),
- CMD_QUIT (QUIT, QUIT_DESC, QUIT_USAGE),
- CMD_EXIT (EXIT, EXIT_DESC, EXIT_USAGE),
- // sub commands
- CMD_SHOW_TOPICS (SHOW_TOPICS, "", EMPTY_ARRAY),
- CMD_SHOW_HUBS (SHOW_HUBS, "", EMPTY_ARRAY),
- CMD_DESCRIBE_TOPIC (DESCRIBE_TOPIC, "", EMPTY_ARRAY);
-
- COMMAND(String name, String desc, String[] usage) {
- this.name = name;
- this.desc = desc;
- this.usage = usage;
- this.subCmds = new LinkedHashMap<String, COMMAND>();
- }
-
- public String getName() { return name; }
-
- public String getDescription() { return desc; }
-
- public Map<String, COMMAND> getSubCommands() { return subCmds; }
-
- public void addSubCommand(COMMAND c) {
- this.subCmds.put(c.name, c);
- };
-
- public void printUsage() {
- System.err.println(name + ": " + desc);
- for(String line : usage) {
- System.err.println(line);
- }
- System.err.println();
- }
-
- protected String name;
- protected String desc;
- protected String[] usage;
- protected Map<String, COMMAND> subCmds;
- }
-
- static Map<String, COMMAND> commands = null;
-
- private static void addCommand(COMMAND c) {
- commands.put(c.getName(), c);
- }
-
- static synchronized void init() {
- if (commands != null) {
- return;
- }
- commands = new LinkedHashMap<String, COMMAND>();
-
- addCommand(COMMAND.CMD_PUB);
- addCommand(COMMAND.CMD_SUB);
- addCommand(COMMAND.CMD_CLOSESUB);
- addCommand(COMMAND.CMD_UNSUB);
- addCommand(COMMAND.CMD_RMSUB);
- addCommand(COMMAND.CMD_CONSUME);
- addCommand(COMMAND.CMD_CONSUMETO);
- addCommand(COMMAND.CMD_PUBSUB);
-
- // show
- COMMAND.CMD_SHOW.addSubCommand(COMMAND.CMD_SHOW_TOPICS);
- COMMAND.CMD_SHOW.addSubCommand(COMMAND.CMD_SHOW_HUBS);
- addCommand(COMMAND.CMD_SHOW);
-
- // describe
- COMMAND.CMD_DESCRIBE.addSubCommand(COMMAND.CMD_DESCRIBE_TOPIC);
- addCommand(COMMAND.CMD_DESCRIBE);
-
- addCommand(COMMAND.CMD_READTOPIC);
- addCommand(COMMAND.CMD_FORMAT);
- addCommand(COMMAND.CMD_SET);
- addCommand(COMMAND.CMD_HISTORY);
- addCommand(COMMAND.CMD_REDO);
- addCommand(COMMAND.CMD_HELP);
- addCommand(COMMAND.CMD_QUIT);
- addCommand(COMMAND.CMD_EXIT);
- }
-
- public static Map<String, COMMAND> getHedwigCommands() {
- return commands;
- }
-
- /**
- * Find candidate commands by the specified token list
- *
- * @param token token list
- *
- * @return list of candidate commands
- */
- public static List<String> findCandidateCommands(String[] tokens) {
- List<String> cmds = new LinkedList<String>();
-
- Map<String, COMMAND> cmdMap = commands;
- for (int i=0; i<(tokens.length - 1); i++) {
- COMMAND c = cmdMap.get(tokens[i]);
- // no commands
- if (c == null || c.getSubCommands().size() <= 0) {
- return cmds;
- } else {
- cmdMap = c.getSubCommands();
- }
- }
- cmds.addAll(cmdMap.keySet());
- return cmds;
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java b/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java
deleted file mode 100644
index 3a5ef5a..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java
+++ /dev/null
@@ -1,1038 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hedwig.admin.console;
-
-import jline.ConsoleReader;
-import jline.History;
-import jline.Terminal;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.bookkeeper.util.MathUtils;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.hedwig.admin.HedwigAdmin;
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.api.Publisher;
-import org.apache.hedwig.client.api.Subscriber;
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.HedwigClient;
-import org.apache.hedwig.protocol.PubSubProtocol.LedgerRange;
-import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
-import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.topics.HubInfo;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.HedwigSocketAddress;
-import org.apache.hedwig.util.SubscriptionListener;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.ByteString;
-import static com.google.common.base.Charsets.UTF_8;
-
-import static org.apache.hedwig.admin.console.HedwigCommands.*;
-import static org.apache.hedwig.admin.console.HedwigCommands.COMMAND.*;
-
-/**
- * Console Client to Hedwig
- */
-public class HedwigConsole {
- private static final Logger LOG = LoggerFactory.getLogger(HedwigConsole.class);
- // NOTE: now it is fixed passwd in bookkeeper
- static byte[] passwd = "sillysecret".getBytes(UTF_8);
-
- // history file name
- static final String HW_HISTORY_FILE = ".hw_history";
-
- static final char[] CONTINUE_OR_QUIT = new char[] { 'Q', 'q', '\n' };
-
- protected MyCommandOptions cl = new MyCommandOptions();
- protected HashMap<Integer, String> history = new LinkedHashMap<Integer, String>();
- protected int commandCount = 0;
- protected boolean printWatches = true;
- protected Map<String, MyCommand> myCommands;
-
- protected boolean inConsole = true;
- protected ConsoleReader console = null;
-
- protected HedwigAdmin admin;
- protected HedwigClient hubClient;
- protected Publisher publisher;
- protected Subscriber subscriber;
- protected ConsoleMessageHandler consoleHandler =
- new ConsoleMessageHandler();
- protected Terminal terminal;
-
- protected String myRegion;
-
- interface MyCommand {
- boolean runCmd(String[] args) throws Exception;
- }
-
- static class HelpCmd implements MyCommand {
-
- @Override
- public boolean runCmd(String[] args) throws Exception {
- boolean printUsage = true;
- if (args.length >= 2) {
- String command = args[1];
- COMMAND c = getHedwigCommands().get(command);
- if (c != null) {
- c.printUsage();
- printUsage = false;
- }
- }
- if (printUsage) {
- usage();
- }
- return true;
- }
- }
-
- class ExitCmd implements MyCommand {
-
- @Override
- public boolean runCmd(String[] args) throws Exception {
- printMessage("Quitting ...");
- hubClient.close();
- admin.close();
- Runtime.getRuntime().exit(0);
- return true;
- }
- }
-
- class RedoCmd implements MyCommand {
-
- @Override
- public boolean runCmd(String[] args) throws Exception {
- if (args.length < 2) {
- return false;
- }
-
- int index;
- if ("!".equals(args[1])) {
- index = commandCount - 1;
- } else {
- index = Integer.decode(args[1]);
- if (commandCount <= index) {
- System.err.println("Command index out of range");
- return false;
- }
- }
- cl.parseCommand(history.get(index));
- if (cl.getCommand().equals("redo")) {
- System.err.println("No redoing redos");
- return false;
- }
- history.put(commandCount, history.get(index));
- processCmd(cl);
- return true;
- }
-
- }
-
- class HistoryCmd implements MyCommand {
-
- @Override
- public boolean runCmd(String[] args) throws Exception {
- for (int i=commandCount - 10; i<=commandCount; ++i) {
- if (i < 0) {
- continue;
- }
- System.out.println(i + " - " + history.get(i));
- }
- return true;
- }
-
- }
-
- class SetCmd implements MyCommand {
-
- @Override
- public boolean runCmd(String[] args) throws Exception {
- if (args.length < 3 || !"printwatches".equals(args[1])) {
- return false;
- } else if (args.length == 2) {
- System.out.println("printwatches is " + (printWatches ? "on" : "off"));
- } else {
- printWatches = args[2].equals("on");
- }
- return true;
- }
-
- }
-
- class PubCmd implements MyCommand {
-
- @Override
- public boolean runCmd(String[] args) throws Exception {
- if (args.length < 3) {
- return false;
- }
- ByteString topic = ByteString.copyFromUtf8(args[1]);
-
- StringBuilder sb = new StringBuilder();
- for (int i=2; i<args.length; i++) {
- sb.append(args[i]);
- if (i != args.length - 1) {
- sb.append(' ');
- }
- }
- ByteString msgBody = ByteString.copyFromUtf8(sb.toString());
- Message msg = Message.newBuilder().setBody(msgBody).build();
- try {
- publisher.publish(topic, msg);
- System.out.println("PUB DONE");
- } catch (Exception e) {
- System.err.println("PUB FAILED");
- e.printStackTrace();
- }
- return true;
- }
-
- }
-
- static class ConsoleMessageHandler implements MessageHandler {
-
- @Override
- public void deliver(ByteString topic, ByteString subscriberId,
- Message msg, Callback<Void> callback, Object context) {
- System.out.println("Received message from topic " + topic.toStringUtf8() +
- " for subscriber " + subscriberId.toStringUtf8() + " : "
- + msg.getBody().toStringUtf8());
- callback.operationFinished(context, null);
- }
-
- }
-
- static class ConsoleSubscriptionListener implements SubscriptionListener {
-
- @Override
- public void processEvent(ByteString t, ByteString s, SubscriptionEvent event) {
- System.out.println("Subscription Channel for (topic:" + t.toStringUtf8() + ", subscriber:"
- + s.toStringUtf8() + ") received event : " + event);
- }
- }
-
- class SubCmd implements MyCommand {
-
- @Override
- public boolean runCmd(String[] args) throws Exception {
- CreateOrAttach mode;
- boolean receive = true;
- if (args.length < 3) {
- return false;
- } else if (args.length == 3) {
- mode = CreateOrAttach.ATTACH;
- receive = true;
- } else {
- try {
- mode = CreateOrAttach.valueOf(Integer.parseInt(args[3]));
- } catch (Exception e) {
- System.err.println("Unknow mode : " + args[3]);
- return false;
- }
- if (args.length >= 5) {
- try {
- receive = Boolean.parseBoolean(args[4]);
- } catch (Exception e) {
- receive = false;
- }
- }
- }
- if (mode == null) {
- System.err.println("Unknow mode : " + args[3]);
- return false;
- }
- ByteString topic = ByteString.copyFromUtf8(args[1]);
- ByteString subId = ByteString.copyFromUtf8(args[2]);
- try {
- SubscriptionOptions options =
- SubscriptionOptions.newBuilder().setCreateOrAttach(mode)
- .setForceAttach(false).build();
- subscriber.subscribe(topic, subId, options);
- if (receive) {
- subscriber.startDelivery(topic, subId, consoleHandler);
- System.out.println("SUB DONE AND RECEIVE");
- } else {
- System.out.println("SUB DONE BUT NOT RECEIVE");
- }
- } catch (Exception e) {
- System.err.println("SUB FAILED");
- e.printStackTrace();
- }
- return true;
- }
- }
-
- class UnsubCmd implements MyCommand {
-
- @Override
- public boolean runCmd(String[] args) throws Exception {
- if (args.length < 3) {
- return false;
- }
- ByteString topic = ByteString.copyFromUtf8(args[1]);
- ByteString subId = ByteString.copyFromUtf8(args[2]);
- try {
- subscriber.stopDelivery(topic, subId);
- subscriber.unsubscribe(topic, subId);
- System.out.println("UNSUB DONE");
- } catch (Exception e) {
- System.err.println("UNSUB FAILED");
- e.printStackTrace();
- }
- return true;
- }
-
- }
-
- class RmsubCmd implements MyCommand {
-
- @Override
- public boolean runCmd(String[] args) throws Exception {
- if (args.length < 7) {
- return false;
- }
- String topicPrefix = args[1];
- int startTopic = Integer.parseInt(args[2]);
- int endTopic = Integer.parseInt(args[3]);
- String subPrefix = args[4];
- int startSub = Integer.parseInt(args[5]);
- int endSub = Integer.parseInt(args[6]);
- if (startTopic > endTopic || endSub < startSub) {
- return false;
- }
- for (int i=startTopic; i<=endTopic; i++) {
- ByteString topic = ByteString.copyFromUtf8(topicPrefix + i);
- try {
- for (int j=startSub; j<=endSub; j++) {
- ByteString sub = ByteString.copyFromUtf8(subPrefix + j);
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
- subscriber.subscribe(topic, sub, opts);
- subscriber.unsubscribe(topic, sub);
- }
- System.out.println("RMSUB " + topic.toStringUtf8() + " DONE");
- } catch (Exception e) {
- System.err.println("RMSUB " + topic.toStringUtf8() + " FAILED");
- e.printStackTrace();
- }
- }
- return true;
- }
-
- }
-
- class CloseSubscriptionCmd implements MyCommand {
-
- @Override
- public boolean runCmd(String[] args) throws Exception {
- if (args.length < 3) {
- return false;
- }
- ByteString topic = ByteString.copyFromUtf8(args[1]);
- ByteString sudId = ByteString.copyFromUtf8(args[2]);
-
- try {
- subscriber.stopDelivery(topic, sudId);
- subscriber.closeSubscription(topic, sudId);
- } catch (Exception e) {
- System.err.println("CLOSESUB FAILED");
- }
- return true;
- }
-
- }
-
- class ConsumeToCmd implements MyCommand {
-
- @Override
- public boolean runCmd(String[] args) throws Exception {
- if (args.length < 4) {
- return false;
- }
- ByteString topic = ByteString.copyFromUtf8(args[1]);
- ByteString subId = ByteString.copyFromUtf8(args[2]);
- long msgId = Long.parseLong(args[3]);
- MessageSeqId consumeId = MessageSeqId.newBuilder().setLocalComponent(msgId).build();
- try {
- subscriber.consume(topic, subId, consumeId);
- } catch (Exception e) {
- System.err.println("CONSUMETO FAILED");
- }
- return true;
- }
-
- }
-
- class ConsumeCmd implements MyCommand {
-
- @Override
- public boolean runCmd(String[] args) throws Exception {
- if (args.length < 4) {
- return false;
- }
- long lastConsumedId = 0;
- SubscriptionData subData = admin.getSubscription(ByteString.copyFromUtf8(args[1]),
- ByteString.copyFromUtf8(args[2]));
- if (null == subData) {
- System.err.println("Failed to read subscription for topic: " + args[1]
- + " subscriber: " + args[2]);
- return true;
- }
- lastConsumedId = subData.getState().getMsgId().getLocalComponent();
- long numMessagesToConsume = Long.parseLong(args[3]);
- long idToConsumed = lastConsumedId + numMessagesToConsume;
- System.out.println("Try to move subscriber(" + args[2] + ") consume ptr of topic(" + args[1]
- + ") from " + lastConsumedId + " to " + idToConsumed);
- MessageSeqId consumeId = MessageSeqId.newBuilder().setLocalComponent(idToConsumed).build();
- ByteString topic = ByteString.copyFromUtf8(args[1]);
- ByteString subId = ByteString.copyFromUtf8(args[2]);
- try {
- subscriber.consume(topic, subId, consumeId);
- } catch (Exception e) {
- System.err.println("CONSUME FAILED");
- }
- return true;
- }
-
- }
-
- class PubSubCmd implements MyCommand {
-
- @Override
- public boolean runCmd(String[] args) throws Exception {
- if (args.length < 5) {
- return false;
- }
- final long startTime = MathUtils.now();
-
- final ByteString topic = ByteString.copyFromUtf8(args[1]);
- final ByteString subId = ByteString.copyFromUtf8(args[2] + "-" + startTime);
- int timeoutSecs = 60;
- try {
- timeoutSecs = Integer.parseInt(args[3]);
- } catch (NumberFormatException nfe) {
- }
-
- StringBuilder sb = new StringBuilder();
- for (int i=4; i<args.length; i++) {
- sb.append(args[i]);
- if (i != args.length - 1) {
- sb.append(' ');
- }
- }
- // append a timestamp tag
- ByteString msgBody = ByteString.copyFromUtf8(sb.toString() + "-" + startTime);
- final Message msg = Message.newBuilder().setBody(msgBody).build();
-
- boolean subscribed = false;
- boolean success = false;
- final CountDownLatch isDone = new CountDownLatch(1);
- long elapsedTime = 0L;
-
- System.out.println("Starting PUBSUB test ...");
- try {
- // sub the topic
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
- subscriber.subscribe(topic, subId, opts);
- subscribed = true;
-
- System.out.println("Sub topic " + topic.toStringUtf8() + ", subscriber id " + subId.toStringUtf8());
-
-
-
- // pub topic
- publisher.publish(topic, msg);
- System.out.println("Pub topic " + topic.toStringUtf8() + " : " + msg.getBody().toStringUtf8());
-
- // ensure subscriber first, publish next, then we start delivery to receive message
- // if start delivery first before publish, isDone may notify before wait
- subscriber.startDelivery(topic, subId, new MessageHandler() {
-
- @Override
- public void deliver(ByteString thisTopic, ByteString subscriberId,
- Message message, Callback<Void> callback, Object context) {
- if (thisTopic.equals(topic) && subscriberId.equals(subId) &&
- msg.getBody().equals(message.getBody())) {
- System.out.println("Received message : " + message.getBody().toStringUtf8());
- isDone.countDown();
- }
- callback.operationFinished(context, null);
- }
-
- });
-
- // wait for the message
- success = isDone.await(timeoutSecs, TimeUnit.SECONDS);
- elapsedTime = MathUtils.now() - startTime;
- } finally {
- try {
- if (subscribed) {
- subscriber.stopDelivery(topic, subId);
- subscriber.unsubscribe(topic, subId);
- }
- } finally {
- if (success) {
- System.out.println("PUBSUB SUCCESS. TIME: " + elapsedTime + " MS");
- } else {
- System.out.println("PUBSUB FAILED. ");
- }
- return success;
- }
- }
- }
-
- }
-
- class ReadTopicCmd implements MyCommand {
-
- @Override
- public boolean runCmd(String[] args) throws Exception {
- if (args.length < 2) {
- return false;
- }
- ReadTopic rt;
- ByteString topic = ByteString.copyFromUtf8(args[1]);
- if (args.length == 2) {
- rt = new ReadTopic(admin, topic, inConsole);
- } else {
- rt = new ReadTopic(admin, topic, Long.parseLong(args[2]), inConsole);
- }
- rt.readTopic();
- return true;
- }
-
- }
-
- class ShowCmd implements MyCommand {
-
- static final int MAX_TOPICS_PER_SHOW = 100;
-
- @Override
- public boolean runCmd(String[] args) throws Exception {
- if (args.length < 2) {
- return false;
- }
- String errorMsg = null;
- try {
- if (HedwigCommands.SHOW_HUBS.equals(args[1])) {
- errorMsg = "Unable to fetch the list of hub servers";
- showHubs();
- } else if (HedwigCommands.SHOW_TOPICS.equals(args[1])) {
- errorMsg = "Unable to fetch the list of topics";
- showTopics();
- } else {
- System.err.println("ERROR: Unknown show command '" + args[1] + "'");
- return false;
- }
- } catch (Exception e) {
- if (null != errorMsg) {
- System.err.println(errorMsg);
- }
- e.printStackTrace();
- }
- return true;
- }
-
- protected void showHubs() throws Exception {
- Map<HedwigSocketAddress, HedwigAdmin.HubStats> hubs = admin.getAvailableHubs();
- System.out.println("Available Hub Servers:");
- for (Map.Entry<HedwigSocketAddress, HedwigAdmin.HubStats> entry : hubs.entrySet()) {
- System.out.println("\t" + entry.getKey() + " :\t" + entry.getValue());
- }
- }
-
- protected void showTopics() throws Exception {
- List<String> topics = new ArrayList<String>();
- Iterator<ByteString> iter = admin.getTopics();
-
- System.out.println("Topic List:");
- boolean stop = false;
- while (iter.hasNext()) {
- if (topics.size() >= MAX_TOPICS_PER_SHOW) {
- System.out.println(topics);
- topics.clear();
- stop = !continueOrQuit();
- if (stop) {
- break;
- }
- }
- ByteString t = iter.next();
- topics.add(t.toStringUtf8());
- }
- if (!stop) {
- System.out.println(topics);
- }
- }
-
-
-
- }
-
- class DescribeCmd implements MyCommand {
-
- @Override
- public boolean runCmd(String[] args) throws Exception {
- if (args.length < 3) {
- return false;
- }
- if (HedwigCommands.DESCRIBE_TOPIC.equals(args[1])) {
- return describeTopic(args[2]);
- } else {
- return false;
- }
- }
-
- protected boolean describeTopic(String topic) throws Exception {
- ByteString btopic = ByteString.copyFromUtf8(topic);
- HubInfo owner = admin.getTopicOwner(btopic);
- List<LedgerRange> ranges = admin.getTopicLedgers(btopic);
- Map<ByteString, SubscriptionData> states = admin.getTopicSubscriptions(btopic);
-
- System.out.println("===== Topic Information : " + topic + " =====");
- System.out.println();
- System.out.println("Owner : " + (owner == null ? "NULL" :
- owner.toString().trim().replaceAll("\n", ", ")));
- System.out.println();
-
- // print ledgers
- printTopicLedgers(ranges);
- // print subscriptions
- printTopicSubscriptions(states);
-
- return true;
- }
-
- private void printTopicLedgers(List<LedgerRange> ranges) {
- System.out.println(">>> Persistence Info <<<");
- if (null == ranges) {
- System.out.println("N/A");
- return;
- }
- if (ranges.isEmpty()) {
- System.out.println("No Ledger used.");
- return;
- }
- for (LedgerRange range : ranges) {
- System.out.println("Ledger " + range.getLedgerId() + " [ "
- + range.getStartSeqIdIncluded() + " ~ "
- + range.getEndSeqIdIncluded().getLocalComponent() + " ]");
- }
- System.out.println();
- }
-
- private void printTopicSubscriptions(Map<ByteString, SubscriptionData> states) {
- System.out.println(">>> Subscription Info <<<");
- if (0 == states.size()) {
- System.out.println("No subscriber.");
- return;
- }
- for (Map.Entry<ByteString, SubscriptionData> entry : states.entrySet()) {
- System.out.println("Subscriber " + entry.getKey().toStringUtf8() + " : "
- + SubscriptionStateUtils.toString(entry.getValue()));
- }
- System.out.println();
- }
-
- }
-
- class FormatCmd implements MyCommand {
-
- @Override
- public boolean runCmd(String[] args) throws Exception {
- boolean force = false;
- if (args.length >= 2 && "-force".equals(args[1])) {
- force = true;
- }
- boolean doFormat = true;
- System.out.println("You ask to format hedwig metadata stored in "
- + admin.getMetadataManagerFactory().getClass().getName() + ".");
- if (!force) {
- doFormat = continueOrQuit();
- }
- if (doFormat) {
- admin.format();
- System.out.println("Formatted hedwig metadata successfully.");
- } else {
- System.out.println("Given up formatting hedwig metadata.");
- }
- return true;
- }
-
- }
-
- protected Map<String, MyCommand> buildMyCommands() {
- Map<String, MyCommand> cmds =
- new HashMap<String, MyCommand>();
-
- ExitCmd exitCmd = new ExitCmd();
- cmds.put(EXIT, exitCmd);
- cmds.put(QUIT, exitCmd);
- cmds.put(HELP, new HelpCmd());
- cmds.put(HISTORY, new HistoryCmd());
- cmds.put(REDO, new RedoCmd());
- cmds.put(SET, new SetCmd());
- cmds.put(PUB, new PubCmd());
- cmds.put(SUB, new SubCmd());
- cmds.put(PUBSUB, new PubSubCmd());
- cmds.put(CLOSESUB, new CloseSubscriptionCmd());
- cmds.put(UNSUB, new UnsubCmd());
- cmds.put(RMSUB, new RmsubCmd());
- cmds.put(CONSUME, new ConsumeCmd());
- cmds.put(CONSUMETO, new ConsumeToCmd());
- cmds.put(SHOW, new ShowCmd());
- cmds.put(DESCRIBE, new DescribeCmd());
- cmds.put(READTOPIC, new ReadTopicCmd());
- cmds.put(FORMAT, new FormatCmd());
-
- return cmds;
- }
-
- static void usage() {
- System.err.println("HedwigConsole [options] [command] [args]");
- System.err.println();
- System.err.println("Avaiable commands:");
- for (String cmd : getHedwigCommands().keySet()) {
- System.err.println("\t" + cmd);
- }
- System.err.println();
- }
-
- /**
- * A storage class for both command line options and shell commands.
- */
- static private class MyCommandOptions {
-
- private Map<String,String> options = new HashMap<String,String>();
- private List<String> cmdArgs = null;
- private String command = null;
-
- public MyCommandOptions() {
- }
-
- public String getOption(String opt) {
- return options.get(opt);
- }
-
- public String getCommand( ) {
- return command;
- }
-
- public String getCmdArgument( int index ) {
- return cmdArgs.get(index);
- }
-
- public int getNumArguments( ) {
- return cmdArgs.size();
- }
-
- public String[] getArgArray() {
- return cmdArgs.toArray(new String[0]);
- }
-
- /**
- * Parses a command line that may contain one or more flags
- * before an optional command string
- * @param args command line arguments
- * @return true if parsing succeeded, false otherwise.
- */
- public boolean parseOptions(String[] args) {
- List<String> argList = Arrays.asList(args);
- Iterator<String> it = argList.iterator();
-
- while (it.hasNext()) {
- String opt = it.next();
- if (!opt.startsWith("-")) {
- command = opt;
- cmdArgs = new ArrayList<String>( );
- cmdArgs.add( command );
- while (it.hasNext()) {
- cmdArgs.add(it.next());
- }
- return true;
- } else {
- try {
- options.put(opt.substring(1), it.next());
- } catch (NoSuchElementException e) {
- System.err.println("Error: no argument found for option "
- + opt);
- return false;
- }
- }
- }
- return true;
- }
-
- /**
- * Breaks a string into command + arguments.
- * @param cmdstring string of form "cmd arg1 arg2..etc"
- * @return true if parsing succeeded.
- */
- public boolean parseCommand( String cmdstring ) {
- String[] args = cmdstring.split(" ");
- if (args.length == 0){
- return false;
- }
- command = args[0];
- cmdArgs = Arrays.asList(args);
- return true;
- }
- }
-
- private class MyWatcher implements Watcher {
- public void process(WatchedEvent event) {
- if (getPrintWatches()) {
- printMessage("WATCHER::");
- printMessage(event.toString());
- }
- }
- }
-
- public void printMessage(String msg) {
- if (inConsole) {
- System.out.println("\n"+msg);
- }
- }
-
- /**
- * Hedwig Console
- *
- * @param args arguments
- * @throws IOException
- * @throws InterruptedException
- */
- public HedwigConsole(String[] args) throws IOException, InterruptedException {
- // Setup Terminal
- terminal = Terminal.setupTerminal();
- HedwigCommands.init();
- cl.parseOptions(args);
-
- if (cl.getCommand() == null) {
- inConsole = true;
- } else {
- inConsole = false;
- }
-
- org.apache.bookkeeper.conf.ClientConfiguration bkClientConf =
- new org.apache.bookkeeper.conf.ClientConfiguration();
- ServerConfiguration hubServerConf = new ServerConfiguration();
- String serverCfgFile = cl.getOption("server-cfg");
- if (serverCfgFile != null) {
- try {
- hubServerConf.loadConf(new File(serverCfgFile).toURI().toURL());
- } catch (ConfigurationException e) {
- throw new IOException(e);
- }
- try {
- bkClientConf.loadConf(new File(serverCfgFile).toURI().toURL());
- } catch (ConfigurationException e) {
- throw new IOException(e);
- }
- }
-
- ClientConfiguration hubClientCfg = new ClientConfiguration();
- String clientCfgFile = cl.getOption("client-cfg");
- if (clientCfgFile != null) {
- try {
- hubClientCfg.loadConf(new File(clientCfgFile).toURI().toURL());
- } catch (ConfigurationException e) {
- throw new IOException(e);
- }
- }
-
- printMessage("Connecting to zookeeper/bookkeeper using HedwigAdmin");
- try {
- admin = new HedwigAdmin(bkClientConf, hubServerConf);
- admin.getZkHandle().register(new MyWatcher());
- } catch (Exception e) {
- throw new IOException(e);
- }
-
- printMessage("Connecting to default hub server " + hubClientCfg.getDefaultServerHost());
- hubClient = new HedwigClient(hubClientCfg);
- publisher = hubClient.getPublisher();
- subscriber = hubClient.getSubscriber();
- subscriber.addSubscriptionListener(new ConsoleSubscriptionListener());
-
- // other parameters
- myRegion = hubServerConf.getMyRegion();
- }
-
- public boolean getPrintWatches() {
- return printWatches;
- }
-
- protected String getPrompt() {
- StringBuilder sb = new StringBuilder();
- sb.append("[hedwig: (").append(myRegion).append(") ").append(commandCount).append("] ");
- return sb.toString();
- }
-
- protected boolean continueOrQuit() throws IOException {
- System.out.println("Press <Return> to continue, or Q to cancel ...");
- int ch;
- if (null != console) {
- ch = console.readCharacter(CONTINUE_OR_QUIT);
- } else {
- do {
- ch = terminal.readCharacter(System.in);
- } while (ch != 'q' && ch != 'Q' && ch != '\n');
- }
- if (ch == 'q' ||
- ch == 'Q') {
- return false;
- }
- return true;
- }
-
- protected void addToHistory(int i, String cmd) {
- history.put(i, cmd);
- }
-
- public void executeLine(String line) {
- if (!line.equals("")) {
- cl.parseCommand(line);
- addToHistory(commandCount, line);
- processCmd(cl);
- commandCount++;
- }
- }
-
- protected boolean processCmd(MyCommandOptions co) {
- String[] args = co.getArgArray();
- String cmd = co.getCommand();
- if (args.length < 1) {
- usage();
- return false;
- }
- if (!getHedwigCommands().containsKey(cmd)) {
- usage();
- return false;
- }
-
- LOG.debug("Processing {}", cmd);
-
- MyCommand myCommand = myCommands.get(cmd);
- if (myCommand == null) {
- System.err.println("No Command Processor found for command " + cmd);
- usage();
- return false;
- }
-
- long startTime = MathUtils.now();
- boolean success = false;
- try {
- success = myCommand.runCmd(args);
- } catch (Exception e) {
- e.printStackTrace();
- success = false;
- }
- long elapsedTime = MathUtils.now() - startTime;
- if (inConsole) {
- if (success) {
- System.out.println("Finished " + ((double)elapsedTime / 1000) + " s.");
- } else {
- COMMAND c = getHedwigCommands().get(cmd);
- if (c != null) {
- c.printUsage();
- }
- }
- }
- return success;
- }
-
- @SuppressWarnings("unchecked")
- void run() throws IOException {
- inConsole = true;
- myCommands = buildMyCommands();
- if (cl.getCommand() == null) {
- System.out.println("Welcome to Hedwig!");
- System.out.println("JLine support is enabled");
-
- console = new ConsoleReader();
- JLineHedwigCompletor completor = new JLineHedwigCompletor(admin);
- console.addCompletor(completor);
-
- // load history file
- History history = new History();
- File file = new File(System.getProperty("hw.history",
- new File(System.getProperty("user.home"), HW_HISTORY_FILE).toString()));
- if (LOG.isDebugEnabled()) {
- LOG.debug("History file is " + file.toString());
- }
- history.setHistoryFile(file);
- // set history to console reader
- console.setHistory(history);
- // load history from history file
- history.moveToFirstEntry();
-
- while (history.next()) {
- String entry = history.current();
- if (!entry.equals("")) {
- addToHistory(commandCount, entry);
- }
- commandCount++;
- }
- System.out.println("JLine history support is enabled");
-
- String line;
- while ((line = console.readLine(getPrompt())) != null) {
- executeLine(line);
- history.addToHistory(line);
- }
- }
-
- inConsole = false;
- processCmd(cl);
- try {
- myCommands.get(EXIT).runCmd(new String[0]);
- } catch (Exception e) {
- }
- }
-
- public static void main(String[] args) throws IOException, InterruptedException {
- HedwigConsole console = new HedwigConsole(args);
- console.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/admin/console/JLineHedwigCompletor.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/admin/console/JLineHedwigCompletor.java b/hedwig-server/src/main/java/org/apache/hedwig/admin/console/JLineHedwigCompletor.java
deleted file mode 100644
index cdc0c33..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/admin/console/JLineHedwigCompletor.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hedwig.admin.console;
-
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.zookeeper.KeeperException;
-import org.apache.hedwig.admin.HedwigAdmin;
-
-import com.google.protobuf.ByteString;
-
-import jline.Completor;
-
-import static org.apache.hedwig.admin.console.HedwigCommands.*;
-
-/**
- * A jline completor for hedwig console
- */
-public class JLineHedwigCompletor implements Completor {
- // for topic completion
- static final int MAX_TOPICS_TO_SEARCH = 1000;
-
- private HedwigAdmin admin;
-
- public JLineHedwigCompletor(HedwigAdmin admin) {
- this.admin = admin;
- }
-
- @Override
- public int complete(String buffer, int cursor, List candidates) {
- // Guarantee that the final token is the one we're expanding
- buffer = buffer.substring(0,cursor);
- String[] tokens = buffer.split(" ");
- if (buffer.endsWith(" ")) {
- String[] newTokens = new String[tokens.length + 1];
- System.arraycopy(tokens, 0, newTokens, 0, tokens.length);
- newTokens[newTokens.length - 1] = "";
- tokens = newTokens;
- }
-
- if (tokens.length > 2 &&
- DESCRIBE.equalsIgnoreCase(tokens[0]) &&
- DESCRIBE_TOPIC.equalsIgnoreCase(tokens[1])) {
- return completeTopic(buffer, tokens[2], candidates);
- } else if (tokens.length > 1 &&
- (SUB.equalsIgnoreCase(tokens[0]) ||
- PUB.equalsIgnoreCase(tokens[0]) ||
- CLOSESUB.equalsIgnoreCase(tokens[0]) ||
- CONSUME.equalsIgnoreCase(tokens[0]) ||
- CONSUMETO.equalsIgnoreCase(tokens[0]) ||
- READTOPIC.equalsIgnoreCase(tokens[0]))) {
- return completeTopic(buffer, tokens[1], candidates);
- }
- List cmds = HedwigCommands.findCandidateCommands(tokens);
- return completeCommand(buffer, tokens[tokens.length - 1], cmds, candidates);
- }
-
- @SuppressWarnings("unchecked")
- private int completeCommand(String buffer, String token,
- List commands, List candidates) {
- for (Object cmdo : commands) {
- assert (cmdo instanceof String);
- if (((String)cmdo).startsWith(token)) {
- candidates.add(cmdo);
- }
- }
- return buffer.lastIndexOf(" ") + 1;
- }
-
- @SuppressWarnings("unchecked")
- private int completeTopic(String buffer, String token, List candidates) {
- try {
- Iterator<ByteString> children = admin.getTopics();
- int i = 0;
- while (children.hasNext() && i <= MAX_TOPICS_TO_SEARCH) {
- String child = children.next().toStringUtf8();
- if (child.startsWith(token)) {
- candidates.add(child);
- }
- ++i;
- }
- } catch (Exception e) {
- return buffer.length();
- }
- return candidates.size() == 0 ? buffer.length() : buffer.lastIndexOf(" ") + 1;
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java b/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java
deleted file mode 100644
index edad190..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/admin/console/ReadTopic.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hedwig.admin.console;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.Enumeration;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.BookKeeper.DigestType;
-import org.apache.bookkeeper.client.LedgerEntry;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.hedwig.admin.HedwigAdmin;
-import org.apache.hedwig.protocol.PubSubProtocol.LedgerRange;
-import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.RegionSpecificSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-
-import static com.google.common.base.Charsets.UTF_8;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-
-/**
- * A tool to read topic messages.
- *
- * This tool :
- * 1) read persistence info from zookeeper: ledger ranges
- * 2) read subscription infor from zookeeper: we can know the least message id (ledger id)
- * 3) use bk client to read message starting from least message id
- */
-public class ReadTopic {
-
- final HedwigAdmin admin;
- final ByteString topic;
- long startSeqId;
- long leastConsumedSeqId = Long.MAX_VALUE;
- final boolean inConsole;
-
- static final int RC_OK = 0;
- static final int RC_ERROR = -1;
- static final int RC_NOTOPIC = -2;
- static final int RC_NOLEDGERS = -3;
- static final int RC_NOSUBSCRIBERS = -4;
-
- static final int NUM_MESSAGES_TO_PRINT = 15;
-
- List<LedgerRange> ledgers = new ArrayList<LedgerRange>();
-
- /**
- * Constructor
- */
- public ReadTopic(HedwigAdmin admin, ByteString topic, boolean inConsole) {
- this(admin, topic, 1, inConsole);
- }
-
- /**
- * Constructor
- */
- public ReadTopic(HedwigAdmin admin, ByteString topic, long msgSeqId, boolean inConsole) {
- this.admin = admin;
- this.topic = topic;
- this.startSeqId = msgSeqId;
- this.inConsole = inConsole;
- }
-
- /**
- * Check whether the topic existed or not
- *
- * @return RC_OK if topic is existed; RC_NOTOPIC if not.
- * @throws Exception
- */
- protected int checkTopic() throws Exception {
- return admin.hasTopic(topic) ? RC_OK : RC_NOTOPIC;
- }
-
- /**
- * Get the ledgers used by this topic to store messages
- *
- * @return RC_OK if topic has messages; RC_NOLEDGERS if not.
- * @throws Exception
- */
- protected int getTopicLedgers() throws Exception {
- List<LedgerRange> ranges = admin.getTopicLedgers(topic);
- if (null == ranges || ranges.isEmpty()) {
- return RC_NOLEDGERS;
- }
- ledgers.addAll(ranges);
- return RC_OK;
- }
-
- protected int getLeastSubscription() throws Exception {
- Map<ByteString, SubscriptionData> states = admin.getTopicSubscriptions(topic);
- if (states.isEmpty()) {
- return RC_NOSUBSCRIBERS;
- }
- for (Map.Entry<ByteString, SubscriptionData> entry : states.entrySet()) {
- SubscriptionData state = entry.getValue();
- long localMsgId = state.getState().getMsgId().getLocalComponent();
- if (localMsgId < leastConsumedSeqId) {
- leastConsumedSeqId = localMsgId;
- }
- }
- if (leastConsumedSeqId == Long.MAX_VALUE) {
- leastConsumedSeqId = 0;
- }
- return RC_OK;
- }
-
- public void readTopic() {
- try {
- int rc = _readTopic();
- switch (rc) {
- case RC_NOTOPIC:
- System.err.println("No topic " + topic + " found.");
- break;
- case RC_NOLEDGERS:
- System.err.println("No message is published to topic " + topic);
- break;
- default:
- break;
- }
- } catch (Exception e) {
- System.err.println("ERROR: read messages of topic " + topic + " failed.");
- e.printStackTrace();
- }
- }
-
- protected int _readTopic() throws Exception {
- int rc;
- // check topic
- rc = checkTopic();
- if (RC_OK != rc) {
- return rc;
- }
- // get topic ledgers
- rc = getTopicLedgers();
- if (RC_OK != rc) {
- return rc;
- }
- // get topic subscription to find the least one
- rc = getLeastSubscription();
- if (RC_NOSUBSCRIBERS == rc) {
- startSeqId = 1;
- } else if (RC_OK == rc) {
- if (leastConsumedSeqId > startSeqId) {
- startSeqId = leastConsumedSeqId + 1;
- }
- } else {
- return rc;
- }
-
- for (LedgerRange range : ledgers) {
- long endSeqId = range.getEndSeqIdIncluded().getLocalComponent();
- if (endSeqId < startSeqId) {
- continue;
- }
- boolean toContinue = readLedger(range);
- startSeqId = endSeqId + 1;
- if (!toContinue) {
- break;
- }
- }
-
- return RC_OK;
- }
-
- /**
- * Read a specific ledger
- *
- * @param ledger in memory ledger range
- * @param endSeqId end seq id
- * @return true if continue, otherwise false
- * @throws BKException
- * @throws IOException
- * @throws InterruptedException
- */
- protected boolean readLedger(LedgerRange ledger)
- throws BKException, IOException, InterruptedException {
- long tEndSeqId = ledger.getEndSeqIdIncluded().getLocalComponent();
-
- if (tEndSeqId < this.startSeqId) {
- return true;
- }
- // Open Ledger Handle
- long ledgerId = ledger.getLedgerId();
- System.out.println("\n>>>>> " + ledger + " <<<<<\n");
- LedgerHandle lh = null;
- try {
- lh = admin.getBkHandle().openLedgerNoRecovery(ledgerId, admin.getBkDigestType(), admin.getBkPasswd());
- } catch (BKException e) {
- System.err.println("ERROR: No ledger " + ledgerId + " found. maybe garbage collected due to the messages are consumed.");
- }
- if (null == lh) {
- return true;
- }
- long expectedEntryId = startSeqId - ledger.getStartSeqIdIncluded();
-
- long correctedEndSeqId = tEndSeqId;
- try {
- while (startSeqId <= tEndSeqId) {
- correctedEndSeqId = Math.min(startSeqId + NUM_MESSAGES_TO_PRINT - 1, tEndSeqId);
-
- try {
- Enumeration<LedgerEntry> seq =
- lh.readEntries(startSeqId - ledger.getStartSeqIdIncluded(),
- correctedEndSeqId - ledger.getStartSeqIdIncluded());
- LedgerEntry entry = null;
- while (seq.hasMoreElements()) {
- entry = seq.nextElement();
- Message message;
- try {
- message = Message.parseFrom(entry.getEntryInputStream());
- } catch (IOException e) {
- System.out.println("WARN: Unreadable message found\n");
- expectedEntryId++;
- continue;
- }
- if (expectedEntryId != entry.getEntryId()
- || (message.getMsgId().getLocalComponent() - ledger.getStartSeqIdIncluded()) != expectedEntryId) {
- throw new IOException("ERROR: Message ids are out of order : expected entry id " + expectedEntryId
- + ", current entry id " + entry.getEntryId() + ", msg seq id " + message.getMsgId().getLocalComponent());
- }
- expectedEntryId++;
- formatMessage(message);
-
- }
- startSeqId = correctedEndSeqId + 1;
- if (inConsole) {
- if (!pressKeyToContinue()) {
- return false;
- }
- }
- } catch (BKException.BKReadException be) {
- throw be;
- }
- }
- } catch (BKException bke) {
- if (tEndSeqId != Long.MAX_VALUE) {
- System.err.println("ERROR: ledger " + ledgerId + " may be corrupted, since read messages ["
- + startSeqId + " ~ " + correctedEndSeqId + " ] failed :");
- throw bke;
- }
- }
- System.out.println("\n");
- return true;
- }
-
- protected void formatMessage(Message message) {
- // print msg id
- String msgId;
- if (!message.hasMsgId()) {
- msgId = "N/A";
- } else {
- MessageSeqId seqId = message.getMsgId();
- StringBuilder idBuilder = new StringBuilder();
- if (seqId.hasLocalComponent()) {
- idBuilder.append("LOCAL(").append(seqId.getLocalComponent()).append(")");
- } else {
- List<RegionSpecificSeqId> remoteIds = seqId.getRemoteComponentsList();
- int i = 0, numRegions = remoteIds.size();
- idBuilder.append("REMOTE(");
- for (RegionSpecificSeqId rssid : remoteIds) {
- idBuilder.append(rssid.getRegion().toStringUtf8());
- idBuilder.append("[");
- idBuilder.append(rssid.getSeqId());
- idBuilder.append("]");
- ++i;
- if (i < numRegions) {
- idBuilder.append(",");
- }
- }
- idBuilder.append(")");
- }
- msgId = idBuilder.toString();
- }
- System.out.println("---------- MSGID=" + msgId + " ----------");
- System.out.println("MsgId: " + msgId);
- // print source region
- if (message.hasSrcRegion()) {
- System.out.println("SrcRegion: " + message.getSrcRegion().toStringUtf8());
- } else {
- System.out.println("SrcRegion: N/A");
- }
- // print message body
- System.out.println("Message:");
- System.out.println();
- if (message.hasBody()) {
- System.out.println(message.getBody().toStringUtf8());
- } else {
- System.out.println("N/A");
- }
- System.out.println();
- }
-
- boolean pressKeyToContinue() throws IOException {
- System.out.println("Press Y to continue...");
- BufferedReader stdin = new BufferedReader(new InputStreamReader(System.in, UTF_8));
- int ch = stdin.read();
- if (ch == 'y' ||
- ch == 'Y') {
- return true;
- }
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/data/MessageFormatter.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/data/MessageFormatter.java b/hedwig-server/src/main/java/org/apache/hedwig/data/MessageFormatter.java
deleted file mode 100644
index 6ac6879..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/data/MessageFormatter.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.hedwig.data;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.bookkeeper.util.EntryFormatter;
-import org.apache.commons.configuration.Configuration;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.RegionSpecificSeqId;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Format a pub sub message into a readable format.
- */
-public class MessageFormatter extends EntryFormatter {
- private static final Logger logger = LoggerFactory.getLogger(MessageFormatter.class);
-
- static final String MESSAGE_PAYLOAD_FORMATTER_CLASS = "message_payload_formatter_class";
-
- EntryFormatter dataFormatter = EntryFormatter.STRING_FORMATTER;
-
- @Override
- public void setConf(Configuration conf) {
- super.setConf(conf);
- dataFormatter = EntryFormatter.newEntryFormatter(conf, MESSAGE_PAYLOAD_FORMATTER_CLASS);
- }
-
- @Override
- public void formatEntry(java.io.InputStream input) {
- Message message;
- try {
- message = Message.parseFrom(input);
- } catch (IOException e) {
- System.out.println("WARN: Unreadable message found\n");
- EntryFormatter.STRING_FORMATTER.formatEntry(input);
- return;
- }
- formatMessage(message);
- }
-
- @Override
- public void formatEntry(byte[] data) {
- Message message;
- try {
- message = Message.parseFrom(data);
- } catch (IOException e) {
- System.out.println("WARN: Unreadable message found\n");
- EntryFormatter.STRING_FORMATTER.formatEntry(data);
- return;
- }
- formatMessage(message);
- }
-
- void formatMessage(Message message) {
- // print msg id
- String msgId;
- if (!message.hasMsgId()) {
- msgId = "N/A";
- } else {
- MessageSeqId seqId = message.getMsgId();
- StringBuilder idBuilder = new StringBuilder();
- if (seqId.hasLocalComponent()) {
- idBuilder.append("LOCAL(").append(seqId.getLocalComponent()).append(")");
- } else {
- List<RegionSpecificSeqId> remoteIds = seqId.getRemoteComponentsList();
- int i = 0, numRegions = remoteIds.size();
- idBuilder.append("REMOTE(");
- for (RegionSpecificSeqId rssid : remoteIds) {
- idBuilder.append(rssid.getRegion().toStringUtf8());
- idBuilder.append("[");
- idBuilder.append(rssid.getSeqId());
- idBuilder.append("]");
- ++i;
- if (i < numRegions) {
- idBuilder.append(",");
- }
- }
- idBuilder.append(")");
- }
- msgId = idBuilder.toString();
- }
- System.out.println("****** MSGID=" + msgId + " ******");
- System.out.println("MessageId: " + msgId);
- // print source region
- if (message.hasSrcRegion()) {
- System.out.println("SrcRegion: " + message.getSrcRegion().toStringUtf8());
- } else {
- System.out.println("SrcRegion: N/A");
- }
- // print message body
- if (message.hasBody()) {
- System.out.println("Body:");
- dataFormatter.formatEntry(message.getBody().toByteArray());
- } else {
- System.out.println("Body: N/A");
- }
- System.out.println();
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/AbstractBenchmark.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/AbstractBenchmark.java b/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/AbstractBenchmark.java
deleted file mode 100644
index 21687eb..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/AbstractBenchmark.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.benchmark;
-
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.bookkeeper.util.MathUtils;
-import org.apache.hedwig.util.ConcurrencyUtils;
-
-public abstract class AbstractBenchmark {
-
- private static final Logger logger = LoggerFactory.getLogger(AbstractBenchmark.class);
-
- AtomicLong totalLatency = new AtomicLong();
- LinkedBlockingQueue<Boolean> doneSignalQueue = new LinkedBlockingQueue<Boolean>();
-
- abstract void doOps(int numOps) throws Exception;
- abstract void tearDown() throws Exception;
-
- protected class AbstractCallback {
- AtomicInteger numDone = new AtomicInteger(0);
- Semaphore outstanding;
- int numOps;
- boolean logging;
-
- public AbstractCallback(Semaphore outstanding, int numOps) {
- this.outstanding = outstanding;
- this.numOps = numOps;
- logging = Boolean.getBoolean("progress");
- }
-
- public void handle(boolean success, Object ctx) {
- outstanding.release();
-
- if (!success) {
- ConcurrencyUtils.put(doneSignalQueue, false);
- return;
- }
-
- totalLatency.addAndGet(MathUtils.now() - (Long)ctx);
- int numDoneInt = numDone.incrementAndGet();
-
- if (logging && numDoneInt % 10000 == 0) {
- logger.info("Finished " + numDoneInt + " ops");
- }
-
- if (numOps == numDoneInt) {
- ConcurrencyUtils.put(doneSignalQueue, true);
- }
- }
- }
-
- public void runPhase(String phase, int numOps) throws Exception {
- long startTime = MathUtils.now();
-
- doOps(numOps);
-
- if (!doneSignalQueue.take()) {
- logger.error("One or more operations failed in phase: " + phase);
- throw new RuntimeException();
- } else {
- logger.info("Phase: " + phase + " Avg latency : " + totalLatency.get() / numOps + ", tput = " + (numOps * 1000/ (MathUtils.now() - startTime)));
- }
- }
-
-
-
-
-
- public void run() throws Exception {
-
- int numWarmup = Integer.getInteger("nWarmup", 50000);
- runPhase("warmup", numWarmup);
-
- logger.info("Sleeping for 10 seconds");
- Thread.sleep(10000);
- //reset latency
- totalLatency.set(0);
-
- int numOps = Integer.getInteger("nOps", 400000);
- runPhase("real", numOps);
-
- tearDown();
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookieBenchmark.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookieBenchmark.java b/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookieBenchmark.java
deleted file mode 100644
index d58883d..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookieBenchmark.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.benchmark;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Semaphore;
-import org.apache.bookkeeper.conf.ClientConfiguration;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.bookkeeper.proto.BookieClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
-import org.apache.bookkeeper.util.MathUtils;
-import org.apache.bookkeeper.util.OrderedSafeExecutor;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-
-public class BookieBenchmark extends AbstractBenchmark {
-
- private static final Logger logger = LoggerFactory.getLogger(BookkeeperBenchmark.class);
-
- BookieClient bkc;
- BookieSocketAddress addr;
- ClientSocketChannelFactory channelFactory;
- OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder()
- .name("BookieBenchmarkScheduler")
- .numThreads(1)
- .build();
-
- public BookieBenchmark(String bookieHostPort) throws Exception {
- channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
- bkc = new BookieClient(new ClientConfiguration(), channelFactory, executor);
- String[] hostPort = bookieHostPort.split(":");
- addr = new BookieSocketAddress(hostPort[0], Integer.parseInt(hostPort[1]));
- }
-
-
- @Override
- void doOps(final int numOps) throws Exception {
- int numOutstanding = Integer.getInteger("nPars",1000);
- final Semaphore outstanding = new Semaphore(numOutstanding);
-
-
- WriteCallback callback = new WriteCallback() {
- AbstractCallback handler = new AbstractCallback(outstanding, numOps);
-
- @Override
- public void writeComplete(int rc, long ledgerId, long entryId,
- BookieSocketAddress addr, Object ctx) {
- handler.handle(rc == BKException.Code.OK, ctx);
- }
- };
-
- byte[] passwd = new byte[20];
- int size = Integer.getInteger("size", 1024);
- byte[] data = new byte[size];
-
- for (int i=0; i<numOps; i++) {
- outstanding.acquire();
-
- ByteBuffer buffer = ByteBuffer.allocate(44);
- long ledgerId = 1000;
- buffer.putLong(ledgerId);
- buffer.putLong(i);
- buffer.putLong(0);
- buffer.put(passwd);
- buffer.rewind();
- ChannelBuffer toSend = ChannelBuffers.wrappedBuffer(ChannelBuffers.wrappedBuffer(buffer.slice()), ChannelBuffers.wrappedBuffer(data));
- bkc.addEntry(addr, ledgerId, passwd, i, toSend, callback, MathUtils.now(), 0);
- }
-
- }
-
- @Override
- public void tearDown() {
- bkc.close();
- channelFactory.releaseExternalResources();
- executor.shutdown();
- }
-
-
- public static void main(String[] args) throws Exception {
- BookieBenchmark benchmark = new BookieBenchmark(args[0]);
- benchmark.run();
- }
-
-
-}