You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/01/05 00:51:16 UTC

[11/51] [partial] incubator-distributedlog git commit: DL-4: Repackage the source under apache namespace

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java
new file mode 100644
index 0000000..00e6b5c
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java
@@ -0,0 +1,1106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import org.apache.distributedlog.callback.LogSegmentListener;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.exceptions.AlreadyClosedException;
+import org.apache.distributedlog.exceptions.LogEmptyException;
+import org.apache.distributedlog.exceptions.LogNotFoundException;
+import org.apache.distributedlog.exceptions.UnexpectedException;
+import org.apache.distributedlog.function.CloseAsyncCloseableFunction;
+import org.apache.distributedlog.function.GetVersionedValueFunction;
+import org.apache.distributedlog.injector.AsyncFailureInjector;
+import org.apache.distributedlog.logsegment.LogSegmentEntryStore;
+import org.apache.distributedlog.logsegment.LogSegmentEntryWriter;
+import org.apache.distributedlog.metadata.LogMetadataForReader;
+import org.apache.distributedlog.metadata.LogMetadataForWriter;
+import org.apache.distributedlog.io.AsyncCloseable;
+import org.apache.distributedlog.lock.DistributedLock;
+import org.apache.distributedlog.lock.NopDistributedLock;
+import org.apache.distributedlog.lock.ZKDistributedLock;
+import org.apache.distributedlog.logsegment.LogSegmentFilter;
+import org.apache.distributedlog.logsegment.LogSegmentMetadataCache;
+import org.apache.distributedlog.metadata.LogStreamMetadataStore;
+import org.apache.distributedlog.namespace.NamespaceDriver;
+import org.apache.distributedlog.stats.BroadCastStatsLogger;
+import org.apache.distributedlog.subscription.SubscriptionsStore;
+import org.apache.distributedlog.util.Allocator;
+import org.apache.distributedlog.util.DLUtils;
+import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.util.MonitoredFuturePool;
+import org.apache.distributedlog.util.OrderedScheduler;
+import org.apache.distributedlog.util.PermitLimiter;
+import org.apache.distributedlog.util.PermitManager;
+import org.apache.distributedlog.util.SchedulerUtils;
+import org.apache.distributedlog.util.Utils;
+import com.twitter.util.ExceptionalFunction;
+import com.twitter.util.ExceptionalFunction0;
+import com.twitter.util.Function;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import com.twitter.util.Promise;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.stats.AlertStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.AbstractFunction0;
+import scala.runtime.AbstractFunction1;
+import scala.runtime.BoxedUnit;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.distributedlog.namespace.NamespaceDriver.Role.READER;
+import static org.apache.distributedlog.namespace.NamespaceDriver.Role.WRITER;
+
+/**
+ * <h3>Metrics</h3>
+ * <ul>
+ * <li> `log_writer/*`: all asynchronous writer related metrics are exposed under scope `log_writer`.
+ * See {@link BKAsyncLogWriter} for detail stats.
+ * <li> `async_reader/*`: all asyncrhonous reader related metrics are exposed under scope `async_reader`.
+ * See {@link BKAsyncLogReader} for detail stats.
+ * <li> `writer_future_pool/*`: metrics about the future pools that used by writers are exposed under
+ * scope `writer_future_pool`. See {@link MonitoredFuturePool} for detail stats.
+ * <li> `reader_future_pool/*`: metrics about the future pools that used by readers are exposed under
+ * scope `reader_future_pool`. See {@link MonitoredFuturePool} for detail stats.
+ * <li> `lock/*`: metrics about the locks used by writers. See {@link ZKDistributedLock} for detail
+ * stats.
+ * <li> `read_lock/*`: metrics about the locks used by readers. See {@link ZKDistributedLock} for
+ * detail stats.
+ * <li> `logsegments/*`: metrics about basic operations on log segments. See {@link BKLogHandler} for details.
+ * <li> `segments/*`: metrics about write operations on log segments. See {@link BKLogWriteHandler} for details.
+ * <li> `readahead_worker/*`: metrics about readahead workers used by readers. See {@link BKLogReadHandler}
+ * for details.
+ * </ul>
+ */
+class BKDistributedLogManager implements DistributedLogManager {
+    static final Logger LOG = LoggerFactory.getLogger(BKDistributedLogManager.class);
+
+    static final Function<LogRecordWithDLSN, Long> RECORD_2_TXID_FUNCTION =
+            new Function<LogRecordWithDLSN, Long>() {
+                @Override
+                public Long apply(LogRecordWithDLSN record) {
+                    return record.getTransactionId();
+                }
+            };
+
+    static final Function<LogRecordWithDLSN, DLSN> RECORD_2_DLSN_FUNCTION =
+            new Function<LogRecordWithDLSN, DLSN>() {
+                @Override
+                public DLSN apply(LogRecordWithDLSN record) {
+                    return record.getDlsn();
+                }
+            };
+
+    private final URI uri;
+    private final String name;
+    private final String clientId;
+    private final int regionId;
+    private final String streamIdentifier;
+    private final DistributedLogConfiguration conf;
+    private final DynamicDistributedLogConfiguration dynConf;
+    private final NamespaceDriver driver;
+    private Promise<Void> closePromise;
+    private final OrderedScheduler scheduler;
+    private final FeatureProvider featureProvider;
+    private final AsyncFailureInjector failureInjector;
+    private final StatsLogger statsLogger;
+    private final StatsLogger perLogStatsLogger;
+    final AlertStatsLogger alertStatsLogger;
+
+    // log segment metadata cache
+    private final LogSegmentMetadataCache logSegmentMetadataCache;
+
+    //
+    // Writer Related Variables
+    //
+    private final PermitLimiter writeLimiter;
+
+    //
+    // Reader Related Variables
+    ///
+    // read handler for listener.
+    private BKLogReadHandler readHandlerForListener = null;
+    private final PendingReaders pendingReaders;
+
+    // resource to close
+    private final Optional<AsyncCloseable> resourcesCloseable;
+
+    /**
+     * Create a {@link DistributedLogManager} with supplied resources.
+     *
+     * @param name log name
+     * @param conf distributedlog configuration
+     * @param dynConf dynamic distributedlog configuration
+     * @param uri uri location for the log
+     * @param driver namespace driver
+     * @param logSegmentMetadataCache log segment metadata cache
+     * @param scheduler ordered scheduled used by readers and writers
+     * @param clientId client id that used to initiate the locks
+     * @param regionId region id that would be encrypted as part of log segment metadata
+     *                 to indicate which region that the log segment will be created
+     * @param writeLimiter write limiter
+     * @param featureProvider provider to offer features
+     * @param statsLogger stats logger to receive stats
+     * @param perLogStatsLogger stats logger to receive per log stats
+     * @throws IOException
+     */
+    BKDistributedLogManager(String name,
+                            DistributedLogConfiguration conf,
+                            DynamicDistributedLogConfiguration dynConf,
+                            URI uri,
+                            NamespaceDriver driver,
+                            LogSegmentMetadataCache logSegmentMetadataCache,
+                            OrderedScheduler scheduler,
+                            String clientId,
+                            Integer regionId,
+                            PermitLimiter writeLimiter,
+                            FeatureProvider featureProvider,
+                            AsyncFailureInjector failureInjector,
+                            StatsLogger statsLogger,
+                            StatsLogger perLogStatsLogger,
+                            Optional<AsyncCloseable> resourcesCloseable) {
+        this.name = name;
+        this.conf = conf;
+        this.dynConf = dynConf;
+        this.uri = uri;
+        this.driver = driver;
+        this.logSegmentMetadataCache = logSegmentMetadataCache;
+        this.scheduler = scheduler;
+        this.statsLogger = statsLogger;
+        this.perLogStatsLogger = BroadCastStatsLogger.masterslave(perLogStatsLogger, statsLogger);
+        this.pendingReaders = new PendingReaders(scheduler);
+        this.regionId = regionId;
+        this.clientId = clientId;
+        this.streamIdentifier = conf.getUnpartitionedStreamName();
+        this.writeLimiter = writeLimiter;
+        // Feature Provider
+        this.featureProvider = featureProvider;
+        // Failure Injector
+        this.failureInjector = failureInjector;
+        // Stats
+        this.alertStatsLogger = new AlertStatsLogger(this.perLogStatsLogger, "dl_alert");
+        this.resourcesCloseable = resourcesCloseable;
+    }
+
+    @Override
+    public String getStreamName() {
+        return name;
+    }
+
+    @Override
+    public NamespaceDriver getNamespaceDriver() {
+        return driver;
+    }
+
+    URI getUri() {
+        return uri;
+    }
+
+    DistributedLogConfiguration getConf() {
+        return conf;
+    }
+
+    OrderedScheduler getScheduler() {
+        return scheduler;
+    }
+
+    AsyncFailureInjector getFailureInjector() {
+        return failureInjector;
+    }
+
+    //
+    // Test Methods
+    //
+
+    @VisibleForTesting
+    LogStreamMetadataStore getWriterMetadataStore() {
+        return driver.getLogStreamMetadataStore(WRITER);
+    }
+
+    @VisibleForTesting
+    LogSegmentEntryStore getReaderEntryStore() {
+        return driver.getLogSegmentEntryStore(READER);
+    }
+
+    @VisibleForTesting
+    FeatureProvider getFeatureProvider() {
+        return this.featureProvider;
+    }
+
+    private synchronized BKLogReadHandler getReadHandlerAndRegisterListener(
+            boolean create, LogSegmentListener listener) {
+        if (null == readHandlerForListener && create) {
+            readHandlerForListener = createReadHandler();
+            readHandlerForListener.registerListener(listener);
+            // start fetch the log segments after created the listener
+            readHandlerForListener.asyncStartFetchLogSegments();
+            return readHandlerForListener;
+        }
+        if (null != readHandlerForListener && null != listener) {
+            readHandlerForListener.registerListener(listener);
+        }
+        return readHandlerForListener;
+    }
+
+    @Override
+    public List<LogSegmentMetadata> getLogSegments() throws IOException {
+        return FutureUtils.result(getLogSegmentsAsync());
+    }
+
+    protected Future<List<LogSegmentMetadata>> getLogSegmentsAsync() {
+        final BKLogReadHandler readHandler = createReadHandler();
+        return readHandler.readLogSegmentsFromStore(
+                LogSegmentMetadata.COMPARATOR,
+                LogSegmentFilter.DEFAULT_FILTER,
+                null)
+                .map(GetVersionedValueFunction.GET_LOGSEGMENT_LIST_FUNC)
+                .ensure(CloseAsyncCloseableFunction.of(readHandler));
+    }
+
+    @Override
+    public void registerListener(LogSegmentListener listener) throws IOException {
+        getReadHandlerAndRegisterListener(true, listener);
+    }
+
+    @Override
+    public synchronized void unregisterListener(LogSegmentListener listener) {
+        if (null != readHandlerForListener) {
+            readHandlerForListener.unregisterListener(listener);
+        }
+    }
+
+    public void checkClosedOrInError(String operation) throws AlreadyClosedException {
+        synchronized (this) {
+            if (null != closePromise) {
+                throw new AlreadyClosedException("Executing " + operation + " on already closed DistributedLogManager");
+            }
+        }
+    }
+
+    // Create Read Handler
+
+    synchronized BKLogReadHandler createReadHandler() {
+        Optional<String> subscriberId = Optional.absent();
+        return createReadHandler(subscriberId, false);
+    }
+
+    synchronized BKLogReadHandler createReadHandler(Optional<String> subscriberId) {
+        return createReadHandler(subscriberId, false);
+    }
+
+    synchronized BKLogReadHandler createReadHandler(Optional<String> subscriberId,
+                                                    boolean isHandleForReading) {
+        return createReadHandler(
+                subscriberId,
+                null,
+                isHandleForReading);
+    }
+
+    synchronized BKLogReadHandler createReadHandler(Optional<String> subscriberId,
+                                                    AsyncNotification notification,
+                                                    boolean isHandleForReading) {
+        LogMetadataForReader logMetadata = LogMetadataForReader.of(uri, name, streamIdentifier);
+        return new BKLogReadHandler(
+                logMetadata,
+                subscriberId,
+                conf,
+                dynConf,
+                driver.getLogStreamMetadataStore(READER),
+                logSegmentMetadataCache,
+                driver.getLogSegmentEntryStore(READER),
+                scheduler,
+                alertStatsLogger,
+                statsLogger,
+                perLogStatsLogger,
+                clientId,
+                notification,
+                isHandleForReading);
+    }
+
+    // Create Ledger Allocator
+
+
+
+    // Create Write Handler
+
+    public BKLogWriteHandler createWriteHandler(boolean lockHandler)
+            throws IOException {
+        return FutureUtils.result(asyncCreateWriteHandler(lockHandler));
+    }
+
+    Future<BKLogWriteHandler> asyncCreateWriteHandler(final boolean lockHandler) {
+        // Fetching Log Metadata (create if not exists)
+        return driver.getLogStreamMetadataStore(WRITER).getLog(
+                uri,
+                name,
+                true,
+                conf.getCreateStreamIfNotExists()
+        ).flatMap(new AbstractFunction1<LogMetadataForWriter, Future<BKLogWriteHandler>>() {
+            @Override
+            public Future<BKLogWriteHandler> apply(LogMetadataForWriter logMetadata) {
+                Promise<BKLogWriteHandler> createPromise = new Promise<BKLogWriteHandler>();
+                createWriteHandler(logMetadata, lockHandler, createPromise);
+                return createPromise;
+            }
+        });
+    }
+
+    private void createWriteHandler(LogMetadataForWriter logMetadata,
+                                    boolean lockHandler,
+                                    final Promise<BKLogWriteHandler> createPromise) {
+        // Build the locks
+        DistributedLock lock;
+        if (conf.isWriteLockEnabled()) {
+            lock = driver.getLogStreamMetadataStore(WRITER).createWriteLock(logMetadata);
+        } else {
+            lock = NopDistributedLock.INSTANCE;
+        }
+
+        Allocator<LogSegmentEntryWriter, Object> segmentAllocator;
+        try {
+            segmentAllocator = driver.getLogSegmentEntryStore(WRITER)
+                    .newLogSegmentAllocator(logMetadata, dynConf);
+        } catch (IOException ioe) {
+            FutureUtils.setException(createPromise, ioe);
+            return;
+        }
+
+        // Make sure writer handler created before resources are initialized
+        final BKLogWriteHandler writeHandler = new BKLogWriteHandler(
+                logMetadata,
+                conf,
+                driver.getLogStreamMetadataStore(WRITER),
+                logSegmentMetadataCache,
+                driver.getLogSegmentEntryStore(WRITER),
+                scheduler,
+                segmentAllocator,
+                statsLogger,
+                perLogStatsLogger,
+                alertStatsLogger,
+                clientId,
+                regionId,
+                writeLimiter,
+                featureProvider,
+                dynConf,
+                lock);
+        if (lockHandler) {
+            writeHandler.lockHandler().addEventListener(new FutureEventListener<DistributedLock>() {
+                @Override
+                public void onSuccess(DistributedLock lock) {
+                    FutureUtils.setValue(createPromise, writeHandler);
+                }
+
+                @Override
+                public void onFailure(final Throwable cause) {
+                    writeHandler.asyncClose().ensure(new AbstractFunction0<BoxedUnit>() {
+                        @Override
+                        public BoxedUnit apply() {
+                            FutureUtils.setException(createPromise, cause);
+                            return BoxedUnit.UNIT;
+                        }
+                    });
+                }
+            });
+        } else {
+            FutureUtils.setValue(createPromise, writeHandler);
+        }
+    }
+
+    PermitManager getLogSegmentRollingPermitManager() {
+        return driver.getLogStreamMetadataStore(WRITER).getPermitManager();
+    }
+
+    <T> Future<T> processReaderOperation(final Function<BKLogReadHandler, Future<T>> func) {
+        return scheduler.apply(new ExceptionalFunction0<BKLogReadHandler>() {
+            @Override
+            public BKLogReadHandler applyE() throws Throwable {
+                return getReadHandlerAndRegisterListener(true, null);
+            }
+        }).flatMap(new ExceptionalFunction<BKLogReadHandler, Future<T>>() {
+            @Override
+            public Future<T> applyE(final BKLogReadHandler readHandler) throws Throwable {
+                return func.apply(readHandler);
+            }
+        });
+    }
+
+    /**
+     * Check if an end of stream marker was added to the stream
+     * A stream with an end of stream marker cannot be appended to
+     *
+     * @return true if the marker was added to the stream, false otherwise
+     */
+    @Override
+    public boolean isEndOfStreamMarked() throws IOException {
+        checkClosedOrInError("isEndOfStreamMarked");
+        long lastTxId = FutureUtils.result(getLastLogRecordAsyncInternal(false, true)).getTransactionId();
+        return lastTxId == DistributedLogConstants.MAX_TXID;
+    }
+
+    /**
+     * Begin appending to the end of the log stream which is being treated as a sequence of bytes
+     *
+     * @return the writer interface to generate log records
+     */
+    public AppendOnlyStreamWriter getAppendOnlyStreamWriter() throws IOException {
+        long position;
+        try {
+            position = FutureUtils.result(getLastLogRecordAsyncInternal(true, false)).getTransactionId();
+            if (DistributedLogConstants.INVALID_TXID == position ||
+                DistributedLogConstants.EMPTY_LOGSEGMENT_TX_ID == position) {
+                position = 0;
+            }
+        } catch (LogEmptyException ex) {
+            position = 0;
+        } catch (LogNotFoundException ex) {
+            position = 0;
+        }
+        return new AppendOnlyStreamWriter(startAsyncLogSegmentNonPartitioned(), position);
+    }
+
+    /**
+     * Get a reader to read a log stream as a sequence of bytes
+     *
+     * @return the writer interface to generate log records
+     */
+    public AppendOnlyStreamReader getAppendOnlyStreamReader() throws IOException {
+        return new AppendOnlyStreamReader(this);
+    }
+
+    /**
+     * Begin writing to the log stream identified by the name
+     *
+     * @return the writer interface to generate log records
+     */
+    @Override
+    public BKSyncLogWriter startLogSegmentNonPartitioned() throws IOException {
+        checkClosedOrInError("startLogSegmentNonPartitioned");
+        BKSyncLogWriter writer = new BKSyncLogWriter(conf, dynConf, this);
+        boolean success = false;
+        try {
+            writer.createAndCacheWriteHandler();
+            BKLogWriteHandler writeHandler = writer.getWriteHandler();
+            FutureUtils.result(writeHandler.lockHandler());
+            success = true;
+            return writer;
+        } finally {
+            if (!success) {
+                writer.abort();
+            }
+        }
+    }
+
+    /**
+     * Begin writing to the log stream identified by the name
+     *
+     * @return the writer interface to generate log records
+     */
+    @Override
+    public BKAsyncLogWriter startAsyncLogSegmentNonPartitioned() throws IOException {
+        return (BKAsyncLogWriter) FutureUtils.result(openAsyncLogWriter());
+    }
+
+    @Override
+    public Future<AsyncLogWriter> openAsyncLogWriter() {
+        try {
+            checkClosedOrInError("startLogSegmentNonPartitioned");
+        } catch (AlreadyClosedException e) {
+            return Future.exception(e);
+        }
+
+        Future<BKLogWriteHandler> createWriteHandleFuture;
+        synchronized (this) {
+            // 1. create the locked write handler
+            createWriteHandleFuture = asyncCreateWriteHandler(true);
+        }
+        return createWriteHandleFuture.flatMap(new AbstractFunction1<BKLogWriteHandler, Future<AsyncLogWriter>>() {
+            @Override
+            public Future<AsyncLogWriter> apply(final BKLogWriteHandler writeHandler) {
+                final BKAsyncLogWriter writer;
+                synchronized (BKDistributedLogManager.this) {
+                    // 2. create the writer with the handler
+                    writer = new BKAsyncLogWriter(
+                            conf,
+                            dynConf,
+                            BKDistributedLogManager.this,
+                            writeHandler,
+                            featureProvider,
+                            statsLogger);
+                }
+                // 3. recover the incomplete log segments
+                return writeHandler.recoverIncompleteLogSegments()
+                        .map(new AbstractFunction1<Long, AsyncLogWriter>() {
+                            @Override
+                            public AsyncLogWriter apply(Long lastTxId) {
+                                // 4. update last tx id if successfully recovered
+                                writer.setLastTxId(lastTxId);
+                                return writer;
+                            }
+                        }).onFailure(new AbstractFunction1<Throwable, BoxedUnit>() {
+                            @Override
+                            public BoxedUnit apply(Throwable cause) {
+                                // 5. close the writer if recovery failed
+                                writer.asyncAbort();
+                                return BoxedUnit.UNIT;
+                            }
+                        });
+            }
+        });
+    }
+
+    @Override
+    public Future<DLSN> getDLSNNotLessThanTxId(final long fromTxnId) {
+        return getLogSegmentsAsync().flatMap(new AbstractFunction1<List<LogSegmentMetadata>, Future<DLSN>>() {
+            @Override
+            public Future<DLSN> apply(List<LogSegmentMetadata> segments) {
+                return getDLSNNotLessThanTxId(fromTxnId, segments);
+            }
+        });
+    }
+
+    private Future<DLSN> getDLSNNotLessThanTxId(long fromTxnId,
+                                                final List<LogSegmentMetadata> segments) {
+        if (segments.isEmpty()) {
+            return getLastDLSNAsync();
+        }
+        final int segmentIdx = DLUtils.findLogSegmentNotLessThanTxnId(segments, fromTxnId);
+        if (segmentIdx < 0) {
+            return Future.value(new DLSN(segments.get(0).getLogSegmentSequenceNumber(), 0L, 0L));
+        }
+        return getDLSNNotLessThanTxIdInSegment(
+                fromTxnId,
+                segmentIdx,
+                segments,
+                driver.getLogSegmentEntryStore(READER)
+        );
+    }
+
+    private Future<DLSN> getDLSNNotLessThanTxIdInSegment(final long fromTxnId,
+                                                         final int segmentIdx,
+                                                         final List<LogSegmentMetadata> segments,
+                                                         final LogSegmentEntryStore entryStore) {
+        final LogSegmentMetadata segment = segments.get(segmentIdx);
+        return ReadUtils.getLogRecordNotLessThanTxId(
+                name,
+                segment,
+                fromTxnId,
+                scheduler,
+                entryStore,
+                Math.max(2, dynConf.getReadAheadBatchSize())
+        ).flatMap(new AbstractFunction1<Optional<LogRecordWithDLSN>, Future<DLSN>>() {
+            @Override
+            public Future<DLSN> apply(Optional<LogRecordWithDLSN> foundRecord) {
+                if (foundRecord.isPresent()) {
+                    return Future.value(foundRecord.get().getDlsn());
+                }
+                if ((segments.size() - 1) == segmentIdx) {
+                    return getLastLogRecordAsync().map(new AbstractFunction1<LogRecordWithDLSN, DLSN>() {
+                        @Override
+                        public DLSN apply(LogRecordWithDLSN record) {
+                            if (record.getTransactionId() >= fromTxnId) {
+                                return record.getDlsn();
+                            }
+                            return record.getDlsn().getNextDLSN();
+                        }
+                    });
+                } else {
+                    return getDLSNNotLessThanTxIdInSegment(
+                            fromTxnId,
+                            segmentIdx + 1,
+                            segments,
+                            entryStore);
+                }
+            }
+        });
+    }
+
+    /**
+     * Get the input stream starting with fromTxnId for the specified log
+     *
+     * @param fromTxnId - the first transaction id we want to read
+     * @return the stream starting with transaction fromTxnId
+     * @throws IOException if a stream cannot be found.
+     */
+    @Override
+    public LogReader getInputStream(long fromTxnId)
+        throws IOException {
+        return getInputStreamInternal(fromTxnId);
+    }
+
+    @Override
+    public LogReader getInputStream(DLSN fromDLSN) throws IOException {
+        return getInputStreamInternal(fromDLSN, Optional.<Long>absent());
+    }
+
+    @Override
+    public AsyncLogReader getAsyncLogReader(long fromTxnId) throws IOException {
+        return FutureUtils.result(openAsyncLogReader(fromTxnId));
+    }
+
+    /**
+     * Opening a log reader positioning by transaction id <code>fromTxnId</code>.
+     *
+     * <p>
+     * - retrieve log segments for the stream
+     * - if the log segment list is empty, positioning by the last dlsn
+     * - otherwise, find the first log segment that contains the records whose transaction ids are not less than
+     *   the provided transaction id <code>fromTxnId</code>
+     *   - if all log segments' records' transaction ids are more than <code>fromTxnId</code>, positioning
+     *     on the first record.
+     *   - otherwise, search the log segment to find the log record
+     *     - if the log record is found, positioning the reader by that found record's dlsn
+     *     - otherwise, positioning by the last dlsn
+     * </p>
+     *
+     * @see DLUtils#findLogSegmentNotLessThanTxnId(List, long)
+     * @see ReadUtils#getLogRecordNotLessThanTxId(String, LogSegmentMetadata, long, ExecutorService, LogSegmentEntryStore, int)
+     * @param fromTxnId
+     *          transaction id to start reading from
+     * @return future representing the open result.
+     */
+    @Override
+    public Future<AsyncLogReader> openAsyncLogReader(long fromTxnId) {
+        final Promise<DLSN> dlsnPromise = new Promise<DLSN>();
+        getDLSNNotLessThanTxId(fromTxnId).addEventListener(new FutureEventListener<DLSN>() {
+
+            @Override
+            public void onSuccess(DLSN dlsn) {
+                dlsnPromise.setValue(dlsn);
+            }
+
+            @Override
+            public void onFailure(Throwable cause) {
+                if (cause instanceof LogEmptyException) {
+                    dlsnPromise.setValue(DLSN.InitialDLSN);
+                } else {
+                    dlsnPromise.setException(cause);
+                }
+            }
+        });
+        return dlsnPromise.flatMap(new AbstractFunction1<DLSN, Future<AsyncLogReader>>() {
+            @Override
+            public Future<AsyncLogReader> apply(DLSN dlsn) {
+                return openAsyncLogReader(dlsn);
+            }
+        });
+    }
+
+    @Override
+    public AsyncLogReader getAsyncLogReader(DLSN fromDLSN) throws IOException {
+        return FutureUtils.result(openAsyncLogReader(fromDLSN));
+    }
+
+    @Override
+    public Future<AsyncLogReader> openAsyncLogReader(DLSN fromDLSN) {
+        Optional<String> subscriberId = Optional.absent();
+        AsyncLogReader reader = new BKAsyncLogReader(
+                this,
+                scheduler,
+                fromDLSN,
+                subscriberId,
+                false,
+                statsLogger);
+        pendingReaders.add(reader);
+        return Future.value(reader);
+    }
+
+    /**
+     * Note the lock here is a sort of elective exclusive lock. I.e. acquiring this lock will only prevent other
+     * people who try to acquire the lock from reading from the stream. Normal readers (and writers) will not be
+     * blocked.
+     */
+    @Override
+    public Future<AsyncLogReader> getAsyncLogReaderWithLock(final DLSN fromDLSN) {
+        Optional<String> subscriberId = Optional.absent();
+        return getAsyncLogReaderWithLock(Optional.of(fromDLSN), subscriberId);
+    }
+
+    @Override
+    public Future<AsyncLogReader> getAsyncLogReaderWithLock(final DLSN fromDLSN, final String subscriberId) {
+        return getAsyncLogReaderWithLock(Optional.of(fromDLSN), Optional.of(subscriberId));
+    }
+
+    @Override
+    public Future<AsyncLogReader> getAsyncLogReaderWithLock(String subscriberId) {
+        Optional<DLSN> fromDLSN = Optional.absent();
+        return getAsyncLogReaderWithLock(fromDLSN, Optional.of(subscriberId));
+    }
+
+    protected Future<AsyncLogReader> getAsyncLogReaderWithLock(final Optional<DLSN> fromDLSN,
+                                                               final Optional<String> subscriberId) {
+        if (!fromDLSN.isPresent() && !subscriberId.isPresent()) {
+            return Future.exception(new UnexpectedException("Neither from dlsn nor subscriber id is provided."));
+        }
+        final BKAsyncLogReader reader = new BKAsyncLogReader(
+                BKDistributedLogManager.this,
+                scheduler,
+                fromDLSN.isPresent() ? fromDLSN.get() : DLSN.InitialDLSN,
+                subscriberId,
+                false,
+                statsLogger);
+        pendingReaders.add(reader);
+        final Future<Void> lockFuture = reader.lockStream();
+        final Promise<AsyncLogReader> createPromise = new Promise<AsyncLogReader>(
+                new Function<Throwable, BoxedUnit>() {
+            @Override
+            public BoxedUnit apply(Throwable cause) {
+                // cancel the lock when the creation future is cancelled
+                lockFuture.cancel();
+                return BoxedUnit.UNIT;
+            }
+        });
+        // lock the stream - fetch the last commit position on success
+        lockFuture.flatMap(new Function<Void, Future<AsyncLogReader>>() {
+            @Override
+            public Future<AsyncLogReader> apply(Void complete) {
+                if (fromDLSN.isPresent()) {
+                    return Future.value((AsyncLogReader) reader);
+                }
+                LOG.info("Reader {} @ {} reading last commit position from subscription store after acquired lock.",
+                        subscriberId.get(), name);
+                // we acquired lock
+                final SubscriptionsStore subscriptionsStore = driver.getSubscriptionsStore(getStreamName());
+                return subscriptionsStore.getLastCommitPosition(subscriberId.get())
+                        .map(new ExceptionalFunction<DLSN, AsyncLogReader>() {
+                    @Override
+                    public AsyncLogReader applyE(DLSN lastCommitPosition) throws UnexpectedException {
+                        LOG.info("Reader {} @ {} positioned to last commit position {}.",
+                                new Object[] { subscriberId.get(), name, lastCommitPosition });
+                        reader.setStartDLSN(lastCommitPosition);
+                        return reader;
+                    }
+                });
+            }
+        }).addEventListener(new FutureEventListener<AsyncLogReader>() {
+            @Override
+            public void onSuccess(AsyncLogReader r) {
+                pendingReaders.remove(reader);
+                FutureUtils.setValue(createPromise, r);
+            }
+
+            @Override
+            public void onFailure(final Throwable cause) {
+                pendingReaders.remove(reader);
+                reader.asyncClose().ensure(new AbstractFunction0<BoxedUnit>() {
+                    @Override
+                    public BoxedUnit apply() {
+                        FutureUtils.setException(createPromise, cause);
+                        return BoxedUnit.UNIT;
+                    }
+                });
+            }
+        });
+        return createPromise;
+    }
+
+    /**
+     * Get the input stream starting with fromTxnId for the specified log
+     *
+     * @param fromTxnId
+     *          transaction id to start reading from
+     * @return log reader
+     * @throws IOException
+     */
+    LogReader getInputStreamInternal(long fromTxnId)
+        throws IOException {
+        DLSN fromDLSN;
+        try {
+            fromDLSN = FutureUtils.result(getDLSNNotLessThanTxId(fromTxnId));
+        } catch (LogEmptyException lee) {
+            fromDLSN = DLSN.InitialDLSN;
+        }
+        return getInputStreamInternal(fromDLSN, Optional.of(fromTxnId));
+    }
+
+    LogReader getInputStreamInternal(DLSN fromDLSN, Optional<Long> fromTxnId)
+            throws IOException {
+        LOG.info("Create sync reader starting from {}", fromDLSN);
+        checkClosedOrInError("getInputStream");
+        return new BKSyncLogReader(
+                conf,
+                this,
+                fromDLSN,
+                fromTxnId,
+                statsLogger);
+    }
+
+    /**
+     * Get the last log record in the stream
+     *
+     * @return the last log record in the stream
+     * @throws java.io.IOException if a stream cannot be found.
+     */
+    @Override
+    public LogRecordWithDLSN getLastLogRecord() throws IOException {
+        checkClosedOrInError("getLastLogRecord");
+        return FutureUtils.result(getLastLogRecordAsync());
+    }
+
+    @Override
+    public long getFirstTxId() throws IOException {
+        checkClosedOrInError("getFirstTxId");
+        return FutureUtils.result(getFirstRecordAsyncInternal()).getTransactionId();
+    }
+
+    @Override
+    public long getLastTxId() throws IOException {
+        checkClosedOrInError("getLastTxId");
+        return FutureUtils.result(getLastTxIdAsync());
+    }
+
+    @Override
+    public DLSN getLastDLSN() throws IOException {
+        checkClosedOrInError("getLastDLSN");
+        return FutureUtils.result(getLastLogRecordAsyncInternal(false, false)).getDlsn();
+    }
+
+    /**
+     * Get Latest log record in the log
+     *
+     * @return latest log record
+     */
+    @Override
+    public Future<LogRecordWithDLSN> getLastLogRecordAsync() {
+        return getLastLogRecordAsyncInternal(false, false);
+    }
+
+    private Future<LogRecordWithDLSN> getLastLogRecordAsyncInternal(final boolean recover,
+                                                                    final boolean includeEndOfStream) {
+        return processReaderOperation(new Function<BKLogReadHandler, Future<LogRecordWithDLSN>>() {
+            @Override
+            public Future<LogRecordWithDLSN> apply(final BKLogReadHandler ledgerHandler) {
+                return ledgerHandler.getLastLogRecordAsync(recover, includeEndOfStream);
+            }
+        });
+    }
+
+    /**
+     * Get Latest Transaction Id in the log
+     *
+     * @return latest transaction id
+     */
+    @Override
+    public Future<Long> getLastTxIdAsync() {
+        return getLastLogRecordAsyncInternal(false, false)
+                .map(RECORD_2_TXID_FUNCTION);
+    }
+
+    /**
+     * Get first DLSN in the log.
+     *
+     * @return first dlsn in the stream
+     */
+    @Override
+    public Future<DLSN> getFirstDLSNAsync() {
+        return getFirstRecordAsyncInternal().map(RECORD_2_DLSN_FUNCTION);
+    }
+
+    private Future<LogRecordWithDLSN> getFirstRecordAsyncInternal() {
+        return processReaderOperation(new Function<BKLogReadHandler, Future<LogRecordWithDLSN>>() {
+            @Override
+            public Future<LogRecordWithDLSN> apply(final BKLogReadHandler ledgerHandler) {
+                return ledgerHandler.asyncGetFirstLogRecord();
+            }
+        });
+    }
+
+    /**
+     * Get Latest DLSN in the log.
+     *
+     * @return latest transaction id
+     */
+    @Override
+    public Future<DLSN> getLastDLSNAsync() {
+        return getLastLogRecordAsyncInternal(false, false)
+                .map(RECORD_2_DLSN_FUNCTION);
+    }
+
+    /**
+     * Get the number of log records in the active portion of the log
+     * Any log segments that have already been truncated will not be included
+     *
+     * @return number of log records
+     * @throws IOException
+     */
+    @Override
+    public long getLogRecordCount() throws IOException {
+        checkClosedOrInError("getLogRecordCount");
+        return FutureUtils.result(getLogRecordCountAsync(DLSN.InitialDLSN));
+    }
+
+    /**
+     * Get the number of log records in the active portion of the log asynchronously.
+     * Any log segments that have already been truncated will not be included
+     *
+     * @return future number of log records
+     * @throws IOException
+     */
+    @Override
+    public Future<Long> getLogRecordCountAsync(final DLSN beginDLSN) {
+        return processReaderOperation(new Function<BKLogReadHandler, Future<Long>>() {
+                    @Override
+                    public Future<Long> apply(BKLogReadHandler ledgerHandler) {
+                        return ledgerHandler.asyncGetLogRecordCount(beginDLSN);
+                    }
+                });
+    }
+
+    @Override
+    public void recover() throws IOException {
+        recoverInternal(conf.getUnpartitionedStreamName());
+    }
+
+    /**
+     * Recover a specified stream within the log container
+     * The writer implicitly recovers a topic when it resumes writing.
+     * This allows applications to recover a container explicitly so
+     * that application may read a fully recovered log before resuming
+     * the writes
+     *
+     * @throws IOException if the recovery fails
+     */
+    private void recoverInternal(String streamIdentifier) throws IOException {
+        checkClosedOrInError("recoverInternal");
+        BKLogWriteHandler ledgerHandler = createWriteHandler(true);
+        try {
+            FutureUtils.result(ledgerHandler.recoverIncompleteLogSegments());
+        } finally {
+            Utils.closeQuietly(ledgerHandler);
+        }
+    }
+
+    /**
+     * Delete all the partitions of the specified log
+     *
+     * @throws IOException if the deletion fails
+     */
+    @Override
+    public void delete() throws IOException {
+        FutureUtils.result(driver.getLogStreamMetadataStore(WRITER)
+                .deleteLog(uri, getStreamName()));
+    }
+
+    /**
+     * The DistributedLogManager may archive/purge any logs for transactionId
+     * less than or equal to minImageTxId.
+     * This is to be used only when the client explicitly manages deletion. If
+     * the cleanup policy is based on sliding time window, then this method need
+     * not be called.
+     *
+     * @param minTxIdToKeep the earliest txid that must be retained
+     * @throws IOException if purging fails
+     */
+    @Override
+    public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException {
+        Preconditions.checkArgument(minTxIdToKeep > 0, "Invalid transaction id " + minTxIdToKeep);
+        checkClosedOrInError("purgeLogSegmentsOlderThan");
+        BKLogWriteHandler ledgerHandler = createWriteHandler(true);
+        try {
+            LOG.info("Purging logs for {} older than {}", ledgerHandler.getFullyQualifiedName(), minTxIdToKeep);
+            FutureUtils.result(ledgerHandler.purgeLogSegmentsOlderThanTxnId(minTxIdToKeep));
+        } finally {
+            Utils.closeQuietly(ledgerHandler);
+        }
+    }
+
+    static class PendingReaders implements AsyncCloseable {
+
+        final ExecutorService executorService;
+        final Set<AsyncCloseable> readers = new HashSet<AsyncCloseable>();
+
+        PendingReaders(ExecutorService executorService) {
+            this.executorService = executorService;
+        }
+
+        public synchronized void remove(AsyncCloseable reader) {
+            readers.remove(reader);
+        }
+
+        public synchronized void add(AsyncCloseable reader) {
+            readers.add(reader);
+        }
+
+        @Override
+        public Future<Void> asyncClose() {
+            return Utils.closeSequence(executorService, true, readers.toArray(new AsyncLogReader[readers.size()]))
+                    .onSuccess(new AbstractFunction1<Void, BoxedUnit>() {
+                        @Override
+                        public BoxedUnit apply(Void value) {
+                            readers.clear();
+                            return BoxedUnit.UNIT;
+                        }
+                    });
+        }
+    };
+
+    /**
+     * Close the distributed log manager, freeing any resources it may hold.
+     */
+    @Override
+    public Future<Void> asyncClose() {
+        Promise<Void> closeFuture;
+        BKLogReadHandler readHandlerToClose;
+        synchronized (this) {
+            if (null != closePromise) {
+                return closePromise;
+            }
+            closeFuture = closePromise = new Promise<Void>();
+            readHandlerToClose = readHandlerForListener;
+        }
+
+        Future<Void> closeResult = Utils.closeSequence(null, true,
+                readHandlerToClose,
+                pendingReaders,
+                resourcesCloseable.or(AsyncCloseable.NULL));
+        closeResult.proxyTo(closeFuture);
+        return closeFuture;
+    }
+
+    @Override
+    public void close() throws IOException {
+        FutureUtils.result(asyncClose());
+    }
+
+    @Override
+    public String toString() {
+        return String.format("DLM:%s:%s", getUri(), getStreamName());
+    }
+
+    public void raiseAlert(String msg, Object... args) {
+        alertStatsLogger.raise(msg, args);
+    }
+
+    @Override
+    public SubscriptionsStore getSubscriptionsStore() {
+        return driver.getSubscriptionsStore(getStreamName());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
new file mode 100644
index 0000000..0a4608e
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Ticker;
+import org.apache.distributedlog.acl.AccessControlManager;
+import org.apache.distributedlog.callback.NamespaceListener;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.exceptions.AlreadyClosedException;
+import org.apache.distributedlog.exceptions.InvalidStreamNameException;
+import org.apache.distributedlog.exceptions.LogNotFoundException;
+import org.apache.distributedlog.injector.AsyncFailureInjector;
+import org.apache.distributedlog.io.AsyncCloseable;
+import org.apache.distributedlog.logsegment.LogSegmentMetadataCache;
+import org.apache.distributedlog.namespace.DistributedLogNamespace;
+import org.apache.distributedlog.namespace.NamespaceDriver;
+import org.apache.distributedlog.util.ConfUtils;
+import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor;
+import org.apache.distributedlog.util.OrderedScheduler;
+import org.apache.distributedlog.util.PermitLimiter;
+import org.apache.distributedlog.util.SchedulerUtils;
+import org.apache.distributedlog.util.Utils;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.distributedlog.namespace.NamespaceDriver.Role.WRITER;
+import static org.apache.distributedlog.util.DLUtils.validateName;
+
+/**
+ * BKDistributedLogNamespace is the default implementation of {@link DistributedLogNamespace}. It uses
+ * zookeeper for metadata storage and bookkeeper for data storage.
+ * <h3>Metrics</h3>
+ *
+ * <h4>ZooKeeper Client</h4>
+ * See {@link ZooKeeperClient} for detail sub-stats.
+ * <ul>
+ * <li> `scope`/dlzk_factory_writer_shared/* : stats about the zookeeper client shared by all DL writers.
+ * <li> `scope`/dlzk_factory_reader_shared/* : stats about the zookeeper client shared by all DL readers.
+ * <li> `scope`/bkzk_factory_writer_shared/* : stats about the zookeeper client used by bookkeeper client
+ * shared by all DL writers.
+ * <li> `scope`/bkzk_factory_reader_shared/* : stats about the zookeeper client used by bookkeeper client
+ * shared by all DL readers.
+ * </ul>
+ *
+ * <h4>BookKeeper Client</h4>
+ * BookKeeper client stats are exposed directly to current scope. See {@link BookKeeperClient} for detail stats.
+ *
+ * <h4>Utils</h4>
+ * <ul>
+ * <li> `scope`/factory/thread_pool/* : stats about the ordered scheduler used by this namespace.
+ * See {@link OrderedScheduler}.
+ * <li> `scope`/factory/readahead_thread_pool/* : stats about the readahead thread pool executor
+ * used by this namespace. See {@link MonitoredScheduledThreadPoolExecutor}.
+ * <li> `scope`/writeLimiter/* : stats about the global write limiter used by this namespace.
+ * See {@link PermitLimiter}.
+ * </ul>
+ *
+ * <h4>DistributedLogManager</h4>
+ *
+ * All the core stats about reader and writer are exposed under current scope via {@link BKDistributedLogManager}.
+ */
+public class BKDistributedLogNamespace implements DistributedLogNamespace {
+    static final Logger LOG = LoggerFactory.getLogger(BKDistributedLogNamespace.class);
+
+    private final String clientId;
+    private final int regionId;
+    private final DistributedLogConfiguration conf;
+    private final URI namespace;
+    // namespace driver
+    private final NamespaceDriver driver;
+    // resources
+    private final OrderedScheduler scheduler;
+    private final PermitLimiter writeLimiter;
+    private final AsyncFailureInjector failureInjector;
+    // log segment metadata store
+    private final LogSegmentMetadataCache logSegmentMetadataCache;
+    // feature provider
+    private final FeatureProvider featureProvider;
+    // Stats Loggers
+    private final StatsLogger statsLogger;
+    private final StatsLogger perLogStatsLogger;
+
+    protected final AtomicBoolean closed = new AtomicBoolean(false);
+
+    public BKDistributedLogNamespace(
+            DistributedLogConfiguration conf,
+            URI uri,
+            NamespaceDriver driver,
+            OrderedScheduler scheduler,
+            FeatureProvider featureProvider,
+            PermitLimiter writeLimiter,
+            AsyncFailureInjector failureInjector,
+            StatsLogger statsLogger,
+            StatsLogger perLogStatsLogger,
+            String clientId,
+            int regionId) {
+        this.conf = conf;
+        this.namespace = uri;
+        this.driver = driver;
+        this.scheduler = scheduler;
+        this.featureProvider = featureProvider;
+        this.writeLimiter = writeLimiter;
+        this.failureInjector = failureInjector;
+        this.statsLogger = statsLogger;
+        this.perLogStatsLogger = perLogStatsLogger;
+        this.clientId = clientId;
+        this.regionId = regionId;
+
+        // create a log segment metadata cache
+        this.logSegmentMetadataCache = new LogSegmentMetadataCache(conf, Ticker.systemTicker());
+    }
+
+    @Override
+    public NamespaceDriver getNamespaceDriver() {
+        return driver;
+    }
+
+    //
+    // Namespace Methods
+    //
+
+    @Override
+    public void createLog(String logName)
+            throws InvalidStreamNameException, IOException {
+        checkState();
+        validateName(logName);
+        URI uri = FutureUtils.result(driver.getLogMetadataStore().createLog(logName));
+        FutureUtils.result(driver.getLogStreamMetadataStore(WRITER).getLog(uri, logName, true, true));
+    }
+
+    @Override
+    public void deleteLog(String logName)
+            throws InvalidStreamNameException, LogNotFoundException, IOException {
+        checkState();
+        validateName(logName);
+        Optional<URI> uri = FutureUtils.result(driver.getLogMetadataStore().getLogLocation(logName));
+        if (!uri.isPresent()) {
+            throw new LogNotFoundException("Log " + logName + " isn't found.");
+        }
+        DistributedLogManager dlm = openLogInternal(
+                uri.get(),
+                logName,
+                Optional.<DistributedLogConfiguration>absent(),
+                Optional.<DynamicDistributedLogConfiguration>absent());
+        dlm.delete();
+    }
+
+    @Override
+    public DistributedLogManager openLog(String logName)
+            throws InvalidStreamNameException, IOException {
+        return openLog(logName,
+                Optional.<DistributedLogConfiguration>absent(),
+                Optional.<DynamicDistributedLogConfiguration>absent(),
+                Optional.<StatsLogger>absent());
+    }
+
+    @Override
+    public DistributedLogManager openLog(String logName,
+                                         Optional<DistributedLogConfiguration> logConf,
+                                         Optional<DynamicDistributedLogConfiguration> dynamicLogConf,
+                                         Optional<StatsLogger> perStreamStatsLogger)
+            throws InvalidStreamNameException, IOException {
+        checkState();
+        validateName(logName);
+        Optional<URI> uri = FutureUtils.result(driver.getLogMetadataStore().getLogLocation(logName));
+        if (!uri.isPresent()) {
+            throw new LogNotFoundException("Log " + logName + " isn't found.");
+        }
+        return openLogInternal(
+                uri.get(),
+                logName,
+                logConf,
+                dynamicLogConf);
+    }
+
+    @Override
+    public boolean logExists(String logName)
+        throws IOException, IllegalArgumentException {
+        checkState();
+        Optional<URI> uri = FutureUtils.result(driver.getLogMetadataStore().getLogLocation(logName));
+        if (uri.isPresent()) {
+            try {
+                FutureUtils.result(driver.getLogStreamMetadataStore(WRITER)
+                        .logExists(uri.get(), logName));
+                return true;
+            } catch (LogNotFoundException lnfe) {
+                return false;
+            }
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public Iterator<String> getLogs() throws IOException {
+        checkState();
+        return FutureUtils.result(driver.getLogMetadataStore().getLogs());
+    }
+
+    @Override
+    public void registerNamespaceListener(NamespaceListener listener) {
+        driver.getLogMetadataStore().registerNamespaceListener(listener);
+    }
+
+    @Override
+    public synchronized AccessControlManager createAccessControlManager() throws IOException {
+        checkState();
+        return driver.getAccessControlManager();
+    }
+
+    /**
+     * Open the log in location <i>uri</i>.
+     *
+     * @param uri
+     *          location to store the log
+     * @param nameOfLogStream
+     *          name of the log
+     * @param logConfiguration
+     *          optional stream configuration
+     * @param dynamicLogConfiguration
+     *          dynamic stream configuration overrides.
+     * @return distributedlog manager instance.
+     * @throws InvalidStreamNameException if the stream name is invalid
+     * @throws IOException
+     */
+    protected DistributedLogManager openLogInternal(
+            URI uri,
+            String nameOfLogStream,
+            Optional<DistributedLogConfiguration> logConfiguration,
+            Optional<DynamicDistributedLogConfiguration> dynamicLogConfiguration)
+        throws InvalidStreamNameException, IOException {
+        // Make sure the name is well formed
+        checkState();
+        validateName(nameOfLogStream);
+
+        DistributedLogConfiguration mergedConfiguration = new DistributedLogConfiguration();
+        mergedConfiguration.addConfiguration(conf);
+        mergedConfiguration.loadStreamConf(logConfiguration);
+        // If dynamic config was not provided, default to a static view of the global configuration.
+        DynamicDistributedLogConfiguration dynConf = null;
+        if (dynamicLogConfiguration.isPresent()) {
+            dynConf = dynamicLogConfiguration.get();
+        } else {
+            dynConf = ConfUtils.getConstDynConf(mergedConfiguration);
+        }
+
+        return new BKDistributedLogManager(
+                nameOfLogStream,                    /* Log Name */
+                mergedConfiguration,                /* Configuration */
+                dynConf,                            /* Dynamic Configuration */
+                uri,                                /* Namespace URI */
+                driver,                             /* Namespace Driver */
+                logSegmentMetadataCache,            /* Log Segment Metadata Cache */
+                scheduler,                          /* DL scheduler */
+                clientId,                           /* Client Id */
+                regionId,                           /* Region Id */
+                writeLimiter,                       /* Write Limiter */
+                featureProvider.scope("dl"),        /* Feature Provider */
+                failureInjector,                    /* Failure Injector */
+                statsLogger,                        /* Stats Logger */
+                perLogStatsLogger,                  /* Per Log Stats Logger */
+                Optional.<AsyncCloseable>absent()   /* shared resources, we don't need to close any resources in dlm */
+        );
+    }
+
+    /**
+     * Check the namespace state.
+     *
+     * @throws IOException
+     */
+    private void checkState() throws IOException {
+        if (closed.get()) {
+            LOG.error("BK namespace {} is already closed", namespace);
+            throw new AlreadyClosedException("BK namespace " + namespace + " is already closed");
+        }
+    }
+
+    /**
+     * Close the distributed log manager factory, freeing any resources it may hold.
+     */
+    @Override
+    public void close() {
+        if (!closed.compareAndSet(false, true)) {
+            return;
+        }
+        // shutdown the driver
+        Utils.close(driver);
+        // close the write limiter
+        this.writeLimiter.close();
+        // Shutdown the schedulers
+        SchedulerUtils.shutdownScheduler(scheduler, conf.getSchedulerShutdownTimeoutMs(),
+                TimeUnit.MILLISECONDS);
+        LOG.info("Executor Service Stopped.");
+    }
+}