You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/21 17:20:46 UTC

[14/23] incubator-distributedlog git commit: DL-124: Use Java8 Future rather than twitter Future

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/NamespaceBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/NamespaceBuilder.java b/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/NamespaceBuilder.java
new file mode 100644
index 0000000..45dc021
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/NamespaceBuilder.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.api.namespace;
+
+import com.google.common.base.Preconditions;
+import org.apache.distributedlog.BKDistributedLogNamespace;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.DistributedLogConstants;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.feature.CoreFeatureKeys;
+import org.apache.distributedlog.injector.AsyncFailureInjector;
+import org.apache.distributedlog.injector.AsyncRandomFailureInjector;
+import org.apache.distributedlog.namespace.NamespaceDriver;
+import org.apache.distributedlog.namespace.NamespaceDriverManager;
+import org.apache.distributedlog.util.ConfUtils;
+import org.apache.distributedlog.util.DLUtils;
+import org.apache.distributedlog.util.OrderedScheduler;
+import org.apache.distributedlog.common.util.PermitLimiter;
+import org.apache.distributedlog.util.SimplePermitLimiter;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.feature.SettableFeatureProvider;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * Builder to construct a <code>Namespace</code>.
+ * The builder takes the responsibility of loading backend according to the uri.
+ *
+ * @see Namespace
+ * @since 0.3.32
+ */
+public class NamespaceBuilder {
+
+    private static final Logger logger = LoggerFactory.getLogger(NamespaceBuilder.class);
+
+    public static NamespaceBuilder newBuilder() {
+        return new NamespaceBuilder();
+    }
+
+    private DistributedLogConfiguration _conf = null;
+    private DynamicDistributedLogConfiguration _dynConf = null;
+    private URI _uri = null;
+    private StatsLogger _statsLogger = NullStatsLogger.INSTANCE;
+    private StatsLogger _perLogStatsLogger = NullStatsLogger.INSTANCE;
+    private FeatureProvider _featureProvider = null;
+    private String _clientId = DistributedLogConstants.UNKNOWN_CLIENT_ID;
+    private int _regionId = DistributedLogConstants.LOCAL_REGION_ID;
+
+    // private constructor
+    private NamespaceBuilder() {}
+
+    /**
+     * DistributedLog Configuration used for the namespace.
+     *
+     * @param conf
+     *          distributedlog configuration
+     * @return namespace builder.
+     */
+    public NamespaceBuilder conf(DistributedLogConfiguration conf) {
+        this._conf = conf;
+        return this;
+    }
+
+    /**
+     * Dynamic DistributedLog Configuration used for the namespace
+     *
+     * @param dynConf dynamic distributedlog configuration
+     * @return namespace builder
+     */
+    public NamespaceBuilder dynConf(DynamicDistributedLogConfiguration dynConf) {
+        this._dynConf = dynConf;
+        return this;
+    }
+
+    /**
+     * Namespace Location.
+     *
+     * @param uri
+     *          namespace location uri.
+     * @see Namespace
+     * @return namespace builder.
+     */
+    public NamespaceBuilder uri(URI uri) {
+        this._uri = uri;
+        return this;
+    }
+
+    /**
+     * Stats Logger used for stats collection
+     *
+     * @param statsLogger
+     *          stats logger
+     * @return namespace builder.
+     */
+    public NamespaceBuilder statsLogger(StatsLogger statsLogger) {
+        this._statsLogger = statsLogger;
+        return this;
+    }
+
+    /**
+     * Stats Logger used for collecting per log stats.
+     *
+     * @param statsLogger
+     *          stats logger for collecting per log stats
+     * @return namespace builder.
+     */
+    public NamespaceBuilder perLogStatsLogger(StatsLogger statsLogger) {
+        this._perLogStatsLogger = statsLogger;
+        return this;
+    }
+
+    /**
+     * Feature provider used to control the availabilities of features in the namespace.
+     *
+     * @param featureProvider
+     *          feature provider to control availabilities of features.
+     * @return namespace builder.
+     */
+    public NamespaceBuilder featureProvider(FeatureProvider featureProvider) {
+        this._featureProvider = featureProvider;
+        return this;
+    }
+
+    /**
+     * Client Id used for accessing the namespace
+     *
+     * @param clientId
+     *          client id used for accessing the namespace
+     * @return namespace builder.
+     */
+    public NamespaceBuilder clientId(String clientId) {
+        this._clientId = clientId;
+        return this;
+    }
+
+    /**
+     * Region Id used for encoding logs in the namespace. The region id
+     * is useful when the namespace is globally spanning over regions.
+     *
+     * @param regionId
+     *          region id.
+     * @return namespace builder.
+     */
+    public NamespaceBuilder regionId(int regionId) {
+        this._regionId = regionId;
+        return this;
+    }
+
+    @SuppressWarnings("deprecation")
+    private static StatsLogger normalizePerLogStatsLogger(StatsLogger statsLogger,
+                                                          StatsLogger perLogStatsLogger,
+                                                          DistributedLogConfiguration conf) {
+        StatsLogger normalizedPerLogStatsLogger = perLogStatsLogger;
+        if (perLogStatsLogger == NullStatsLogger.INSTANCE &&
+                conf.getEnablePerStreamStat()) {
+            normalizedPerLogStatsLogger = statsLogger.scope("stream");
+        }
+        return normalizedPerLogStatsLogger;
+    }
+
+    /**
+     * Build the namespace.
+     *
+     * @return the namespace instance.
+     * @throws IllegalArgumentException when there is illegal argument provided in the builder
+     * @throws NullPointerException when there is null argument provided in the builder
+     * @throws IOException when fail to build the backend
+     */
+    public Namespace build()
+            throws IllegalArgumentException, NullPointerException, IOException {
+        // Check arguments
+        Preconditions.checkNotNull(_conf, "No DistributedLog Configuration.");
+        Preconditions.checkNotNull(_uri, "No DistributedLog URI");
+
+        // validate the configuration
+        _conf.validate();
+        if (null == _dynConf) {
+            _dynConf = ConfUtils.getConstDynConf(_conf);
+        }
+
+        // retrieve the namespace driver
+        NamespaceDriver driver = NamespaceDriverManager.getDriver(_uri);
+        URI normalizedUri = DLUtils.normalizeURI(_uri);
+
+        // build the feature provider
+        FeatureProvider featureProvider;
+        if (null == _featureProvider) {
+            featureProvider = new SettableFeatureProvider("", 0);
+            logger.info("No feature provider is set. All features are disabled now.");
+        } else {
+            featureProvider = _featureProvider;
+        }
+
+        // build the failure injector
+        AsyncFailureInjector failureInjector = AsyncRandomFailureInjector.newBuilder()
+                .injectDelays(_conf.getEIInjectReadAheadDelay(),
+                              _conf.getEIInjectReadAheadDelayPercent(),
+                              _conf.getEIInjectMaxReadAheadDelayMs())
+                .injectErrors(false, 10)
+                .injectStops(_conf.getEIInjectReadAheadStall(), 10)
+                .injectCorruption(_conf.getEIInjectReadAheadBrokenEntries())
+                .build();
+
+        // normalize the per log stats logger
+        StatsLogger perLogStatsLogger = normalizePerLogStatsLogger(_statsLogger, _perLogStatsLogger, _conf);
+
+        // build the scheduler
+        OrderedScheduler scheduler = OrderedScheduler.newBuilder()
+                .name("DLM-" + normalizedUri.getPath())
+                .corePoolSize(_conf.getNumWorkerThreads())
+                .build();
+
+        // initialize the namespace driver
+        driver.initialize(
+                _conf,
+                _dynConf,
+                normalizedUri,
+                scheduler,
+                featureProvider,
+                failureInjector,
+                _statsLogger,
+                perLogStatsLogger,
+                DLUtils.normalizeClientId(_clientId),
+                _regionId);
+
+        // initialize the write limiter
+        PermitLimiter writeLimiter;
+        if (_conf.getGlobalOutstandingWriteLimit() < 0) {
+            writeLimiter = PermitLimiter.NULL_PERMIT_LIMITER;
+        } else {
+            Feature disableWriteLimitFeature = featureProvider.getFeature(
+                CoreFeatureKeys.DISABLE_WRITE_LIMIT.name().toLowerCase());
+            writeLimiter = new SimplePermitLimiter(
+                _conf.getOutstandingWriteLimitDarkmode(),
+                _conf.getGlobalOutstandingWriteLimit(),
+                _statsLogger.scope("writeLimiter"),
+                true /* singleton */,
+                disableWriteLimitFeature);
+        }
+
+        return new BKDistributedLogNamespace(
+                _conf,
+                normalizedUri,
+                driver,
+                scheduler,
+                featureProvider,
+                writeLimiter,
+                failureInjector,
+                _statsLogger,
+                perLogStatsLogger,
+                DLUtils.normalizeClientId(_clientId),
+                _regionId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/package-info.java b/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/package-info.java
new file mode 100644
index 0000000..fa8f288
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Namespace API and the builder to build namespace instance.
+ */
+package org.apache.distributedlog.api.namespace;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/api/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/api/package-info.java b/distributedlog-core/src/main/java/org/apache/distributedlog/api/package-info.java
new file mode 100644
index 0000000..eca11fd
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/api/package-info.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * DistributedLog NEW API.
+ *
+ * <p>This is the new Java8 {@link java.util.concurrent.CompletableFuture} based API. It is
+ * <strong>experimental</strong> and still under developing.
+ */
+package org.apache.distributedlog.api;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/api/subscription/SubscriptionStateStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/api/subscription/SubscriptionStateStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/api/subscription/SubscriptionStateStore.java
new file mode 100644
index 0000000..bf4a8d3
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/api/subscription/SubscriptionStateStore.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.api.subscription;
+
+import java.io.Closeable;
+import java.util.concurrent.CompletableFuture;
+import org.apache.distributedlog.DLSN;
+
+public interface SubscriptionStateStore extends Closeable {
+    /**
+     * Get the last committed position stored for this subscription
+     *
+     * @return future represents the last commit position
+     */
+    public CompletableFuture<DLSN> getLastCommitPosition();
+
+    /**
+     * Advances the position associated with the subscriber
+     *
+     * @param newPosition - new commit position
+     * @return future represents the advance result
+     */
+    public CompletableFuture<Void> advanceCommitPosition(DLSN newPosition);
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/api/subscription/SubscriptionsStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/api/subscription/SubscriptionsStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/api/subscription/SubscriptionsStore.java
new file mode 100644
index 0000000..b6a0ed1
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/api/subscription/SubscriptionsStore.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.api.subscription;
+
+import java.io.Closeable;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.distributedlog.DLSN;
+
+/**
+ * Store to manage subscriptions
+ */
+public interface SubscriptionsStore extends Closeable {
+
+    /**
+     * Get the last committed position stored for <i>subscriberId</i>.
+     *
+     * @param subscriberId
+     *          subscriber id
+     * @return future representing last committed position.
+     */
+    public CompletableFuture<DLSN> getLastCommitPosition(String subscriberId);
+
+    /**
+     * Get the last committed positions for all subscribers.
+     *
+     * @return future representing last committed positions for all subscribers.
+     */
+    public CompletableFuture<Map<String, DLSN>> getLastCommitPositions();
+
+    /**
+     * Advance the last committed position for <i>subscriberId</i>.
+     *
+     * @param subscriberId
+     *          subscriber id.
+     * @param newPosition
+     *          new committed position.
+     * @return future representing advancing result.
+     */
+    public CompletableFuture<Void> advanceCommitPosition(String subscriberId, DLSN newPosition);
+
+    /**
+     * Delete the subscriber <i>subscriberId</i> permanently. Once the subscriber is deleted, all the
+     * data stored under this subscriber will be lost.
+     * @param subscriberId subscriber id
+     * @return future represent success or failure.
+     * return true only if there's such subscriber and we removed it successfully.
+     * return false if there's no such subscriber, or we failed to remove.
+     */
+    public CompletableFuture<Boolean> deleteSubscriber(String subscriberId);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/auditor/DLAuditor.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/auditor/DLAuditor.java b/distributedlog-core/src/main/java/org/apache/distributedlog/auditor/DLAuditor.java
index 56a4f2e..34011b5 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/auditor/DLAuditor.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/auditor/DLAuditor.java
@@ -20,20 +20,20 @@ package org.apache.distributedlog.auditor;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.SettableFuture;
+import java.util.concurrent.CompletableFuture;
 import org.apache.distributedlog.BookKeeperClient;
 import org.apache.distributedlog.BookKeeperClientBuilder;
 import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.DistributedLogManager;
+import org.apache.distributedlog.api.DistributedLogManager;
 import org.apache.distributedlog.LogSegmentMetadata;
+import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.impl.BKNamespaceDriver;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
 import org.apache.distributedlog.ZooKeeperClient;
 import org.apache.distributedlog.ZooKeeperClientBuilder;
 import org.apache.distributedlog.exceptions.DLInterruptedException;
 import org.apache.distributedlog.exceptions.ZKException;
 import org.apache.distributedlog.impl.metadata.BKDLConfig;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
 import org.apache.distributedlog.namespace.NamespaceDriver;
 import org.apache.distributedlog.util.DLUtils;
 import org.apache.bookkeeper.client.BKException;
@@ -45,6 +45,7 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
 import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
 import org.apache.bookkeeper.zookeeper.RetryPolicy;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
@@ -84,13 +85,13 @@ public class DLAuditor {
         this.conf = conf;
     }
 
-    private ZooKeeperClient getZooKeeperClient(DistributedLogNamespace namespace) {
+    private ZooKeeperClient getZooKeeperClient(Namespace namespace) {
         NamespaceDriver driver = namespace.getNamespaceDriver();
         assert(driver instanceof BKNamespaceDriver);
         return ((BKNamespaceDriver) driver).getWriterZKC();
     }
 
-    private BookKeeperClient getBookKeeperClient(DistributedLogNamespace namespace) {
+    private BookKeeperClient getBookKeeperClient(Namespace namespace) {
         NamespaceDriver driver = namespace.getNamespaceDriver();
         assert(driver instanceof BKNamespaceDriver);
         return ((BKNamespaceDriver) driver).getReaderBKC();
@@ -169,7 +170,7 @@ public class DLAuditor {
         LedgerManager lm = BookKeeperAccessor.getLedgerManager(bkc.get());
 
         final Set<Long> ledgers = new HashSet<Long>();
-        final SettableFuture<Void> doneFuture = SettableFuture.create();
+        final CompletableFuture<Void> doneFuture = FutureUtils.createFuture();
 
         BookkeeperInternalCallbacks.Processor<Long> collector =
                 new BookkeeperInternalCallbacks.Processor<Long>() {
@@ -195,9 +196,9 @@ public class DLAuditor {
             @Override
             public void processResult(int rc, String path, Object ctx) {
                 if (BKException.Code.OK == rc) {
-                    doneFuture.set(null);
+                    doneFuture.complete(null);
                 } else {
-                    doneFuture.setException(BKException.create(rc));
+                    doneFuture.completeExceptionally(BKException.create(rc));
                 }
             }
         };
@@ -225,12 +226,12 @@ public class DLAuditor {
     private Set<Long> collectLedgersFromDL(List<URI> uris, List<List<String>> allocationPaths)
             throws IOException {
         final Set<Long> ledgers = new TreeSet<Long>();
-        List<DistributedLogNamespace> namespaces =
-                new ArrayList<DistributedLogNamespace>(uris.size());
+        List<Namespace> namespaces =
+                new ArrayList<Namespace>(uris.size());
         try {
             for (URI uri : uris) {
                 namespaces.add(
-                        DistributedLogNamespaceBuilder.newBuilder()
+                        NamespaceBuilder.newBuilder()
                                 .conf(conf)
                                 .uri(uri)
                                 .build());
@@ -240,8 +241,8 @@ public class DLAuditor {
             ExecutorService executor = Executors.newFixedThreadPool(uris.size());
             try {
                 int i = 0;
-                for (final DistributedLogNamespace namespace : namespaces) {
-                    final DistributedLogNamespace dlNamespace = namespace;
+                for (final Namespace namespace : namespaces) {
+                    final Namespace dlNamespace = namespace;
                     final URI uri = uris.get(i);
                     final List<String> aps = allocationPaths.get(i);
                     i++;
@@ -278,7 +279,7 @@ public class DLAuditor {
                 executor.shutdown();
             }
         } finally {
-            for (DistributedLogNamespace namespace : namespaces) {
+            for (Namespace namespace : namespaces) {
                 namespace.close();
             }
         }
@@ -286,7 +287,7 @@ public class DLAuditor {
     }
 
     private void collectLedgersFromAllocator(final URI uri,
-                                             final DistributedLogNamespace namespace,
+                                             final Namespace namespace,
                                              final List<String> allocationPaths,
                                              final Set<Long> ledgers) throws IOException {
         final LinkedBlockingQueue<String> poolQueue =
@@ -346,7 +347,7 @@ public class DLAuditor {
     }
 
     private void collectLedgersFromDL(final URI uri,
-                                      final DistributedLogNamespace namespace,
+                                      final Namespace namespace,
                                       final Set<Long> ledgers) throws IOException {
         logger.info("Enumerating {} to collect streams.", uri);
         Iterator<String> streams = namespace.getLogs();
@@ -366,7 +367,7 @@ public class DLAuditor {
         });
     }
 
-    private List<Long> collectLedgersFromStream(DistributedLogNamespace namespace,
+    private List<Long> collectLedgersFromStream(Namespace namespace,
                                                 String stream,
                                                 Set<Long> ledgers)
             throws IOException {
@@ -394,7 +395,7 @@ public class DLAuditor {
      */
     public Map<String, Long> calculateStreamSpaceUsage(final URI uri) throws IOException {
         logger.info("Collecting stream space usage for {}.", uri);
-        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+        Namespace namespace = NamespaceBuilder.newBuilder()
                 .conf(conf)
                 .uri(uri)
                 .build();
@@ -406,7 +407,7 @@ public class DLAuditor {
     }
 
     private Map<String, Long> calculateStreamSpaceUsage(
-            final URI uri, final DistributedLogNamespace namespace)
+            final URI uri, final Namespace namespace)
         throws IOException {
         Iterator<String> streams = namespace.getLogs();
         final LinkedBlockingQueue<String> streamQueue = new LinkedBlockingQueue<String>();
@@ -432,7 +433,7 @@ public class DLAuditor {
         return streamSpaceUsageMap;
     }
 
-    private long calculateStreamSpaceUsage(final DistributedLogNamespace namespace,
+    private long calculateStreamSpaceUsage(final Namespace namespace,
                                            final String stream) throws IOException {
         DistributedLogManager dlm = namespace.openLog(stream);
         long totalBytes = 0;
@@ -504,7 +505,7 @@ public class DLAuditor {
 
         LedgerManager lm = BookKeeperAccessor.getLedgerManager(bkc.get());
 
-        final SettableFuture<Void> doneFuture = SettableFuture.create();
+        final CompletableFuture<Void> doneFuture = FutureUtils.createFuture();
         final BookKeeper bk = bkc.get();
 
         BookkeeperInternalCallbacks.Processor<Long> collector =
@@ -544,9 +545,9 @@ public class DLAuditor {
             @Override
             public void processResult(int rc, String path, Object ctx) {
                 if (BKException.Code.OK == rc) {
-                    doneFuture.set(null);
+                    doneFuture.complete(null);
                 } else {
-                    doneFuture.setException(BKException.create(rc));
+                    doneFuture.completeExceptionally(BKException.create(rc));
                 }
             }
         };

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/bk/LedgerAllocatorDelegator.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/bk/LedgerAllocatorDelegator.java b/distributedlog-core/src/main/java/org/apache/distributedlog/bk/LedgerAllocatorDelegator.java
index 6ea248b..ee33dc3 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/bk/LedgerAllocatorDelegator.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/bk/LedgerAllocatorDelegator.java
@@ -17,9 +17,10 @@
  */
 package org.apache.distributedlog.bk;
 
+import java.util.concurrent.CompletableFuture;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.util.Transaction;
 import org.apache.distributedlog.util.Transaction.OpListener;
-import com.twitter.util.Future;
 import org.apache.bookkeeper.client.LedgerHandle;
 
 import java.io.IOException;
@@ -57,8 +58,8 @@ public class LedgerAllocatorDelegator implements LedgerAllocator {
     }
 
     @Override
-    public Future<Void> delete() {
-        return Future.exception(new UnsupportedOperationException("Can't delete an allocator by delegator"));
+    public CompletableFuture<Void> delete() {
+        return FutureUtils.exception(new UnsupportedOperationException("Can't delete an allocator by delegator"));
     }
 
     @Override
@@ -67,17 +68,17 @@ public class LedgerAllocatorDelegator implements LedgerAllocator {
     }
 
     @Override
-    public Future<LedgerHandle> tryObtain(Transaction<Object> txn,
+    public CompletableFuture<LedgerHandle> tryObtain(Transaction<Object> txn,
                                           OpListener<LedgerHandle> listener) {
         return this.allocator.tryObtain(txn, listener);
     }
 
     @Override
-    public Future<Void> asyncClose() {
+    public CompletableFuture<Void> asyncClose() {
         if (ownAllocator) {
             return this.allocator.asyncClose();
         } else {
-            return Future.value(null);
+            return FutureUtils.value(null);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/bk/LedgerAllocatorPool.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/bk/LedgerAllocatorPool.java b/distributedlog-core/src/main/java/org/apache/distributedlog/bk/LedgerAllocatorPool.java
index 4fff2f6..19c5546 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/bk/LedgerAllocatorPool.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/bk/LedgerAllocatorPool.java
@@ -19,17 +19,15 @@ package org.apache.distributedlog.bk;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
+import java.util.concurrent.CompletableFuture;
 import org.apache.distributedlog.BookKeeperClient;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.ZooKeeperClient;
 import org.apache.distributedlog.exceptions.DLInterruptedException;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.util.Transaction;
 import org.apache.distributedlog.util.Utils;
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.meta.ZkVersion;
 import org.apache.bookkeeper.util.ZkUtils;
@@ -40,7 +38,6 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -320,25 +317,25 @@ public class LedgerAllocatorPool implements LedgerAllocator {
     }
 
     @Override
-    public Future<LedgerHandle> tryObtain(final Transaction<Object> txn,
-                                          final Transaction.OpListener<LedgerHandle> listener) {
+    public CompletableFuture<LedgerHandle> tryObtain(final Transaction<Object> txn,
+                                                     final Transaction.OpListener<LedgerHandle> listener) {
         final SimpleLedgerAllocator allocator;
         synchronized (this) {
             if (allocatingList.isEmpty()) {
-                return Future.exception(new IOException("No ledger allocator available under " + poolPath + "."));
+                return FutureUtils.exception(new IOException("No ledger allocator available under " + poolPath + "."));
             } else {
                 allocator = allocatingList.removeFirst();
             }
         }
 
-        final Promise<LedgerHandle> tryObtainPromise = new Promise<LedgerHandle>();
+        final CompletableFuture<LedgerHandle> tryObtainPromise = new CompletableFuture<LedgerHandle>();
         final FutureEventListener<LedgerHandle> tryObtainListener = new FutureEventListener<LedgerHandle>() {
             @Override
             public void onSuccess(LedgerHandle lh) {
                 synchronized (LedgerAllocatorPool.this) {
                     obtainMap.put(lh, allocator);
                     reverseObtainMap.put(allocator, lh);
-                    tryObtainPromise.setValue(lh);
+                    tryObtainPromise.complete(lh);
                 }
             }
 
@@ -349,7 +346,7 @@ public class LedgerAllocatorPool implements LedgerAllocator {
                 } catch (IOException ioe) {
                     logger.info("Failed to rescue allocator {}", allocator.allocatePath, ioe);
                 }
-                tryObtainPromise.setException(cause);
+                tryObtainPromise.completeExceptionally(cause);
             }
         };
 
@@ -365,7 +362,7 @@ public class LedgerAllocatorPool implements LedgerAllocator {
                 abortObtain(allocator);
                 listener.onAbort(t);
             }
-        }).addEventListener(tryObtainListener);
+        }).whenComplete(tryObtainListener);
         return tryObtainPromise;
     }
 
@@ -399,7 +396,7 @@ public class LedgerAllocatorPool implements LedgerAllocator {
     }
 
     @Override
-    public Future<Void> asyncClose() {
+    public CompletableFuture<Void> asyncClose() {
         List<LedgerAllocator> allocatorsToClose;
         synchronized (this) {
             allocatorsToClose = Lists.newArrayListWithExpectedSize(
@@ -414,21 +411,15 @@ public class LedgerAllocatorPool implements LedgerAllocator {
                 allocatorsToClose.add(allocator);
             }
         }
-        return FutureUtils.processList(allocatorsToClose, new Function<LedgerAllocator, Future<Void>>() {
-            @Override
-            public Future<Void> apply(LedgerAllocator allocator) {
-                return allocator.asyncClose();
-            }
-        }, scheduledExecutorService).map(new AbstractFunction1<List<Void>, Void>() {
-            @Override
-            public Void apply(List<Void> values) {
-                return null;
-            }
-        });
+        return FutureUtils.processList(
+            allocatorsToClose,
+            allocator -> allocator.asyncClose(),
+            scheduledExecutorService
+        ).thenApply(values -> null);
     }
 
     @Override
-    public Future<Void> delete() {
+    public CompletableFuture<Void> delete() {
         List<LedgerAllocator> allocatorsToDelete;
         synchronized (this) {
             allocatorsToDelete = Lists.newArrayListWithExpectedSize(
@@ -443,16 +434,10 @@ public class LedgerAllocatorPool implements LedgerAllocator {
                 allocatorsToDelete.add(allocator);
             }
         }
-        return FutureUtils.processList(allocatorsToDelete, new Function<LedgerAllocator, Future<Void>>() {
-            @Override
-            public Future<Void> apply(LedgerAllocator allocator) {
-                return allocator.delete();
-            }
-        }, scheduledExecutorService).flatMap(new AbstractFunction1<List<Void>, Future<Void>>() {
-            @Override
-            public Future<Void> apply(List<Void> values) {
-                return Utils.zkDelete(zkc, poolPath, new ZkVersion(-1));
-            }
-        });
+        return FutureUtils.processList(
+            allocatorsToDelete,
+            allocator -> allocator.delete(),
+            scheduledExecutorService
+        ).thenCompose(values -> Utils.zkDelete(zkc, poolPath, new ZkVersion(-1)));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java b/distributedlog-core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java
index e0102f3..144b0a6 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java
@@ -18,19 +18,20 @@
 package org.apache.distributedlog.bk;
 
 import com.google.common.collect.Lists;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.function.Function;
 import org.apache.distributedlog.BookKeeperClient;
 import org.apache.distributedlog.DistributedLogConstants;
 import org.apache.distributedlog.util.DLUtils;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
 import org.apache.distributedlog.util.Transaction;
 import org.apache.distributedlog.util.Transaction.OpListener;
 import org.apache.distributedlog.ZooKeeperClient;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.util.Utils;
 import org.apache.distributedlog.zk.ZKTransaction;
 import org.apache.distributedlog.zk.ZKVersionedSetOp;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.meta.ZkVersion;
 import org.apache.bookkeeper.versioning.Version;
@@ -40,9 +41,6 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction0;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
 
 import java.io.IOException;
 import java.util.LinkedList;
@@ -96,7 +94,7 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen
     // version
     ZkVersion version = new ZkVersion(-1);
     // outstanding allocation
-    Promise<LedgerHandle> allocatePromise;
+    CompletableFuture<LedgerHandle> allocatePromise;
     // outstanding tryObtain transaction
     Transaction<Object> tryObtainTxn = null;
     OpListener<LedgerHandle> tryObtainListener = null;
@@ -105,73 +103,71 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen
     // Allocated Ledger
     LedgerHandle allocatedLh = null;
 
-    Future<Void> closeFuture = null;
-    final LinkedList<Future<Void>> ledgerDeletions =
-            new LinkedList<Future<Void>>();
+    CompletableFuture<Void> closeFuture = null;
+    final LinkedList<CompletableFuture<Void>> ledgerDeletions =
+            new LinkedList<CompletableFuture<Void>>();
 
     // Ledger configuration
     private final QuorumConfigProvider quorumConfigProvider;
 
-    static Future<Versioned<byte[]>> getAndCreateAllocationData(final String allocatePath,
+    static CompletableFuture<Versioned<byte[]>> getAndCreateAllocationData(final String allocatePath,
                                                                 final ZooKeeperClient zkc) {
         return Utils.zkGetData(zkc, allocatePath, false)
-                .flatMap(new AbstractFunction1<Versioned<byte[]>, Future<Versioned<byte[]>>>() {
+                .thenCompose(new Function<Versioned<byte[]>, CompletionStage<Versioned<byte[]>>>() {
             @Override
-            public Future<Versioned<byte[]>> apply(Versioned<byte[]> result) {
+            public CompletableFuture<Versioned<byte[]>> apply(Versioned<byte[]> result) {
                 if (null != result && null != result.getVersion() && null != result.getValue()) {
-                    return Future.value(result);
+                    return FutureUtils.value(result);
                 }
                 return createAllocationData(allocatePath, zkc);
             }
         });
     }
 
-    private static Future<Versioned<byte[]>> createAllocationData(final String allocatePath,
+    private static CompletableFuture<Versioned<byte[]>> createAllocationData(final String allocatePath,
                                                                   final ZooKeeperClient zkc) {
         try {
-            final Promise<Versioned<byte[]>> promise = new Promise<Versioned<byte[]>>();
+            final CompletableFuture<Versioned<byte[]>> promise = new CompletableFuture<Versioned<byte[]>>();
             zkc.get().create(allocatePath, DistributedLogConstants.EMPTY_BYTES,
                     zkc.getDefaultACL(), CreateMode.PERSISTENT,
                     new org.apache.zookeeper.AsyncCallback.Create2Callback() {
                         @Override
                         public void processResult(int rc, String path, Object ctx, String name, Stat stat) {
                             if (KeeperException.Code.OK.intValue() == rc) {
-                                promise.setValue(new Versioned<byte[]>(DistributedLogConstants.EMPTY_BYTES,
+                                promise.complete(new Versioned<byte[]>(DistributedLogConstants.EMPTY_BYTES,
                                         new ZkVersion(stat.getVersion())));
                             } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
-                                Utils.zkGetData(zkc, allocatePath, false).proxyTo(promise);
+                                FutureUtils.proxyTo(
+                                  Utils.zkGetData(zkc, allocatePath, false),
+                                  promise
+                                );
                             } else {
-                                promise.setException(FutureUtils.zkException(
+                                promise.completeExceptionally(Utils.zkException(
                                         KeeperException.create(KeeperException.Code.get(rc)), allocatePath));
                             }
                         }
                     }, null);
             return promise;
         } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            return Future.exception(FutureUtils.zkException(e, allocatePath));
+            return FutureUtils.exception(Utils.zkException(e, allocatePath));
         } catch (InterruptedException e) {
-            return Future.exception(FutureUtils.zkException(e, allocatePath));
+            return FutureUtils.exception(Utils.zkException(e, allocatePath));
         }
     }
 
-    public static Future<SimpleLedgerAllocator> of(final String allocatePath,
+    public static CompletableFuture<SimpleLedgerAllocator> of(final String allocatePath,
                                                    final Versioned<byte[]> allocationData,
                                                    final QuorumConfigProvider quorumConfigProvider,
                                                    final ZooKeeperClient zkc,
                                                    final BookKeeperClient bkc) {
         if (null != allocationData && null != allocationData.getValue()
                 && null != allocationData.getVersion()) {
-            return Future.value(new SimpleLedgerAllocator(allocatePath, allocationData,
+            return FutureUtils.value(new SimpleLedgerAllocator(allocatePath, allocationData,
                     quorumConfigProvider, zkc, bkc));
         }
         return getAndCreateAllocationData(allocatePath, zkc)
-                .map(new AbstractFunction1<Versioned<byte[]>, SimpleLedgerAllocator>() {
-            @Override
-            public SimpleLedgerAllocator apply(Versioned<byte[]> allocationData) {
-                return new SimpleLedgerAllocator(allocatePath, allocationData,
-                        quorumConfigProvider, zkc, bkc);
-            }
-        });
+            .thenApply(allocationData1 -> new SimpleLedgerAllocator(allocatePath, allocationData1,
+                        quorumConfigProvider, zkc, bkc));
     }
 
     /**
@@ -240,14 +236,14 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen
     }
 
     @Override
-    public synchronized Future<LedgerHandle> tryObtain(final Transaction<Object> txn,
-                                                       final OpListener<LedgerHandle> listener) {
+    public synchronized CompletableFuture<LedgerHandle> tryObtain(final Transaction<Object> txn,
+                                                                  final OpListener<LedgerHandle> listener) {
         if (Phase.ERROR == phase) {
-            return Future.exception(new AllocationException(Phase.ERROR,
+            return FutureUtils.exception(new AllocationException(Phase.ERROR,
                     "Error on allocating ledger under " + allocatePath));
         }
         if (Phase.HANDING_OVER == phase || Phase.HANDED_OVER == phase || null != tryObtainTxn) {
-            return Future.exception(new ConcurrentObtainException(phase,
+            return FutureUtils.exception(new ConcurrentObtainException(phase,
                     "Ledger handle is handling over to another thread : " + phase));
         }
         tryObtainTxn = txn;
@@ -328,13 +324,13 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen
             return;
         }
         setPhase(Phase.ALLOCATING);
-        allocatePromise = new Promise<LedgerHandle>();
+        allocatePromise = new CompletableFuture<LedgerHandle>();
         QuorumConfig quorumConfig = quorumConfigProvider.getQuorumConfig();
         bkc.createLedger(
                 quorumConfig.getEnsembleSize(),
                 quorumConfig.getWriteQuorumSize(),
                 quorumConfig.getAckQuorumSize()
-        ).addEventListener(this);
+        ).whenComplete(this);
     }
 
     private synchronized void completeAllocation(LedgerHandle lh) {
@@ -347,11 +343,11 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen
         ZKVersionedSetOp commitOp = new ZKVersionedSetOp(zkSetDataOp, this);
         tryObtainTxn.addOp(commitOp);
         setPhase(Phase.HANDING_OVER);
-        FutureUtils.setValue(allocatePromise, lh);
+        allocatePromise.complete(lh);
     }
 
     private synchronized void failAllocation(Throwable cause) {
-        FutureUtils.setException(allocatePromise, cause);
+        allocatePromise.completeExceptionally(cause);
     }
 
     @Override
@@ -386,7 +382,7 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen
     private void markAsAllocated(final LedgerHandle lh) {
         byte[] data = DLUtils.logSegmentId2Bytes(lh.getId());
         Utils.zkSetData(zkc, allocatePath, data, getVersion())
-            .addEventListener(new FutureEventListener<ZkVersion>() {
+            .whenComplete(new FutureEventListener<ZkVersion>() {
                 @Override
                 public void onSuccess(ZkVersion version) {
                     // we only issue deleting ledger left from previous allocation when we could allocate first ledger
@@ -411,27 +407,20 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen
     }
 
     void deleteLedger(final long ledgerId) {
-        final Future<Void> deleteFuture = bkc.deleteLedger(ledgerId, true);
+        final CompletableFuture<Void> deleteFuture = bkc.deleteLedger(ledgerId, true);
         synchronized (ledgerDeletions) {
             ledgerDeletions.add(deleteFuture);
         }
-        deleteFuture.onFailure(new AbstractFunction1<Throwable, BoxedUnit>() {
-            @Override
-            public BoxedUnit apply(Throwable cause) {
+        deleteFuture.whenComplete((value, cause) -> {
+            if (null != cause) {
                 LOG.error("Error deleting ledger {} for ledger allocator {}, retrying : ",
                         new Object[] { ledgerId, allocatePath, cause });
                 if (!isClosing()) {
                     deleteLedger(ledgerId);
                 }
-                return BoxedUnit.UNIT;
             }
-        }).ensure(new AbstractFunction0<BoxedUnit>() {
-            @Override
-            public BoxedUnit apply() {
-                synchronized (ledgerDeletions) {
-                    ledgerDeletions.remove(deleteFuture);
-                }
-                return BoxedUnit.UNIT;
+            synchronized (ledgerDeletions) {
+                ledgerDeletions.remove(deleteFuture);
             }
         });
     }
@@ -440,25 +429,25 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen
         return closeFuture != null;
     }
 
-    private Future<Void> closeInternal(boolean cleanup) {
-        Promise<Void> closePromise;
+    private CompletableFuture<Void> closeInternal(boolean cleanup) {
+        CompletableFuture<Void> closePromise;
         synchronized (this) {
             if (null != closeFuture) {
                 return closeFuture;
             }
-            closePromise = new Promise<Void>();
+            closePromise = new CompletableFuture<Void>();
             closeFuture = closePromise;
         }
         if (!cleanup) {
             LOG.info("Abort ledger allocator without cleaning up on {}.", allocatePath);
-            FutureUtils.setValue(closePromise, null);
+            closePromise.complete(null);
             return closePromise;
         }
         cleanupAndClose(closePromise);
         return closePromise;
     }
 
-    private void cleanupAndClose(final Promise<Void> closePromise) {
+    private void cleanupAndClose(final CompletableFuture<Void> closePromise) {
         LOG.info("Closing ledger allocator on {}.", allocatePath);
         final ZKTransaction txn = new ZKTransaction(zkc);
         // try obtain ledger handle
@@ -476,21 +465,21 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen
             }
 
             private void complete() {
-                FutureUtils.setValue(closePromise, null);
+                closePromise.complete(null);
                 LOG.info("Closed ledger allocator on {}.", allocatePath);
             }
-        }).addEventListener(new FutureEventListener<LedgerHandle>() {
+        }).whenComplete(new FutureEventListener<LedgerHandle>() {
             @Override
             public void onSuccess(LedgerHandle lh) {
                 // try obtain succeed
                 // if we could obtain the ledger handle, we have the responsibility to close it
                 deleteLedger(lh.getId());
                 // wait for deletion to be completed
-                List<Future<Void>> outstandingDeletions;
+                List<CompletableFuture<Void>> outstandingDeletions;
                 synchronized (ledgerDeletions) {
                     outstandingDeletions = Lists.newArrayList(ledgerDeletions);
                 }
-                Future.collect(outstandingDeletions).addEventListener(new FutureEventListener<List<Void>>() {
+                FutureUtils.collect(outstandingDeletions).whenComplete(new FutureEventListener<List<Void>>() {
                     @Override
                     public void onSuccess(List<Void> values) {
                         txn.execute();
@@ -499,7 +488,7 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen
                     @Override
                     public void onFailure(Throwable cause) {
                         LOG.debug("Fail to obtain the allocated ledger handle when closing the allocator : ", cause);
-                        FutureUtils.setValue(closePromise, null);
+                        closePromise.complete(null);
                     }
                 });
             }
@@ -507,7 +496,7 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen
             @Override
             public void onFailure(Throwable cause) {
                 LOG.debug("Fail to obtain the allocated ledger handle when closing the allocator : ", cause);
-                FutureUtils.setValue(closePromise, null);
+                closePromise.complete(null);
             }
         });
 
@@ -519,18 +508,13 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen
     }
 
     @Override
-    public Future<Void> asyncClose() {
+    public CompletableFuture<Void> asyncClose() {
         return closeInternal(false);
     }
 
     @Override
-    public Future<Void> delete() {
-        return closeInternal(true).flatMap(new AbstractFunction1<Void, Future<Void>>() {
-            @Override
-            public Future<Void> apply(Void value) {
-                return Utils.zkDelete(zkc, allocatePath, getVersion());
-            }
-        });
+    public CompletableFuture<Void> delete() {
+        return closeInternal(true).thenCompose(value -> Utils.zkDelete(zkc, allocatePath, getVersion()));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/callback/ReadAheadCallback.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/callback/ReadAheadCallback.java b/distributedlog-core/src/main/java/org/apache/distributedlog/callback/ReadAheadCallback.java
deleted file mode 100644
index dccd2e8..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/callback/ReadAheadCallback.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.callback;
-
-/**
- * ReadAhead Callback
- */
-public interface ReadAheadCallback {
-    void resumeReadAhead();
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConcurrentBaseConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConcurrentBaseConfiguration.java b/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConcurrentBaseConfiguration.java
deleted file mode 100644
index f189ad3..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConcurrentBaseConfiguration.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.config;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.commons.configuration.AbstractConfiguration;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Configuration view built on concurrent hash map for fast thread-safe access.
- * Notes:
- * 1. Multi-property list aggregation will not work in this class. I.e. commons config
- * normally combines all properties with the same key into one list property automatically.
- * This class simply overwrites any existing mapping.
- */
-public class ConcurrentBaseConfiguration extends AbstractConfiguration {
-    static final Logger LOG = LoggerFactory.getLogger(ConcurrentBaseConfiguration.class);
-
-    private final ConcurrentHashMap<String, Object> map;
-
-    public ConcurrentBaseConfiguration() {
-        this.map = new ConcurrentHashMap<String, Object>();
-    }
-
-    @Override
-    protected void addPropertyDirect(String key, Object value) {
-        Preconditions.checkNotNull(value);
-        map.put(key, value);
-    }
-
-    @Override
-    public Object getProperty(String key) {
-        return map.get(key);
-    }
-
-    @Override
-    public Iterator getKeys() {
-        return map.keySet().iterator();
-    }
-
-    @Override
-    public boolean containsKey(String key) {
-        return map.containsKey(key);
-    }
-
-    @Override
-    public boolean isEmpty() {
-        return map.isEmpty();
-    }
-
-    @Override
-    protected void clearPropertyDirect(String key) {
-        map.remove(key);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConcurrentConstConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConcurrentConstConfiguration.java b/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConcurrentConstConfiguration.java
deleted file mode 100644
index 4e7f886..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConcurrentConstConfiguration.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.config;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.configuration.Configuration;
-
-/**
- * Invariant thread-safe view of some configuration.
- */
-public class ConcurrentConstConfiguration extends ConcurrentBaseConfiguration {
-    public ConcurrentConstConfiguration(Configuration conf) {
-        Preconditions.checkNotNull(conf);
-        copy(conf);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConfigurationListener.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConfigurationListener.java b/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConfigurationListener.java
deleted file mode 100644
index 70059d4..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConfigurationListener.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.config;
-
-/**
- * Configuration listener triggered when reloading configuration settings.
- */
-public interface ConfigurationListener {
-
-    /**
-     * Reload the configuration.
-     *
-     * @param conf configuration to reload
-     */
-    void onReload(ConcurrentBaseConfiguration conf);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConfigurationSubscription.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConfigurationSubscription.java b/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConfigurationSubscription.java
deleted file mode 100644
index 0e5c897..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConfigurationSubscription.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.config;
-
-import java.io.FileNotFoundException;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.Iterator;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.FileConfiguration;
-import org.apache.commons.configuration.reloading.FileChangedReloadingStrategy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * ConfigurationSubscription publishes a reloading, thread-safe view of file configuration. The class
- * periodically calls FileConfiguration.reload on the underlying conf, and propagates changes to the
- * concurrent config. The configured FileChangedReloadingStrategy ensures that file config will only
- * be reloaded if something changed.
- * Notes:
- * 1. Reload schedule is never terminated. The assumption is a finite number of these are started
- * at the calling layer, and terminated only once the executor service is shut down.
- * 2. The underlying FileConfiguration is not at all thread-safe, so its important to ensure access
- * to this object is always single threaded.
- */
-public class ConfigurationSubscription {
-    static final Logger LOG = LoggerFactory.getLogger(ConfigurationSubscription.class);
-
-    private final ConcurrentBaseConfiguration viewConfig;
-    private final ScheduledExecutorService executorService;
-    private final int reloadPeriod;
-    private final TimeUnit reloadUnit;
-    private final List<FileConfigurationBuilder> fileConfigBuilders;
-    private final List<FileConfiguration> fileConfigs;
-    private final CopyOnWriteArraySet<ConfigurationListener> confListeners;
-
-    public ConfigurationSubscription(ConcurrentBaseConfiguration viewConfig,
-                                     List<FileConfigurationBuilder> fileConfigBuilders,
-                                     ScheduledExecutorService executorService,
-                                     int reloadPeriod,
-                                     TimeUnit reloadUnit)
-            throws ConfigurationException {
-        Preconditions.checkNotNull(fileConfigBuilders);
-        Preconditions.checkArgument(!fileConfigBuilders.isEmpty());
-        Preconditions.checkNotNull(executorService);
-        Preconditions.checkNotNull(viewConfig);
-        this.viewConfig = viewConfig;
-        this.executorService = executorService;
-        this.reloadPeriod = reloadPeriod;
-        this.reloadUnit = reloadUnit;
-        this.fileConfigBuilders = fileConfigBuilders;
-        this.fileConfigs = Lists.newArrayListWithExpectedSize(this.fileConfigBuilders.size());
-        this.confListeners = new CopyOnWriteArraySet<ConfigurationListener>();
-        reload();
-        scheduleReload();
-    }
-
-    public void registerListener(ConfigurationListener listener) {
-        this.confListeners.add(listener);
-    }
-
-    public void unregisterListener(ConfigurationListener listener) {
-        this.confListeners.remove(listener);
-    }
-
-    private boolean initConfig() {
-        if (fileConfigs.isEmpty()) {
-            try {
-                for (FileConfigurationBuilder fileConfigBuilder : fileConfigBuilders) {
-                    FileConfiguration fileConfig = fileConfigBuilder.getConfiguration();
-                    FileChangedReloadingStrategy reloadingStrategy = new FileChangedReloadingStrategy();
-                    reloadingStrategy.setRefreshDelay(0);
-                    fileConfig.setReloadingStrategy(reloadingStrategy);
-                    fileConfigs.add(fileConfig);
-                }
-            } catch (ConfigurationException ex) {
-                if (!fileNotFound(ex)) {
-                    LOG.error("Config init failed {}", ex);
-                }
-            }
-        }
-        return !fileConfigs.isEmpty();
-    }
-
-    private void scheduleReload() {
-        executorService.scheduleAtFixedRate(new Runnable() {
-            @Override
-            public void run() {
-                reload();
-            }
-        }, 0, reloadPeriod, reloadUnit);
-    }
-
-    @VisibleForTesting
-    void reload() {
-        // No-op if already loaded.
-        if (!initConfig()) {
-            return;
-        }
-        // Reload if config exists.
-        Set<String> confKeys = Sets.newHashSet();
-        for (FileConfiguration fileConfig : fileConfigs) {
-            LOG.debug("Check and reload config, file={}, lastModified={}", fileConfig.getFile(),
-                    fileConfig.getFile().lastModified());
-            fileConfig.reload();
-            // load keys
-            Iterator keyIter = fileConfig.getKeys();
-            while (keyIter.hasNext()) {
-                String key = (String) keyIter.next();
-                confKeys.add(key);
-            }
-        }
-        // clear unexisted keys
-        Iterator viewIter = viewConfig.getKeys();
-        while (viewIter.hasNext()) {
-            String key = (String) viewIter.next();
-            if (!confKeys.contains(key)) {
-                clearViewProperty(key);
-            }
-        }
-        LOG.info("Reload features : {}", confKeys);
-        // load keys from files
-        for (FileConfiguration fileConfig : fileConfigs) {
-            try {
-                loadView(fileConfig);
-            } catch (Exception ex) {
-                if (!fileNotFound(ex)) {
-                    LOG.error("Config reload failed for file {}", fileConfig.getFileName(), ex);
-                }
-            }
-        }
-        for (ConfigurationListener listener : confListeners) {
-            listener.onReload(viewConfig);
-        }
-    }
-
-    private boolean fileNotFound(Exception ex) {
-        return ex instanceof FileNotFoundException ||
-                ex.getCause() != null && ex.getCause() instanceof FileNotFoundException;
-    }
-
-    private void loadView(FileConfiguration fileConfig) {
-        Iterator fileIter = fileConfig.getKeys();
-        while (fileIter.hasNext()) {
-            String key = (String) fileIter.next();
-            setViewProperty(fileConfig, key, fileConfig.getProperty(key));
-        }
-    }
-
-    private void clearViewProperty(String key) {
-        LOG.debug("Removing property, key={}", key);
-        viewConfig.clearProperty(key);
-    }
-
-    private void setViewProperty(FileConfiguration fileConfig,
-                                 String key,
-                                 Object value) {
-        if (!viewConfig.containsKey(key) || !viewConfig.getProperty(key).equals(value)) {
-            LOG.debug("Setting property, key={} value={}", key, fileConfig.getProperty(key));
-            viewConfig.setProperty(key, fileConfig.getProperty(key));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/config/DynamicConfigurationFactory.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/config/DynamicConfigurationFactory.java b/distributedlog-core/src/main/java/org/apache/distributedlog/config/DynamicConfigurationFactory.java
index c77778a..c69b7a5 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/config/DynamicConfigurationFactory.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/config/DynamicConfigurationFactory.java
@@ -24,7 +24,6 @@ import com.google.common.collect.Lists;
 import org.apache.distributedlog.DistributedLogConfiguration;
 
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.net.MalformedURLException;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -35,6 +34,11 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.configuration.ConfigurationException;
 
+import org.apache.distributedlog.common.config.ConcurrentBaseConfiguration;
+import org.apache.distributedlog.common.config.ConcurrentConstConfiguration;
+import org.apache.distributedlog.common.config.ConfigurationSubscription;
+import org.apache.distributedlog.common.config.FileConfigurationBuilder;
+import org.apache.distributedlog.common.config.PropertiesConfigurationBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/config/DynamicDistributedLogConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/config/DynamicDistributedLogConfiguration.java b/distributedlog-core/src/main/java/org/apache/distributedlog/config/DynamicDistributedLogConfiguration.java
index ea7f4a7..9e760c5 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/config/DynamicDistributedLogConfiguration.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/config/DynamicDistributedLogConfiguration.java
@@ -19,6 +19,7 @@ package org.apache.distributedlog.config;
 
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.bk.QuorumConfig;
+import org.apache.distributedlog.common.config.ConcurrentBaseConfiguration;
 
 import static org.apache.distributedlog.DistributedLogConfiguration.*;
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/config/FileConfigurationBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/config/FileConfigurationBuilder.java b/distributedlog-core/src/main/java/org/apache/distributedlog/config/FileConfigurationBuilder.java
deleted file mode 100644
index dbf8fe7..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/config/FileConfigurationBuilder.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.config;
-
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.FileConfiguration;
-
-/**
- * Abstract out FileConfiguration subclass construction.
- */
-public interface FileConfigurationBuilder {
-    FileConfiguration getConfiguration() throws ConfigurationException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/config/PropertiesConfigurationBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/config/PropertiesConfigurationBuilder.java b/distributedlog-core/src/main/java/org/apache/distributedlog/config/PropertiesConfigurationBuilder.java
deleted file mode 100644
index df1408c..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/config/PropertiesConfigurationBuilder.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.config;
-
-import java.net.URL;
-
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.FileConfiguration;
-import org.apache.commons.configuration.PropertiesConfiguration;
-
-/**
- * Hide PropertiesConfiguration dependency.
- */
-public class PropertiesConfigurationBuilder implements FileConfigurationBuilder {
-    private URL url;
-
-    public PropertiesConfigurationBuilder(URL url) {
-        this.url = url;
-    }
-
-    @Override
-    public FileConfiguration getConfiguration() throws ConfigurationException {
-        return new PropertiesConfiguration(url);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/feature/ConfigurationFeatureProvider.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/feature/ConfigurationFeatureProvider.java b/distributedlog-core/src/main/java/org/apache/distributedlog/feature/ConfigurationFeatureProvider.java
index 83cac22..f51302e 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/feature/ConfigurationFeatureProvider.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/feature/ConfigurationFeatureProvider.java
@@ -17,7 +17,7 @@
  */
 package org.apache.distributedlog.feature;
 
-import org.apache.distributedlog.config.ConcurrentBaseConfiguration;
+import org.apache.distributedlog.common.config.ConcurrentBaseConfiguration;
 import org.apache.bookkeeper.feature.CacheableFeatureProvider;
 import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.feature.FeatureProvider;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/feature/DynamicConfigurationFeatureProvider.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/feature/DynamicConfigurationFeatureProvider.java b/distributedlog-core/src/main/java/org/apache/distributedlog/feature/DynamicConfigurationFeatureProvider.java
index 4689d51..201ed8a 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/feature/DynamicConfigurationFeatureProvider.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/feature/DynamicConfigurationFeatureProvider.java
@@ -21,11 +21,11 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.config.ConcurrentBaseConfiguration;
-import org.apache.distributedlog.config.ConfigurationListener;
-import org.apache.distributedlog.config.ConfigurationSubscription;
-import org.apache.distributedlog.config.FileConfigurationBuilder;
-import org.apache.distributedlog.config.PropertiesConfigurationBuilder;
+import org.apache.distributedlog.common.config.ConcurrentBaseConfiguration;
+import org.apache.distributedlog.common.config.ConfigurationListener;
+import org.apache.distributedlog.common.config.ConfigurationSubscription;
+import org.apache.distributedlog.common.config.FileConfigurationBuilder;
+import org.apache.distributedlog.common.config.PropertiesConfigurationBuilder;
 import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.feature.SettableFeature;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/function/CloseAsyncCloseableFunction.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/function/CloseAsyncCloseableFunction.java b/distributedlog-core/src/main/java/org/apache/distributedlog/function/CloseAsyncCloseableFunction.java
deleted file mode 100644
index b1adf4a..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/function/CloseAsyncCloseableFunction.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.function;
-
-import org.apache.distributedlog.io.AsyncCloseable;
-import scala.Function0;
-import scala.runtime.AbstractFunction0;
-import scala.runtime.BoxedUnit;
-
-/**
- * Function to close {@link org.apache.distributedlog.io.AsyncCloseable}
- */
-public class CloseAsyncCloseableFunction extends AbstractFunction0<BoxedUnit> {
-
-    /**
-     * Return a function to close an {@link AsyncCloseable}.
-     *
-     * @param closeable closeable to close
-     * @return function to close an {@link AsyncCloseable}
-     */
-    public static Function0<BoxedUnit> of(AsyncCloseable closeable) {
-        return new CloseAsyncCloseableFunction(closeable);
-    }
-
-    private final AsyncCloseable closeable;
-
-    private CloseAsyncCloseableFunction(AsyncCloseable closeable) {
-        this.closeable = closeable;
-    }
-
-    @Override
-    public BoxedUnit apply() {
-        closeable.asyncClose();
-        return BoxedUnit.UNIT;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/function/DefaultValueMapFunction.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/function/DefaultValueMapFunction.java b/distributedlog-core/src/main/java/org/apache/distributedlog/function/DefaultValueMapFunction.java
deleted file mode 100644
index 6360f2c..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/function/DefaultValueMapFunction.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.function;
-
-import scala.runtime.AbstractFunction1;
-
-/**
- * Map Function return default value
- */
-public class DefaultValueMapFunction<T, R> extends AbstractFunction1<T, R> {
-
-    public static <T, R> DefaultValueMapFunction<T, R> of(R defaultValue) {
-        return new DefaultValueMapFunction<T, R>(defaultValue);
-    }
-
-    private final R defaultValue;
-
-    private DefaultValueMapFunction(R defaultValue) {
-        this.defaultValue = defaultValue;
-    }
-
-    @Override
-    public R apply(T any) {
-        return defaultValue;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/function/GetLastTxIdFunction.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/function/GetLastTxIdFunction.java b/distributedlog-core/src/main/java/org/apache/distributedlog/function/GetLastTxIdFunction.java
index 1bf620c..00703e3 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/function/GetLastTxIdFunction.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/function/GetLastTxIdFunction.java
@@ -17,16 +17,15 @@
  */
 package org.apache.distributedlog.function;
 
+import java.util.List;
+import java.util.function.Function;
 import org.apache.distributedlog.DistributedLogConstants;
 import org.apache.distributedlog.LogSegmentMetadata;
-import scala.runtime.AbstractFunction1;
-
-import java.util.List;
 
 /**
  * Retrieve the last tx id from list of log segments
  */
-public class GetLastTxIdFunction extends AbstractFunction1<List<LogSegmentMetadata>, Long> {
+public class GetLastTxIdFunction implements Function<List<LogSegmentMetadata>, Long> {
 
     public static final GetLastTxIdFunction INSTANCE = new GetLastTxIdFunction();
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/function/GetVersionedValueFunction.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/function/GetVersionedValueFunction.java b/distributedlog-core/src/main/java/org/apache/distributedlog/function/GetVersionedValueFunction.java
deleted file mode 100644
index 98164de..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/function/GetVersionedValueFunction.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.function;
-
-import org.apache.distributedlog.LogSegmentMetadata;
-import org.apache.bookkeeper.versioning.Versioned;
-import scala.Function1;
-import scala.runtime.AbstractFunction1;
-
-import java.util.List;
-
-/**
- * Function to get the versioned value from {@link org.apache.bookkeeper.versioning.Versioned}
- */
-public class GetVersionedValueFunction<T> extends AbstractFunction1<Versioned<T>, T> {
-
-    public static final Function1<Versioned<List<LogSegmentMetadata>>, List<LogSegmentMetadata>>
-            GET_LOGSEGMENT_LIST_FUNC = new GetVersionedValueFunction<List<LogSegmentMetadata>>();
-
-    @Override
-    public T apply(Versioned<T> versionedValue) {
-        return versionedValue.getValue();
-    }
-}