You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/06/07 06:10:26 UTC

[bookkeeper] branch master updated: [TABLE SERVICE] Move table service cli to bookkeeper-tools

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 2aad543  [TABLE SERVICE] Move table service cli to bookkeeper-tools
2aad543 is described below

commit 2aad54347870bddc03aa071c67eac18a23beadff
Author: Sijie Guo <si...@apache.org>
AuthorDate: Wed Jun 6 23:10:18 2018 -0700

    [TABLE SERVICE] Move table service cli to bookkeeper-tools
    
    Descriptions of the changes in this PR:
    
    *Motivation*
    
    This PR follows #1478  to move table service cli to bookkeeper-tools.
    So different service components are now available in one cli tool.
    
    *Changes*
    
    - move stream/cli module to tools/stream module
    - organize the table service commands as groups:
    
    * cluster
    * namespace
    * table
    * tables
    
    *Output*
    
    Example output:
    
    ```
    $ bin/bkctl
    bkctl interacts and operates Apache BookKeeper clusters
    
    Usage:  bkctl [flags] [command group] [commands]
    
    Commands:
    
        bookie          Commands on operating a single bookie
        bookies         Commands on operating a cluster of bookies
        cluster         Commands on administrating bookkeeper clusters
        ledger          Commands on interacting with ledgers
        namespace       Commands on operating namespaces
        table           Commands on interacting with tables
        tables          Commands on operating tables
    
        help            Display help information about it
    
    Flags:
    
        -c, --conf
            Configuration file
    
        -n, --namespace
            Namespace scope to run commands (only valid for table service for now)
    
        -u, --service-uri
            Service Uri
    
        -h, --help
            Display help information
    
    Use "bkctl [command] --help" or "bkctl help [command]" for more information
    about a command
    ```
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Jia Zhai <None>
    
    This closes #1485 from sijie/stream_cli
---
 bin/common.sh                                      |   4 +-
 bin/standalone.docker-compose                      |  27 ++-
 .../tools/cli/helpers/BookieCommand.java           |   2 +
 .../tools/cli/helpers/ClientCommand.java           |   2 +
 docker/scripts/common.sh                           |  36 ++-
 docker/scripts/init_bookie.sh                      |  73 ++----
 .../bookkeeper/stream/cli/StreamStorageCli.java    | 189 ---------------
 .../stream/cli/commands/AdminCommand.java          |  39 ---
 .../bookkeeper/stream/cli/commands/CmdBase.java    |  90 -------
 .../bookkeeper/stream/cli/commands/CmdStream.java  |  33 ---
 .../bookkeeper/stream/cli/commands/SubCommand.java |  39 ---
 .../cli/commands/stream/CreateStreamCommand.java   |  58 -----
 stream/pom.xml                                     |   1 -
 .../storage/api/cluster/ClusterInitializer.java    |   3 +-
 .../storage/api/cluster/ClusterMetadataStore.java  |   7 +-
 .../impl/cluster/InMemClusterMetadataStore.java    |   3 +-
 .../storage/impl/cluster/ZkClusterInitializer.java |   6 +-
 .../impl/cluster/ZkClusterMetadataStore.java       |   9 +-
 .../cluster/ClusterControllerLeaderImplTest.java   |   4 +-
 .../docker-images/current-version-image/Dockerfile |   4 +-
 .../tests/integration/stream/BkCtlTest.java        | 186 +++++++++++++++
 .../integration/stream/StreamClusterTestBase.java  |  15 ++
 .../tests/integration/topologies/BKCluster.java    |  10 +
 tools/all/pom.xml                                  |  23 ++
 ....apache.bookkeeper.tools.framework.CommandGroup |   4 +
 .../apache/bookkeeper/tools/common/BKCommand.java  |   3 +-
 .../apache/bookkeeper/tools/common/BKFlags.java    |   9 +
 .../org/apache/bookkeeper/tools/framework/Cli.java |   1 +
 tools/pom.xml                                      |  15 ++
 {stream/cli => tools/stream}/pom.xml               |  25 +-
 .../bookkeeper/stream/cli/ClusterCommandGroup.java |  25 +-
 .../stream/cli/NamespaceCommandGroup.java          |  28 ++-
 .../stream/cli/TableAdminCommandGroup.java         |  35 +--
 .../bookkeeper/stream/cli/TableCommandGroup.java   |  32 ++-
 .../stream/cli/commands/AdminCommand.java          |  75 ++++++
 .../stream/cli/commands/ClientCommand.java         |  72 ++++++
 .../cli/commands/cluster/InitClusterCommand.java   | 262 +++++++++++++++++++++
 .../stream/cli/commands/cluster}/package-info.java |   4 +-
 .../commands/namespace/CreateNamespaceCommand.java |  50 ++--
 .../cli/commands/namespace/package-info.java       |   0
 .../stream/cli/commands/package-info.java          |   0
 .../cli/commands/table/CreateTableCommand.java     |  74 ++++++
 .../stream/cli/commands/table/GetCommand.java      |  61 +++--
 .../cli/commands/table/IncrementCommand.java       |  49 ++--
 .../stream/cli/commands/table/PutCommand.java      |  48 ++--
 .../stream/cli/commands/table/package-info.java    |   0
 .../apache/bookkeeper/stream/cli/package-info.java |   0
 47 files changed, 1073 insertions(+), 662 deletions(-)

diff --git a/bin/common.sh b/bin/common.sh
index 0c048c9..0a0146f 100755
--- a/bin/common.sh
+++ b/bin/common.sh
@@ -149,7 +149,7 @@ find_module_jar() {
       read -p "Do you want me to run \`mvn package -DskiptTests\` for you ? " answer
       case "${answer:0:1}" in
         y|Y )
-          mvn package -DskipTests
+          mvn package -DskipTests -Dstream
           ;;
         * )
           exit 1
@@ -183,7 +183,7 @@ add_maven_deps_to_classpath() {
   # clean it up and force us create a new one.
   f="${MODULE_PATH}/target/cached_classpath.txt"
   if [ ! -f ${f} ]; then
-    ${MVN} -f "${MODULE_PATH}/pom.xml" dependency:build-classpath -Dmdep.outputFile="target/cached_classpath.txt" &> /dev/null
+    ${MVN} -f "${MODULE_PATH}/pom.xml" -Dstream dependency:build-classpath -Dmdep.outputFile="target/cached_classpath.txt" &> /dev/null
   fi
 }
 
