You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/06/07 06:10:29 UTC

[GitHub] sijie closed pull request #1485: [table service] Move table service cli to bookkeeper-tools

sijie closed pull request #1485: [table service] Move table service cli to bookkeeper-tools
URL: https://github.com/apache/bookkeeper/pull/1485
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bin/common.sh b/bin/common.sh
index 0c048c9c6..0a0146f99 100755
--- a/bin/common.sh
+++ b/bin/common.sh
@@ -149,7 +149,7 @@ find_module_jar() {
       read -p "Do you want me to run \`mvn package -DskiptTests\` for you ? " answer
       case "${answer:0:1}" in
         y|Y )
-          mvn package -DskipTests
+          mvn package -DskipTests -Dstream
           ;;
         * )
           exit 1
@@ -183,7 +183,7 @@ add_maven_deps_to_classpath() {
   # clean it up and force us create a new one.
   f="${MODULE_PATH}/target/cached_classpath.txt"
   if [ ! -f ${f} ]; then
-    ${MVN} -f "${MODULE_PATH}/pom.xml" dependency:build-classpath -Dmdep.outputFile="target/cached_classpath.txt" &> /dev/null
+    ${MVN} -f "${MODULE_PATH}/pom.xml" -Dstream dependency:build-classpath -Dmdep.outputFile="target/cached_classpath.txt" &> /dev/null
   fi
 }
 
