You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/21 17:20:34 UTC

[02/23] incubator-distributedlog git commit: DL-124: Use Java8 Future rather than twitter Future

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-protocol/src/test/java/org/apache/distributedlog/TimedOutTestsListener.java
----------------------------------------------------------------------
diff --git a/distributedlog-protocol/src/test/java/org/apache/distributedlog/TimedOutTestsListener.java b/distributedlog-protocol/src/test/java/org/apache/distributedlog/TimedOutTestsListener.java
deleted file mode 100644
index db0ee4e..0000000
--- a/distributedlog-protocol/src/test/java/org/apache/distributedlog/TimedOutTestsListener.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.lang.management.LockInfo;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MonitorInfo;
-import java.lang.management.ThreadInfo;
-import java.lang.management.ThreadMXBean;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.Map;
-
-import org.apache.commons.lang.StringUtils;
-import org.junit.runner.notification.Failure;
-import org.junit.runner.notification.RunListener;
-
-/**
- * JUnit run listener which prints full thread dump into System.err in case a test is failed due to
- * timeout.
- */
-public class TimedOutTestsListener extends RunListener {
-
-    static final String TEST_TIMED_OUT_PREFIX = "test timed out after";
-
-    private static String indent = "    ";
-
-    private final PrintWriter output;
-
-    public TimedOutTestsListener() {
-        this.output = new PrintWriter(System.err);
-    }
-
-    public TimedOutTestsListener(PrintWriter output) {
-        this.output = output;
-    }
-
-    @Override
-    public void testFailure(Failure failure) throws Exception {
-        if (failure != null && failure.getMessage() != null && failure.getMessage().startsWith(TEST_TIMED_OUT_PREFIX)) {
-            output.println("====> TEST TIMED OUT. PRINTING THREAD DUMP. <====");
-            output.println();
-            output.print(buildThreadDiagnosticString());
-        }
-    }
-
-    public static String buildThreadDiagnosticString() {
-        StringWriter sw = new StringWriter();
-        PrintWriter output = new PrintWriter(sw);
-
-        DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss,SSS");
-        output.println(String.format("Timestamp: %s", dateFormat.format(new Date())));
-        output.println();
-        output.println(buildThreadDump());
-
-        String deadlocksInfo = buildDeadlockInfo();
-        if (deadlocksInfo != null) {
-            output.println("====> DEADLOCKS DETECTED <====");
-            output.println();
-            output.println(deadlocksInfo);
-        }
-
-        return sw.toString();
-    }
-
-    static String buildThreadDump() {
-        StringBuilder dump = new StringBuilder();
-        Map<Thread, StackTraceElement[]> stackTraces = Thread.getAllStackTraces();
-        for (Map.Entry<Thread, StackTraceElement[]> e : stackTraces.entrySet()) {
-            Thread thread = e.getKey();
-            dump.append(String.format("\"%s\" %s prio=%d tid=%d %s\njava.lang.Thread.State: %s", thread.getName(),
-                (thread.isDaemon() ? "daemon" : ""), thread.getPriority(), thread.getId(),
-                Thread.State.WAITING.equals(thread.getState()) ? "in Object.wait()"
-                        : StringUtils.lowerCase(thread.getState().name()),
-                Thread.State.WAITING.equals(thread.getState()) ? "WAITING (on object monitor)" : thread.getState()));
-            for (StackTraceElement stackTraceElement : e.getValue()) {
-                dump.append("\n        at ");
-                dump.append(stackTraceElement);
-            }
-            dump.append("\n");
-        }
-        return dump.toString();
-    }
-
-    static String buildDeadlockInfo() {
-        ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
-        long[] threadIds = threadBean.findMonitorDeadlockedThreads();
-        if (threadIds != null && threadIds.length > 0) {
-            StringWriter stringWriter = new StringWriter();
-            PrintWriter out = new PrintWriter(stringWriter);
-
-            ThreadInfo[] infos = threadBean.getThreadInfo(threadIds, true, true);
-            for (ThreadInfo ti : infos) {
-                printThreadInfo(ti, out);
-                printLockInfo(ti.getLockedSynchronizers(), out);
-                out.println();
-            }
-
-            out.close();
-            return stringWriter.toString();
-        } else {
-            return null;
-        }
-    }
-
-    private static void printThreadInfo(ThreadInfo ti, PrintWriter out) {
-        // print thread information
-        printThread(ti, out);
-
-        // print stack trace with locks
-        StackTraceElement[] stacktrace = ti.getStackTrace();
-        MonitorInfo[] monitors = ti.getLockedMonitors();
-        for (int i = 0; i < stacktrace.length; i++) {
-            StackTraceElement ste = stacktrace[i];
-            out.println(indent + "at " + ste.toString());
-            for (MonitorInfo mi : monitors) {
-                if (mi.getLockedStackDepth() == i) {
-                    out.println(indent + "  - locked " + mi);
-                }
-            }
-        }
-        out.println();
-    }
-
-    private static void printThread(ThreadInfo ti, PrintWriter out) {
-        out.print("\"" + ti.getThreadName() + "\"" + " Id=" + ti.getThreadId() + " in " + ti.getThreadState());
-        if (ti.getLockName() != null) {
-            out.print(" on lock=" + ti.getLockName());
-        }
-        if (ti.isSuspended()) {
-            out.print(" (suspended)");
-        }
-        if (ti.isInNative()) {
-            out.print(" (running in native)");
-        }
-        out.println();
-        if (ti.getLockOwnerName() != null) {
-            out.println(indent + " owned by " + ti.getLockOwnerName() + " Id=" + ti.getLockOwnerId());
-        }
-    }
-
-    private static void printLockInfo(LockInfo[] locks, PrintWriter out) {
-        out.println(indent + "Locked synchronizers: count = " + locks.length);
-        for (LockInfo li : locks) {
-            out.println(indent + "  - " + li);
-        }
-        out.println();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-protocol/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/distributedlog-protocol/src/test/resources/log4j.properties b/distributedlog-protocol/src/test/resources/log4j.properties
new file mode 100644
index 0000000..3e51059
--- /dev/null
+++ b/distributedlog-protocol/src/test/resources/log4j.properties
@@ -0,0 +1,51 @@
+#/**
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements.  See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership.  The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License.  You may obtain a copy of the License at
+# *
+# *     http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+#
+# DisributedLog Logging Configuration
+#
+
+# Example with rolling log file
+log4j.rootLogger=INFO, CONSOLE
+
+#disable zookeeper logging
+log4j.logger.org.apache.zookeeper=OFF
+#Set the bookkeeper level to warning
+log4j.logger.org.apache.bookkeeper=INFO
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.Threshold=INFO
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
+
+# Add ROLLINGFILE to rootLogger to get log file output
+#    Log DEBUG level and above messages to a log file
+#log4j.appender.ROLLINGFILE=org.apache.log4j.DailyRollingFileAppender
+#log4j.appender.ROLLINGFILE.Threshold=INFO
+#log4j.appender.ROLLINGFILE.File=distributedlog.log
+#log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
+#log4j.appender.ROLLINGFILE.DatePattern='.'yyyy-MM-dd-HH-mm
+#log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
+
+log4j.appender.R=org.apache.log4j.RollingFileAppender
+log4j.appender.R.Threshold=TRACE
+log4j.appender.R.File=target/error.log
+log4j.appender.R.MaxFileSize=200MB
+log4j.appender.R.MaxBackupIndex=7
+log4j.appender.R.layout=org.apache.log4j.PatternLayout
+log4j.appender.R.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-client/pom.xml
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-client/pom.xml b/distributedlog-proxy-client/pom.xml
index 7392d90..25ad732 100644
--- a/distributedlog-proxy-client/pom.xml
+++ b/distributedlog-proxy-client/pom.xml
@@ -86,7 +86,7 @@
     </dependency> 
     <dependency>
       <groupId>org.apache.distributedlog</groupId>
-      <artifactId>distributedlog-protocol</artifactId>
+      <artifactId>distributedlog-common</artifactId>
       <version>${project.parent.version}</version>
       <type>test-jar</type>
       <scope>test</scope>
@@ -129,7 +129,7 @@
           <properties>
             <property>
               <name>listener</name>
-              <value>org.apache.distributedlog.TimedOutTestsListener</value>
+              <value>org.apache.distributedlog.common.util.TimedOutTestsListener</value>
             </property>
           </properties>
         </configuration>

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/DistributedLogMultiStreamWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/DistributedLogMultiStreamWriter.java b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/DistributedLogMultiStreamWriter.java
index b3f3368..781005c 100644
--- a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/DistributedLogMultiStreamWriter.java
+++ b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/DistributedLogMultiStreamWriter.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.distributedlog.LogRecord.MAX_LOGRECORDSET_SIZE;
 import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
+import static org.apache.distributedlog.protocol.util.TwitterFutureUtils.newJFuture;
 
 import com.google.common.base.Stopwatch;
 import com.google.common.base.Ticker;
@@ -440,7 +441,7 @@ public class DistributedLogMultiStreamWriter implements Runnable {
         }
         Promise<DLSN> writePromise = new Promise<DLSN>();
         try {
-            recordSetWriter.writeRecord(buffer, writePromise);
+            recordSetWriter.writeRecord(buffer, newJFuture(writePromise));
         } catch (LogRecordTooLongException e) {
             return Future.exception(e);
         } catch (WriteException e) {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-protocol/src/main/java/org/apache/distributedlog/protocol/util/TwitterFutureUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-protocol/src/main/java/org/apache/distributedlog/protocol/util/TwitterFutureUtils.java b/distributedlog-proxy-protocol/src/main/java/org/apache/distributedlog/protocol/util/TwitterFutureUtils.java
new file mode 100644
index 0000000..6ce1fa4
--- /dev/null
+++ b/distributedlog-proxy-protocol/src/main/java/org/apache/distributedlog/protocol/util/TwitterFutureUtils.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.protocol.util;
+
+import com.google.common.collect.Lists;
+import com.twitter.util.Future;
+import com.twitter.util.Promise;
+import com.twitter.util.Return;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
+
+/**
+ * Utils for Twitter's {@link com.twitter.util.Future}.
+ */
+public final class TwitterFutureUtils {
+
+    private TwitterFutureUtils() {}
+
+    public static <T> CompletableFuture<T> newJFuture(Promise<T> promise) {
+        CompletableFuture<T> jFuture = FutureUtils.createFuture();
+        jFuture.whenComplete((value, cause) -> {
+            if (null != cause) {
+                if (cause instanceof CompletionException) {
+                    promise.setException(cause.getCause());
+                } else {
+                    promise.setException(cause);
+                }
+            } else {
+                promise.setValue(value);
+            }
+        });
+        return jFuture;
+    }
+
+    public static <T> Future<T> newTFuture(CompletableFuture<T> jFuture) {
+        Promise<T> promise = new Promise<>();
+        jFuture.whenComplete((value, cause) -> {
+            if (null != cause) {
+                if (cause instanceof CompletionException) {
+                    promise.setException(cause.getCause());
+                } else {
+                    promise.setException(cause);
+                }
+            } else {
+                promise.setValue(value);
+            }
+        });
+        return promise;
+    }
+
+    public static <T> Future<List<Future<T>>> newTFutureList(
+            CompletableFuture<List<CompletableFuture<T>>> jFutureList) {
+        Promise<List<Future<T>>> promise = new Promise<>();
+        jFutureList.whenComplete((value, cause) -> {
+            if (null != cause) {
+                if (cause instanceof CompletionException) {
+                    promise.setException(cause.getCause());
+                } else {
+                    promise.setException(cause);
+                }
+            } else {
+                promise.setValue(Lists.transform(
+                    value,
+                    future -> newTFuture(future)));
+            }
+        });
+        return promise;
+    }
+
+    public static <T> void setValue(Promise<T> promise, T value) {
+        promise.updateIfEmpty(new Return<T>(value));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/pom.xml
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/pom.xml b/distributedlog-proxy-server/pom.xml
index d7cbd56..83b2bef 100644
--- a/distributedlog-proxy-server/pom.xml
+++ b/distributedlog-proxy-server/pom.xml
@@ -135,7 +135,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.distributedlog</groupId>
-      <artifactId>distributedlog-protocol</artifactId>
+      <artifactId>distributedlog-common</artifactId>
       <version>${project.parent.version}</version>
       <type>test-jar</type>
       <scope>test</scope>
@@ -189,7 +189,7 @@
           <properties>
             <property>
               <name>listener</name>
-              <value>org.apache.distributedlog.TimedOutTestsListener</value>
+              <value>org.apache.distributedlog.common.util.TimedOutTestsListener</value>
             </property>
           </properties>
         </configuration>

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServer.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServer.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServer.java
index 81e476b..c904499 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServer.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServer.java
@@ -37,7 +37,7 @@ import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConver
 import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
 import org.apache.distributedlog.thrift.service.DistributedLogService;
 import org.apache.distributedlog.util.ConfUtils;
-import org.apache.distributedlog.util.SchedulerUtils;
+import org.apache.distributedlog.common.util.SchedulerUtils;
 import com.twitter.finagle.Stack;
 import com.twitter.finagle.ThriftMuxServer$;
 import com.twitter.finagle.builder.Server;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServiceImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServiceImpl.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServiceImpl.java
index c37cd53..72f2758 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServiceImpl.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServiceImpl.java
@@ -25,6 +25,7 @@ import com.twitter.common.net.InetSocketAddressHelper;
 import org.apache.distributedlog.DLSN;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.acl.AccessControlManager;
+import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.client.resolver.DefaultRegionResolver;
 import org.apache.distributedlog.client.resolver.RegionResolver;
 import org.apache.distributedlog.client.routing.RoutingService;
@@ -35,10 +36,9 @@ import org.apache.distributedlog.exceptions.ServiceUnavailableException;
 import org.apache.distributedlog.exceptions.StreamUnavailableException;
 import org.apache.distributedlog.exceptions.TooManyStreamsException;
 import org.apache.distributedlog.feature.AbstractFeatureProvider;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
-import org.apache.distributedlog.rate.MovingAverageRate;
-import org.apache.distributedlog.rate.MovingAverageRateFactory;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.distributedlog.common.rate.MovingAverageRate;
+import org.apache.distributedlog.common.rate.MovingAverageRateFactory;
 import org.apache.distributedlog.service.config.ServerConfiguration;
 import org.apache.distributedlog.service.config.StreamConfigProvider;
 import org.apache.distributedlog.service.placement.LeastLoadPlacementPolicy;
@@ -76,7 +76,7 @@ import org.apache.distributedlog.thrift.service.WriteContext;
 import org.apache.distributedlog.thrift.service.WriteResponse;
 import org.apache.distributedlog.util.ConfUtils;
 import org.apache.distributedlog.util.OrderedScheduler;
-import org.apache.distributedlog.util.SchedulerUtils;
+import org.apache.distributedlog.common.util.SchedulerUtils;
 import com.twitter.util.Await;
 import com.twitter.util.Duration;
 import com.twitter.util.Function;
@@ -116,7 +116,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
 
     private final ServerConfiguration serverConfig;
     private final DistributedLogConfiguration dlConfig;
-    private final DistributedLogNamespace dlNamespace;
+    private final Namespace dlNamespace;
     private final int serverRegionId;
     private final PlacementPolicy placementPolicy;
     private ServerStatus serverStatus = ServerStatus.WRITE_AND_ACCEPT;
@@ -199,7 +199,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
         }
 
         // Build the namespace
-        this.dlNamespace = DistributedLogNamespaceBuilder.newBuilder()
+        this.dlNamespace = NamespaceBuilder.newBuilder()
                 .conf(dlConf)
                 .uri(uri)
                 .statsLogger(statsLogger)
@@ -218,8 +218,6 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
         this.scheduler = OrderedScheduler.newBuilder()
                 .corePoolSize(numThreads)
                 .name("DistributedLogService-Executor")
-                .traceTaskExecution(true)
-                .statsLogger(statsLogger.scope("scheduler"))
                 .build();
 
         // Timer, kept separate to ensure reliability of timeouts.
@@ -261,7 +259,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
 
         // Resource limiting
         this.timer = new ScheduledThreadPoolTimer(1, "timer", true);
-        this.movingAvgFactory = new MovingAverageRateFactory(timer);
+        this.movingAvgFactory = new MovingAverageRateFactory(scheduler);
         this.windowedRps = movingAvgFactory.create(MOVING_AVERAGE_WINDOW_SECS);
         this.windowedBps = movingAvgFactory.create(MOVING_AVERAGE_WINDOW_SECS);
         this.limiter = new ServiceRequestLimiter(
@@ -783,7 +781,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
     }
 
     @VisibleForTesting
-    public DistributedLogNamespace getDistributedLogNamespace() {
+    public Namespace getDistributedLogNamespace() {
         return dlNamespace;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorService.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorService.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorService.java
index b1e2879..969c598 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorService.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorService.java
@@ -27,14 +27,14 @@ import com.google.common.hash.Hashing;
 import com.twitter.common.zookeeper.ServerSet;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.DistributedLogConstants;
-import org.apache.distributedlog.DistributedLogManager;
+import org.apache.distributedlog.api.DistributedLogManager;
 import org.apache.distributedlog.LogSegmentMetadata;
 import org.apache.distributedlog.callback.LogSegmentListener;
 import org.apache.distributedlog.callback.NamespaceListener;
 import org.apache.distributedlog.client.monitor.MonitorServiceClient;
 import org.apache.distributedlog.client.serverset.DLZkServerSet;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
 import com.twitter.finagle.builder.ClientBuilder;
 import com.twitter.finagle.stats.Stat;
 import com.twitter.finagle.stats.StatsReceiver;
@@ -71,7 +71,7 @@ public class MonitorService implements NamespaceListener {
 
     private static final Logger logger = LoggerFactory.getLogger(MonitorService.class);
 
-    private DistributedLogNamespace dlNamespace = null;
+    private Namespace dlNamespace = null;
     private MonitorServiceClient dlClient = null;
     private DLZkServerSet[] zkServerSets = null;
     private final ScheduledExecutorService executorService =
@@ -411,7 +411,7 @@ public class MonitorService implements NamespaceListener {
         // stats
         statsProvider.getStatsLogger("monitor").registerGauge("num_streams", numOfStreamsGauge);
         logger.info("Construct dl namespace @ {}", dlUri);
-        dlNamespace = DistributedLogNamespaceBuilder.newBuilder()
+        dlNamespace = NamespaceBuilder.newBuilder()
                 .conf(conf)
                 .uri(dlUri)
                 .build();

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ResponseUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ResponseUtils.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ResponseUtils.java
index 08f4b41..b327867 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ResponseUtils.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ResponseUtils.java
@@ -17,6 +17,7 @@
  */
 package org.apache.distributedlog.service;
 
+import java.util.concurrent.CompletionException;
 import org.apache.distributedlog.exceptions.DLException;
 import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
 import org.apache.distributedlog.thrift.service.BulkWriteResponse;
@@ -53,6 +54,8 @@ public class ResponseUtils {
             }
             response.setCode(StatusCode.findByValue(dle.getCode()));
             response.setErrMsg(dle.getMessage());
+        } else if (t instanceof CompletionException) {
+            return exceptionToHeader(t.getCause());
         } else {
             response.setCode(StatusCode.INTERNAL_SERVER_ERROR);
             response.setErrMsg("Internal server error : " + t.getMessage());

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/DefaultStreamConfigProvider.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/DefaultStreamConfigProvider.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/DefaultStreamConfigProvider.java
index 7d72093..53e16b4 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/DefaultStreamConfigProvider.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/DefaultStreamConfigProvider.java
@@ -20,11 +20,11 @@ package org.apache.distributedlog.service.config;
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.config.ConcurrentConstConfiguration;
-import org.apache.distributedlog.config.ConfigurationSubscription;
+import org.apache.distributedlog.common.config.ConcurrentConstConfiguration;
+import org.apache.distributedlog.common.config.ConfigurationSubscription;
 import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
-import org.apache.distributedlog.config.FileConfigurationBuilder;
-import org.apache.distributedlog.config.PropertiesConfigurationBuilder;
+import org.apache.distributedlog.common.config.FileConfigurationBuilder;
+import org.apache.distributedlog.common.config.PropertiesConfigurationBuilder;
 import java.io.File;
 import java.net.MalformedURLException;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/ServerConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/ServerConfiguration.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/ServerConfiguration.java
index 257b4be..1e62302 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/ServerConfiguration.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/ServerConfiguration.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import org.apache.distributedlog.DLSN;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.DistributedLogConstants;
+import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter;
 import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
 import org.apache.bookkeeper.util.ReflectionUtils;
@@ -160,7 +161,7 @@ public class ServerConfiguration extends CompositeConfiguration {
     }
 
     /**
-     * Set the region id used to instantiate DistributedLogNamespace.
+     * Set the region id used to instantiate Namespace.
      *
      * @param regionId
      *          region id
@@ -172,9 +173,9 @@ public class ServerConfiguration extends CompositeConfiguration {
     }
 
     /**
-     * Get the region id used to instantiate {@link org.apache.distributedlog.namespace.DistributedLogNamespace}.
+     * Get the region id used to instantiate {@link Namespace}.
      *
-     * @return region id used to instantiate DistributedLogNamespace
+     * @return region id used to instantiate Namespace
      */
     public int getRegionId() {
         return getInt(SERVER_REGION_ID, SERVER_REGION_ID_DEFAULT);
@@ -216,7 +217,7 @@ public class ServerConfiguration extends CompositeConfiguration {
     /**
      * Get the shard id of this server.
      *
-     * <p>It would be used to instantiate the client id used for DistributedLogNamespace.
+     * <p>It would be used to instantiate the client id used for Namespace.
      *
      * @return shard id of this server.
      */

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java
index 2e9dd6b..1336ddd 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java
@@ -18,7 +18,7 @@
 package org.apache.distributedlog.service.placement;
 
 import org.apache.distributedlog.client.routing.RoutingService;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
+import org.apache.distributedlog.api.namespace.Namespace;
 import com.twitter.util.Duration;
 import com.twitter.util.Function;
 import com.twitter.util.Future;
@@ -52,7 +52,7 @@ public class LeastLoadPlacementPolicy extends PlacementPolicy {
     private Map<String, String> streamToServer = new HashMap<String, String>();
 
     public LeastLoadPlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService,
-                                    DistributedLogNamespace namespace, PlacementStateManager placementStateManager,
+                                    Namespace namespace, PlacementStateManager placementStateManager,
                                     Duration refreshInterval, StatsLogger statsLogger) {
         super(loadAppraiser, routingService, namespace, placementStateManager, refreshInterval, statsLogger);
         statsLogger.registerGauge("placement/load.diff", new Gauge<Number>() {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/PlacementPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/PlacementPolicy.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/PlacementPolicy.java
index ac952aa..17edc22 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/PlacementPolicy.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/PlacementPolicy.java
@@ -17,8 +17,8 @@
  */
 package org.apache.distributedlog.service.placement;
 
+import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.client.routing.RoutingService;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
 import org.apache.distributedlog.service.DLSocketAddress;
 import com.twitter.util.Duration;
 import com.twitter.util.Function0;
@@ -53,14 +53,14 @@ public abstract class PlacementPolicy {
 
     protected final LoadAppraiser loadAppraiser;
     protected final RoutingService routingService;
-    protected final DistributedLogNamespace namespace;
+    protected final Namespace namespace;
     protected final PlacementStateManager placementStateManager;
     private final Duration refreshInterval;
     protected final OpStatsLogger placementCalcStats;
     private Timer placementRefreshTimer;
 
     public PlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService,
-                           DistributedLogNamespace namespace, PlacementStateManager placementStateManager,
+                           Namespace namespace, PlacementStateManager placementStateManager,
                            Duration refreshInterval, StatsLogger statsLogger) {
         this.loadAppraiser = loadAppraiser;
         this.routingService = routingService;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/ZKPlacementStateManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/ZKPlacementStateManager.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/ZKPlacementStateManager.java
index 862f05a..5dcea73 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/ZKPlacementStateManager.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/ZKPlacementStateManager.java
@@ -63,8 +63,7 @@ public class ZKPlacementStateManager implements PlacementStateManager {
         serverLoadPath = uri.getPath() + SERVER_LOAD_DIR;
     }
 
-    private void createServerLoadPathIfNoExists(byte[] data)
-        throws ZooKeeperClient.ZooKeeperConnectionException, KeeperException, InterruptedException {
+    private void createServerLoadPathIfNoExists(byte[] data) throws KeeperException, IOException {
         try {
             Utils.zkCreateFullPathOptimistic(
                 zkClient, serverLoadPath, data, zkClient.getDefaultACL(), CreateMode.PERSISTENT);
@@ -152,7 +151,7 @@ public class ZKPlacementStateManager implements PlacementStateManager {
                 watching = false;
                 watch(callback);
             }
-        } catch (ZooKeeperClient.ZooKeeperConnectionException | InterruptedException | KeeperException e) {
+        } catch (InterruptedException | IOException | KeeperException e) {
             logger.error("Watch of Ownership failed", e);
             watching = false;
             watch(callback);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java
index 83ac668..7700184 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java
@@ -18,13 +18,13 @@
 package org.apache.distributedlog.service.stream;
 
 import com.google.common.base.Stopwatch;
-import org.apache.distributedlog.AsyncLogWriter;
+import org.apache.distributedlog.api.AsyncLogWriter;
 import org.apache.distributedlog.exceptions.ChecksumFailedException;
 import org.apache.distributedlog.exceptions.DLException;
 import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
 import org.apache.distributedlog.service.ResponseUtils;
 import org.apache.distributedlog.thrift.service.ResponseHeader;
-import org.apache.distributedlog.util.Sequencer;
+import org.apache.distributedlog.common.util.Sequencer;
 import com.twitter.util.Future;
 import com.twitter.util.FutureEventListener;
 import com.twitter.util.Promise;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java
index 6c98468..372703a 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java
@@ -17,7 +17,9 @@
  */
 package org.apache.distributedlog.service.stream;
 
-import org.apache.distributedlog.AsyncLogWriter;
+import static org.apache.distributedlog.protocol.util.TwitterFutureUtils.newTFutureList;
+
+import org.apache.distributedlog.api.AsyncLogWriter;
 import org.apache.distributedlog.DLSN;
 import org.apache.distributedlog.LogRecord;
 import org.apache.distributedlog.acl.AccessControlManager;
@@ -33,7 +35,7 @@ import org.apache.distributedlog.thrift.service.BulkWriteResponse;
 import org.apache.distributedlog.thrift.service.ResponseHeader;
 import org.apache.distributedlog.thrift.service.StatusCode;
 import org.apache.distributedlog.thrift.service.WriteResponse;
-import org.apache.distributedlog.util.Sequencer;
+import org.apache.distributedlog.common.util.Sequencer;
 import com.twitter.util.ConstFuture;
 import com.twitter.util.Future;
 import com.twitter.util.Future$;
@@ -157,7 +159,7 @@ public class BulkWriteOp extends AbstractStreamOp<BulkWriteResponse> implements
         Future<List<Future<DLSN>>> futureList;
         synchronized (txnLock) {
             records = asRecordList(buffers, sequencer);
-            futureList = writer.writeBulk(records);
+            futureList = newTFutureList(writer.writeBulk(records));
         }
 
         // Collect into a list of tries to make it easier to extract exception or DLSN.

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/DeleteOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/DeleteOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/DeleteOp.java
index 3ecb46f..24ce0be 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/DeleteOp.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/DeleteOp.java
@@ -17,13 +17,13 @@
  */
 package org.apache.distributedlog.service.stream;
 
-import org.apache.distributedlog.AsyncLogWriter;
+import org.apache.distributedlog.api.AsyncLogWriter;
 import org.apache.distributedlog.acl.AccessControlManager;
 import org.apache.distributedlog.exceptions.DLException;
 import org.apache.distributedlog.exceptions.RequestDeniedException;
 import org.apache.distributedlog.service.ResponseUtils;
 import org.apache.distributedlog.thrift.service.WriteResponse;
-import org.apache.distributedlog.util.Sequencer;
+import org.apache.distributedlog.common.util.Sequencer;
 import com.twitter.util.Future;
 import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.stats.Counter;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/HeartbeatOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/HeartbeatOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/HeartbeatOp.java
index 0ffa619..c9dec80 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/HeartbeatOp.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/HeartbeatOp.java
@@ -18,8 +18,9 @@
 package org.apache.distributedlog.service.stream;
 
 import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.distributedlog.protocol.util.TwitterFutureUtils.newTFuture;
 
-import org.apache.distributedlog.AsyncLogWriter;
+import org.apache.distributedlog.api.AsyncLogWriter;
 import org.apache.distributedlog.DLSN;
 import org.apache.distributedlog.LogRecord;
 import org.apache.distributedlog.acl.AccessControlManager;
@@ -27,7 +28,7 @@ import org.apache.distributedlog.exceptions.DLException;
 import org.apache.distributedlog.exceptions.RequestDeniedException;
 import org.apache.distributedlog.service.ResponseUtils;
 import org.apache.distributedlog.thrift.service.WriteResponse;
-import org.apache.distributedlog.util.Sequencer;
+import org.apache.distributedlog.common.util.Sequencer;
 import com.twitter.util.Future;
 import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.stats.Counter;
@@ -78,7 +79,7 @@ public class HeartbeatOp extends AbstractWriteOp {
                 txnId = sequencer.nextId();
                 LogRecord hbRecord = new LogRecord(txnId, HEARTBEAT_DATA);
                 hbRecord.setControl();
-                writeResult = writer.write(hbRecord);
+                writeResult = newTFuture(writer.write(hbRecord));
             }
             return writeResult.map(new AbstractFunction1<DLSN, WriteResponse>() {
                 @Override

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/ReleaseOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/ReleaseOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/ReleaseOp.java
index 6ec8642..d657660 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/ReleaseOp.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/ReleaseOp.java
@@ -17,13 +17,13 @@
  */
 package org.apache.distributedlog.service.stream;
 
-import org.apache.distributedlog.AsyncLogWriter;
+import org.apache.distributedlog.api.AsyncLogWriter;
 import org.apache.distributedlog.acl.AccessControlManager;
 import org.apache.distributedlog.exceptions.DLException;
 import org.apache.distributedlog.exceptions.RequestDeniedException;
 import org.apache.distributedlog.service.ResponseUtils;
 import org.apache.distributedlog.thrift.service.WriteResponse;
-import org.apache.distributedlog.util.Sequencer;
+import org.apache.distributedlog.common.util.Sequencer;
 import com.twitter.util.Future;
 import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.stats.Counter;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamFactoryImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamFactoryImpl.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamFactoryImpl.java
index 2b90d55..98362b5 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamFactoryImpl.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamFactoryImpl.java
@@ -18,8 +18,8 @@
 package org.apache.distributedlog.service.stream;
 
 import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
 import org.apache.distributedlog.service.FatalErrorHandler;
 import org.apache.distributedlog.service.config.ServerConfiguration;
 import org.apache.distributedlog.service.config.StreamConfigProvider;
@@ -40,7 +40,7 @@ public class StreamFactoryImpl implements StreamFactory {
     private final FeatureProvider featureProvider;
     private final StreamConfigProvider streamConfigProvider;
     private final StreamPartitionConverter streamPartitionConverter;
-    private final DistributedLogNamespace dlNamespace;
+    private final Namespace dlNamespace;
     private final OrderedScheduler scheduler;
     private final FatalErrorHandler fatalErrorHandler;
     private final HashedWheelTimer requestTimer;
@@ -53,7 +53,7 @@ public class StreamFactoryImpl implements StreamFactory {
         FeatureProvider featureProvider,
         StreamConfigProvider streamConfigProvider,
         StreamPartitionConverter streamPartitionConverter,
-        DistributedLogNamespace dlNamespace,
+        Namespace dlNamespace,
         OrderedScheduler scheduler,
         FatalErrorHandler fatalErrorHandler,
         HashedWheelTimer requestTimer) {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java
index c0c0972..df3d64f 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java
@@ -20,9 +20,10 @@ package org.apache.distributedlog.service.stream;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Stopwatch;
-import org.apache.distributedlog.AsyncLogWriter;
+import java.util.concurrent.CompletableFuture;
+import org.apache.distributedlog.api.AsyncLogWriter;
 import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.DistributedLogManager;
+import org.apache.distributedlog.api.DistributedLogManager;
 import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
 import org.apache.distributedlog.exceptions.AlreadyClosedException;
 import org.apache.distributedlog.exceptions.DLException;
@@ -33,15 +34,16 @@ import org.apache.distributedlog.exceptions.StreamNotReadyException;
 import org.apache.distributedlog.exceptions.StreamUnavailableException;
 import org.apache.distributedlog.exceptions.UnexpectedException;
 import org.apache.distributedlog.io.Abortables;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.protocol.util.TwitterFutureUtils;
 import org.apache.distributedlog.service.FatalErrorHandler;
 import org.apache.distributedlog.service.ServerFeatureKeys;
 import org.apache.distributedlog.service.config.ServerConfiguration;
 import org.apache.distributedlog.service.config.StreamConfigProvider;
 import org.apache.distributedlog.service.stream.limiter.StreamRequestLimiter;
 import org.apache.distributedlog.service.streamset.Partition;
-import org.apache.distributedlog.stats.BroadCastStatsLogger;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.stats.BroadCastStatsLogger;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.TimeSequencer;
 import org.apache.distributedlog.util.Utils;
@@ -50,7 +52,6 @@ import com.twitter.util.Function0;
 import com.twitter.util.Future;
 import com.twitter.util.FutureEventListener;
 import com.twitter.util.Promise;
-import com.twitter.util.TimeoutException;
 import com.twitter.util.Timer;
 import java.io.IOException;
 import java.util.ArrayDeque;
@@ -126,7 +127,7 @@ public class StreamImpl implements Stream {
     private final StreamRequestLimiter limiter;
     private final DynamicDistributedLogConfiguration dynConf;
     private final DistributedLogConfiguration dlConfig;
-    private final DistributedLogNamespace dlNamespace;
+    private final Namespace dlNamespace;
     private final String clientId;
     private final OrderedScheduler scheduler;
     private final ReentrantReadWriteLock closeLock = new ReentrantReadWriteLock();
@@ -169,7 +170,7 @@ public class StreamImpl implements Stream {
                DynamicDistributedLogConfiguration streamConf,
                FeatureProvider featureProvider,
                StreamConfigProvider streamConfigProvider,
-               DistributedLogNamespace dlNamespace,
+               Namespace dlNamespace,
                OrderedScheduler scheduler,
                FatalErrorHandler fatalErrorHandler,
                HashedWheelTimer requestTimer,
@@ -555,8 +556,8 @@ public class StreamImpl implements Stream {
     Future<Boolean> acquireStream() {
         final Stopwatch stopwatch = Stopwatch.createStarted();
         final Promise<Boolean> acquirePromise = new Promise<Boolean>();
-        manager.openAsyncLogWriter().addEventListener(
-            FutureUtils.OrderedFutureEventListener.of(new FutureEventListener<AsyncLogWriter>() {
+        manager.openAsyncLogWriter().whenCompleteAsync(
+            new org.apache.distributedlog.common.concurrent.FutureEventListener<AsyncLogWriter>() {
 
             @Override
             public void onSuccess(AsyncLogWriter w) {
@@ -568,7 +569,7 @@ public class StreamImpl implements Stream {
                 onAcquireStreamFailure(cause, stopwatch, acquirePromise);
             }
 
-        }, scheduler, getStreamName()));
+        }, scheduler.chooseExecutor(getStreamName()));
         return acquirePromise;
     }
 
@@ -662,7 +663,7 @@ public class StreamImpl implements Stream {
             pendingOpsCounter.dec();
         }
         Abortables.asyncAbort(oldWriter, true);
-        FutureUtils.setValue(acquirePromise, success);
+        TwitterFutureUtils.setValue(acquirePromise, success);
     }
 
     //
@@ -802,7 +803,7 @@ public class StreamImpl implements Stream {
                 logger.info("Removed cached stream {}.", getStreamName());
             }
         }
-        FutureUtils.setValue(closePromise, null);
+        TwitterFutureUtils.setValue(closePromise, null);
     }
 
     /**
@@ -825,7 +826,7 @@ public class StreamImpl implements Stream {
         }
         logger.info("Closing stream {} ...", name);
         // Close the writers to release the locks before failing the requests
-        Future<Void> closeWriterFuture;
+        CompletableFuture<Void> closeWriterFuture;
         if (abort) {
             closeWriterFuture = Abortables.asyncAbort(writer, true);
         } else {
@@ -839,25 +840,38 @@ public class StreamImpl implements Stream {
             closeWaitDuration = Duration.fromMilliseconds(writerCloseTimeoutMs);
         }
 
-        FutureUtils.stats(
+        CompletableFuture<Void> maskedFuture = FutureUtils.createFuture();
+        FutureUtils.proxyTo(
+            FutureUtils.stats(
                 closeWriterFuture,
                 writerCloseStatLogger,
                 Stopwatch.createStarted()
-        ).masked().within(futureTimer, closeWaitDuration)
-                .addEventListener(FutureUtils.OrderedFutureEventListener.of(
-                new FutureEventListener<Void>() {
-                    @Override
-                    public void onSuccess(Void value) {
-                        postClose(uncache);
-                    }
-                    @Override
-                    public void onFailure(Throwable cause) {
-                        if (cause instanceof TimeoutException) {
-                            writerCloseTimeoutCounter.inc();
-                        }
-                        postClose(uncache);
+            ),
+            maskedFuture);
+
+        FutureUtils.within(
+            maskedFuture,
+            closeWaitDuration.inMillis(),
+            TimeUnit.MILLISECONDS,
+            new java.util.concurrent.TimeoutException("Timeout on closing"),
+            scheduler,
+            name
+        ).whenCompleteAsync(
+            new org.apache.distributedlog.common.concurrent.FutureEventListener<Void>() {
+                @Override
+                public void onSuccess(Void value) {
+                    postClose(uncache);
+                }
+
+                @Override
+                public void onFailure(Throwable cause) {
+                    if (cause instanceof java.util.concurrent.TimeoutException) {
+                        writerCloseTimeoutCounter.inc();
                     }
-                }, scheduler, name));
+                }
+            },
+            scheduler.chooseExecutor(name)
+        );
         return closePromise;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamManagerImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamManagerImpl.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamManagerImpl.java
index 5d54738..fd57c17 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamManagerImpl.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamManagerImpl.java
@@ -21,11 +21,11 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.RateLimiter;
 import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
 import org.apache.distributedlog.exceptions.ServiceUnavailableException;
 import org.apache.distributedlog.exceptions.StreamUnavailableException;
 import org.apache.distributedlog.exceptions.UnexpectedException;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
 import org.apache.distributedlog.service.config.StreamConfigProvider;
 import org.apache.distributedlog.service.streamset.Partition;
 import org.apache.distributedlog.service.streamset.PartitionMap;
@@ -86,7 +86,7 @@ public class StreamManagerImpl implements StreamManager {
     private final String clientId;
     private boolean closed = false;
     private final StreamFactory streamFactory;
-    private final DistributedLogNamespace dlNamespace;
+    private final Namespace dlNamespace;
 
     public StreamManagerImpl(String clientId,
                              DistributedLogConfiguration dlConfig,
@@ -94,7 +94,7 @@ public class StreamManagerImpl implements StreamManager {
                              StreamFactory streamFactory,
                              StreamPartitionConverter partitionConverter,
                              StreamConfigProvider streamConfigProvider,
-                             DistributedLogNamespace dlNamespace) {
+                             Namespace dlNamespace) {
         this.clientId = clientId;
         this.executorService = executorService;
         this.streamFactory = streamFactory;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOp.java
index d0b8de4..b608e11 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOp.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOp.java
@@ -18,10 +18,10 @@
 package org.apache.distributedlog.service.stream;
 
 import com.google.common.base.Stopwatch;
-import org.apache.distributedlog.AsyncLogWriter;
+import org.apache.distributedlog.api.AsyncLogWriter;
 import org.apache.distributedlog.exceptions.DLException;
 import org.apache.distributedlog.thrift.service.ResponseHeader;
-import org.apache.distributedlog.util.Sequencer;
+import org.apache.distributedlog.common.util.Sequencer;
 import com.twitter.util.Future;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOpStats.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOpStats.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOpStats.java
index f3fc610..feb2c6a 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOpStats.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOpStats.java
@@ -18,7 +18,7 @@
 package org.apache.distributedlog.service.stream;
 
 import org.apache.distributedlog.service.streamset.Partition;
-import org.apache.distributedlog.stats.BroadCastStatsLogger;
+import org.apache.distributedlog.common.stats.BroadCastStatsLogger;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/TruncateOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/TruncateOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/TruncateOp.java
index 0036a5c..5d6dd1c 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/TruncateOp.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/TruncateOp.java
@@ -17,7 +17,9 @@
  */
 package org.apache.distributedlog.service.stream;
 
-import org.apache.distributedlog.AsyncLogWriter;
+import static org.apache.distributedlog.protocol.util.TwitterFutureUtils.newTFuture;
+
+import org.apache.distributedlog.api.AsyncLogWriter;
 import org.apache.distributedlog.DLSN;
 import org.apache.distributedlog.acl.AccessControlManager;
 import org.apache.distributedlog.exceptions.DLException;
@@ -25,14 +27,13 @@ import org.apache.distributedlog.exceptions.RequestDeniedException;
 import org.apache.distributedlog.protocol.util.ProtocolUtils;
 import org.apache.distributedlog.service.ResponseUtils;
 import org.apache.distributedlog.thrift.service.WriteResponse;
-import org.apache.distributedlog.util.Sequencer;
+import org.apache.distributedlog.common.util.Sequencer;
 import com.twitter.util.Future;
 import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
 
 /**
  * Operation to truncate a log stream.
@@ -72,12 +73,7 @@ public class TruncateOp extends AbstractWriteOp {
             logger.error("Truncate: Stream Name Mismatch in the Stream Map {}, {}", stream, writer.getStreamName());
             return Future.exception(new IllegalStateException("The stream mapping is incorrect, fail the request"));
         }
-        return writer.truncate(dlsn).map(new AbstractFunction1<Boolean, WriteResponse>() {
-            @Override
-            public WriteResponse apply(Boolean v1) {
-                return ResponseUtils.writeSuccess();
-            }
-        });
+        return newTFuture(writer.truncate(dlsn).thenApply((value) -> ResponseUtils.writeSuccess()));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java
index 2e7ffb8..0a8a2da 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java
@@ -17,7 +17,9 @@
  */
 package org.apache.distributedlog.service.stream;
 
-import org.apache.distributedlog.AsyncLogWriter;
+import static org.apache.distributedlog.protocol.util.TwitterFutureUtils.newTFuture;
+
+import org.apache.distributedlog.api.AsyncLogWriter;
 import org.apache.distributedlog.DLSN;
 import org.apache.distributedlog.LogRecord;
 import org.apache.distributedlog.acl.AccessControlManager;
@@ -31,7 +33,7 @@ import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
 import org.apache.distributedlog.thrift.service.ResponseHeader;
 import org.apache.distributedlog.thrift.service.StatusCode;
 import org.apache.distributedlog.thrift.service.WriteResponse;
-import org.apache.distributedlog.util.Sequencer;
+import org.apache.distributedlog.common.util.Sequencer;
 import com.twitter.util.Future;
 import com.twitter.util.FutureEventListener;
 import java.nio.ByteBuffer;
@@ -150,7 +152,7 @@ public class WriteOp extends AbstractWriteOp implements WriteOpWithPayload {
             if (isRecordSet) {
                 record.setRecordSet();
             }
-            writeResult = writer.write(record);
+            writeResult = newTFuture(writer.write(record));
         }
         return writeResult.map(new AbstractFunction1<DLSN, WriteResponse>() {
             @Override

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/ServiceRequestLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/ServiceRequestLimiter.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/ServiceRequestLimiter.java
index de805aa..549262d 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/ServiceRequestLimiter.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/ServiceRequestLimiter.java
@@ -22,7 +22,7 @@ import org.apache.distributedlog.exceptions.OverCapacityException;
 import org.apache.distributedlog.limiter.ChainedRequestLimiter;
 import org.apache.distributedlog.limiter.ComposableRequestLimiter.OverlimitFunction;
 import org.apache.distributedlog.limiter.RequestLimiter;
-import org.apache.distributedlog.rate.MovingAverageRate;
+import org.apache.distributedlog.common.rate.MovingAverageRate;
 import org.apache.distributedlog.service.stream.StreamManager;
 import org.apache.distributedlog.service.stream.StreamOp;
 import org.apache.bookkeeper.feature.Feature;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamAcquireLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamAcquireLimiter.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamAcquireLimiter.java
index 7675d6f..2551a5e 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamAcquireLimiter.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamAcquireLimiter.java
@@ -20,7 +20,7 @@ package org.apache.distributedlog.service.stream.limiter;
 import org.apache.distributedlog.exceptions.OverCapacityException;
 import org.apache.distributedlog.exceptions.TooManyStreamsException;
 import org.apache.distributedlog.limiter.RequestLimiter;
-import org.apache.distributedlog.rate.MovingAverageRate;
+import org.apache.distributedlog.common.rate.MovingAverageRate;
 import org.apache.distributedlog.service.stream.StreamManager;
 import org.apache.distributedlog.service.stream.StreamOp;
 import org.apache.bookkeeper.stats.Counter;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerBase.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerBase.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerBase.java
index 4a5dd01..16e36c9 100644
--- a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerBase.java
+++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerBase.java
@@ -26,23 +26,23 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import com.google.common.base.Optional;
-import org.apache.distributedlog.AsyncLogReader;
+import org.apache.distributedlog.api.AsyncLogReader;
 import org.apache.distributedlog.DLMTestUtil;
 import org.apache.distributedlog.DLSN;
-import org.apache.distributedlog.DistributedLogManager;
-import org.apache.distributedlog.LogReader;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
 import org.apache.distributedlog.LogRecord;
 import org.apache.distributedlog.LogRecordWithDLSN;
 import org.apache.distributedlog.TestZooKeeperClientBuilder;
 import org.apache.distributedlog.ZooKeeperClient;
 import org.apache.distributedlog.acl.AccessControlManager;
-import org.apache.distributedlog.annotations.DistributedLogAnnotations;
+import org.apache.distributedlog.common.annotations.DistributedLogAnnotations;
 import org.apache.distributedlog.client.routing.LocalRoutingService;
 import org.apache.distributedlog.exceptions.DLException;
 import org.apache.distributedlog.exceptions.LogNotFoundException;
 import org.apache.distributedlog.impl.acl.ZKAccessControl;
 import org.apache.distributedlog.impl.metadata.BKDLConfig;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
+import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.service.stream.StreamManagerImpl;
 import org.apache.distributedlog.thrift.AccessControlEntry;
 import org.apache.distributedlog.thrift.service.BulkWriteResponse;
@@ -50,7 +50,7 @@ import org.apache.distributedlog.thrift.service.HeartbeatOptions;
 import org.apache.distributedlog.thrift.service.StatusCode;
 import org.apache.distributedlog.thrift.service.WriteContext;
 import org.apache.distributedlog.util.FailpointUtils;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import com.twitter.finagle.builder.ClientBuilder;
 import com.twitter.finagle.thrift.ClientId$;
 import com.twitter.util.Await;
@@ -105,7 +105,7 @@ public abstract class TestDistributedLogServerBase extends DistributedLogServerT
         HeartbeatOptions hbOptions = new HeartbeatOptions();
         hbOptions.setSendHeartBeatToReader(true);
         // make sure the first log segment of each stream created
-        FutureUtils.result(dlClient.dlClient.heartbeat(name));
+        Await.result(dlClient.dlClient.heartbeat(name));
 
         DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri());
         LogReader reader = dlm.getInputStream(1);
@@ -305,7 +305,7 @@ public abstract class TestDistributedLogServerBase extends DistributedLogServerT
         }
 
         validateFailedAsLogRecordTooLong(futures.get(writeCount));
-        FutureUtils.result(Futures.collect(futures.subList(writeCount + 1, 2 * writeCount + 1)));
+        Await.result(Futures.collect(futures.subList(writeCount + 1, 2 * writeCount + 1)));
         assertEquals(writeCount, succeeded);
     }
 
@@ -325,7 +325,7 @@ public abstract class TestDistributedLogServerBase extends DistributedLogServerT
 
         List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes);
         validateFailedAsLogRecordTooLong(futures.get(0));
-        FutureUtils.result(Futures.collect(futures.subList(1, writeCount + 1)));
+        Await.result(Futures.collect(futures.subList(1, writeCount + 1)));
     }
 
     @Test(timeout = 60000)
@@ -601,7 +601,7 @@ public abstract class TestDistributedLogServerBase extends DistributedLogServerT
                 .connectionTimeoutMs(60000)
                 .sessionTimeoutMs(60000)
                 .build();
-        DistributedLogNamespace dlNamespace = dlServer.dlServer.getLeft().getDistributedLogNamespace();
+        Namespace dlNamespace = dlServer.dlServer.getLeft().getDistributedLogNamespace();
         BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(zkc, getUri());
         String zkPath = getUri().getPath() + "/" + bkdlConfig.getACLRootPath() + "/" + name;
         ZKAccessControl accessControl = new ZKAccessControl(ace, zkPath);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java
index 4a2d65f..60f814e 100644
--- a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java
+++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java
@@ -50,7 +50,6 @@ import org.apache.distributedlog.thrift.service.StatusCode;
 import org.apache.distributedlog.thrift.service.WriteContext;
 import org.apache.distributedlog.thrift.service.WriteResponse;
 import org.apache.distributedlog.util.ConfUtils;
-import org.apache.distributedlog.util.FutureUtils;
 import com.twitter.util.Await;
 import com.twitter.util.Future;
 import java.net.URI;
@@ -446,7 +445,8 @@ public class TestDistributedLogService extends TestDistributedLogBase {
             assertTrue("Write should not fail before closing",
                     futureList.get(i).isDefined());
             WriteResponse response = Await.result(futureList.get(i));
-            assertTrue("Op should fail with " + StatusCode.WRITE_CANCELLED_EXCEPTION,
+            assertTrue("Op should fail with " + StatusCode.WRITE_CANCELLED_EXCEPTION
+                    + " but " + response.getHeader().getCode() + " is received.",
                     StatusCode.BK_TRANSMIT_ERROR == response.getHeader().getCode()
                         || StatusCode.WRITE_EXCEPTION == response.getHeader().getCode()
                         || StatusCode.WRITE_CANCELLED_EXCEPTION == response.getHeader().getCode());
@@ -500,7 +500,7 @@ public class TestDistributedLogService extends TestDistributedLogBase {
         }
         assertTrue("Stream " + streamName + " should be cached",
                 streamManager.getCachedStreams().containsKey(streamName));
-        List<WriteResponse> resultList = FutureUtils.result(Future.collect(futureList));
+        List<WriteResponse> resultList = Await.result(Future.collect(futureList));
         for (WriteResponse wr : resultList) {
             assertEquals(DLSN.InvalidDLSN, DLSN.deserialize(wr.getDlsn()));
         }
@@ -689,7 +689,7 @@ public class TestDistributedLogService extends TestDistributedLogBase {
             HeartbeatOptions hbOptions = new HeartbeatOptions();
             hbOptions.setSendHeartBeatToReader(true);
             // make sure the first log segment of each stream created
-            FutureUtils.result(localService.heartbeatWithOptions(streamName, new WriteContext(), hbOptions));
+            Await.result(localService.heartbeatWithOptions(streamName, new WriteContext(), hbOptions));
             for (int j = 0; j < numWrites; j++) {
                 futureList.add(localService.write(streamName, createRecord(i * numWrites + j)));
             }
@@ -741,7 +741,7 @@ public class TestDistributedLogService extends TestDistributedLogBase {
             HeartbeatOptions hbOptions = new HeartbeatOptions();
             hbOptions.setSendHeartBeatToReader(true);
             // make sure the first log segment of each stream created
-            FutureUtils.result(localService.heartbeatWithOptions(streamName, new WriteContext(), hbOptions));
+            Await.result(localService.heartbeatWithOptions(streamName, new WriteContext(), hbOptions));
             for (int j = 0; j < numWrites; j++) {
                 futureList.add(localService.write(streamName, createRecord(i * numWrites + j)));
             }
@@ -803,7 +803,7 @@ public class TestDistributedLogService extends TestDistributedLogBase {
 
         service.startPlacementPolicy();
 
-        WriteResponse response = FutureUtils.result(service.getOwner("stream-1", new WriteContext()));
+        WriteResponse response = Await.result(service.getOwner("stream-1", new WriteContext()));
         assertEquals(StatusCode.FOUND, response.getHeader().getCode());
         assertEquals(service.getServiceAddress().toString(),
                 response.getHeader().getLocation());
@@ -824,7 +824,7 @@ public class TestDistributedLogService extends TestDistributedLogBase {
         assertNull(stream.getLastException());
 
         // the stream is acquired
-        response = FutureUtils.result(service.getOwner("stream-2", new WriteContext()));
+        response = Await.result(service.getOwner("stream-2", new WriteContext()));
         assertEquals(StatusCode.FOUND, response.getHeader().getCode());
         assertEquals(service.getServiceAddress().toString(),
                 response.getHeader().getLocation());

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java
index 5f5ecd4..5b8e3a6 100644
--- a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java
+++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java
@@ -26,8 +26,8 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.client.routing.RoutingService;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
 import com.twitter.util.Await;
 import com.twitter.util.Duration;
 import com.twitter.util.Future;
@@ -55,7 +55,7 @@ public class TestLeastLoadPlacementPolicy {
         int numSevers = new Random().nextInt(20) + 1;
         int numStreams = new Random().nextInt(200) + 1;
         RoutingService mockRoutingService = mock(RoutingService.class);
-        DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class);
+        Namespace mockNamespace = mock(Namespace.class);
         LeastLoadPlacementPolicy leastLoadPlacementPolicy = new LeastLoadPlacementPolicy(
             new EqualLoadAppraiser(),
             mockRoutingService,
@@ -81,7 +81,7 @@ public class TestLeastLoadPlacementPolicy {
         int numStreams = new Random().nextInt(200) + 1;
         RoutingService mockRoutingService = mock(RoutingService.class);
         when(mockRoutingService.getHosts()).thenReturn(generateSocketAddresses(numSevers));
-        DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class);
+        Namespace mockNamespace = mock(Namespace.class);
         try {
             when(mockNamespace.getLogs()).thenReturn(generateStreams(numStreams).iterator());
         } catch (IOException e) {
@@ -112,7 +112,7 @@ public class TestLeastLoadPlacementPolicy {
     /* use AtomicInteger to have a final object in answer method */
         final AtomicInteger maxLoad = new AtomicInteger(Integer.MIN_VALUE);
         RoutingService mockRoutingService = mock(RoutingService.class);
-        DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class);
+        Namespace mockNamespace = mock(Namespace.class);
         LoadAppraiser mockLoadAppraiser = mock(LoadAppraiser.class);
         when(mockLoadAppraiser.getStreamLoad(anyString())).then(new Answer<Future<StreamLoad>>() {
             @Override

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamManager.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamManager.java
index 56e9483..2e87b71 100644
--- a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamManager.java
+++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamManager.java
@@ -26,8 +26,8 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
 import org.apache.distributedlog.service.config.StreamConfigProvider;
 import org.apache.distributedlog.service.streamset.Partition;
 import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
@@ -67,7 +67,7 @@ public class TestStreamManager {
                 mockStreamFactory,
                 mockPartitionConverter,
                 mockStreamConfigProvider,
-                mock(DistributedLogNamespace.class));
+                mock(Namespace.class));
 
         assertFalse(streamManager.isAcquired("stream1"));
         assertEquals(0, streamManager.numAcquired());
@@ -117,7 +117,7 @@ public class TestStreamManager {
             (DynamicDistributedLogConfiguration) any(),
             (StreamManager) any())
         ).thenReturn(mockStream);
-        DistributedLogNamespace dlNamespace = mock(DistributedLogNamespace.class);
+        Namespace dlNamespace = mock(Namespace.class);
         ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
 
         StreamManager streamManager = new StreamManagerImpl(

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamOp.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamOp.java
index a18fda1..dc861a4 100644
--- a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamOp.java
+++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamOp.java
@@ -22,7 +22,7 @@ import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import org.apache.distributedlog.AsyncLogWriter;
+import org.apache.distributedlog.api.AsyncLogWriter;
 import org.apache.distributedlog.DLSN;
 import org.apache.distributedlog.LogRecord;
 import org.apache.distributedlog.acl.DefaultAccessControlManager;
@@ -32,9 +32,9 @@ import org.apache.distributedlog.service.config.ServerConfiguration;
 import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter;
 import org.apache.distributedlog.thrift.service.StatusCode;
 import org.apache.distributedlog.thrift.service.WriteResponse;
-import org.apache.distributedlog.util.Sequencer;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
+import org.apache.distributedlog.common.util.Sequencer;
 import com.twitter.util.Await;
-import com.twitter.util.Future;
 import java.nio.ByteBuffer;
 import org.apache.bookkeeper.feature.SettableFeature;
 import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -79,7 +79,7 @@ public class TestStreamOp {
     @Test(timeout = 60000)
     public void testResponseSucceededThenFailed() throws Exception {
         AsyncLogWriter writer = mock(AsyncLogWriter.class);
-        when(writer.write((LogRecord) any())).thenReturn(Future.value(new DLSN(1, 2, 3)));
+        when(writer.write((LogRecord) any())).thenReturn(FutureUtils.value(new DLSN(1, 2, 3)));
         when(writer.getStreamName()).thenReturn("test");
         WriteOp writeOp = getWriteOp();
         writeOp.execute(writer, new Sequencer() {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/limiter/TestServiceRequestLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/limiter/TestServiceRequestLimiter.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/limiter/TestServiceRequestLimiter.java
index 431bfa4..ccf3188 100644
--- a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/limiter/TestServiceRequestLimiter.java
+++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/limiter/TestServiceRequestLimiter.java
@@ -21,7 +21,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.config.ConcurrentConstConfiguration;
+import org.apache.distributedlog.common.config.ConcurrentConstConfiguration;
 import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
 import org.apache.distributedlog.exceptions.OverCapacityException;
 import org.apache.distributedlog.limiter.ChainedRequestLimiter;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-tutorials/distributedlog-basic/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/distributedlog-tutorials/distributedlog-basic/conf/log4j.properties b/distributedlog-tutorials/distributedlog-basic/conf/log4j.properties
index 56a6417..7aa93f6 100644
--- a/distributedlog-tutorials/distributedlog-basic/conf/log4j.properties
+++ b/distributedlog-tutorials/distributedlog-basic/conf/log4j.properties
@@ -30,11 +30,7 @@ log4j.logger.org.apache.zookeeper=INFO
 log4j.logger.org.apache.bookkeeper=INFO
 
 # redirect executor output to executors.log since slow op warnings can be quite verbose
-log4j.logger.org.apache.distributedlog.util.MonitoredFuturePool=INFO, Executors
-log4j.logger.org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor=INFO, Executors
 log4j.logger.org.apache.bookkeeper.util.SafeRunnable=INFO, Executors
-log4j.additivity.org.apache.distributedlog.util.MonitoredFuturePool=false
-log4j.additivity.org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor=false
 log4j.additivity.org.apache.bookkeeper.util.SafeRunnable=false
 
 log4j.appender.Executors=org.apache.log4j.RollingFileAppender