diff --git a/bin/standalone.docker-compose b/bin/standalone.docker-compose
index 25e9fcb..4cf1ffd 100755
--- a/bin/standalone.docker-compose
+++ b/bin/standalone.docker-compose
@@ -54,23 +54,30 @@ function gen_bookie_section() {
   local cluster=$1
   local bookie_name=$2
   local bookie_port=$3
-  local image=$4
+  local bookie_http_port=$4
+  local bookie_grpc_port=$5
+  local image=$6
   cat <<EOF
   ${bookie_name}:
     image: ${image}
     depends_on:
       - metadata-service
     environment:
+      # metadata service
       - BK_zkServers=metadata-service
-      - BK_metadataServiceUri=zk://metadata-service/ledgers
       - BK_zkLedgersRootPath=/ledgers
-      - BK_journalDirectory=/data/bookkeeper/journal
-      - BK_ledgerDirectories=/data/bookkeeper/ledgers
-      - BK_indexDirectories=/data/bookkeeper/ledgers
-      - BK_advertisedAddress=localhost
-      - BK_bookiePort=${bookie_port}
+      - BK_metadataServiceUri=zk://metadata-service/ledgers
+      # bookie
+      - BK_DATA_DIR=/data/bookkeeper
+      - BK_advertisedAddress=${bookie_name}
+      # bookie http
+      - BK_httpServerEnabled=true
+      # stream storage
+      - BK_extraServerComponents=org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent
     ports:
-      - "${bookie_port}:${bookie_port}"
+      - "${bookie_port}:3181"
+      - "${bookie_http_port}:8080"
+      - "${bookie_grpc_port}:4181"
     volumes:
       - "${DATA_ROOT_DIR}/${cluster}/${bookie_name}/journal:/data/bookkeeper/journal"
       - "${DATA_ROOT_DIR}/${cluster}/${bookie_name}/ledgers:/data/bookkeeper/ledgers"
@@ -94,7 +101,9 @@ function generate_docker_compose_file() {
   local BI=0
   while [ ${BI} -lt $((num_bookies)) ]; do
     local bookie_port=$((3181 + BI))
-    local bookie_section=$(gen_bookie_section ${cluster} "bookie-${BI}" ${bookie_port} ${image})
+    local bookie_http_port=$((8080 + BI))
+    local bookie_grpc_port=$((4181 + BI))
+    local bookie_section=$(gen_bookie_section ${cluster} "bookie-${BI}" ${bookie_port} ${bookie_http_port} ${bookie_grpc_port} ${image})
     echo "${bookie_section}"        >> ${docker_compose_file}
     let BI=BI+1
   done
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/helpers/BookieCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/helpers/BookieCommand.java
index 64c6308..a2d1bbc 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/helpers/BookieCommand.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/helpers/BookieCommand.java
@@ -21,6 +21,7 @@ package org.apache.bookkeeper.tools.cli.helpers;
 import org.apache.bookkeeper.common.net.ServiceURI;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.tools.common.BKCommand;
+import org.apache.bookkeeper.tools.common.BKFlags;
 import org.apache.bookkeeper.tools.framework.CliFlags;
 import org.apache.bookkeeper.tools.framework.CliSpec;
 import org.apache.commons.configuration.CompositeConfiguration;
@@ -37,6 +38,7 @@ public abstract class BookieCommand<BookieFlagsT extends CliFlags> extends BKCom
     @Override
     protected boolean apply(ServiceURI serviceURI,
                             CompositeConfiguration conf,
+                            BKFlags globalFlags,
                             BookieFlagsT cmdFlags) {
         ServerConfiguration serverConf = new ServerConfiguration();
         serverConf.loadConf(conf);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/helpers/ClientCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/helpers/ClientCommand.java
index 7738911..ae807df 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/helpers/ClientCommand.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/helpers/ClientCommand.java
@@ -24,6 +24,7 @@ import org.apache.bookkeeper.common.net.ServiceURI;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.tools.common.BKCommand;
+import org.apache.bookkeeper.tools.common.BKFlags;
 import org.apache.bookkeeper.tools.framework.CliFlags;
 import org.apache.bookkeeper.tools.framework.CliSpec;
 import org.apache.commons.configuration.CompositeConfiguration;
@@ -41,6 +42,7 @@ public abstract class ClientCommand<ClientFlagsT extends CliFlags> extends BKCom
     @Override
     protected boolean apply(ServiceURI serviceURI,
                             CompositeConfiguration conf,
+                            BKFlags globalFlags,
                             ClientFlagsT cmdFlags) {
         ClientConfiguration clientConf = new ClientConfiguration();
         clientConf.loadConf(conf);
diff --git a/docker/scripts/common.sh b/docker/scripts/common.sh
index ece24ba..5bbcd20 100755
--- a/docker/scripts/common.sh
+++ b/docker/scripts/common.sh
@@ -28,22 +28,48 @@ BK_CLUSTER_ROOT_PATH=${BK_CLUSTER_ROOT_PATH:-""}
 
 # bk env vars to replace values in config files
 export BK_HOME=/opt/bookkeeper
-export BK_bookiePort=${BK_bookiePort:-${PORT0}}
+# metadata service
 export BK_zkServers=${BK_zkServers}
 export BK_zkLedgersRootPath=${BK_zkLedgersRootPath:-"${BK_CLUSTER_ROOT_PATH}/ledgers"}
+export BK_metadataServiceUri=${BK_metadataServiceUri:-"zk://${BK_zkServers}${BK_zkLedgersRootPath}"}
+# bookie
+export BK_bookiePort=${BK_bookiePort:-${PORT0}}
+export BK_httpServerEnabled=${BK_httpServerEnabled:-"true"}
+export BK_httpServerPort=${BK_httpServerPort:-${BOOKIE_HTTP_PORT}}
 export BK_journalDirectory=${BK_journalDirectory:-${BK_DATA_DIR}/journal}
 export BK_ledgerDirectories=${BK_ledgerDirectories:-${BK_DATA_DIR}/ledgers}
-export BK_indexDirectories=${BK_indexDirectories:-${BK_DATA_DIR}/index}
-export BK_metadataServiceUri=${BK_metadataServiceUri:-"zk://${BK_zkServers}${BK_zkLedgersRootPath}"}
+export BK_indexDirectories=${BK_indexDirectories:-${BK_ledgerDirectories}}
+# dlog
 export BK_dlogRootPath=${BK_dlogRootPath:-"${BK_CLUSTER_ROOT_PATH}/distributedlog"}
+# stream storage
+export BK_NUM_STORAGE_CONTAINERS=${BK_NUM_STORAGE_CONTAINERS:-"32"}
+export BK_STREAM_STORAGE_ROOT_PATH=${BK_STREAM_STORAGE_ROOT_PATH:-"/stream"}
 
 echo "Environment Vars for bookie:"
-echo "  BK_bookiePort bookie service port is $BK_bookiePort"
+echo ""
+echo "  [metadata service]"
 echo "  BK_zkServers is $BK_zkServers"
-echo "  BK_DATA_DIR is $BK_DATA_DIR"
 echo "  BK_CLUSTER_ROOT_PATH is $BK_CLUSTER_ROOT_PATH"
 echo "  BK_metadataServiceUri is $BK_metadataServiceUri"
+echo ""
+echo "  [bookie]"
+echo "  BK_bookiePort bookie service port is $BK_bookiePort"
+echo "  BK_DATA_DIR is $BK_DATA_DIR"
+echo "  BK_journalDirectory is ${BK_journalDirectory}"
+echo "  BK_ledgerDirectories are ${BK_ledgerDirectories}"
+echo "  BK_indexDirectories are ${BK_indexDirectories}"
+echo ""
+echo "  [bookie http]"
+echo "  BK_httpServerEnabled is ${BK_httpServerEnabled}"
+echo "  BK_httpServerPort is ${BK_httpServerPort}"
+echo ""
+echo "  [dlog]"
 echo "  BK_dlogRootPath is $BK_dlogRootPath"
+echo ""
+echo "  [stream storage]"
+echo "  BK_STREAM_STORAGE_ROOT_PATH is ${BK_STREAM_STORAGE_ROOT_PATH}"
+echo "  BK_NUM_STORAGE_CONTAINERS is ${BK_NUM_STORAGE_CONTAINERS}"
+echo "  BOOKIE_GRPC_PORT is ${BOOKIE_GRPC_PORT}"
 
 python scripts/apply-config-from-env.py ${BK_HOME}/conf
 
diff --git a/docker/scripts/init_bookie.sh b/docker/scripts/init_bookie.sh
index 09aab1d..07a7fee 100755
--- a/docker/scripts/init_bookie.sh
+++ b/docker/scripts/init_bookie.sh
@@ -28,27 +28,34 @@ function wait_for_zookeeper() {
 }
 
 function create_zk_root() {
-    echo "create the zk root dir for bookkeeper"
-    /opt/bookkeeper/bin/bookkeeper org.apache.zookeeper.ZooKeeperMain -server ${BK_zkServers} create ${BK_CLUSTER_ROOT_PATH}
+    if [ "x${BK_CLUSTER_ROOT_PATH}" != "x" ]; then
+        echo "create the zk root dir for bookkeeper at '${BK_CLUSTER_ROOT_PATH}'"
+        /opt/bookkeeper/bin/bookkeeper org.apache.zookeeper.ZooKeeperMain -server ${BK_zkServers} create ${BK_CLUSTER_ROOT_PATH}
+    fi
 }
 
 # Init the cluster if required znodes not exist in Zookeeper.
 # Use ephemeral zk node as lock to keep initialize atomic.
 function init_cluster() {
-    /opt/bookkeeper/bin/bookkeeper org.apache.zookeeper.ZooKeeperMain -server ${BK_zkServers} stat ${BK_zkLedgersRootPath}/available/readonly
+    if [ "x${BK_STREAM_STORAGE_ROOT_PATH}" == "x" ]; then
+        echo "BK_STREAM_STORAGE_ROOT_PATH is not set. fail fast."
+        exit -1
+    fi
+
+    /opt/bookkeeper/bin/bookkeeper org.apache.zookeeper.ZooKeeperMain -server ${BK_zkServers} stat ${BK_STREAM_STORAGE_ROOT_PATH}
     if [ $? -eq 0 ]; then
-        echo "Metadata of cluster already exists, no need format"
+        echo "Metadata of cluster already exists, no need to init"
     else
         # create ephemeral zk node bkInitLock, initiator who this node, then do init; other initiators will wait.
         /opt/bookkeeper/bin/bookkeeper org.apache.zookeeper.ZooKeeperMain -server ${BK_zkServers} create -e ${BK_CLUSTER_ROOT_PATH}/bkInitLock
         if [ $? -eq 0 ]; then
             # bkInitLock created success, this is the successor to do znode init
-            echo "Bookkeeper znodes not exist in Zookeeper, do the init to create them."
-            /opt/bookkeeper/bin/bookkeeper shell initnewcluster
+            echo "Initializing bookkeeper cluster at service uri ${BK_metadataServiceUri}."
+            /opt/bookkeeper/bin/bkctl --service-uri ${BK_metadataServiceUri} cluster init
             if [ $? -eq 0 ]; then
-                echo "Bookkeeper znodes init success."
+                echo "Successfully initialized bookkeeper cluster at service uri ${BK_metadataServiceUri}."
             else
-                echo "Bookkeeper znodes init failed. please check the reason."
+                echo "Failed to initialize bookkeeper cluster at service uri ${BK_metadataServiceUri}. please check the reason."
                 exit
             fi
         else
@@ -57,7 +64,8 @@ function init_cluster() {
             while [ ${tenSeconds} -lt 10 ]
             do
                 sleep 10
-                /opt/bookkeeper/bin/bookkeeper org.apache.zookeeper.ZooKeeperMain -server ${BK_zkServers} stat ${BK_zkLedgersRootPath}/available/readonly
+                echo "run '/opt/bookkeeper/bin/bookkeeper org.apache.zookeeper.ZooKeeperMain -server ${BK_zkServers} stat ${BK_STREAM_STORAGE_ROOT_PATH}'"
+                /opt/bookkeeper/bin/bookkeeper org.apache.zookeeper.ZooKeeperMain -server ${BK_zkServers} stat ${BK_STREAM_STORAGE_ROOT_PATH}
                 if [ $? -eq 0 ]; then
                     echo "Waited $tenSeconds * 10 seconds, bookkeeper inited"
                     break
@@ -76,50 +84,6 @@ function init_cluster() {
     fi
 }
 
-# Create default dlog namespace
-# Use ephemeral zk node as lock to keep initialize atomic.
-function create_dlog_namespace() {
-    /opt/bookkeeper/bin/bookkeeper org.apache.zookeeper.ZooKeeperMain -server ${BK_zkServers} stat ${BK_dlogRootPath}
-    if [ $? -eq 0 ]; then
-        echo "Dlog namespace already created, no need to create another one"
-    else
-        # create ephemeral zk node dlogInitLock, initiator who this node, then do init; other initiators will wait.
-        /opt/bookkeeper/bin/bookkeeper org.apache.zookeeper.ZooKeeperMain -server ${BK_zkServers} create -e ${BK_CLUSTER_ROOT_PATH}/dlogInitLock
-        if [ $? -eq 0 ]; then
-            # dlogInitLock created success, this is the successor to do znode init
-            echo "Dlog namespace not exist, do the init to create them."
-            /opt/bookkeeper/bin/dlog admin bind -l ${BK_zkLedgersRootPath} -s ${BK_zkServers} -c distributedlog://${BK_zkServers}${BK_dlogRootPath}
-            if [ $? -eq 0 ]; then
-                echo "Dlog namespace is created successfully."
-            else
-                echo "Failed to create dlog namespace ${BK_dlogRootPath}. please check the reason."
-                exit
-            fi
-        else
-            echo "Other docker instance is doing initialize at the same time, will wait in this instance."
-            tenSeconds=1
-            while [ ${tenSeconds} -lt 10 ]
-            do
-                sleep 10
-                /opt/bookkeeper/bin/bookkeeper org.apache.zookeeper.ZooKeeperMain -server ${BK_zkServers} stat ${BK_dlogRootPath}
-                if [ $? -eq 0 ]; then
-                    echo "Waited $tenSeconds * 10 seconds, dlog namespace created"
-                    break
-                else
-                    echo "Waited $tenSeconds * 10 seconds, dlog namespace still not created"
-                    (( tenSeconds++ ))
-                    continue
-                fi
-            done
-
-            if [ ${tenSeconds} -eq 10 ]; then
-                echo "Waited 100 seconds for creating dlog namespace, something wrong, please check"
-                exit
-            fi
-        fi
-    fi
-}
-
 function init_bookie() {
 
     # create dirs if they don't exist
@@ -134,7 +98,4 @@ function init_bookie() {
     # init the cluster
     init_cluster
 
-    # create dlog namespace
-    create_dlog_namespace
-
 }
\ No newline at end of file
diff --git a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/StreamStorageCli.java b/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/StreamStorageCli.java
deleted file mode 100644
index b1ac239..0000000
--- a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/StreamStorageCli.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.bookkeeper.stream.cli;
-
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-import com.google.common.annotations.VisibleForTesting;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-import lombok.AccessLevel;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.clients.config.StorageClientSettings;
-import org.apache.bookkeeper.stream.cli.commands.CmdBase;
-import org.apache.bookkeeper.stream.cli.commands.CmdNamespace;
-import org.apache.bookkeeper.stream.cli.commands.CmdStream;
-import org.apache.bookkeeper.stream.cli.commands.CmdTable;
-
-/**
- * Bookie Shell.
- */
-@Slf4j
-public class StreamStorageCli {
-
-    /**
-     * Make this command map static. This provides a way to plugin different sub commands.
-     */
-    private static final Map<String, Class> commandMap;
-
-    static {
-        commandMap = new TreeMap<>();
-
-        // build the default command map
-        commandMap.put("namespace", CmdNamespace.class);
-        commandMap.put("stream", CmdStream.class);
-        commandMap.put("table", CmdTable.class);
-    }
-
-    static JCommander newJCommander() {
-        return new JCommander();
-    }
-
-    @SuppressWarnings("unchecked")
-    @VisibleForTesting
-    public static Object newCommandInstance(Class cls,
-                                            StorageClientSettings.Builder settingsBuilder)
-            throws Exception {
-        return cls.getConstructor(StorageClientSettings.Builder.class)
-            .newInstance(settingsBuilder);
-    }
-
-    public static void registerSubcommand(String commandName, Class commandClass) {
-        synchronized (commandMap) {
-            commandMap.put(commandName, commandClass);
-        }
-    }
-
-    public static void unregisterSubcommand(String commandName) {
-        synchronized (commandMap) {
-            commandMap.remove(commandName);
-        }
-    }
-
-    @Getter(AccessLevel.PACKAGE)
-    static class ShellArguments {
-
-        @Parameter(names = { "-u", "--server-uri" }, description = "The bookkeeper service uri")
-        private String serviceUri = "bk://localhost:4181";
-
-        @Parameter(names = { "-n", "--namespace" }, description = "Namespace")
-        private String namespace = "default";
-
-        @Parameter(names = { "-h", "--help" }, description = "Show this help message")
-        private boolean help = false;
-
-    }
-
-    @Getter(value = AccessLevel.PACKAGE)
-    private final ShellArguments shellArgs;
-    @Getter(value = AccessLevel.PACKAGE)
-    private final JCommander commander;
-    private final StorageClientSettings.Builder settingsBuilder;
-
-    StreamStorageCli() throws Exception {
-        this.shellArgs = new ShellArguments();
-        this.commander = newJCommander();
-        this.commander.setProgramName("stream-storage-cli");
-        this.commander.addObject(shellArgs);
-
-        this.settingsBuilder = StorageClientSettings.newBuilder()
-            .clientName("stream-storage-cli")
-            .usePlaintext(true);
-    }
-
-    void setupShell() {
-        for (Entry<String, Class> entry : commandMap.entrySet()) {
-            try {
-                Object obj = newCommandInstance(entry.getValue(), settingsBuilder);
-                log.info("Setup command {}", entry.getValue());
-                this.commander.addCommand(
-                    entry.getKey(),
-                    obj);
-            } catch (Exception e) {
-                System.err.println("Fail to load sub command '" + entry.getKey() + "' : " + e.getMessage());
-                e.printStackTrace();
-                System.exit(1);
-            }
-        }
-    }
-
-    boolean run(String[] args) {
-        setupShell();
-        if (args.length == 0) {
-            commander.usage();
-            return false;
-        }
-
-        int cmdPos;
-        for (cmdPos = 0; cmdPos < args.length; cmdPos++) {
-            if (commandMap.containsKey(args[cmdPos])) {
-                break;
-            }
-        }
-
-        try {
-            commander.parse(Arrays.copyOfRange(args, 0, Math.min(cmdPos, args.length)));
-        } catch (Exception e) {
-            System.err.println(e.getMessage());
-            System.err.println();
-            commander.usage();
-            return false;
-        }
-
-        if (shellArgs.help) {
-            commander.usage();
-            return false;
-        }
-
-        if (null == shellArgs.serviceUri) {
-            System.err.println("No endpoint is provided");
-            commander.usage();
-            return false;
-        }
-
-        settingsBuilder.serviceUri(shellArgs.serviceUri);
-
-        log.info("connecting to storage service = {}", shellArgs.serviceUri);
-
-        if (cmdPos == args.length) {
-            commander.usage();
-            return false;
-        } else {
-            String cmd = args[cmdPos];
-            JCommander subCmd = commander.getCommands().get(cmd);
-            CmdBase subCmdObj = (CmdBase) subCmd.getObjects().get(0);
-            String[] subCmdArgs = Arrays.copyOfRange(args, cmdPos + 1, args.length);
-
-            return subCmdObj.run(shellArgs.namespace, subCmdArgs);
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        StreamStorageCli shell = new StreamStorageCli();
-
-        if (shell.run(args)) {
-            System.exit(0);
-        } else {
-            System.exit(1);
-        }
-    }
-
-}
diff --git a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/AdminCommand.java b/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/AdminCommand.java
deleted file mode 100644
index eaf770e..0000000
--- a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/AdminCommand.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.bookkeeper.stream.cli.commands;
-
-import org.apache.bookkeeper.clients.StorageClientBuilder;
-import org.apache.bookkeeper.clients.admin.StorageAdminClient;
-import org.apache.bookkeeper.clients.config.StorageClientSettings;
-
-/**
- * An admin command interface provides a run method to execute admin commands.
- */
-public abstract class AdminCommand implements SubCommand {
-
-    @Override
-    public void run(String namespace, StorageClientSettings settings) throws Exception {
-        try (StorageAdminClient admin = StorageClientBuilder.newBuilder()
-            .withSettings(settings)
-            .buildAdmin()) {
-            run(namespace, admin);
-        }
-    }
-
-    protected abstract void run(String namespace, StorageAdminClient admin) throws Exception;
-}
diff --git a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/CmdBase.java b/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/CmdBase.java
deleted file mode 100644
index b4991b4..0000000
--- a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/CmdBase.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.bookkeeper.stream.cli.commands;
-
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-import lombok.AccessLevel;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.clients.config.StorageClientSettings;
-
-/**
- * The command base for other sub commands to extend.
- */
-@Slf4j
-public abstract class CmdBase {
-
-    // Parameters defined for this command
-
-    @Parameter(names = { "-h", "--help" }, help = true, hidden = true)
-    private boolean help;
-
-    // Parameters defined for this command (end)
-
-    protected final JCommander commander;
-    @Getter(AccessLevel.PUBLIC)
-    protected final StorageClientSettings.Builder settingsBuilder;
-
-    protected CmdBase(String cmdName, StorageClientSettings.Builder settingsBuilder) {
-        this(cmdName, settingsBuilder, new JCommander());
-    }
-
-    protected CmdBase(String cmdName, StorageClientSettings.Builder settingsBuilder, JCommander commander) {
-        this.settingsBuilder = settingsBuilder;
-        this.commander = commander;
-        this.commander.setProgramName("bookie-shell " + cmdName);
-    }
-
-    protected void addSubCommand(SubCommand subCommand) {
-        this.commander.addCommand(subCommand.name(), subCommand);
-    }
-
-    public boolean run(String namespace, String[] args) {
-        try {
-            commander.parse(args);
-        } catch (Exception e) {
-            System.err.println(e.getMessage());
-            System.err.println();
-            commander.usage();
-            return false;
-        }
-
-        String cmd = commander.getParsedCommand();
-        if (null == cmd) {
-            commander.usage();
-            return false;
-        }
-
-        JCommander cmdObj = commander.getCommands().get(cmd);
-        SubCommand subCmd = (SubCommand) cmdObj.getObjects().get(0);
-
-
-        try {
-            subCmd.run(namespace, settingsBuilder.build());
-            return true;
-        } catch (Exception e) {
-            System.err.println("Failed to execute command '" + cmd + "' : " + e.getMessage());
-            e.printStackTrace();
-            System.err.println();
-            commander.usage();
-            return false;
-        }
-
-    }
-}
diff --git a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/CmdStream.java b/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/CmdStream.java
deleted file mode 100644
index 8a0c9b7..0000000
--- a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/CmdStream.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.bookkeeper.stream.cli.commands;
-
-import com.beust.jcommander.Parameters;
-import org.apache.bookkeeper.clients.config.StorageClientSettings;
-import org.apache.bookkeeper.stream.cli.commands.stream.CreateStreamCommand;
-
-/**
- * Commands that operates a single bookie.
- */
-@Parameters(commandDescription = "Commands on operating namespaces")
-public class CmdStream extends CmdBase {
-    public CmdStream(StorageClientSettings.Builder settingsBuilder) {
-        super("stream", settingsBuilder);
-        addSubCommand(new CreateStreamCommand());
-    }
-}
diff --git a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/SubCommand.java b/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/SubCommand.java
deleted file mode 100644
index ff0c3dc..0000000
--- a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/SubCommand.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.bookkeeper.stream.cli.commands;
-
-import org.apache.bookkeeper.clients.config.StorageClientSettings;
-
-/**
- * A basic command interface provides a run method to execute it.
- */
-public interface SubCommand {
-
-    /**
-     * SubCommand name.
-     *
-     * @return command name.
-     */
-    String name();
-
-    /**
-     * Run the command with provided configuration.
-     */
-    void run(String namespace, StorageClientSettings settings) throws Exception;
-
-}
diff --git a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/stream/CreateStreamCommand.java b/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/stream/CreateStreamCommand.java
deleted file mode 100644
index 0fa14a8..0000000
--- a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/stream/CreateStreamCommand.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.bookkeeper.stream.cli.commands.stream;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
-
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.Parameters;
-import org.apache.bookkeeper.clients.admin.StorageAdminClient;
-import org.apache.bookkeeper.stream.cli.commands.AdminCommand;
-import org.apache.bookkeeper.stream.proto.StreamProperties;
-
-/**
- * Command to create a namespace.
- */
-@Parameters(commandDescription = "Create a stream")
-public class CreateStreamCommand extends AdminCommand {
-
-    @Parameter(names = { "-n", "--namespace" }, description = "namespace name")
-    private String namespaceName;
-    @Parameter(names = { "-s", "--stream" }, description = "stream name")
-    private String streamName;
-
-    @Override
-    protected void run(String defaultNamespace, StorageAdminClient admin) throws Exception {
-        checkNotNull(streamName, "Stream name is not provided");
-        System.out.println("Creating stream '" + streamName + "' ...");
-        StreamProperties nsProps = result(
-            admin.createStream(
-                null == namespaceName ? defaultNamespace : namespaceName,
-                streamName,
-                DEFAULT_STREAM_CONF));
-        System.out.println("Successfully created stream '" + streamName + "':");
-        System.out.println(nsProps);
-    }
-
-    @Override
-    public String name() {
-        return "create";
-    }
-}
diff --git a/stream/pom.xml b/stream/pom.xml
index b4a4462..1f8e39c 100644
--- a/stream/pom.xml
+++ b/stream/pom.xml
@@ -38,7 +38,6 @@
     <module>clients</module>
     <module>storage</module>
     <module>server</module>
-    <module>cli</module>
   </modules>
 
   <build>
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterInitializer.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterInitializer.java
index 61e5fa3..5808186 100644
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterInitializer.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterInitializer.java
@@ -41,7 +41,8 @@ public interface ClusterInitializer {
      *
      * @param metadataServiceUri metadata service uri
      * @param numStorageContainers number storage containers
+     * @return true if successfully initialized cluster; otherwise false.
      */
-    void initializeCluster(URI metadataServiceUri, int numStorageContainers);
+    boolean initializeCluster(URI metadataServiceUri, int numStorageContainers);
 
 }
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterMetadataStore.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterMetadataStore.java
index d46f633..c6545c1 100644
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterMetadataStore.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/cluster/ClusterMetadataStore.java
@@ -31,8 +31,8 @@ import org.apache.bookkeeper.stream.proto.cluster.ClusterMetadata;
 public interface ClusterMetadataStore extends AutoCloseable {
 
 
-    default void initializeCluster(int numStorageContainers) {
-        initializeCluster(numStorageContainers, Optional.empty());
+    default boolean initializeCluster(int numStorageContainers) {
+        return initializeCluster(numStorageContainers, Optional.empty());
     }
 
     /**
@@ -40,8 +40,9 @@ public interface ClusterMetadataStore extends AutoCloseable {
      *
      * @param numStorageContainers number of storage containers.
      * @param segmentStorePath segment store path
+     * @return true if successfully initialized cluster, otherwise false.
      */
-    void initializeCluster(int numStorageContainers, Optional<String> segmentStorePath);
+    boolean initializeCluster(int numStorageContainers, Optional<String> segmentStorePath);
 
     /**
      * Get the current cluster assignment data.
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStore.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStore.java
index 4313ce1..bea4b23 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStore.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/InMemClusterMetadataStore.java
@@ -54,12 +54,13 @@ public class InMemClusterMetadataStore implements ClusterMetadataStore {
     }
 
     @Override
-    public synchronized void initializeCluster(int numStorageContainers,
+    public synchronized boolean initializeCluster(int numStorageContainers,
                                                Optional<String> segmentStorePath) {
         this.metadata = ClusterMetadata.newBuilder()
             .setNumStorageContainers(numStorageContainers)
             .build();
         this.assignmentData = ClusterAssignmentData.newBuilder().build();
+        return true;
     }
 
     @Override
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterInitializer.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterInitializer.java
index 832f15d..53e749d 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterInitializer.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterInitializer.java
@@ -51,7 +51,7 @@ public class ZkClusterInitializer implements ClusterInitializer  {
     }
 
     @Override
-    public void initializeCluster(URI metadataServiceUri, int numStorageContainers) {
+    public boolean initializeCluster(URI metadataServiceUri, int numStorageContainers) {
         String zkInternalConnectString = ZKMetadataDriverBase.getZKServersFromServiceUri(metadataServiceUri);
         // 1) `zkExternalConnectString` are the public endpoints, where the tool can interact with.
         //    It allows the tools running outside of the cluster. It is useful for being used in dockerized environment.
@@ -70,6 +70,7 @@ public class ZkClusterInitializer implements ClusterInitializer  {
             try {
                 metadata = store.getClusterMetadata();
                 log.info("Loaded cluster metadata : \n{}", metadata);
+                return false;
             } catch (StorageRuntimeException sre) {
                 if (sre.getCause() instanceof KeeperException.NoNodeException) {
                     log.info("Initializing the stream cluster with {} storage containers with segment store path {}.",
@@ -83,8 +84,9 @@ public class ZkClusterInitializer implements ClusterInitializer  {
                         segmentStorePath = Optional.of(ledgersPath);
                     }
 
-                    store.initializeCluster(numStorageContainers, segmentStorePath);
+                    boolean initialized = store.initializeCluster(numStorageContainers, segmentStorePath);
                     log.info("Successfully initialized the stream cluster : \n{}", store.getClusterMetadata());
+                    return initialized;
                 } else {
                     throw sre;
                 }
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStore.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStore.java
index 59af968..6f76dc5 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStore.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/cluster/ZkClusterMetadataStore.java
@@ -42,6 +42,7 @@ import org.apache.curator.framework.recipes.cache.NodeCache;
 import org.apache.curator.framework.recipes.cache.NodeCacheListener;
 import org.apache.distributedlog.impl.metadata.BKDLConfig;
 import org.apache.distributedlog.metadata.DLMetadata;
+import org.apache.zookeeper.KeeperException;
 
 /**
  * A zookeeper based implementation of cluster metadata store.
@@ -93,7 +94,7 @@ public class ZkClusterMetadataStore implements ClusterMetadataStore {
     }
 
     @Override
-    public void initializeCluster(int numStorageContainers, Optional<String> segmentStorePath) {
+    public boolean initializeCluster(int numStorageContainers, Optional<String> segmentStorePath) {
         ClusterMetadata metadata = ClusterMetadata.newBuilder()
             .setNumStorageContainers(numStorageContainers)
             .build();
@@ -113,7 +114,13 @@ public class ZkClusterMetadataStore implements ClusterMetadataStore {
                     client.transactionOp().create().forPath(getServersPath(zkRootPath)),
                     client.transactionOp().create().forPath(getWritableServersPath(zkRootPath)),
                     client.transactionOp().create().forPath(getStoragePath(zkRootPath), dlogMetadata.serialize()));
+            return true;
         } catch (Exception e) {
+            if (e instanceof KeeperException.NodeExistsException) {
+                // the cluster already exists.
+                log.info("Stream storage cluster is already initialized.");
+                return false;
+            }
             throw new StorageRuntimeException("Failed to initialize storage cluster with "
                 + numStorageContainers + " storage containers", e);
         }
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerLeaderImplTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerLeaderImplTest.java
index 7d1aa31..721668e 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerLeaderImplTest.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/cluster/ClusterControllerLeaderImplTest.java
@@ -83,8 +83,8 @@ public class ClusterControllerLeaderImplTest {
         ClusterMetadataStore originalStore = metadataStore;
         this.metadataStore = new ClusterMetadataStore() {
             @Override
-            public void initializeCluster(int numStorageContainers, Optional<String> segmentStorePath) {
-                originalStore.initializeCluster(numStorageContainers);
+            public boolean initializeCluster(int numStorageContainers, Optional<String> segmentStorePath) {
+                return originalStore.initializeCluster(numStorageContainers);
             }
 
             @Override
diff --git a/tests/docker-images/current-version-image/Dockerfile b/tests/docker-images/current-version-image/Dockerfile
index 892eb35..c174b6a 100644
--- a/tests/docker-images/current-version-image/Dockerfile
+++ b/tests/docker-images/current-version-image/Dockerfile
@@ -25,7 +25,9 @@ ARG DISTRO_NAME=bookkeeper-dist-server-${BK_VERSION}-bin
 ARG PKG_NAME=bookkeeper-server-${BK_VERSION}
 
 ENV BOOKIE_PORT=3181
-EXPOSE $BOOKIE_PORT
+ENV BOOKIE_HTTP_PORT=8080
+ENV BOOKIE_GRPC_PORT=4181
+EXPOSE ${BOOKIE_PORT} ${BOOKIE_HTTP_PORT} ${BOOKIE_GRPC_PORT}
 ENV BK_USER=bookkeeper
 ENV BK_HOME=/opt/bookkeeper
 ENV JAVA_HOME=/usr/lib/jvm/jre-1.8.0
diff --git a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/BkCtlTest.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/BkCtlTest.java
new file mode 100644
index 0000000..0bb11a1
--- /dev/null
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/BkCtlTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.tests.integration.stream;
+
+import static org.junit.Assert.assertTrue;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.tests.containers.BookieContainer;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.testcontainers.containers.Container.ExecResult;
+
+/**
+ * Integration test for `bkctl`.
+ */
+@Slf4j
+public class BkCtlTest extends StreamClusterTestBase {
+
+    @Rule
+    public final TestName testName = new TestName();
+
+    //
+    // `bookies` commands
+    //
+
+    @Test
+    public void listBookies() throws Exception {
+        BookieContainer bookie = bkCluster.getAnyBookie();
+        ExecResult result = bookie.execCmd(
+            BKCTL,
+            "bookies",
+            "list"
+        );
+        String output = result.getStdout();
+        assertTrue(output.contains("ReadWrite Bookies :"));
+    }
+
+    //
+    // `bookie` commands
+    //
+
+    @Test
+    public void showLastMark() throws Exception {
+        BookieContainer bookie = bkCluster.getAnyBookie();
+        ExecResult result = bookie.execCmd(
+            BKCTL,
+            "bookie",
+            "lastmark"
+        );
+        assertTrue(result.getStdout().contains("LastLogMark : Journal"));
+    }
+
+    //
+    // `ledger` commands
+    //
+
+    @Test
+    public void simpleTest() throws Exception {
+        ExecResult result = bkCluster.getAnyBookie().execCmd(
+            BKCTL,
+            "ledger",
+            "simpletest",
+            "--ensemble-size", "3",
+            "--write-quorum-size", "3",
+            "--ack-quorum-size", "2",
+            "--num-entries", "100"
+        );
+        assertTrue(
+            result.getStdout().contains("100 entries written to ledger"));
+    }
+
+    //
+    // `namespace` commands
+    //
+
+    @Test
+    public void createNamespace() throws Exception {
+        String nsName = testName.getMethodName();
+        ExecResult result = bkCluster.getAnyBookie().execCmd(
+            BKCTL,
+            STREAM_URI,
+            "namespace",
+            "create",
+            nsName
+        );
+        assertTrue(
+            result.getStdout().contains("Successfully created namespace '" + nsName + "'"));
+    }
+
+    //
+    // `tables` commands
+    //
+
+    @Test
+    public void createTable() throws Exception {
+        String tableName = testName.getMethodName();
+        ExecResult result = bkCluster.getAnyBookie().execCmd(
+            BKCTL,
+            STREAM_URI,
+            "tables",
+            "create",
+            tableName
+        );
+        assertTrue(
+            result.getStdout().contains("Successfully created stream '" + tableName + "'"));
+    }
+
+    //
+    // `table` commands
+    //
+
+    @Test
+    public void putGetKey() throws Exception {
+        String key = testName.getMethodName() + "-key";
+        String value = testName.getMethodName() + "-value";
+        ExecResult result = bkCluster.getAnyBookie().execCmd(
+            BKCTL,
+            STREAM_URI,
+            "table",
+            "put",
+            TEST_TABLE,
+            key,
+            value
+        );
+        assertTrue(
+            result.getStdout().contains(String.format("Successfully update kv: ('%s', '%s')",
+                key, value
+                )));
+
+        result = bkCluster.getAnyBookie().execCmd(
+            BKCTL,
+            STREAM_URI,
+            "table",
+            "get",
+            TEST_TABLE,
+            key);
+        assertTrue(
+            result.getStdout().contains(String.format("value = %s", value)));
+    }
+
+    @Test
+    public void incGetKey() throws Exception {
+        String key = testName.getMethodName() + "-key";
+        long value = System.currentTimeMillis();
+        ExecResult result = bkCluster.getAnyBookie().execCmd(
+            BKCTL,
+            STREAM_URI,
+            "table",
+            "inc",
+            TEST_TABLE,
+            key,
+            "" + value
+        );
+        assertTrue(
+            result.getStdout().contains(String.format("Successfully increment kv: ('%s', amount = '%s').",
+                key, value
+                )));
+
+        result = bkCluster.getAnyBookie().execCmd(
+            BKCTL,
+            STREAM_URI,
+            "table",
+            "get",
+            TEST_TABLE,
+            key);
+        assertTrue(
+            result.getStdout().contains(String.format("value = %s", value)));
+    }
+
+}
diff --git a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java
index a8a3e6e..bbde846 100644
--- a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java
@@ -41,6 +41,9 @@ import org.junit.BeforeClass;
 public abstract class StreamClusterTestBase extends BookKeeperClusterTestBase {
 
     protected static Random rand = new Random();
+    protected static final String BKCTL = "/opt/bookkeeper/bin/bkctl";
+    protected static final String STREAM_URI = "--service-uri bk://localhost:4181";
+    protected static final String TEST_TABLE = "test-table";
 
     @BeforeClass
     public static void setupCluster() throws Exception {
@@ -54,6 +57,18 @@ public abstract class StreamClusterTestBase extends BookKeeperClusterTestBase {
             .extraServerComponents("org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent")
             .build();
         BookKeeperClusterTestBase.setupCluster(spec);
+        bkCluster.getAnyBookie().execCmd(
+            BKCTL,
+            STREAM_URI,
+            "namespace",
+            "create",
+            "default");
+        bkCluster.getAnyBookie().execCmd(
+            BKCTL,
+            STREAM_URI,
+            "tables",
+            "create",
+            TEST_TABLE);
     }
 
     @AfterClass
diff --git a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java
index 9a7cd86..0d5b311 100644
--- a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java
+++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java
@@ -18,6 +18,7 @@
 
 package org.apache.bookkeeper.tests.integration.topologies;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.base.Strings;
@@ -25,6 +26,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.UncheckedExecutionException;
 import java.net.URI;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -161,6 +163,14 @@ public class BKCluster {
         return bookieContainers.get(bookieName);
     }
 
+    public synchronized BookieContainer getAnyBookie() {
+        List<BookieContainer> bookieList = Lists.newArrayList();
+        bookieList.addAll(bookieContainers.values());
+        Collections.shuffle(bookieList);
+        checkArgument(!bookieList.isEmpty(), "No bookie is alive");
+        return bookieList.get(0);
+    }
+
     public BookieContainer killBookie(String bookieName) {
         BookieContainer container;
         synchronized (this) {
diff --git a/tools/all/pom.xml b/tools/all/pom.xml
index db6d91d..266deeb 100644
--- a/tools/all/pom.xml
+++ b/tools/all/pom.xml
@@ -36,6 +36,11 @@
       <version>${project.parent.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>runtime</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.bookkeeper</groupId>
       <artifactId>buildtools</artifactId>
       <version>${project.parent.version}</version>
@@ -49,4 +54,22 @@
       <scope>test</scope>
     </dependency>
   </dependencies>
+  <profiles>
+    <profile>
+      <id>stream</id>
+      <activation>
+        <property>
+          <name>stream</name>
+        </property>
+      </activation>
+      <dependencies>
+        <!-- stream.storage -->
+        <dependency>
+          <groupId>org.apache.bookkeeper</groupId>
+          <artifactId>stream-storage-cli</artifactId>
+          <version>${project.version}</version>
+        </dependency>
+      </dependencies>
+    </profile>
+  </profiles>
 </project>
diff --git a/tools/all/src/main/resources/META-INF/services/org.apache.bookkeeper.tools.framework.CommandGroup b/tools/all/src/main/resources/META-INF/services/org.apache.bookkeeper.tools.framework.CommandGroup
index 47df897..44fc194 100644
--- a/tools/all/src/main/resources/META-INF/services/org.apache.bookkeeper.tools.framework.CommandGroup
+++ b/tools/all/src/main/resources/META-INF/services/org.apache.bookkeeper.tools.framework.CommandGroup
@@ -19,3 +19,7 @@
 org.apache.bookkeeper.tools.cli.commands.BookieCommandGroup
 org.apache.bookkeeper.tools.cli.commands.BookiesCommandGroup
 org.apache.bookkeeper.tools.cli.commands.LedgerCommandGroup
+org.apache.bookkeeper.stream.cli.ClusterCommandGroup
+org.apache.bookkeeper.stream.cli.NamespaceCommandGroup
+org.apache.bookkeeper.stream.cli.TableAdminCommandGroup
+org.apache.bookkeeper.stream.cli.TableCommandGroup
diff --git a/tools/framework/src/main/java/org/apache/bookkeeper/tools/common/BKCommand.java b/tools/framework/src/main/java/org/apache/bookkeeper/tools/common/BKCommand.java
index 2f2724b..bf591ab 100644
--- a/tools/framework/src/main/java/org/apache/bookkeeper/tools/common/BKCommand.java
+++ b/tools/framework/src/main/java/org/apache/bookkeeper/tools/common/BKCommand.java
@@ -77,7 +77,7 @@ public abstract class BKCommand<CommandFlagsT extends CliFlags> extends CliComma
             }
         }
 
-        return apply(serviceURI, conf, cmdFlags);
+        return apply(serviceURI, conf, bkFlags, cmdFlags);
     }
 
     protected boolean acceptServiceUri(ServiceURI serviceURI) {
@@ -86,6 +86,7 @@ public abstract class BKCommand<CommandFlagsT extends CliFlags> extends CliComma
 
     protected abstract boolean apply(ServiceURI serviceURI,
                                      CompositeConfiguration conf,
+                                     BKFlags globalFlags,
                                      CommandFlagsT cmdFlags);
 
 }
diff --git a/tools/framework/src/main/java/org/apache/bookkeeper/tools/common/BKFlags.java b/tools/framework/src/main/java/org/apache/bookkeeper/tools/common/BKFlags.java
index 890a3df..6b6c392 100644
--- a/tools/framework/src/main/java/org/apache/bookkeeper/tools/common/BKFlags.java
+++ b/tools/framework/src/main/java/org/apache/bookkeeper/tools/common/BKFlags.java
@@ -19,11 +19,13 @@
 package org.apache.bookkeeper.tools.common;
 
 import com.beust.jcommander.Parameter;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import org.apache.bookkeeper.tools.framework.CliFlags;
 
 /**
  * Default BK flags.
  */
+@SuppressFBWarnings("URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD")
 public final class BKFlags extends CliFlags {
 
     @Parameter(
@@ -40,4 +42,11 @@ public final class BKFlags extends CliFlags {
         description = "Configuration file")
     public String configFile = null;
 
+    @Parameter(
+        names = {
+            "-n", "--namespace"
+        },
+        description = "Namespace scope to run commands (only valid for table service for now)")
+    public String namespace = "default";
+
 }
diff --git a/tools/framework/src/main/java/org/apache/bookkeeper/tools/framework/Cli.java b/tools/framework/src/main/java/org/apache/bookkeeper/tools/framework/Cli.java
index 44b210f..83cf9b9 100644
--- a/tools/framework/src/main/java/org/apache/bookkeeper/tools/framework/Cli.java
+++ b/tools/framework/src/main/java/org/apache/bookkeeper/tools/framework/Cli.java
@@ -231,6 +231,7 @@ public class Cli<CliFlagsT extends CliFlags> {
             try {
                 return command.apply(spec.flags(), subCmdArgs);
             } catch (Exception e) {
+                e.printStackTrace(spec.console());
                 usage(e.getMessage());
                 return false;
             }
diff --git a/tools/pom.xml b/tools/pom.xml
index 46b7763..ee48821 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -29,4 +29,19 @@
     <module>framework</module>
     <module>all</module>
   </modules>
+  <profiles>
+  <!-- include stream storage cli only when -Dstream is provided -->
+    <profile>
+      <id>stream</id>
+      <activation>
+        <property>
+          <name>stream</name>
+        </property>
+      </activation>
+      <modules>
+        <!-- enable building stream storage cli -->
+        <module>stream</module>
+      </modules>
+    </profile>
+  </profiles>
 </project>
diff --git a/stream/cli/pom.xml b/tools/stream/pom.xml
similarity index 70%
rename from stream/cli/pom.xml
rename to tools/stream/pom.xml
index b1e3aea..8bfad0e 100644
--- a/stream/cli/pom.xml
+++ b/tools/stream/pom.xml
@@ -21,28 +21,33 @@
   <modelVersion>4.0.0</modelVersion>
   <parent>
     <groupId>org.apache.bookkeeper</groupId>
-    <artifactId>stream-storage-parent</artifactId>
+    <artifactId>bookkeeper-tools-parent</artifactId>
     <version>4.8.0-SNAPSHOT</version>
   </parent>
   <artifactId>stream-storage-cli</artifactId>
-  <name>Apache BookKeeper :: Stream Storage :: CLI</name>
+  <name>Apache BookKeeper :: Tools :: Stream</name>
 
   <dependencies>
     <dependency>
       <groupId>org.apache.bookkeeper</groupId>
       <artifactId>stream-storage-java-client</artifactId>
-      <version>${project.parent.version}</version>
+      <version>${project.version}</version>
     </dependency>
     <dependency>
-      <groupId>com.beust</groupId>
-      <artifactId>jcommander</artifactId>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>stream-storage-service-impl</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>bookkeeper-tools-framework</artifactId>
+      <version>${project.version}</version>
     </dependency>
     <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-log4j12</artifactId>
-      <scope>runtime</scope>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>bookkeeper-server</artifactId>
+      <version>${project.version}</version>
     </dependency>
   </dependencies>
-  <build>
-  </build>
+
 </project>
diff --git a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/package-info.java b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/ClusterCommandGroup.java
similarity index 51%
copy from stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/package-info.java
copy to tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/ClusterCommandGroup.java
index c881763..375bc66 100644
--- a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/package-info.java
+++ b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/ClusterCommandGroup.java
@@ -15,8 +15,29 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.bookkeeper.stream.cli;
+
+import org.apache.bookkeeper.stream.cli.commands.cluster.InitClusterCommand;
+import org.apache.bookkeeper.tools.common.BKFlags;
+import org.apache.bookkeeper.tools.framework.CliCommandGroup;
+import org.apache.bookkeeper.tools.framework.CliSpec;
 
 /**
- * Storage CLI.
+ * Commands that operates stream storage namespaces.
  */
-package org.apache.bookkeeper.stream.cli;
+public class ClusterCommandGroup extends CliCommandGroup<BKFlags> {
+
+    private static final String NAME = "cluster";
+    private static final String DESC = "Commands on administrating bookkeeper clusters";
+
+    private static final CliSpec<BKFlags> spec = CliSpec.<BKFlags>newBuilder()
+        .withName(NAME)
+        .withDescription(DESC)
+        .withParent("bkctl")
+        .addCommand(new InitClusterCommand())
+        .build();
+
+    public ClusterCommandGroup() {
+        super(spec);
+    }
+}
diff --git a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/CmdNamespace.java b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/NamespaceCommandGroup.java
similarity index 54%
rename from stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/CmdNamespace.java
rename to tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/NamespaceCommandGroup.java
index 20f9cf1..be9e062 100644
--- a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/CmdNamespace.java
+++ b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/NamespaceCommandGroup.java
@@ -15,19 +15,29 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.bookkeeper.stream.cli.commands;
+package org.apache.bookkeeper.stream.cli;
 
-import com.beust.jcommander.Parameters;
-import org.apache.bookkeeper.clients.config.StorageClientSettings;
 import org.apache.bookkeeper.stream.cli.commands.namespace.CreateNamespaceCommand;
+import org.apache.bookkeeper.tools.common.BKFlags;
+import org.apache.bookkeeper.tools.framework.CliCommandGroup;
+import org.apache.bookkeeper.tools.framework.CliSpec;
 
 /**
- * Commands that operates a single bookie.
+ * Commands that operates stream storage namespaces.
  */
-@Parameters(commandDescription = "Commands on operating namespaces")
-public class CmdNamespace extends CmdBase {
-    public CmdNamespace(StorageClientSettings.Builder settingsBuilder) {
-        super("namespace", settingsBuilder);
-        addSubCommand(new CreateNamespaceCommand());
+public class NamespaceCommandGroup extends CliCommandGroup<BKFlags> {
+
+    private static final String NAME = "namespace";
+    private static final String DESC = "Commands on operating namespaces";
+
+    private static final CliSpec<BKFlags> spec = CliSpec.<BKFlags>newBuilder()
+        .withName(NAME)
+        .withDescription(DESC)
+        .withParent("bkctl")
+        .addCommand(new CreateNamespaceCommand())
+        .build();
+
+    public NamespaceCommandGroup() {
+        super(spec);
     }
 }
diff --git a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/ClientCommand.java b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/TableAdminCommandGroup.java
similarity index 51%
rename from stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/ClientCommand.java
rename to tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/TableAdminCommandGroup.java
index 38f0160..60b9892 100644
--- a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/ClientCommand.java
+++ b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/TableAdminCommandGroup.java
@@ -15,26 +15,29 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.bookkeeper.stream.cli.commands;
+package org.apache.bookkeeper.stream.cli;
 
-import org.apache.bookkeeper.api.StorageClient;
-import org.apache.bookkeeper.clients.StorageClientBuilder;
-import org.apache.bookkeeper.clients.config.StorageClientSettings;
+import org.apache.bookkeeper.stream.cli.commands.table.CreateTableCommand;
+import org.apache.bookkeeper.tools.common.BKFlags;
+import org.apache.bookkeeper.tools.framework.CliCommandGroup;
+import org.apache.bookkeeper.tools.framework.CliSpec;
 
 /**
- * An admin command interface provides a run method to execute admin commands.
+ * Commands that admin tables.
  */
-public abstract class ClientCommand implements SubCommand {
+public class TableAdminCommandGroup extends CliCommandGroup<BKFlags> {
 
-    @Override
-    public void run(String namespace, StorageClientSettings settings) throws Exception {
-        try (StorageClient client = StorageClientBuilder.newBuilder()
-            .withSettings(settings)
-            .withNamespace(namespace)
-            .build()) {
-            run(client);
-        }
-    }
+    private static final String NAME = "tables";
+    private static final String DESC = "Commands on operating tables";
+
+    private static final CliSpec<BKFlags> spec = CliSpec.<BKFlags>newBuilder()
+        .withName(NAME)
+        .withDescription(DESC)
+        .withParent("bkctl")
+        .addCommand(new CreateTableCommand())
+        .build();
 
-    protected abstract void run(StorageClient client) throws Exception;
+    public TableAdminCommandGroup() {
+        super(spec);
+    }
 }
diff --git a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/CmdTable.java b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/TableCommandGroup.java
similarity index 56%
rename from stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/CmdTable.java
rename to tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/TableCommandGroup.java
index 6b5e084..09d3be6 100644
--- a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/CmdTable.java
+++ b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/TableCommandGroup.java
@@ -15,23 +15,33 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.bookkeeper.stream.cli.commands;
+package org.apache.bookkeeper.stream.cli;
 
-import com.beust.jcommander.Parameters;
-import org.apache.bookkeeper.clients.config.StorageClientSettings.Builder;
 import org.apache.bookkeeper.stream.cli.commands.table.GetCommand;
 import org.apache.bookkeeper.stream.cli.commands.table.IncrementCommand;
 import org.apache.bookkeeper.stream.cli.commands.table.PutCommand;
+import org.apache.bookkeeper.tools.common.BKFlags;
+import org.apache.bookkeeper.tools.framework.CliCommandGroup;
+import org.apache.bookkeeper.tools.framework.CliSpec;
 
 /**
- * Commands that operates a single bookie.
+ * Commands that interact with tables.
  */
-@Parameters(commandDescription = "Commands on operating tables")
-public class CmdTable extends CmdBase {
-    public CmdTable(Builder settingsBuilder) {
-        super("table", settingsBuilder);
-        addSubCommand(new PutCommand());
-        addSubCommand(new GetCommand());
-        addSubCommand(new IncrementCommand());
+public class TableCommandGroup extends CliCommandGroup<BKFlags> {
+
+    private static final String NAME = "table";
+    private static final String DESC = "Commands on interacting with tables";
+
+    private static final CliSpec<BKFlags> spec = CliSpec.<BKFlags>newBuilder()
+        .withName(NAME)
+        .withDescription(DESC)
+        .withParent("bkctl")
+        .addCommand(new PutCommand())
+        .addCommand(new GetCommand())
+        .addCommand(new IncrementCommand())
+        .build();
+
+    public TableCommandGroup() {
+        super(spec);
     }
 }
diff --git a/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/AdminCommand.java b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/AdminCommand.java
new file mode 100644
index 0000000..eb319aa
--- /dev/null
+++ b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/AdminCommand.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.stream.cli.commands;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.clients.StorageClientBuilder;
+import org.apache.bookkeeper.clients.admin.StorageAdminClient;
+import org.apache.bookkeeper.clients.config.StorageClientSettings;
+import org.apache.bookkeeper.common.net.ServiceURI;
+import org.apache.bookkeeper.tools.common.BKCommand;
+import org.apache.bookkeeper.tools.common.BKFlags;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.framework.CliSpec;
+import org.apache.commons.configuration.CompositeConfiguration;
+
+/**
+ * An admin command interface provides a run method to execute admin commands.
+ */
+@Slf4j
+public abstract class AdminCommand<ClientFlagsT extends CliFlags> extends BKCommand<ClientFlagsT> {
+
+    protected AdminCommand(CliSpec<ClientFlagsT> spec) {
+        super(spec);
+    }
+
+    @Override
+    protected boolean acceptServiceUri(ServiceURI serviceURI) {
+        return ServiceURI.SERVICE_BK.equals(serviceURI.getServiceName());
+    }
+
+    @Override
+    protected boolean apply(ServiceURI serviceURI,
+                            CompositeConfiguration conf,
+                            BKFlags bkFlags,
+                            ClientFlagsT cmdFlags) {
+        checkArgument(
+            null != serviceURI,
+            "No service URI is provided");
+
+        StorageClientSettings settings = StorageClientSettings.newBuilder()
+            .clientName("bkctl")
+            .serviceUri(serviceURI.getUri().toString())
+            .build();
+
+        try (StorageAdminClient admin = StorageClientBuilder.newBuilder()
+            .withSettings(settings)
+            .buildAdmin()) {
+            run(admin, bkFlags, cmdFlags);
+            return true;
+        } catch (Exception e) {
+            log.error("Failed to process stream admin command", e);
+            return false;
+        }
+    }
+
+    protected abstract void run(StorageAdminClient admin, BKFlags globalFlags, ClientFlagsT cmdFlags)
+        throws Exception;
+}
diff --git a/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/ClientCommand.java b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/ClientCommand.java
new file mode 100644
index 0000000..03232a3
--- /dev/null
+++ b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/ClientCommand.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.stream.cli.commands;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.api.StorageClient;
+import org.apache.bookkeeper.clients.StorageClientBuilder;
+import org.apache.bookkeeper.clients.config.StorageClientSettings;
+import org.apache.bookkeeper.common.net.ServiceURI;
+import org.apache.bookkeeper.tools.common.BKCommand;
+import org.apache.bookkeeper.tools.common.BKFlags;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.framework.CliSpec;
+import org.apache.commons.configuration.CompositeConfiguration;
+
+/**
+ * An admin command interface provides a run method to execute admin commands.
+ */
+@Slf4j
+public abstract class ClientCommand<ClientFlagsT extends CliFlags> extends BKCommand<ClientFlagsT> {
+
+    protected ClientCommand(CliSpec<ClientFlagsT> spec) {
+        super(spec);
+    }
+
+    @Override
+    protected boolean apply(ServiceURI serviceURI,
+                            CompositeConfiguration conf,
+                            BKFlags globalFlags,
+                            ClientFlagsT cmdFlags) {
+        checkArgument(
+            null != serviceURI,
+            "No service uri is provided");
+
+        StorageClientSettings settings = StorageClientSettings.newBuilder()
+            .clientName("bkctl")
+            .serviceUri(serviceURI.getUri().toString())
+            .build();
+
+        try (StorageClient client = StorageClientBuilder.newBuilder()
+            .withSettings(settings)
+            .withNamespace(globalFlags.namespace)
+            .build()) {
+            run(client, cmdFlags);
+            return true;
+        } catch (Exception e) {
+            log.error("Failed to process commands under namespace '{}'",
+                globalFlags.namespace, e);
+            return false;
+        }
+    }
+
+    protected abstract void run(StorageClient client,
+                                ClientFlagsT cmdFlags) throws Exception;
+}
diff --git a/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/cluster/InitClusterCommand.java b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/cluster/InitClusterCommand.java
new file mode 100644
index 0000000..2337f00
--- /dev/null
+++ b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/cluster/InitClusterCommand.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.stream.cli.commands.cluster;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.beust.jcommander.Parameter;
+import com.google.common.base.Strings;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import java.net.URI;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.net.ServiceURI;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.MetadataDrivers;
+import org.apache.bookkeeper.stream.cli.commands.cluster.InitClusterCommand.Flags;
+import org.apache.bookkeeper.stream.storage.StorageConstants;
+import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterInitializer;
+import org.apache.bookkeeper.tools.common.BKCommand;
+import org.apache.bookkeeper.tools.common.BKFlags;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.framework.CliSpec;
+import org.apache.commons.configuration.CompositeConfiguration;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.BoundedExponentialBackoffRetry;
+import org.apache.distributedlog.exceptions.ZKException;
+import org.apache.distributedlog.impl.metadata.BKDLConfig;
+import org.apache.distributedlog.metadata.DLMetadata;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+
+/**
+ * Command to init a cluster.
+ */
+@Slf4j
+public class InitClusterCommand extends BKCommand<Flags> {
+
+    private static final String NAME = "init";
+    private static final String DESC = "Init a cluster";
+
+    /**
+     * Flags for the init cluster command.
+     */
+    public static class Flags extends CliFlags {
+
+        @Parameter(
+            names = {
+                "-x", "--cluster-name"
+            },
+            description = "cluster name. this would be used as a root path for storing metadata.")
+        private String clusterName = "";
+
+        @Parameter(
+            names = {
+                "-l", "--ledgers-path"
+            },
+            description = "root path to store ledgers' metadata")
+        private String ledgersPath = "/ledgers";
+
+        @Parameter(
+            names = {
+                "-dl", "--dlog-path"
+            },
+            description = "root path to store dlog metadata")
+        private String dlogPath = "/distributedlog";
+
+        @Parameter(
+            names = {
+                "-n", "--num-storage-containers"
+            },
+            description = "num of storage containers allocated for stream storage")
+        private int numStorageContainers = 32;
+
+    }
+
+    public InitClusterCommand() {
+        super(CliSpec.<Flags>newBuilder()
+            .withName(NAME)
+            .withDescription(DESC)
+            .withFlags(new Flags())
+            .build());
+    }
+
+    @Override
+    protected boolean acceptServiceUri(ServiceURI serviceURI) {
+        // only support zookeeper now
+        return ServiceURI.SERVICE_ZK.equals(serviceURI.getServiceName());
+    }
+
+    @Override
+    protected boolean apply(ServiceURI serviceURI,
+                            CompositeConfiguration conf,
+                            BKFlags globalFlags,
+                            Flags cmdFlags) {
+        checkArgument(
+            null != serviceURI,
+            "No service URI is provided");
+
+        if (null != cmdFlags.clusterName) {
+            checkArgument(
+                !cmdFlags.clusterName.contains("/"),
+                "Invalid cluster name : " + cmdFlags.clusterName);
+        }
+
+        checkArgument(
+            !Strings.isNullOrEmpty(cmdFlags.ledgersPath)
+                && cmdFlags.ledgersPath.startsWith("/"),
+            "Invalid ledgers root metadata path : " + cmdFlags.ledgersPath);
+
+        checkArgument(
+            !Strings.isNullOrEmpty(cmdFlags.dlogPath)
+                && cmdFlags.dlogPath.startsWith("/"),
+            "Invalid dlog root metadata path : " + cmdFlags.dlogPath);
+
+        checkArgument(
+            cmdFlags.numStorageContainers > 0,
+            "Zero or negative number of storage containers configured");
+
+        String clusterName = null == cmdFlags.clusterName ? "" : cmdFlags.clusterName;
+        String ledgersPath = getFullyQualifiedPath(clusterName, cmdFlags.ledgersPath);
+        String dlogPath = getFullyQualifiedPath(clusterName, cmdFlags.dlogPath);
+
+        String metadataServiceHosts = StringUtils.join(serviceURI.getServiceHosts(), ",");
+
+        try (CuratorFramework client = CuratorFrameworkFactory.newClient(
+            metadataServiceHosts,
+            new BoundedExponentialBackoffRetry(100, 10000, 20)
+        )) {
+            client.start();
+
+            URI uri = serviceURI.getUri();
+            URI rootUri = new URI(
+                uri.getScheme(),
+                uri.getAuthority(),
+                "",
+                null,
+                null);
+
+            String ledgersUri = rootUri.toString() + ledgersPath;
+            String dlogUri = rootUri.toString() + dlogPath;
+
+            log.info("Initializing cluster {} : \n"
+                + "\tledgers : path = {}, uri = {}\n"
+                + "\tdlog: path = {}, uri = {}\n"
+                + "\tstream storage: path = {}, num_storage_containers = {}",
+                clusterName,
+                ledgersPath, ledgersUri,
+                dlogPath, dlogUri,
+                StorageConstants.ZK_METADATA_ROOT_PATH, cmdFlags.numStorageContainers);
+
+            // create the cluster root path
+            initializeCluster(client, clusterName);
+
+            // init the ledgers metadata
+            initLedgersMetadata(ledgersUri);
+
+            // init the dlog metadata
+            initDlogMetadata(client, metadataServiceHosts, dlogUri, dlogPath, ledgersPath);
+
+            // init the stream storage metadata
+            initStreamStorageMetadata(
+                metadataServiceHosts,
+                ledgersUri,
+                cmdFlags.numStorageContainers);
+            log.info("Successfully initialized cluster {}", clusterName);
+            return true;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void initializeCluster(CuratorFramework client,
+                                   String clusterName) throws Exception {
+        if (Strings.isNullOrEmpty(clusterName)) {
+            return;
+        }
+
+        String path = "/" + clusterName;
+        if (null == client.checkExists().forPath(path)) {
+            try {
+                client.create().forPath(path);
+            } catch (KeeperException.NodeExistsException ne) {
+                // someone created it already
+                return;
+            }
+        }
+    }
+
+    private void initLedgersMetadata(String ledgersUri) throws Exception {
+        MetadataDrivers.runFunctionWithRegistrationManager(
+            new ServerConfiguration().setMetadataServiceUri(ledgersUri),
+            rm -> {
+                try {
+                    if (rm.initNewCluster()) {
+                        log.info("Successfully initialized ledgers metadata at {}", ledgersUri);
+                    }
+                } catch (Exception e) {
+                    throw new UncheckedExecutionException("Failed to init ledgers metadata at " + ledgersUri, e);
+                }
+                return null;
+            }
+        );
+    }
+
+    private void initDlogMetadata(CuratorFramework client,
+                                  String metadataServiceHosts,
+                                  String dlogUri,
+                                  String dlogPath,
+                                  String ledgersPath) throws Exception {
+        BKDLConfig dlogConfig = new BKDLConfig(metadataServiceHosts, ledgersPath);
+        DLMetadata dlogMetadata = DLMetadata.create(dlogConfig);
+
+        if (null == client.checkExists().forPath(dlogPath)) {
+            try {
+                dlogMetadata.create(URI.create(dlogUri));
+            } catch (ZKException zke) {
+                if (Code.NODEEXISTS.intValue() == zke.getCode()) {
+                    // dlog namespace already created
+                    return;
+                }
+            }
+        }
+    }
+
+    private void initStreamStorageMetadata(String metadataServiceHosts,
+                                           String ledgersUri,
+                                           int numStorageContainers) {
+        ZkClusterInitializer initializer = new ZkClusterInitializer(metadataServiceHosts);
+        if (initializer.initializeCluster(URI.create(ledgersUri), numStorageContainers)) {
+            log.info("Successfully initialized stream storage metadata at {}:{}",
+                metadataServiceHosts,
+                StorageConstants.ZK_METADATA_ROOT_PATH);
+        }
+    }
+
+    private static String getFullyQualifiedPath(String clusterName, String path) {
+        if (Strings.isNullOrEmpty(clusterName)) {
+            clusterName = "";
+        } else {
+            clusterName = "/" + clusterName;
+        }
+        return clusterName + path;
+    }
+
+}
diff --git a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/stream/package-info.java b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/cluster/package-info.java
similarity index 87%
rename from stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/stream/package-info.java
rename to tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/cluster/package-info.java
index f262f82..36c523b 100644
--- a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/stream/package-info.java
+++ b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/cluster/package-info.java
@@ -17,6 +17,6 @@
  */
 
 /**
- * This package contains all the stream related commands.
+ * This package contains all the cluster related commands.
  */
-package org.apache.bookkeeper.stream.cli.commands.stream;
+package org.apache.bookkeeper.stream.cli.commands.cluster;
\ No newline at end of file
diff --git a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/namespace/CreateNamespaceCommand.java b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/namespace/CreateNamespaceCommand.java
similarity index 54%
rename from stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/namespace/CreateNamespaceCommand.java
rename to tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/namespace/CreateNamespaceCommand.java
index fa6af6e..6f5e435 100644
--- a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/namespace/CreateNamespaceCommand.java
+++ b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/namespace/CreateNamespaceCommand.java
@@ -17,42 +17,60 @@
  */
 package org.apache.bookkeeper.stream.cli.commands.namespace;
 
-import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
 
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.Parameters;
 import org.apache.bookkeeper.clients.admin.StorageAdminClient;
 import org.apache.bookkeeper.stream.cli.commands.AdminCommand;
+import org.apache.bookkeeper.stream.cli.commands.namespace.CreateNamespaceCommand.Flags;
 import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
 import org.apache.bookkeeper.stream.proto.NamespaceProperties;
+import org.apache.bookkeeper.tools.common.BKFlags;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.framework.CliSpec;
 
 /**
  * Command to create a namespace.
  */
-@Parameters(commandDescription = "Create a namespace")
-public class CreateNamespaceCommand extends AdminCommand {
+public class CreateNamespaceCommand extends AdminCommand<Flags> {
 
-    @Parameter(names = { "-n", "--name" }, description = "namespace name")
-    private String namespaceName;
+    private static final String NAME = "create";
+    private static final String DESC = "Create a namespace";
+
+    /**
+     * Flags for the create namespace command.
+     */
+    public static class Flags extends CliFlags {
+    }
+
+    public CreateNamespaceCommand() {
+        super(CliSpec.<Flags>newBuilder()
+            .withName(NAME)
+            .withDescription(DESC)
+            .withFlags(new Flags())
+            .withArgumentsUsage("<namespace-name>")
+            .build());
+    }
 
     @Override
-    protected void run(String namespace, StorageAdminClient admin) throws Exception {
-        checkNotNull(namespaceName, "Namespace name is not provided");
-        System.out.println("Creating namespace '" + namespaceName + "' ...");
+    protected void run(StorageAdminClient admin,
+                       BKFlags globalFlags,
+                       Flags flags) throws Exception {
+        checkArgument(!flags.arguments.isEmpty(),
+            "Namespace name is not provided");
+
+        String namespaceName = flags.arguments.get(0);
+
+        spec.console().println("Creating namespace '" + namespaceName + "' ...");
         NamespaceProperties nsProps = result(
             admin.createNamespace(
                 namespaceName,
                 NamespaceConfiguration.newBuilder()
                     .setDefaultStreamConf(DEFAULT_STREAM_CONF)
                     .build()));
-        System.out.println("Successfully created namespace '" + namespaceName + "':");
-        System.out.println(nsProps);
+        spec.console().println("Successfully created namespace '" + namespaceName + "':");
+        spec.console().println(nsProps);
     }
 
-    @Override
-    public String name() {
-        return "create";
-    }
 }
diff --git a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/namespace/package-info.java b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/namespace/package-info.java
similarity index 100%
rename from stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/namespace/package-info.java
rename to tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/namespace/package-info.java
diff --git a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/package-info.java b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/package-info.java
similarity index 100%
rename from stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/package-info.java
rename to tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/package-info.java
diff --git a/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/CreateTableCommand.java b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/CreateTableCommand.java
new file mode 100644
index 0000000..784c8c2
--- /dev/null
+++ b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/CreateTableCommand.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.stream.cli.commands.table;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
+
+import org.apache.bookkeeper.clients.admin.StorageAdminClient;
+import org.apache.bookkeeper.stream.cli.commands.AdminCommand;
+import org.apache.bookkeeper.stream.cli.commands.table.CreateTableCommand.Flags;
+import org.apache.bookkeeper.stream.proto.StreamProperties;
+import org.apache.bookkeeper.tools.common.BKFlags;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.framework.CliSpec;
+
+/**
+ * Command to create a namespace.
+ */
+public class CreateTableCommand extends AdminCommand<Flags> {
+
+    private static final String NAME = "create";
+    private static final String DESC = "Create a stream";
+
+    /**
+     * Flags for the create table command.
+     */
+    public static class Flags extends CliFlags {
+    }
+
+    public CreateTableCommand() {
+        super(CliSpec.<Flags>newBuilder()
+            .withName(NAME)
+            .withDescription(DESC)
+            .withFlags(new Flags())
+            .withArgumentsUsage("<stream-name>")
+            .build());
+    }
+
+    @Override
+    protected void run(StorageAdminClient admin,
+                       BKFlags globalFlags,
+                       Flags flags) throws Exception {
+        checkArgument(!flags.arguments.isEmpty(),
+            "Stream name is not provided");
+
+        String streamName = flags.arguments.get(0);
+
+        spec.console().println("Creating stream '" + streamName + "' ...");
+        StreamProperties nsProps = result(
+            admin.createStream(
+                globalFlags.namespace,
+                streamName,
+                DEFAULT_STREAM_CONF));
+        spec.console().println("Successfully created stream '" + streamName + "':");
+        spec.console().println(nsProps);
+    }
+
+}
diff --git a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/GetCommand.java b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/GetCommand.java
similarity index 53%
rename from stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/GetCommand.java
rename to tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/GetCommand.java
index 0b39da5..b7090fc 100644
--- a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/GetCommand.java
+++ b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/GetCommand.java
@@ -17,12 +17,11 @@
  */
 package org.apache.bookkeeper.stream.cli.commands.table;
 
-import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkArgument;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 
 import com.beust.jcommander.Parameter;
-import com.beust.jcommander.Parameters;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
@@ -30,53 +29,69 @@ import org.apache.bookkeeper.api.StorageClient;
 import org.apache.bookkeeper.api.kv.Table;
 import org.apache.bookkeeper.api.kv.result.KeyValue;
 import org.apache.bookkeeper.stream.cli.commands.ClientCommand;
+import org.apache.bookkeeper.stream.cli.commands.table.GetCommand.Flags;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.framework.CliSpec;
 
 /**
  * Command to get kv.
  */
-@Parameters(commandDescription = "Get key/value pair from a table")
-public class GetCommand extends ClientCommand {
+public class GetCommand extends ClientCommand<Flags> {
 
-    @Parameter(names = { "-t", "--table" }, description = "table name")
-    private String tableName = null;
+    private static final String NAME = "get";
+    private static final String DESC = "Get key/value pair from a table";
 
-    @Parameter(names = { "-k", "--key" }, description = "key")
-    private String key = null;
+    /**
+     * Flags for the get kv command.
+     */
+    public static class Flags extends CliFlags {
 
-    @Parameter(names = { "-w", "--watch" }, description = "watch the value changes of a key")
-    private boolean watch = false;
+        @Parameter(names = { "-w", "--watch" }, description = "watch the value changes of a key")
+        private boolean watch = false;
+
+    }
+
+    public GetCommand() {
+        super(CliSpec.<Flags>newBuilder()
+            .withName(NAME)
+            .withDescription(DESC)
+            .withFlags(new Flags())
+            .withArgumentsUsage("<table> <key>")
+            .build());
+    }
 
     @Override
-    protected void run(StorageClient client) throws Exception {
-        checkNotNull(tableName, "Table name is not provided");
-        checkNotNull(key, "Key is not provided");
+    protected void run(StorageClient client, Flags flags) throws Exception {
+        checkArgument(flags.arguments.size() >= 2,
+            "table and key are not provided");
+
+        String tableName = flags.arguments.get(0);
+        String key = flags.arguments.get(1);
 
         try (Table<ByteBuf, ByteBuf> table = result(client.openTable(tableName))) {
             long lastVersion = -1L;
             do {
-                try (KeyValue<ByteBuf, ByteBuf> kv = result(table.getKv(Unpooled.wrappedBuffer(key.getBytes(UTF_8))))) {
+                try (KeyValue<ByteBuf, ByteBuf> kv =
+                         result(table.getKv(Unpooled.wrappedBuffer(key.getBytes(UTF_8))))) {
                     if (null == kv) {
-                        System.out.println("key '" + key + "' doesn't exist.");
+                        spec.console().println("key '" + key + "' doesn't exist.");
                     } else {
                         if (kv.version() > lastVersion) {
                             if (kv.isNumber()) {
-                                System.out.println("value = " + kv.numberValue());
+                                spec.console().println("value = " + kv.numberValue());
                             } else {
-                                System.out.println("value = " + new String(ByteBufUtil.getBytes(kv.value()), UTF_8));
+                                spec.console()
+                                    .println("value = " + new String(ByteBufUtil.getBytes(kv.value()), UTF_8));
                             }
                             lastVersion = kv.version();
                         }
                     }
                 }
-                if (watch) {
+                if (flags.watch) {
                     Thread.sleep(1000);
                 }
-            } while (watch);
+            } while (flags.watch);
         }
     }
 
-    @Override
-    public String name() {
-        return "get";
-    }
 }
diff --git a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/IncrementCommand.java b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/IncrementCommand.java
similarity index 53%
rename from stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/IncrementCommand.java
rename to tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/IncrementCommand.java
index a049828..0e8f2e5 100644
--- a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/IncrementCommand.java
+++ b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/IncrementCommand.java
@@ -17,48 +17,57 @@
  */
 package org.apache.bookkeeper.stream.cli.commands.table;
 
-import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkArgument;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.Parameters;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import org.apache.bookkeeper.api.StorageClient;
 import org.apache.bookkeeper.api.kv.Table;
 import org.apache.bookkeeper.stream.cli.commands.ClientCommand;
+import org.apache.bookkeeper.stream.cli.commands.table.IncrementCommand.Flags;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.framework.CliSpec;
 
 /**
- * Commands to put kvs.
+ * Commands to increment amount of keys.
  */
-@Parameters(commandDescription = "Increment the amount of a key in a table")
-public class IncrementCommand extends ClientCommand {
+public class IncrementCommand extends ClientCommand<Flags> {
 
-    @Parameter(names = { "-t", "--table" }, description = "table name")
-    private String tableName = null;
+    private static final String NAME = "inc";
+    private static final String DESC = "Increment the amount of a key in a table";
 
-    @Parameter(names = { "-k", "--key" }, description = "key")
-    private String key = null;
+    /**
+     * Flags of the increment command.
+     */
+    public static class Flags extends CliFlags {
+    }
 
-    @Parameter(names = { "-a", "--amount" }, description = "amount to increment")
-    private long amount = 0;
+    public IncrementCommand() {
+        super(CliSpec.<Flags>newBuilder()
+            .withName(NAME)
+            .withDescription(DESC)
+            .withFlags(new Flags())
+            .withArgumentsUsage("<table> <key> <amount>")
+            .build());
+    }
 
     @Override
-    protected void run(StorageClient client) throws Exception {
-        checkNotNull(tableName, "Table name is not provided");
-        checkNotNull(key, "Key is not provided");
+    protected void run(StorageClient client, Flags flags) throws Exception {
+        checkArgument(flags.arguments.size() >= 3,
+            "table and key/amount are not provided");
+
+        String tableName = flags.arguments.get(0);
+        String key = flags.arguments.get(1);
+        long amount = Long.parseLong(flags.arguments.get(2));
 
         try (Table<ByteBuf, ByteBuf> table = result(client.openTable(tableName))) {
             result(table.increment(
                 Unpooled.wrappedBuffer(key.getBytes(UTF_8)),
                 amount));
-            System.out.println("Successfully increment kv: ('" + key + "', amount = '" + amount + "').");
+            spec.console().println("Successfully increment kv: ('" + key + "', amount = '" + amount + "').");
         }
     }
 
-    @Override
-    public String name() {
-        return "incr";
-    }
 }
diff --git a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/PutCommand.java b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/PutCommand.java
similarity index 56%
rename from stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/PutCommand.java
rename to tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/PutCommand.java
index b9f537d..5c89c66 100644
--- a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/PutCommand.java
+++ b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/PutCommand.java
@@ -17,49 +17,57 @@
  */
 package org.apache.bookkeeper.stream.cli.commands.table;
 
-import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkArgument;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.Parameters;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import org.apache.bookkeeper.api.StorageClient;
 import org.apache.bookkeeper.api.kv.Table;
 import org.apache.bookkeeper.stream.cli.commands.ClientCommand;
+import org.apache.bookkeeper.stream.cli.commands.table.PutCommand.Flags;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.framework.CliSpec;
 
 /**
  * Commands to put kvs.
  */
-@Parameters(commandDescription = "Put key/value pair to a table")
-public class PutCommand extends ClientCommand {
+public class PutCommand extends ClientCommand<Flags> {
 
-    @Parameter(names = { "-t", "--table" }, description = "table name")
-    private String tableName = null;
+    private static final String NAME = "put";
+    private static final String DESC = "Put key/value pair to a table";
 
-    @Parameter(names = { "-k", "--key" }, description = "key")
-    private String key = null;
+    /**
+     * Flags of the put command.
+     */
+    public static class Flags extends CliFlags {
+    }
 
-    @Parameter(names = { "-v", "--value" }, description = "value")
-    private String value = null;
+    public PutCommand() {
+        super(CliSpec.<Flags>newBuilder()
+            .withName(NAME)
+            .withDescription(DESC)
+            .withFlags(new Flags())
+            .withArgumentsUsage("<table> <key> <value>")
+            .build());
+    }
 
     @Override
-    protected void run(StorageClient client) throws Exception {
-        checkNotNull(tableName, "Table name is not provided");
-        checkNotNull(key, "Key is not provided");
-        checkNotNull(value, "Value is not provided");
+    protected void run(StorageClient client, Flags flags) throws Exception {
+        checkArgument(flags.arguments.size() >= 3,
+            "table and key/value are not provided");
+
+        String tableName = flags.arguments.get(0);
+        String key = flags.arguments.get(1);
+        String value = flags.arguments.get(2);
 
         try (Table<ByteBuf, ByteBuf> table = result(client.openTable(tableName))) {
             result(table.put(
                 Unpooled.wrappedBuffer(key.getBytes(UTF_8)),
                 Unpooled.wrappedBuffer(value.getBytes(UTF_8))));
-            System.out.println("Successfully update kv: ('" + key + "', '" + value + "').");
+            spec.console().println("Successfully update kv: ('" + key + "', '" + value + "').");
         }
     }
 
-    @Override
-    public String name() {
-        return "put";
-    }
 }
diff --git a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/package-info.java b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/package-info.java
similarity index 100%
rename from stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/package-info.java
rename to tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/package-info.java
diff --git a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/package-info.java b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/package-info.java
similarity index 100%
rename from stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/package-info.java
rename to tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/package-info.java

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.