diff --git a/bin/standalone.docker-compose b/bin/standalone.docker-compose
index 25e9fcbcb..4cf1ffd2b 100755
--- a/bin/standalone.docker-compose
+++ b/bin/standalone.docker-compose
@@ -54,23 +54,30 @@ function gen_bookie_section() {
   local cluster=$1
   local bookie_name=$2
   local bookie_port=$3
-  local image=$4
+  local bookie_http_port=$4
+  local bookie_grpc_port=$5
+  local image=$6
   cat <<EOF
   ${bookie_name}:
     image: ${image}
     depends_on:
       - metadata-service
     environment:
+      # metadata service
       - BK_zkServers=metadata-service
-      - BK_metadataServiceUri=zk://metadata-service/ledgers
       - BK_zkLedgersRootPath=/ledgers
-      - BK_journalDirectory=/data/bookkeeper/journal
-      - BK_ledgerDirectories=/data/bookkeeper/ledgers
-      - BK_indexDirectories=/data/bookkeeper/ledgers
-      - BK_advertisedAddress=localhost
-      - BK_bookiePort=${bookie_port}
+      - BK_metadataServiceUri=zk://metadata-service/ledgers
+      # bookie
+      - BK_DATA_DIR=/data/bookkeeper
+      - BK_advertisedAddress=${bookie_name}
+      # bookie http
+      - BK_httpServerEnabled=true
+      # stream storage
+      - BK_extraServerComponents=org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent
     ports:
-      - "${bookie_port}:${bookie_port}"
+      - "${bookie_port}:3181"
+      - "${bookie_http_port}:8080"
+      - "${bookie_grpc_port}:4181"
     volumes:
       - "${DATA_ROOT_DIR}/${cluster}/${bookie_name}/journal:/data/bookkeeper/journal"
       - "${DATA_ROOT_DIR}/${cluster}/${bookie_name}/ledgers:/data/bookkeeper/ledgers"
@@ -94,7 +101,9 @@ function generate_docker_compose_file() {
   local BI=0
   while [ ${BI} -lt $((num_bookies)) ]; do
     local bookie_port=$((3181 + BI))
-    local bookie_section=$(gen_bookie_section ${cluster} "bookie-${BI}" ${bookie_port} ${image})
+    local bookie_http_port=$((8080 + BI))
+    local bookie_grpc_port=$((4181 + BI))
+    local bookie_section=$(gen_bookie_section ${cluster} "bookie-${BI}" ${bookie_port} ${bookie_http_port} ${bookie_grpc_port} ${image})
     echo "${bookie_section}"        >> ${docker_compose_file}
     let BI=BI+1
   done
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/helpers/BookieCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/helpers/BookieCommand.java
index 64c630879..a2d1bbcf0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/helpers/BookieCommand.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/helpers/BookieCommand.java
@@ -21,6 +21,7 @@
 import org.apache.bookkeeper.common.net.ServiceURI;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.tools.common.BKCommand;
+import org.apache.bookkeeper.tools.common.BKFlags;
 import org.apache.bookkeeper.tools.framework.CliFlags;
 import org.apache.bookkeeper.tools.framework.CliSpec;
 import org.apache.commons.configuration.CompositeConfiguration;
@@ -37,6 +38,7 @@ protected BookieCommand(CliSpec<BookieFlagsT> spec) {
     @Override
     protected boolean apply(ServiceURI serviceURI,
                             CompositeConfiguration conf,
+                            BKFlags globalFlags,
                             BookieFlagsT cmdFlags) {
         ServerConfiguration serverConf = new ServerConfiguration();
         serverConf.loadConf(conf);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/helpers/ClientCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/helpers/ClientCommand.java
index 7738911ca..ae807dfb6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/helpers/ClientCommand.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/helpers/ClientCommand.java
@@ -24,6 +24,7 @@
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.tools.common.BKCommand;
+import org.apache.bookkeeper.tools.common.BKFlags;
 import org.apache.bookkeeper.tools.framework.CliFlags;
 import org.apache.bookkeeper.tools.framework.CliSpec;
 import org.apache.commons.configuration.CompositeConfiguration;
@@ -41,6 +42,7 @@ protected ClientCommand(CliSpec<ClientFlagsT> spec) {
     @Override
     protected boolean apply(ServiceURI serviceURI,
                             CompositeConfiguration conf,
+                            BKFlags globalFlags,
                             ClientFlagsT cmdFlags) {
         ClientConfiguration clientConf = new ClientConfiguration();
         clientConf.loadConf(conf);
diff --git a/docker/scripts/common.sh b/docker/scripts/common.sh
index ece24ba1b..5bbcd2080 100755
--- a/docker/scripts/common.sh
+++ b/docker/scripts/common.sh
@@ -28,22 +28,48 @@ BK_CLUSTER_ROOT_PATH=${BK_CLUSTER_ROOT_PATH:-""}
 
 # bk env vars to replace values in config files
 export BK_HOME=/opt/bookkeeper
-export BK_bookiePort=${BK_bookiePort:-${PORT0}}
+# metadata service
 export BK_zkServers=${BK_zkServers}
 export BK_zkLedgersRootPath=${BK_zkLedgersRootPath:-"${BK_CLUSTER_ROOT_PATH}/ledgers"}
+export BK_metadataServiceUri=${BK_metadataServiceUri:-"zk://${BK_zkServers}${BK_zkLedgersRootPath}"}
+# bookie
+export BK_bookiePort=${BK_bookiePort:-${PORT0}}
+export BK_httpServerEnabled=${BK_httpServerEnabled:-"true"}
+export BK_httpServerPort=${BK_httpServerPort:-${BOOKIE_HTTP_PORT}}
 export BK_journalDirectory=${BK_journalDirectory:-${BK_DATA_DIR}/journal}
 export BK_ledgerDirectories=${BK_ledgerDirectories:-${BK_DATA_DIR}/ledgers}
-export BK_indexDirectories=${BK_indexDirectories:-${BK_DATA_DIR}/index}
-export BK_metadataServiceUri=${BK_metadataServiceUri:-"zk://${BK_zkServers}${BK_zkLedgersRootPath}"}
+export BK_indexDirectories=${BK_indexDirectories:-${BK_ledgerDirectories}}
+# dlog
 export BK_dlogRootPath=${BK_dlogRootPath:-"${BK_CLUSTER_ROOT_PATH}/distributedlog"}
+# stream storage
+export BK_NUM_STORAGE_CONTAINERS=${BK_NUM_STORAGE_CONTAINERS:-"32"}
+export BK_STREAM_STORAGE_ROOT_PATH=${BK_STREAM_STORAGE_ROOT_PATH:-"/stream"}
 
 echo "Environment Vars for bookie:"
-echo "  BK_bookiePort bookie service port is $BK_bookiePort"
+echo ""
+echo "  [metadata service]"
 echo "  BK_zkServers is $BK_zkServers"
-echo "  BK_DATA_DIR is $BK_DATA_DIR"
 echo "  BK_CLUSTER_ROOT_PATH is $BK_CLUSTER_ROOT_PATH"
 echo "  BK_metadataServiceUri is $BK_metadataServiceUri"
+echo ""
+echo "  [bookie]"
+echo "  BK_bookiePort bookie service port is $BK_bookiePort"
+echo "  BK_DATA_DIR is $BK_DATA_DIR"
+echo "  BK_journalDirectory is ${BK_journalDirectory}"
+echo "  BK_ledgerDirectories are ${BK_ledgerDirectories}"
+echo "  BK_indexDirectories are ${BK_indexDirectories}"
+echo ""
+echo "  [bookie http]"
+echo "  BK_httpServerEnabled is ${BK_httpServerEnabled}"
+echo "  BK_httpServerPort is ${BK_httpServerPort}"
+echo ""
+echo "  [dlog]"
 echo "  BK_dlogRootPath is $BK_dlogRootPath"
+echo ""
+echo "  [stream storage]"
+echo "  BK_STREAM_STORAGE_ROOT_PATH is ${BK_STREAM_STORAGE_ROOT_PATH}"
+echo "  BK_NUM_STORAGE_CONTAINERS is ${BK_NUM_STORAGE_CONTAINERS}"
+echo "  BOOKIE_GRPC_PORT is ${BOOKIE_GRPC_PORT}"
 
 python scripts/apply-config-from-env.py ${BK_HOME}/conf
 
diff --git a/docker/scripts/init_bookie.sh b/docker/scripts/init_bookie.sh
index 09aab1d6c..07a7feecd 100755
--- a/docker/scripts/init_bookie.sh
+++ b/docker/scripts/init_bookie.sh
@@ -28,27 +28,34 @@ function wait_for_zookeeper() {
 }
 
 function create_zk_root() {
-    echo "create the zk root dir for bookkeeper"
-    /opt/bookkeeper/bin/bookkeeper org.apache.zookeeper.ZooKeeperMain -server ${BK_zkServers} create ${BK_CLUSTER_ROOT_PATH}
+    if [ "x${BK_CLUSTER_ROOT_PATH}" != "x" ]; then
+        echo "create the zk root dir for bookkeeper at '${BK_CLUSTER_ROOT_PATH}'"
+        /opt/bookkeeper/bin/bookkeeper org.apache.zookeeper.ZooKeeperMain -server ${BK_zkServers} create ${BK_CLUSTER_ROOT_PATH}
+    fi
 }
 
 # Init the cluster if required znodes not exist in Zookeeper.
 # Use ephemeral zk node as lock to keep initialize atomic.
 function init_cluster() {
-    /opt/bookkeeper/bin/bookkeeper org.apache.zookeeper.ZooKeeperMain -server ${BK_zkServers} stat ${BK_zkLedgersRootPath}/available/readonly
+    if [ "x${BK_STREAM_STORAGE_ROOT_PATH}" == "x" ]; then
+        echo "BK_STREAM_STORAGE_ROOT_PATH is not set. fail fast."
+        exit -1
+    fi
+
+    /opt/bookkeeper/bin/bookkeeper org.apache.zookeeper.ZooKeeperMain -server ${BK_zkServers} stat ${BK_STREAM_STORAGE_ROOT_PATH}
     if [ $? -eq 0 ]; then
-        echo "Metadata of cluster already exists, no need format"
+        echo "Metadata of cluster already exists, no need to init"
     else
         # create ephemeral zk node bkInitLock, initiator who this node, then do init; other initiators will wait.
         /opt/bookkeeper/bin/bookkeeper org.apache.zookeeper.ZooKeeperMain -server ${BK_zkServers} create -e ${BK_CLUSTER_ROOT_PATH}/bkInitLock
         if [ $? -eq 0 ]; then
             # bkInitLock created success, this is the successor to do znode init
-            echo "Bookkeeper znodes not exist in Zookeeper, do the init to create them."
-            /opt/bookkeeper/bin/bookkeeper shell initnewcluster
+            echo "Initializing bookkeeper cluster at service uri ${BK_metadataServiceUri}."
+            /opt/bookkeeper/bin/bkctl --service-uri ${BK_metadataServiceUri} cluster init
             if [ $? -eq 0 ]; then
-                echo "Bookkeeper znodes init success."
+                echo "Successfully initialized bookkeeper cluster at service uri ${BK_metadataServiceUri}."
             else
-                echo "Bookkeeper znodes init failed. please check the reason."
+                echo "Failed to initialize bookkeeper cluster at service uri ${BK_metadataServiceUri}. please check the reason."
                 exit
             fi
         else
@@ -57,7 +64,8 @@ function init_cluster() {
             while [ ${tenSeconds} -lt 10 ]
             do
                 sleep 10
-                /opt/bookkeeper/bin/bookkeeper org.apache.zookeeper.ZooKeeperMain -server ${BK_zkServers} stat ${BK_zkLedgersRootPath}/available/readonly
+                echo "run '/opt/bookkeeper/bin/bookkeeper org.apache.zookeeper.ZooKeeperMain -server ${BK_zkServers} stat ${BK_STREAM_STORAGE_ROOT_PATH}'"
+                /opt/bookkeeper/bin/bookkeeper org.apache.zookeeper.ZooKeeperMain -server ${BK_zkServers} stat ${BK_STREAM_STORAGE_ROOT_PATH}
                 if [ $? -eq 0 ]; then
                     echo "Waited $tenSeconds * 10 seconds, bookkeeper inited"
                     break
@@ -76,50 +84,6 @@ function init_cluster() {
     fi
 }
 
-# Create default dlog namespace
-# Use ephemeral zk node as lock to keep initialize atomic.
-function create_dlog_namespace() {
-    /opt/bookkeeper/bin/bookkeeper org.apache.zookeeper.ZooKeeperMain -server ${BK_zkServers} stat ${BK_dlogRootPath}
-    if [ $? -eq 0 ]; then
-        echo "Dlog namespace already created, no need to create another one"
-    else
-        # create ephemeral zk node dlogInitLock, initiator who this node, then do init; other initiators will wait.
-        /opt/bookkeeper/bin/bookkeeper org.apache.zookeeper.ZooKeeperMain -server ${BK_zkServers} create -e ${BK_CLUSTER_ROOT_PATH}/dlogInitLock
-        if [ $? -eq 0 ]; then
-            # dlogInitLock created success, this is the successor to do znode init
-            echo "Dlog namespace not exist, do the init to create them."
-            /opt/bookkeeper/bin/dlog admin bind -l ${BK_zkLedgersRootPath} -s ${BK_zkServers} -c distributedlog://${BK_zkServers}${BK_dlogRootPath}
-            if [ $? -eq 0 ]; then
-                echo "Dlog namespace is created successfully."
-            else
-                echo "Failed to create dlog namespace ${BK_dlogRootPath}. please check the reason."
-                exit
-            fi
-        else
-            echo "Other docker instance is doing initialize at the same time, will wait in this instance."
-            tenSeconds=1
-            while [ ${tenSeconds} -lt 10 ]
-            do
-                sleep 10
-                /opt/bookkeeper/bin/bookkeeper org.apache.zookeeper.ZooKeeperMain -server ${BK_zkServers} stat ${BK_dlogRootPath}
-                if [ $? -eq 0 ]; then
-                    echo "Waited $tenSeconds * 10 seconds, dlog namespace created"
-                    break
-                else
-                    echo "Waited $tenSeconds * 10 seconds, dlog namespace still not created"
-                    (( tenSeconds++ ))
-                    continue
-                fi
-            done
-
-            if [ ${tenSeconds} -eq 10 ]; then
-                echo "Waited 100 seconds for creating dlog namespace, something wrong, please check"
-                exit
-            fi
-        fi
-    fi
-}
-
 function init_bookie() {
 
     # create dirs if they don't exist
@@ -134,7 +98,4 @@ function init_bookie() {
     # init the cluster
     init_cluster
 
-    # create dlog namespace
-    create_dlog_namespace
-
 }
\ No newline at end of file
diff --git a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/StreamStorageCli.java b/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/StreamStorageCli.java
deleted file mode 100644
index b1ac23992..000000000
--- a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/StreamStorageCli.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.bookkeeper.stream.cli;
-
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-import com.google.common.annotations.VisibleForTesting;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-import lombok.AccessLevel;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.clients.config.StorageClientSettings;
-import org.apache.bookkeeper.stream.cli.commands.CmdBase;
-import org.apache.bookkeeper.stream.cli.commands.CmdNamespace;
-import org.apache.bookkeeper.stream.cli.commands.CmdStream;
-import org.apache.bookkeeper.stream.cli.commands.CmdTable;
-
-/**
- * Bookie Shell.
- */
-@Slf4j
-public class StreamStorageCli {
-
-    /**
-     * Make this command map static. This provides a way to plugin different sub commands.
-     */
-    private static final Map<String, Class> commandMap;
-
-    static {
-        commandMap = new TreeMap<>();
-
-        // build the default command map
-        commandMap.put("namespace", CmdNamespace.class);
-        commandMap.put("stream", CmdStream.class);
-        commandMap.put("table", CmdTable.class);
-    }
-
-    static JCommander newJCommander() {
-        return new JCommander();
-    }
-
-    @SuppressWarnings("unchecked")
-    @VisibleForTesting
-    public static Object newCommandInstance(Class cls,
-                                            StorageClientSettings.Builder settingsBuilder)
-            throws Exception {
-        return cls.getConstructor(StorageClientSettings.Builder.class)
-            .newInstance(settingsBuilder);
-    }
-
-    public static void registerSubcommand(String commandName, Class commandClass) {
-        synchronized (commandMap) {
-            commandMap.put(commandName, commandClass);
-        }
-    }
-
-    public static void unregisterSubcommand(String commandName) {
-        synchronized (commandMap) {
-            commandMap.remove(commandName);
-        }
-    }
-
-    @Getter(AccessLevel.PACKAGE)
-    static class ShellArguments {
-
-        @Parameter(names = { "-u", "--server-uri" }, description = "The bookkeeper service uri")
-        private String serviceUri = "bk://localhost:4181";
-
-        @Parameter(names = { "-n", "--namespace" }, description = "Namespace")
-        private String namespace = "default";
-
-        @Parameter(names = { "-h", "--help" }, description = "Show this help message")
-        private boolean help = false;
-
-    }
-
-    @Getter(value = AccessLevel.PACKAGE)
-    private final ShellArguments shellArgs;
-    @Getter(value = AccessLevel.PACKAGE)
-    private final JCommander commander;
-    private final StorageClientSettings.Builder settingsBuilder;
-
-    StreamStorageCli() throws Exception {
-        this.shellArgs = new ShellArguments();
-        this.commander = newJCommander();
-        this.commander.setProgramName("stream-storage-cli");
-        this.commander.addObject(shellArgs);
-
-        this.settingsBuilder = StorageClientSettings.newBuilder()
-            .clientName("stream-storage-cli")
-            .usePlaintext(true);
-    }
-
-    void setupShell() {
-        for (Entry<String, Class> entry : commandMap.entrySet()) {
-            try {
-                Object obj = newCommandInstance(entry.getValue(), settingsBuilder);
-                log.info("Setup command {}", entry.getValue());
-                this.commander.addCommand(
-                    entry.getKey(),
-                    obj);
-            } catch (Exception e) {
-                System.err.println("Fail to load sub command '" + entry.getKey() + "' : " + e.getMessage());
-                e.printStackTrace();
-                System.exit(1);
-            }
-        }
-    }
-
-    boolean run(String[] args) {
-        setupShell();
-        if (args.length == 0) {
-            commander.usage();
-            return false;
-        }
-
-        int cmdPos;
-        for (cmdPos = 0; cmdPos < args.length; cmdPos++) {
-            if (commandMap.containsKey(args[cmdPos])) {
-                break;
-            }
-        }
-
-        try {
-            commander.parse(Arrays.copyOfRange(args, 0, Math.min(cmdPos, args.length)));
-        } catch (Exception e) {
-            System.err.println(e.getMessage());
-            System.err.println();
-            commander.usage();
-            return false;
-        }
-
-        if (shellArgs.help) {
-            commander.usage();
-            return false;
-        }
-
-        if (null == shellArgs.serviceUri) {
-            System.err.println("No endpoint is provided");
-            commander.usage();
-            return false;
-        }
-
-        settingsBuilder.serviceUri(shellArgs.serviceUri);
-
-        log.info("connecting to storage service = {}", shellArgs.serviceUri);
-
-        if (cmdPos == args.length) {
-            commander.usage();
-            return false;
-        } else {
-            String cmd = args[cmdPos];
-            JCommander subCmd = commander.getCommands().get(cmd);
-            CmdBase subCmdObj = (CmdBase) subCmd.getObjects().get(0);
-            String[] subCmdArgs = Arrays.copyOfRange(args, cmdPos + 1, args.length);
-
-            return subCmdObj.run(shellArgs.namespace, subCmdArgs);
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        StreamStorageCli shell = new StreamStorageCli();
-
-        if (shell.run(args)) {
-            System.exit(0);
-        } else {
-            System.exit(1);
-        }
-    }
-
-}
diff --git a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/ClientCommand.java b/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/ClientCommand.java
deleted file mode 100644
index 38f0160c5..000000000
--- a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/ClientCommand.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.bookkeeper.stream.cli.commands;
-
-import org.apache.bookkeeper.api.StorageClient;
-import org.apache.bookkeeper.clients.StorageClientBuilder;
-import org.apache.bookkeeper.clients.config.StorageClientSettings;
-
-/**
- * An admin command interface provides a run method to execute admin commands.
- */
-public abstract class ClientCommand implements SubCommand {
-
-    @Override
-    public void run(String namespace, StorageClientSettings settings) throws Exception {
-        try (StorageClient client = StorageClientBuilder.newBuilder()
-            .withSettings(settings)
-            .withNamespace(namespace)
-            .build()) {
-            run(client);
-        }
-    }
-
-    protected abstract void run(StorageClient client) throws Exception;
-}
diff --git a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/CmdBase.java b/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/CmdBase.java
deleted file mode 100644
index b4991b4dc..000000000
--- a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/CmdBase.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.bookkeeper.stream.cli.commands;
-
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-import lombok.AccessLevel;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.clients.config.StorageClientSettings;
-
-/**
- * The command base for other sub commands to extend.
- */
-@Slf4j
-public abstract class CmdBase {
-
-    // Parameters defined for this command
-
-    @Parameter(names = { "-h", "--help" }, help = true, hidden = true)
-    private boolean help;
-
-    // Parameters defined for this command (end)
-
-    protected final JCommander commander;
-    @Getter(AccessLevel.PUBLIC)
-    protected final StorageClientSettings.Builder settingsBuilder;
-
-    protected CmdBase(String cmdName, StorageClientSettings.Builder settingsBuilder) {
-        this(cmdName, settingsBuilder, new JCommander());
-    }
-
-    protected CmdBase(String cmdName, StorageClientSettings.Builder settingsBuilder, JCommander commander) {
-        this.settingsBuilder = settingsBuilder;
-        this.commander = commander;
-        this.commander.setProgramName("bookie-shell " + cmdName);
-    }
-
-    protected void addSubCommand(SubCommand subCommand) {
-        this.commander.addCommand(subCommand.name(), subCommand);
-    }
-
-    public boolean run(String namespace, String[] args) {
-        try {
-            commander.parse(args);
-        } catch (Exception e) {
-            System.err.println(e.getMessage());
-            System.err.println();
-            commander.usage();
-            return false;
-        }
-
-        String cmd = commander.getParsedCommand();
-        if (null == cmd) {
-            commander.usage();
-            return false;
-        }
-
-        JCommander cmdObj = commander.getCommands().get(cmd);
-        SubCommand subCmd = (SubCommand) cmdObj.getObjects().get(0);
-
-
-        try {
-            subCmd.run(namespace, settingsBuilder.build());
-            return true;
-        } catch (Exception e) {
-            System.err.println("Failed to execute command '" + cmd + "' : " + e.getMessage());
-            e.printStackTrace();
-            System.err.println();
-            commander.usage();
-            return false;
-        }
-
-    }
-}
diff --git a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/CmdStream.java b/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/CmdStream.java
deleted file mode 100644
index 8a0c9b74f..000000000
--- a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/CmdStream.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.bookkeeper.stream.cli.commands;
-
-import com.beust.jcommander.Parameters;
-import org.apache.bookkeeper.clients.config.StorageClientSettings;
-import org.apache.bookkeeper.stream.cli.commands.stream.CreateStreamCommand;
-
-/**
- * Commands that operates a single bookie.
- */
-@Parameters(commandDescription = "Commands on operating namespaces")
-public class CmdStream extends CmdBase {
-    public CmdStream(StorageClientSettings.Builder settingsBuilder) {
-        super("stream", settingsBuilder);
-        addSubCommand(new CreateStreamCommand());
-    }
-}
diff --git a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/SubCommand.java b/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/SubCommand.java
deleted file mode 100644
index ff0c3dc48..000000000
--- a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/SubCommand.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.bookkeeper.stream.cli.commands;
-
-import org.apache.bookkeeper.clients.config.StorageClientSettings;
-
-/**
- * A basic command interface provides a run method to execute it.
- */
-public interface SubCommand {
-
-    /**
-     * SubCommand name.
-     *
-     * @return command name.
-     */
-    String name();
-
-    /**
-     * Run the command with provided configuration.
-     */
-    void run(String namespace, StorageClientSettings settings) throws Exception;
-
-}
diff --git a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/stream/CreateStreamCommand.java b/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/stream/CreateStreamCommand.java
deleted file mode 100644
index 0fa14a840..000000000
--- a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/stream/CreateStreamCommand.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.bookkeeper.stream.cli.commands.stream;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
-
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.Parameters;
-import org.apache.bookkeeper.clients.admin.StorageAdminClient;
-import org.apache.bookkeeper.stream.cli.commands.AdminCommand;
-import org.apache.bookkeeper.stream.proto.StreamProperties;
-
-/**
- * Command to create a namespace.
- */
-@Parameters(commandDescription = "Create a stream")
-public class CreateStreamCommand extends AdminCommand {
-
-    @Parameter(names = { "-n", "--namespace" }, description = "namespace name")
-    private String namespaceName;
-    @Parameter(names = { "-s", "--stream" }, description = "stream name")
-    private String streamName;
-
-    @Override
-    protected void run(String defaultNamespace, StorageAdminClient admin) throws Exception {
-        checkNotNull(streamName, "Stream name is not provided");
-        System.out.println("Creating stream '" + streamName + "' ...");
-        StreamProperties nsProps = result(
-            admin.createStream(
-                null == namespaceName ? defaultNamespace : namespaceName,
-                streamName,
-                DEFAULT_STREAM_CONF));
-        System.out.println("Successfully created stream '" + streamName + "':");
-        System.out.println(nsProps);
-    }
-
-    @Override
-    public String name() {
-        return "create";
-    }
-}
diff --git a/stream/pom.xml b/stream/pom.xml
index b4a4462fd..1f8e39c7c 100644
--- a/stream/pom.xml
+++ b/stream/pom.xml
@@ -38,7 +38,6 @@
     <module>clients</module>
     <module>storage</module>
     <module>server</module>
-    <module>cli</module>
   </modules>
 
   <build>
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterInitializer.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterInitializer.java
index 61e5fa3b7..5808186bb 100644
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterInitializer.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterInitializer.java
@@ -41,7 +41,8 @@
      *
      * @param metadataServiceUri metadata service uri
      * @param numStorageContainers number storage containers
+     * @return true if successfully initialized cluster; otherwise false.
      */
-    void initializeCluster(URI metadataServiceUri, int numStorageContainers);
+    boolean initializeCluster(URI metadataServiceUri, int numStorageContainers);
 
 }
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterMetadataStore.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterMetadataStore.java
index d46f6338a..c6545c18f 100644
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterMetadataStore.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterMetadataStore.java
@@ -31,8 +31,8 @@
 public interface ClusterMetadataStore extends AutoCloseable {
 
 
-    default void initializeCluster(int numStorageContainers) {
-        initializeCluster(numStorageContainers, Optional.empty());
+    default boolean initializeCluster(int numStorageContainers) {
+        return initializeCluster(numStorageContainers, Optional.empty());
     }
 
     /**
@@ -40,8 +40,9 @@ default void initializeCluster(int numStorageContainers) {
      *
      * @param numStorageContainers number of storage containers.
      * @param segmentStorePath segment store path
+     * @return true if successfully initialized cluster, otherwise false.
      */
-    void initializeCluster(int numStorageContainers, Optional<String> segmentStorePath);
+    boolean initializeCluster(int numStorageContainers, Optional<String> segmentStorePath);
 
     /**
      * Get the current cluster assignment data.
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStore.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStore.java
index 4313ce156..bea4b2309 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStore.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStore.java
@@ -54,12 +54,13 @@ synchronized int getNumWatchers() {
     }
 
     @Override
-    public synchronized void initializeCluster(int numStorageContainers,
+    public synchronized boolean initializeCluster(int numStorageContainers,
                                                Optional<String> segmentStorePath) {
         this.metadata = ClusterMetadata.newBuilder()
             .setNumStorageContainers(numStorageContainers)
             .build();
         this.assignmentData = ClusterAssignmentData.newBuilder().build();
+        return true;
     }
 
     @Override
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterInitializer.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterInitializer.java
index 832f15d67..53e749d61 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterInitializer.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterInitializer.java
@@ -51,7 +51,7 @@ public boolean acceptsURI(URI metadataServiceUri) {
     }
 
     @Override
-    public void initializeCluster(URI metadataServiceUri, int numStorageContainers) {
+    public boolean initializeCluster(URI metadataServiceUri, int numStorageContainers) {
         String zkInternalConnectString = ZKMetadataDriverBase.getZKServersFromServiceUri(metadataServiceUri);
         // 1) `zkExternalConnectString` are the public endpoints, where the tool can interact with.
         //    It allows the tools running outside of the cluster. It is useful for being used in dockerized environment.
@@ -70,6 +70,7 @@ public void initializeCluster(URI metadataServiceUri, int numStorageContainers)
             try {
                 metadata = store.getClusterMetadata();
                 log.info("Loaded cluster metadata : \n{}", metadata);
+                return false;
             } catch (StorageRuntimeException sre) {
                 if (sre.getCause() instanceof KeeperException.NoNodeException) {
                     log.info("Initializing the stream cluster with {} storage containers with segment store path {}.",
@@ -83,8 +84,9 @@ public void initializeCluster(URI metadataServiceUri, int numStorageContainers)
                         segmentStorePath = Optional.of(ledgersPath);
                     }
 
-                    store.initializeCluster(numStorageContainers, segmentStorePath);
+                    boolean initialized = store.initializeCluster(numStorageContainers, segmentStorePath);
                     log.info("Successfully initialized the stream cluster : \n{}", store.getClusterMetadata());
+                    return initialized;
                 } else {
                     throw sre;
                 }
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStore.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStore.java
index 59af968d9..6f76dc5f7 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStore.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStore.java
@@ -42,6 +42,7 @@
 import org.apache.curator.framework.recipes.cache.NodeCacheListener;
 import org.apache.distributedlog.impl.metadata.BKDLConfig;
 import org.apache.distributedlog.metadata.DLMetadata;
+import org.apache.zookeeper.KeeperException;
 
 /**
  * A zookeeper based implementation of cluster metadata store.
@@ -93,7 +94,7 @@ public void close() {
     }
 
     @Override
-    public void initializeCluster(int numStorageContainers, Optional<String> segmentStorePath) {
+    public boolean initializeCluster(int numStorageContainers, Optional<String> segmentStorePath) {
         ClusterMetadata metadata = ClusterMetadata.newBuilder()
             .setNumStorageContainers(numStorageContainers)
             .build();
@@ -113,7 +114,13 @@ public void initializeCluster(int numStorageContainers, Optional<String> segment
                     client.transactionOp().create().forPath(getServersPath(zkRootPath)),
                     client.transactionOp().create().forPath(getWritableServersPath(zkRootPath)),
                     client.transactionOp().create().forPath(getStoragePath(zkRootPath), dlogMetadata.serialize()));
+            return true;
         } catch (Exception e) {
+            if (e instanceof KeeperException.NodeExistsException) {
+                // the cluster already exists.
+                log.info("Stream storage cluster is already initialized.");
+                return false;
+            }
             throw new StorageRuntimeException("Failed to initialize storage cluster with "
                 + numStorageContainers + " storage containers", e);
         }
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerLeaderImplTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerLeaderImplTest.java
index 7d1aa31c0..721668ef5 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerLeaderImplTest.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerLeaderImplTest.java
@@ -83,8 +83,8 @@ public void setup() {
         ClusterMetadataStore originalStore = metadataStore;
         this.metadataStore = new ClusterMetadataStore() {
             @Override
-            public void initializeCluster(int numStorageContainers, Optional<String> segmentStorePath) {
-                originalStore.initializeCluster(numStorageContainers);
+            public boolean initializeCluster(int numStorageContainers, Optional<String> segmentStorePath) {
+                return originalStore.initializeCluster(numStorageContainers);
             }
 
             @Override
diff --git a/tests/docker-images/current-version-image/Dockerfile b/tests/docker-images/current-version-image/Dockerfile
index 892eb3594..c174b6a31 100644
--- a/tests/docker-images/current-version-image/Dockerfile
+++ b/tests/docker-images/current-version-image/Dockerfile
@@ -25,7 +25,9 @@ ARG DISTRO_NAME=bookkeeper-dist-server-${BK_VERSION}-bin
 ARG PKG_NAME=bookkeeper-server-${BK_VERSION}
 
 ENV BOOKIE_PORT=3181
-EXPOSE $BOOKIE_PORT
+ENV BOOKIE_HTTP_PORT=8080
+ENV BOOKIE_GRPC_PORT=4181
+EXPOSE ${BOOKIE_PORT} ${BOOKIE_HTTP_PORT} ${BOOKIE_GRPC_PORT}
 ENV BK_USER=bookkeeper
 ENV BK_HOME=/opt/bookkeeper
 ENV JAVA_HOME=/usr/lib/jvm/jre-1.8.0
diff --git a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/BkCtlTest.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/BkCtlTest.java
new file mode 100644
index 000000000..0bb11a136
--- /dev/null
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/BkCtlTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.tests.integration.stream;
+
+import static org.junit.Assert.assertTrue;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.tests.containers.BookieContainer;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.testcontainers.containers.Container.ExecResult;
+
+/**
+ * Integration test for `bkctl`.
+ */
+@Slf4j
+public class BkCtlTest extends StreamClusterTestBase {
+
+    @Rule
+    public final TestName testName = new TestName();
+
+    //
+    // `bookies` commands
+    //
+
+    @Test
+    public void listBookies() throws Exception {
+        BookieContainer bookie = bkCluster.getAnyBookie();
+        ExecResult result = bookie.execCmd(
+            BKCTL,
+            "bookies",
+            "list"
+        );
+        String output = result.getStdout();
+        assertTrue(output.contains("ReadWrite Bookies :"));
+    }
+
+    //
+    // `bookie` commands
+    //
+
+    @Test
+    public void showLastMark() throws Exception {
+        BookieContainer bookie = bkCluster.getAnyBookie();
+        ExecResult result = bookie.execCmd(
+            BKCTL,
+            "bookie",
+            "lastmark"
+        );
+        assertTrue(result.getStdout().contains("LastLogMark : Journal"));
+    }
+
+    //
+    // `ledger` commands
+    //
+
+    @Test
+    public void simpleTest() throws Exception {
+        ExecResult result = bkCluster.getAnyBookie().execCmd(
+            BKCTL,
+            "ledger",
+            "simpletest",
+            "--ensemble-size", "3",
+            "--write-quorum-size", "3",
+            "--ack-quorum-size", "2",
+            "--num-entries", "100"
+        );
+        assertTrue(
+            result.getStdout().contains("100 entries written to ledger"));
+    }
+
+    //
+    // `namespace` commands
+    //
+
+    @Test
+    public void createNamespace() throws Exception {
+        String nsName = testName.getMethodName();
+        ExecResult result = bkCluster.getAnyBookie().execCmd(
+            BKCTL,
+            STREAM_URI,
+            "namespace",
+            "create",
+            nsName
+        );
+        assertTrue(
+            result.getStdout().contains("Successfully created namespace '" + nsName + "'"));
+    }
+
+    //
+    // `tables` commands
+    //
+
+    @Test
+    public void createTable() throws Exception {
+        String tableName = testName.getMethodName();
+        ExecResult result = bkCluster.getAnyBookie().execCmd(
+            BKCTL,
+            STREAM_URI,
+            "tables",
+            "create",
+            tableName
+        );
+        assertTrue(
+            result.getStdout().contains("Successfully created stream '" + tableName + "'"));
+    }
+
+    //
+    // `table` commands
+    //
+
+    @Test
+    public void putGetKey() throws Exception {
+        String key = testName.getMethodName() + "-key";
+        String value = testName.getMethodName() + "-value";
+        ExecResult result = bkCluster.getAnyBookie().execCmd(
+            BKCTL,
+            STREAM_URI,
+            "table",
+            "put",
+            TEST_TABLE,
+            key,
+            value
+        );
+        assertTrue(
+            result.getStdout().contains(String.format("Successfully update kv: ('%s', '%s')",
+                key, value
+                )));
+
+        result = bkCluster.getAnyBookie().execCmd(
+            BKCTL,
+            STREAM_URI,
+            "table",
+            "get",
+            TEST_TABLE,
+            key);
+        assertTrue(
+            result.getStdout().contains(String.format("value = %s", value)));
+    }
+
+    @Test
+    public void incGetKey() throws Exception {
+        String key = testName.getMethodName() + "-key";
+        long value = System.currentTimeMillis();
+        ExecResult result = bkCluster.getAnyBookie().execCmd(
+            BKCTL,
+            STREAM_URI,
+            "table",
+            "inc",
+            TEST_TABLE,
+            key,
+            "" + value
+        );
+        assertTrue(
+            result.getStdout().contains(String.format("Successfully increment kv: ('%s', amount = '%s').",
+                key, value
+                )));
+
+        result = bkCluster.getAnyBookie().execCmd(
+            BKCTL,
+            STREAM_URI,
+            "table",
+            "get",
+            TEST_TABLE,
+            key);
+        assertTrue(
+            result.getStdout().contains(String.format("value = %s", value)));
+    }
+
+}
diff --git a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java
index a8a3e6e62..bbde846be 100644
--- a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java
@@ -41,6 +41,9 @@
 public abstract class StreamClusterTestBase extends BookKeeperClusterTestBase {
 
     protected static Random rand = new Random();
+    protected static final String BKCTL = "/opt/bookkeeper/bin/bkctl";
+    protected static final String STREAM_URI = "--service-uri bk://localhost:4181";
+    protected static final String TEST_TABLE = "test-table";
 
     @BeforeClass
     public static void setupCluster() throws Exception {
@@ -54,6 +57,18 @@ public static void setupCluster() throws Exception {
             .extraServerComponents("org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent")
             .build();
         BookKeeperClusterTestBase.setupCluster(spec);
+        bkCluster.getAnyBookie().execCmd(
+            BKCTL,
+            STREAM_URI,
+            "namespace",
+            "create",
+            "default");
+        bkCluster.getAnyBookie().execCmd(
+            BKCTL,
+            STREAM_URI,
+            "tables",
+            "create",
+            TEST_TABLE);
     }
 
     @AfterClass
diff --git a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java
index 9a7cd8674..0d5b311a9 100644
--- a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java
@@ -18,6 +18,7 @@
 
 package org.apache.bookkeeper.tests.integration.topologies;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.base.Strings;
@@ -25,6 +26,7 @@
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.UncheckedExecutionException;
 import java.net.URI;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -161,6 +163,14 @@ public synchronized BookieContainer getBookie(String bookieName) {
         return bookieContainers.get(bookieName);
     }
 
+    public synchronized BookieContainer getAnyBookie() {
+        List<BookieContainer> bookieList = Lists.newArrayList();
+        bookieList.addAll(bookieContainers.values());
+        Collections.shuffle(bookieList);
+        checkArgument(!bookieList.isEmpty(), "No bookie is alive");
+        return bookieList.get(0);
+    }
+
     public BookieContainer killBookie(String bookieName) {
         BookieContainer container;
         synchronized (this) {
diff --git a/tools/all/pom.xml b/tools/all/pom.xml
index db6d91dec..266deeb49 100644
--- a/tools/all/pom.xml
+++ b/tools/all/pom.xml
@@ -35,6 +35,11 @@
       <artifactId>bookkeeper-server</artifactId>
       <version>${project.parent.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>runtime</scope>
+    </dependency>
     <dependency>
       <groupId>org.apache.bookkeeper</groupId>
       <artifactId>buildtools</artifactId>
@@ -49,4 +54,22 @@
       <scope>test</scope>
     </dependency>
   </dependencies>
+  <profiles>
+    <profile>
+      <id>stream</id>
+      <activation>
+        <property>
+          <name>stream</name>
+        </property>
+      </activation>
+      <dependencies>
+        <!-- stream.storage -->
+        <dependency>
+          <groupId>org.apache.bookkeeper</groupId>
+          <artifactId>stream-storage-cli</artifactId>
+          <version>${project.version}</version>
+        </dependency>
+      </dependencies>
+    </profile>
+  </profiles>
 </project>
diff --git a/tools/all/src/main/resources/META-INF/services/org.apache.bookkeeper.tools.framework.CommandGroup b/tools/all/src/main/resources/META-INF/services/org.apache.bookkeeper.tools.framework.CommandGroup
index 47df897dd..44fc194f5 100644
--- a/tools/all/src/main/resources/META-INF/services/org.apache.bookkeeper.tools.framework.CommandGroup
+++ b/tools/all/src/main/resources/META-INF/services/org.apache.bookkeeper.tools.framework.CommandGroup
@@ -19,3 +19,7 @@
 org.apache.bookkeeper.tools.cli.commands.BookieCommandGroup
 org.apache.bookkeeper.tools.cli.commands.BookiesCommandGroup
 org.apache.bookkeeper.tools.cli.commands.LedgerCommandGroup
+org.apache.bookkeeper.stream.cli.ClusterCommandGroup
+org.apache.bookkeeper.stream.cli.NamespaceCommandGroup
+org.apache.bookkeeper.stream.cli.TableAdminCommandGroup
+org.apache.bookkeeper.stream.cli.TableCommandGroup
diff --git a/tools/framework/src/main/java/org/apache/bookkeeper/tools/common/BKCommand.java b/tools/framework/src/main/java/org/apache/bookkeeper/tools/common/BKCommand.java
index 2f2724b92..bf591abf4 100644
--- a/tools/framework/src/main/java/org/apache/bookkeeper/tools/common/BKCommand.java
+++ b/tools/framework/src/main/java/org/apache/bookkeeper/tools/common/BKCommand.java
@@ -77,7 +77,7 @@ protected boolean apply(BKFlags bkFlags, CommandFlagsT cmdFlags) {
             }
         }
 
-        return apply(serviceURI, conf, cmdFlags);
+        return apply(serviceURI, conf, bkFlags, cmdFlags);
     }
 
     protected boolean acceptServiceUri(ServiceURI serviceURI) {
@@ -86,6 +86,7 @@ protected boolean acceptServiceUri(ServiceURI serviceURI) {
 
     protected abstract boolean apply(ServiceURI serviceURI,
                                      CompositeConfiguration conf,
+                                     BKFlags globalFlags,
                                      CommandFlagsT cmdFlags);
 
 }
diff --git a/tools/framework/src/main/java/org/apache/bookkeeper/tools/common/BKFlags.java b/tools/framework/src/main/java/org/apache/bookkeeper/tools/common/BKFlags.java
index 890a3df0f..6b6c3920b 100644
--- a/tools/framework/src/main/java/org/apache/bookkeeper/tools/common/BKFlags.java
+++ b/tools/framework/src/main/java/org/apache/bookkeeper/tools/common/BKFlags.java
@@ -19,11 +19,13 @@
 package org.apache.bookkeeper.tools.common;
 
 import com.beust.jcommander.Parameter;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import org.apache.bookkeeper.tools.framework.CliFlags;
 
 /**
  * Default BK flags.
  */
+@SuppressFBWarnings("URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD")
 public final class BKFlags extends CliFlags {
 
     @Parameter(
@@ -40,4 +42,11 @@
         description = "Configuration file")
     public String configFile = null;
 
+    @Parameter(
+        names = {
+            "-n", "--namespace"
+        },
+        description = "Namespace scope to run commands (only valid for table service for now)")
+    public String namespace = "default";
+
 }
diff --git a/tools/framework/src/main/java/org/apache/bookkeeper/tools/framework/Cli.java b/tools/framework/src/main/java/org/apache/bookkeeper/tools/framework/Cli.java
index 44b210ff2..83cf9b9b6 100644
--- a/tools/framework/src/main/java/org/apache/bookkeeper/tools/framework/Cli.java
+++ b/tools/framework/src/main/java/org/apache/bookkeeper/tools/framework/Cli.java
@@ -231,6 +231,7 @@ boolean run(String[] args) {
             try {
                 return command.apply(spec.flags(), subCmdArgs);
             } catch (Exception e) {
+                e.printStackTrace(spec.console());
                 usage(e.getMessage());
                 return false;
             }
diff --git a/tools/pom.xml b/tools/pom.xml
index 46b776390..ee48821b9 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -29,4 +29,19 @@
     <module>framework</module>
     <module>all</module>
   </modules>
+  <profiles>
+  <!-- include stream storage cli only when -Dstream is provided -->
+    <profile>
+      <id>stream</id>
+      <activation>
+        <property>
+          <name>stream</name>
+        </property>
+      </activation>
+      <modules>
+        <!-- enable building stream storage cli -->
+        <module>stream</module>
+      </modules>
+    </profile>
+  </profiles>
 </project>
diff --git a/stream/cli/pom.xml b/tools/stream/pom.xml
similarity index 70%
rename from stream/cli/pom.xml
rename to tools/stream/pom.xml
index b1e3aea47..8bfad0ea9 100644
--- a/stream/cli/pom.xml
+++ b/tools/stream/pom.xml
@@ -21,28 +21,33 @@
   <modelVersion>4.0.0</modelVersion>
   <parent>
     <groupId>org.apache.bookkeeper</groupId>
-    <artifactId>stream-storage-parent</artifactId>
+    <artifactId>bookkeeper-tools-parent</artifactId>
     <version>4.8.0-SNAPSHOT</version>
   </parent>
   <artifactId>stream-storage-cli</artifactId>
-  <name>Apache BookKeeper :: Stream Storage :: CLI</name>
+  <name>Apache BookKeeper :: Tools :: Stream</name>
 
   <dependencies>
     <dependency>
       <groupId>org.apache.bookkeeper</groupId>
       <artifactId>stream-storage-java-client</artifactId>
-      <version>${project.parent.version}</version>
+      <version>${project.version}</version>
     </dependency>
     <dependency>
-      <groupId>com.beust</groupId>
-      <artifactId>jcommander</artifactId>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>stream-storage-service-impl</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>bookkeeper-tools-framework</artifactId>
+      <version>${project.version}</version>
     </dependency>
     <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-log4j12</artifactId>
-      <scope>runtime</scope>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>bookkeeper-server</artifactId>
+      <version>${project.version}</version>
     </dependency>
   </dependencies>
-  <build>
-  </build>
+
 </project>
diff --git a/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/ClusterCommandGroup.java b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/ClusterCommandGroup.java
new file mode 100644
index 000000000..375bc6610
--- /dev/null
+++ b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/ClusterCommandGroup.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.stream.cli;
+
+import org.apache.bookkeeper.stream.cli.commands.cluster.InitClusterCommand;
+import org.apache.bookkeeper.tools.common.BKFlags;
+import org.apache.bookkeeper.tools.framework.CliCommandGroup;
+import org.apache.bookkeeper.tools.framework.CliSpec;
+
+/**
+ * Commands that operates stream storage namespaces.
+ */
+public class ClusterCommandGroup extends CliCommandGroup<BKFlags> {
+
+    private static final String NAME = "cluster";
+    private static final String DESC = "Commands on administrating bookkeeper clusters";
+
+    private static final CliSpec<BKFlags> spec = CliSpec.<BKFlags>newBuilder()
+        .withName(NAME)
+        .withDescription(DESC)
+        .withParent("bkctl")
+        .addCommand(new InitClusterCommand())
+        .build();
+
+    public ClusterCommandGroup() {
+        super(spec);
+    }
+}
diff --git a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/CmdNamespace.java b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/NamespaceCommandGroup.java
similarity index 54%
rename from stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/CmdNamespace.java
rename to tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/NamespaceCommandGroup.java
index 20f9cf19e..be9e06295 100644
--- a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/CmdNamespace.java
+++ b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/NamespaceCommandGroup.java
@@ -15,19 +15,29 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.bookkeeper.stream.cli.commands;
+package org.apache.bookkeeper.stream.cli;
 
-import com.beust.jcommander.Parameters;
-import org.apache.bookkeeper.clients.config.StorageClientSettings;
 import org.apache.bookkeeper.stream.cli.commands.namespace.CreateNamespaceCommand;
+import org.apache.bookkeeper.tools.common.BKFlags;
+import org.apache.bookkeeper.tools.framework.CliCommandGroup;
+import org.apache.bookkeeper.tools.framework.CliSpec;
 
 /**
- * Commands that operates a single bookie.
+ * Commands that operates stream storage namespaces.
  */
-@Parameters(commandDescription = "Commands on operating namespaces")
-public class CmdNamespace extends CmdBase {
-    public CmdNamespace(StorageClientSettings.Builder settingsBuilder) {
-        super("namespace", settingsBuilder);
-        addSubCommand(new CreateNamespaceCommand());
+public class NamespaceCommandGroup extends CliCommandGroup<BKFlags> {
+
+    private static final String NAME = "namespace";
+    private static final String DESC = "Commands on operating namespaces";
+
+    private static final CliSpec<BKFlags> spec = CliSpec.<BKFlags>newBuilder()
+        .withName(NAME)
+        .withDescription(DESC)
+        .withParent("bkctl")
+        .addCommand(new CreateNamespaceCommand())
+        .build();
+
+    public NamespaceCommandGroup() {
+        super(spec);
     }
 }
diff --git a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/AdminCommand.java b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/TableAdminCommandGroup.java
similarity index 51%
rename from stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/AdminCommand.java
rename to tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/TableAdminCommandGroup.java
index eaf770e1e..60b98928e 100644
--- a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/AdminCommand.java
+++ b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/TableAdminCommandGroup.java
@@ -15,25 +15,29 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.bookkeeper.stream.cli.commands;
+package org.apache.bookkeeper.stream.cli;
 
-import org.apache.bookkeeper.clients.StorageClientBuilder;
-import org.apache.bookkeeper.clients.admin.StorageAdminClient;
-import org.apache.bookkeeper.clients.config.StorageClientSettings;
+import org.apache.bookkeeper.stream.cli.commands.table.CreateTableCommand;
+import org.apache.bookkeeper.tools.common.BKFlags;
+import org.apache.bookkeeper.tools.framework.CliCommandGroup;
+import org.apache.bookkeeper.tools.framework.CliSpec;
 
 /**
- * An admin command interface provides a run method to execute admin commands.
+ * Commands that admin tables.
  */
-public abstract class AdminCommand implements SubCommand {
+public class TableAdminCommandGroup extends CliCommandGroup<BKFlags> {
 
-    @Override
-    public void run(String namespace, StorageClientSettings settings) throws Exception {
-        try (StorageAdminClient admin = StorageClientBuilder.newBuilder()
-            .withSettings(settings)
-            .buildAdmin()) {
-            run(namespace, admin);
-        }
-    }
+    private static final String NAME = "tables";
+    private static final String DESC = "Commands on operating tables";
+
+    private static final CliSpec<BKFlags> spec = CliSpec.<BKFlags>newBuilder()
+        .withName(NAME)
+        .withDescription(DESC)
+        .withParent("bkctl")
+        .addCommand(new CreateTableCommand())
+        .build();
 
-    protected abstract void run(String namespace, StorageAdminClient admin) throws Exception;
+    public TableAdminCommandGroup() {
+        super(spec);
+    }
 }
diff --git a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/CmdTable.java b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/TableCommandGroup.java
similarity index 56%
rename from stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/CmdTable.java
rename to tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/TableCommandGroup.java
index 6b5e084d4..09d3be61b 100644
--- a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/CmdTable.java
+++ b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/TableCommandGroup.java
@@ -15,23 +15,33 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.bookkeeper.stream.cli.commands;
+package org.apache.bookkeeper.stream.cli;
 
-import com.beust.jcommander.Parameters;
-import org.apache.bookkeeper.clients.config.StorageClientSettings.Builder;
 import org.apache.bookkeeper.stream.cli.commands.table.GetCommand;
 import org.apache.bookkeeper.stream.cli.commands.table.IncrementCommand;
 import org.apache.bookkeeper.stream.cli.commands.table.PutCommand;
+import org.apache.bookkeeper.tools.common.BKFlags;
+import org.apache.bookkeeper.tools.framework.CliCommandGroup;
+import org.apache.bookkeeper.tools.framework.CliSpec;
 
 /**
- * Commands that operates a single bookie.
+ * Commands that interact with tables.
  */
-@Parameters(commandDescription = "Commands on operating tables")
-public class CmdTable extends CmdBase {
-    public CmdTable(Builder settingsBuilder) {
-        super("table", settingsBuilder);
-        addSubCommand(new PutCommand());
-        addSubCommand(new GetCommand());
-        addSubCommand(new IncrementCommand());
+public class TableCommandGroup extends CliCommandGroup<BKFlags> {
+
+    private static final String NAME = "table";
+    private static final String DESC = "Commands on interacting with tables";
+
+    private static final CliSpec<BKFlags> spec = CliSpec.<BKFlags>newBuilder()
+        .withName(NAME)
+        .withDescription(DESC)
+        .withParent("bkctl")
+        .addCommand(new PutCommand())
+        .addCommand(new GetCommand())
+        .addCommand(new IncrementCommand())
+        .build();
+
+    public TableCommandGroup() {
+        super(spec);
     }
 }
diff --git a/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/AdminCommand.java b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/AdminCommand.java
new file mode 100644
index 000000000..eb319aae8
--- /dev/null
+++ b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/AdminCommand.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.stream.cli.commands;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.clients.StorageClientBuilder;
+import org.apache.bookkeeper.clients.admin.StorageAdminClient;
+import org.apache.bookkeeper.clients.config.StorageClientSettings;
+import org.apache.bookkeeper.common.net.ServiceURI;
+import org.apache.bookkeeper.tools.common.BKCommand;
+import org.apache.bookkeeper.tools.common.BKFlags;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.framework.CliSpec;
+import org.apache.commons.configuration.CompositeConfiguration;
+
+/**
+ * An admin command interface provides a run method to execute admin commands.
+ */
+@Slf4j
+public abstract class AdminCommand<ClientFlagsT extends CliFlags> extends BKCommand<ClientFlagsT> {
+
+    protected AdminCommand(CliSpec<ClientFlagsT> spec) {
+        super(spec);
+    }
+
+    @Override
+    protected boolean acceptServiceUri(ServiceURI serviceURI) {
+        return ServiceURI.SERVICE_BK.equals(serviceURI.getServiceName());
+    }
+
+    @Override
+    protected boolean apply(ServiceURI serviceURI,
+                            CompositeConfiguration conf,
+                            BKFlags bkFlags,
+                            ClientFlagsT cmdFlags) {
+        checkArgument(
+            null != serviceURI,
+            "No service URI is provided");
+
+        StorageClientSettings settings = StorageClientSettings.newBuilder()
+            .clientName("bkctl")
+            .serviceUri(serviceURI.getUri().toString())
+            .build();
+
+        try (StorageAdminClient admin = StorageClientBuilder.newBuilder()
+            .withSettings(settings)
+            .buildAdmin()) {
+            run(admin, bkFlags, cmdFlags);
+            return true;
+        } catch (Exception e) {
+            log.error("Failed to process stream admin command", e);
+            return false;
+        }
+    }
+
+    protected abstract void run(StorageAdminClient admin, BKFlags globalFlags, ClientFlagsT cmdFlags)
+        throws Exception;
+}
diff --git a/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/ClientCommand.java b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/ClientCommand.java
new file mode 100644
index 000000000..03232a3b4
--- /dev/null
+++ b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/ClientCommand.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.stream.cli.commands;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.api.StorageClient;
+import org.apache.bookkeeper.clients.StorageClientBuilder;
+import org.apache.bookkeeper.clients.config.StorageClientSettings;
+import org.apache.bookkeeper.common.net.ServiceURI;
+import org.apache.bookkeeper.tools.common.BKCommand;
+import org.apache.bookkeeper.tools.common.BKFlags;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.framework.CliSpec;
+import org.apache.commons.configuration.CompositeConfiguration;
+
+/**
+ * An admin command interface provides a run method to execute admin commands.
+ */
+@Slf4j
+public abstract class ClientCommand<ClientFlagsT extends CliFlags> extends BKCommand<ClientFlagsT> {
+
+    protected ClientCommand(CliSpec<ClientFlagsT> spec) {
+        super(spec);
+    }
+
+    @Override
+    protected boolean apply(ServiceURI serviceURI,
+                            CompositeConfiguration conf,
+                            BKFlags globalFlags,
+                            ClientFlagsT cmdFlags) {
+        checkArgument(
+            null != serviceURI,
+            "No service uri is provided");
+
+        StorageClientSettings settings = StorageClientSettings.newBuilder()
+            .clientName("bkctl")
+            .serviceUri(serviceURI.getUri().toString())
+            .build();
+
+        try (StorageClient client = StorageClientBuilder.newBuilder()
+            .withSettings(settings)
+            .withNamespace(globalFlags.namespace)
+            .build()) {
+            run(client, cmdFlags);
+            return true;
+        } catch (Exception e) {
+            log.error("Failed to process commands under namespace '{}'",
+                globalFlags.namespace, e);
+            return false;
+        }
+    }
+
+    protected abstract void run(StorageClient client,
+                                ClientFlagsT cmdFlags) throws Exception;
+}
diff --git a/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/cluster/InitClusterCommand.java b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/cluster/InitClusterCommand.java
new file mode 100644
index 000000000..2337f000d
--- /dev/null
+++ b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/cluster/InitClusterCommand.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.stream.cli.commands.cluster;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.beust.jcommander.Parameter;
+import com.google.common.base.Strings;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import java.net.URI;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.net.ServiceURI;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.MetadataDrivers;
+import org.apache.bookkeeper.stream.cli.commands.cluster.InitClusterCommand.Flags;
+import org.apache.bookkeeper.stream.storage.StorageConstants;
+import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterInitializer;
+import org.apache.bookkeeper.tools.common.BKCommand;
+import org.apache.bookkeeper.tools.common.BKFlags;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.framework.CliSpec;
+import org.apache.commons.configuration.CompositeConfiguration;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.BoundedExponentialBackoffRetry;
+import org.apache.distributedlog.exceptions.ZKException;
+import org.apache.distributedlog.impl.metadata.BKDLConfig;
+import org.apache.distributedlog.metadata.DLMetadata;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+
+/**
+ * Command to init a cluster.
+ */
+@Slf4j
+public class InitClusterCommand extends BKCommand<Flags> {
+
+    private static final String NAME = "init";
+    private static final String DESC = "Init a cluster";
+
+    /**
+     * Flags for the init cluster command.
+     */
+    public static class Flags extends CliFlags {
+
+        @Parameter(
+            names = {
+                "-x", "--cluster-name"
+            },
+            description = "cluster name. this would be used as a root path for storing metadata.")
+        private String clusterName = "";
+
+        @Parameter(
+            names = {
+                "-l", "--ledgers-path"
+            },
+            description = "root path to store ledgers' metadata")
+        private String ledgersPath = "/ledgers";
+
+        @Parameter(
+            names = {
+                "-dl", "--dlog-path"
+            },
+            description = "root path to store dlog metadata")
+        private String dlogPath = "/distributedlog";
+
+        @Parameter(
+            names = {
+                "-n", "--num-storage-containers"
+            },
+            description = "num of storage containers allocated for stream storage")
+        private int numStorageContainers = 32;
+
+    }
+
+    public InitClusterCommand() {
+        super(CliSpec.<Flags>newBuilder()
+            .withName(NAME)
+            .withDescription(DESC)
+            .withFlags(new Flags())
+            .build());
+    }
+
+    @Override
+    protected boolean acceptServiceUri(ServiceURI serviceURI) {
+        // only support zookeeper now
+        return ServiceURI.SERVICE_ZK.equals(serviceURI.getServiceName());
+    }
+
+    @Override
+    protected boolean apply(ServiceURI serviceURI,
+                            CompositeConfiguration conf,
+                            BKFlags globalFlags,
+                            Flags cmdFlags) {
+        checkArgument(
+            null != serviceURI,
+            "No service URI is provided");
+
+        if (null != cmdFlags.clusterName) {
+            checkArgument(
+                !cmdFlags.clusterName.contains("/"),
+                "Invalid cluster name : " + cmdFlags.clusterName);
+        }
+
+        checkArgument(
+            !Strings.isNullOrEmpty(cmdFlags.ledgersPath)
+                && cmdFlags.ledgersPath.startsWith("/"),
+            "Invalid ledgers root metadata path : " + cmdFlags.ledgersPath);
+
+        checkArgument(
+            !Strings.isNullOrEmpty(cmdFlags.dlogPath)
+                && cmdFlags.dlogPath.startsWith("/"),
+            "Invalid dlog root metadata path : " + cmdFlags.dlogPath);
+
+        checkArgument(
+            cmdFlags.numStorageContainers > 0,
+            "Zero or negative number of storage containers configured");
+
+        String clusterName = null == cmdFlags.clusterName ? "" : cmdFlags.clusterName;
+        String ledgersPath = getFullyQualifiedPath(clusterName, cmdFlags.ledgersPath);
+        String dlogPath = getFullyQualifiedPath(clusterName, cmdFlags.dlogPath);
+
+        String metadataServiceHosts = StringUtils.join(serviceURI.getServiceHosts(), ",");
+
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(
+            metadataServiceHosts,
+            new BoundedExponentialBackoffRetry(100, 10000, 20)
+        )) {
+            client.start();
+
+            URI uri = serviceURI.getUri();
+            URI rootUri = new URI(
+                uri.getScheme(),
+                uri.getAuthority(),
+                "",
+                null,
+                null);
+
+            String ledgersUri = rootUri.toString() + ledgersPath;
+            String dlogUri = rootUri.toString() + dlogPath;
+
+            log.info("Initializing cluster {} : \n"
+                + "\tledgers : path = {}, uri = {}\n"
+                + "\tdlog: path = {}, uri = {}\n"
+                + "\tstream storage: path = {}, num_storage_containers = {}",
+                clusterName,
+                ledgersPath, ledgersUri,
+                dlogPath, dlogUri,
+                StorageConstants.ZK_METADATA_ROOT_PATH, cmdFlags.numStorageContainers);
+
+            // create the cluster root path
+            initializeCluster(client, clusterName);
+
+            // init the ledgers metadata
+            initLedgersMetadata(ledgersUri);
+
+            // init the dlog metadata
+            initDlogMetadata(client, metadataServiceHosts, dlogUri, dlogPath, ledgersPath);
+
+            // init the stream storage metadata
+            initStreamStorageMetadata(
+                metadataServiceHosts,
+                ledgersUri,
+                cmdFlags.numStorageContainers);
+            log.info("Successfully initialized cluster {}", clusterName);
+            return true;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void initializeCluster(CuratorFramework client,
+                                   String clusterName) throws Exception {
+        if (Strings.isNullOrEmpty(clusterName)) {
+            return;
+        }
+
+        String path = "/" + clusterName;
+        if (null == client.checkExists().forPath(path)) {
+            try {
+                client.create().forPath(path);
+            } catch (KeeperException.NodeExistsException ne) {
+                // someone created it already
+                return;
+            }
+        }
+    }
+
+    private void initLedgersMetadata(String ledgersUri) throws Exception {
+        MetadataDrivers.runFunctionWithRegistrationManager(
+            new ServerConfiguration().setMetadataServiceUri(ledgersUri),
+            rm -> {
+                try {
+                    if (rm.initNewCluster()) {
+                        log.info("Successfully initialized ledgers metadata at {}", ledgersUri);
+                    }
+                } catch (Exception e) {
+                    throw new UncheckedExecutionException("Failed to init ledgers metadata at " + ledgersUri, e);
+                }
+                return null;
+            }
+        );
+    }
+
+    private void initDlogMetadata(CuratorFramework client,
+                                  String metadataServiceHosts,
+                                  String dlogUri,
+                                  String dlogPath,
+                                  String ledgersPath) throws Exception {
+        BKDLConfig dlogConfig = new BKDLConfig(metadataServiceHosts, ledgersPath);
+        DLMetadata dlogMetadata = DLMetadata.create(dlogConfig);
+
+        if (null == client.checkExists().forPath(dlogPath)) {
+            try {
+                dlogMetadata.create(URI.create(dlogUri));
+            } catch (ZKException zke) {
+                if (Code.NODEEXISTS.intValue() == zke.getCode()) {
+                    // dlog namespace already created
+                    return;
+                }
+            }
+        }
+    }
+
+    private void initStreamStorageMetadata(String metadataServiceHosts,
+                                           String ledgersUri,
+                                           int numStorageContainers) {
+        ZkClusterInitializer initializer = new ZkClusterInitializer(metadataServiceHosts);
+        if (initializer.initializeCluster(URI.create(ledgersUri), numStorageContainers)) {
+            log.info("Successfully initialized stream storage metadata at {}:{}",
+                metadataServiceHosts,
+                StorageConstants.ZK_METADATA_ROOT_PATH);
+        }
+    }
+
+    private static String getFullyQualifiedPath(String clusterName, String path) {
+        if (Strings.isNullOrEmpty(clusterName)) {
+            clusterName = "";
+        } else {
+            clusterName = "/" + clusterName;
+        }
+        return clusterName + path;
+    }
+
+}
diff --git a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/stream/package-info.java b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/cluster/package-info.java
similarity index 87%
rename from stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/stream/package-info.java
rename to tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/cluster/package-info.java
index f262f827e..36c523b5e 100644
--- a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/stream/package-info.java
+++ b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/cluster/package-info.java
@@ -17,6 +17,6 @@
  */
 
 /**
- * This package contains all the stream related commands.
+ * This package contains all the cluster related commands.
  */
-package org.apache.bookkeeper.stream.cli.commands.stream;
+package org.apache.bookkeeper.stream.cli.commands.cluster;
\ No newline at end of file
diff --git a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/namespace/CreateNamespaceCommand.java b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/namespace/CreateNamespaceCommand.java
similarity index 54%
rename from stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/namespace/CreateNamespaceCommand.java
rename to tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/namespace/CreateNamespaceCommand.java
index fa6af6ebd..6f5e43537 100644
--- a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/namespace/CreateNamespaceCommand.java
+++ b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/namespace/CreateNamespaceCommand.java
@@ -17,42 +17,60 @@
  */
 package org.apache.bookkeeper.stream.cli.commands.namespace;
 
-import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
 
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.Parameters;
 import org.apache.bookkeeper.clients.admin.StorageAdminClient;
 import org.apache.bookkeeper.stream.cli.commands.AdminCommand;
+import org.apache.bookkeeper.stream.cli.commands.namespace.CreateNamespaceCommand.Flags;
 import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
 import org.apache.bookkeeper.stream.proto.NamespaceProperties;
+import org.apache.bookkeeper.tools.common.BKFlags;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.framework.CliSpec;
 
 /**
  * Command to create a namespace.
  */
-@Parameters(commandDescription = "Create a namespace")
-public class CreateNamespaceCommand extends AdminCommand {
+public class CreateNamespaceCommand extends AdminCommand<Flags> {
 
-    @Parameter(names = { "-n", "--name" }, description = "namespace name")
-    private String namespaceName;
+    private static final String NAME = "create";
+    private static final String DESC = "Create a namespace";
+
+    /**
+     * Flags for the create namespace command.
+     */
+    public static class Flags extends CliFlags {
+    }
+
+    public CreateNamespaceCommand() {
+        super(CliSpec.<Flags>newBuilder()
+            .withName(NAME)
+            .withDescription(DESC)
+            .withFlags(new Flags())
+            .withArgumentsUsage("<namespace-name>")
+            .build());
+    }
 
     @Override
-    protected void run(String namespace, StorageAdminClient admin) throws Exception {
-        checkNotNull(namespaceName, "Namespace name is not provided");
-        System.out.println("Creating namespace '" + namespaceName + "' ...");
+    protected void run(StorageAdminClient admin,
+                       BKFlags globalFlags,
+                       Flags flags) throws Exception {
+        checkArgument(!flags.arguments.isEmpty(),
+            "Namespace name is not provided");
+
+        String namespaceName = flags.arguments.get(0);
+
+        spec.console().println("Creating namespace '" + namespaceName + "' ...");
         NamespaceProperties nsProps = result(
             admin.createNamespace(
                 namespaceName,
                 NamespaceConfiguration.newBuilder()
                     .setDefaultStreamConf(DEFAULT_STREAM_CONF)
                     .build()));
-        System.out.println("Successfully created namespace '" + namespaceName + "':");
-        System.out.println(nsProps);
+        spec.console().println("Successfully created namespace '" + namespaceName + "':");
+        spec.console().println(nsProps);
     }
 
-    @Override
-    public String name() {
-        return "create";
-    }
 }
diff --git a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/namespace/package-info.java b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/namespace/package-info.java
similarity index 100%
rename from stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/namespace/package-info.java
rename to tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/namespace/package-info.java
diff --git a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/package-info.java b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/package-info.java
similarity index 100%
rename from stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/package-info.java
rename to tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/package-info.java
diff --git a/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/CreateTableCommand.java b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/CreateTableCommand.java
new file mode 100644
index 000000000..784c8c252
--- /dev/null
+++ b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/CreateTableCommand.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.stream.cli.commands.table;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
+
+import org.apache.bookkeeper.clients.admin.StorageAdminClient;
+import org.apache.bookkeeper.stream.cli.commands.AdminCommand;
+import org.apache.bookkeeper.stream.cli.commands.table.CreateTableCommand.Flags;
+import org.apache.bookkeeper.stream.proto.StreamProperties;
+import org.apache.bookkeeper.tools.common.BKFlags;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.framework.CliSpec;
+
+/**
+ * Command to create a namespace.
+ */
+public class CreateTableCommand extends AdminCommand<Flags> {
+
+    private static final String NAME = "create";
+    private static final String DESC = "Create a stream";
+
+    /**
+     * Flags for the create table command.
+     */
+    public static class Flags extends CliFlags {
+    }
+
+    public CreateTableCommand() {
+        super(CliSpec.<Flags>newBuilder()
+            .withName(NAME)
+            .withDescription(DESC)
+            .withFlags(new Flags())
+            .withArgumentsUsage("<stream-name>")
+            .build());
+    }
+
+    @Override
+    protected void run(StorageAdminClient admin,
+                       BKFlags globalFlags,
+                       Flags flags) throws Exception {
+        checkArgument(!flags.arguments.isEmpty(),
+            "Stream name is not provided");
+
+        String streamName = flags.arguments.get(0);
+
+        spec.console().println("Creating stream '" + streamName + "' ...");
+        StreamProperties nsProps = result(
+            admin.createStream(
+                globalFlags.namespace,
+                streamName,
+                DEFAULT_STREAM_CONF));
+        spec.console().println("Successfully created stream '" + streamName + "':");
+        spec.console().println(nsProps);
+    }
+
+}
diff --git a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/GetCommand.java b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/GetCommand.java
similarity index 53%
rename from stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/GetCommand.java
rename to tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/GetCommand.java
index 0b39da51e..b7090fcaa 100644
--- a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/GetCommand.java
+++ b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/GetCommand.java
@@ -17,12 +17,11 @@
  */
 package org.apache.bookkeeper.stream.cli.commands.table;
 
-import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkArgument;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 
 import com.beust.jcommander.Parameter;
-import com.beust.jcommander.Parameters;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
@@ -30,53 +29,69 @@
 import org.apache.bookkeeper.api.kv.Table;
 import org.apache.bookkeeper.api.kv.result.KeyValue;
 import org.apache.bookkeeper.stream.cli.commands.ClientCommand;
+import org.apache.bookkeeper.stream.cli.commands.table.GetCommand.Flags;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.framework.CliSpec;
 
 /**
  * Command to get kv.
  */
-@Parameters(commandDescription = "Get key/value pair from a table")
-public class GetCommand extends ClientCommand {
+public class GetCommand extends ClientCommand<Flags> {
 
-    @Parameter(names = { "-t", "--table" }, description = "table name")
-    private String tableName = null;
+    private static final String NAME = "get";
+    private static final String DESC = "Get key/value pair from a table";
 
-    @Parameter(names = { "-k", "--key" }, description = "key")
-    private String key = null;
+    /**
+     * Flags for the get kv command.
+     */
+    public static class Flags extends CliFlags {
 
-    @Parameter(names = { "-w", "--watch" }, description = "watch the value changes of a key")
-    private boolean watch = false;
+        @Parameter(names = { "-w", "--watch" }, description = "watch the value changes of a key")
+        private boolean watch = false;
+
+    }
+
+    public GetCommand() {
+        super(CliSpec.<Flags>newBuilder()
+            .withName(NAME)
+            .withDescription(DESC)
+            .withFlags(new Flags())
+            .withArgumentsUsage("<table> <key>")
+            .build());
+    }
 
     @Override
-    protected void run(StorageClient client) throws Exception {
-        checkNotNull(tableName, "Table name is not provided");
-        checkNotNull(key, "Key is not provided");
+    protected void run(StorageClient client, Flags flags) throws Exception {
+        checkArgument(flags.arguments.size() >= 2,
+            "table and key are not provided");
+
+        String tableName = flags.arguments.get(0);
+        String key = flags.arguments.get(1);
 
         try (Table<ByteBuf, ByteBuf> table = result(client.openTable(tableName))) {
             long lastVersion = -1L;
             do {
-                try (KeyValue<ByteBuf, ByteBuf> kv = result(table.getKv(Unpooled.wrappedBuffer(key.getBytes(UTF_8))))) {
+                try (KeyValue<ByteBuf, ByteBuf> kv =
+                         result(table.getKv(Unpooled.wrappedBuffer(key.getBytes(UTF_8))))) {
                     if (null == kv) {
-                        System.out.println("key '" + key + "' doesn't exist.");
+                        spec.console().println("key '" + key + "' doesn't exist.");
                     } else {
                         if (kv.version() > lastVersion) {
                             if (kv.isNumber()) {
-                                System.out.println("value = " + kv.numberValue());
+                                spec.console().println("value = " + kv.numberValue());
                             } else {
-                                System.out.println("value = " + new String(ByteBufUtil.getBytes(kv.value()), UTF_8));
+                                spec.console()
+                                    .println("value = " + new String(ByteBufUtil.getBytes(kv.value()), UTF_8));
                             }
                             lastVersion = kv.version();
                         }
                     }
                 }
-                if (watch) {
+                if (flags.watch) {
                     Thread.sleep(1000);
                 }
-            } while (watch);
+            } while (flags.watch);
         }
     }
 
-    @Override
-    public String name() {
-        return "get";
-    }
 }
diff --git a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/IncrementCommand.java b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/IncrementCommand.java
similarity index 53%
rename from stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/IncrementCommand.java
rename to tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/IncrementCommand.java
index a04982879..0e8f2e536 100644
--- a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/IncrementCommand.java
+++ b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/IncrementCommand.java
@@ -17,48 +17,57 @@
  */
 package org.apache.bookkeeper.stream.cli.commands.table;
 
-import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkArgument;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.Parameters;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import org.apache.bookkeeper.api.StorageClient;
 import org.apache.bookkeeper.api.kv.Table;
 import org.apache.bookkeeper.stream.cli.commands.ClientCommand;
+import org.apache.bookkeeper.stream.cli.commands.table.IncrementCommand.Flags;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.framework.CliSpec;
 
 /**
- * Commands to put kvs.
+ * Commands to increment amount of keys.
  */
-@Parameters(commandDescription = "Increment the amount of a key in a table")
-public class IncrementCommand extends ClientCommand {
+public class IncrementCommand extends ClientCommand<Flags> {
 
-    @Parameter(names = { "-t", "--table" }, description = "table name")
-    private String tableName = null;
+    private static final String NAME = "inc";
+    private static final String DESC = "Increment the amount of a key in a table";
 
-    @Parameter(names = { "-k", "--key" }, description = "key")
-    private String key = null;
+    /**
+     * Flags of the increment command.
+     */
+    public static class Flags extends CliFlags {
+    }
 
-    @Parameter(names = { "-a", "--amount" }, description = "amount to increment")
-    private long amount = 0;
+    public IncrementCommand() {
+        super(CliSpec.<Flags>newBuilder()
+            .withName(NAME)
+            .withDescription(DESC)
+            .withFlags(new Flags())
+            .withArgumentsUsage("<table> <key> <amount>")
+            .build());
+    }
 
     @Override
-    protected void run(StorageClient client) throws Exception {
-        checkNotNull(tableName, "Table name is not provided");
-        checkNotNull(key, "Key is not provided");
+    protected void run(StorageClient client, Flags flags) throws Exception {
+        checkArgument(flags.arguments.size() >= 3,
+            "table and key/amount are not provided");
+
+        String tableName = flags.arguments.get(0);
+        String key = flags.arguments.get(1);
+        long amount = Long.parseLong(flags.arguments.get(2));
 
         try (Table<ByteBuf, ByteBuf> table = result(client.openTable(tableName))) {
             result(table.increment(
                 Unpooled.wrappedBuffer(key.getBytes(UTF_8)),
                 amount));
-            System.out.println("Successfully increment kv: ('" + key + "', amount = '" + amount + "').");
+            spec.console().println("Successfully increment kv: ('" + key + "', amount = '" + amount + "').");
         }
     }
 
-    @Override
-    public String name() {
-        return "incr";
-    }
 }
diff --git a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/PutCommand.java b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/PutCommand.java
similarity index 56%
rename from stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/PutCommand.java
rename to tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/PutCommand.java
index b9f537dd0..5c89c664a 100644
--- a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/PutCommand.java
+++ b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/PutCommand.java
@@ -17,49 +17,57 @@
  */
 package org.apache.bookkeeper.stream.cli.commands.table;
 
-import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkArgument;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.Parameters;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import org.apache.bookkeeper.api.StorageClient;
 import org.apache.bookkeeper.api.kv.Table;
 import org.apache.bookkeeper.stream.cli.commands.ClientCommand;
+import org.apache.bookkeeper.stream.cli.commands.table.PutCommand.Flags;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.framework.CliSpec;
 
 /**
  * Commands to put kvs.
  */
-@Parameters(commandDescription = "Put key/value pair to a table")
-public class PutCommand extends ClientCommand {
+public class PutCommand extends ClientCommand<Flags> {
 
-    @Parameter(names = { "-t", "--table" }, description = "table name")
-    private String tableName = null;
+    private static final String NAME = "put";
+    private static final String DESC = "Put key/value pair to a table";
 
-    @Parameter(names = { "-k", "--key" }, description = "key")
-    private String key = null;
+    /**
+     * Flags of the put command.
+     */
+    public static class Flags extends CliFlags {
+    }
 
-    @Parameter(names = { "-v", "--value" }, description = "value")
-    private String value = null;
+    public PutCommand() {
+        super(CliSpec.<Flags>newBuilder()
+            .withName(NAME)
+            .withDescription(DESC)
+            .withFlags(new Flags())
+            .withArgumentsUsage("<table> <key> <value>")
+            .build());
+    }
 
     @Override
-    protected void run(StorageClient client) throws Exception {
-        checkNotNull(tableName, "Table name is not provided");
-        checkNotNull(key, "Key is not provided");
-        checkNotNull(value, "Value is not provided");
+    protected void run(StorageClient client, Flags flags) throws Exception {
+        checkArgument(flags.arguments.size() >= 3,
+            "table and key/value are not provided");
+
+        String tableName = flags.arguments.get(0);
+        String key = flags.arguments.get(1);
+        String value = flags.arguments.get(2);
 
         try (Table<ByteBuf, ByteBuf> table = result(client.openTable(tableName))) {
             result(table.put(
                 Unpooled.wrappedBuffer(key.getBytes(UTF_8)),
                 Unpooled.wrappedBuffer(value.getBytes(UTF_8))));
-            System.out.println("Successfully update kv: ('" + key + "', '" + value + "').");
+            spec.console().println("Successfully update kv: ('" + key + "', '" + value + "').");
         }
     }
 
-    @Override
-    public String name() {
-        return "put";
-    }
 }
diff --git a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/package-info.java b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/package-info.java
similarity index 100%
rename from stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/package-info.java
rename to tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/package-info.java
diff --git a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/package-info.java b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/package-info.java
similarity index 100%
rename from stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/package-info.java
rename to tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/package-info.java


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services