You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/01/05 00:51:18 UTC

[13/51] [partial] incubator-distributedlog git commit: DL-4: Repackage the source under apache namespace

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKVersionedSetOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKVersionedSetOp.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKVersionedSetOp.java
deleted file mode 100644
index 5b788e2..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKVersionedSetOp.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.zk;
-
-import com.twitter.distributedlog.util.Transaction.OpListener;
-import org.apache.bookkeeper.meta.ZkVersion;
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Op;
-import org.apache.zookeeper.OpResult;
-
-import javax.annotation.Nullable;
-
-/**
- * ZooKeeper Operation that plays with {@link org.apache.bookkeeper.versioning.Version}
- */
-public class ZKVersionedSetOp extends ZKOp {
-
-    private final OpListener<Version> listener;
-
-    public ZKVersionedSetOp(Op op,
-                            @Nullable OpListener<Version> opListener) {
-        super(op);
-        this.listener = opListener;
-    }
-
-    @Override
-    protected void commitOpResult(OpResult opResult) {
-        assert(opResult instanceof OpResult.SetDataResult);
-        OpResult.SetDataResult setDataResult = (OpResult.SetDataResult) opResult;
-        if (null != listener) {
-            listener.onCommit(new ZkVersion(setDataResult.getStat().getVersion()));
-        }
-    }
-
-    @Override
-    protected void abortOpResult(Throwable t,
-                                 @Nullable OpResult opResult) {
-        Throwable cause;
-        if (null == opResult) {
-            cause = t;
-        } else {
-            assert (opResult instanceof OpResult.ErrorResult);
-            OpResult.ErrorResult errorResult = (OpResult.ErrorResult) opResult;
-            if (KeeperException.Code.OK.intValue() == errorResult.getErr()) {
-                cause = t;
-            } else {
-                cause = KeeperException.create(KeeperException.Code.get(errorResult.getErr()));
-            }
-        }
-        if (null != listener) {
-            listener.onAbort(cause);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java
deleted file mode 100644
index 8ef33ea..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.zk;
-
-import com.twitter.distributedlog.ZooKeeperClient;
-import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Watcher Manager to manage watchers.
- * <h3>Metrics</h3>
- * <ul>
- * <li> `total_watches`: total number of watches that managed by this watcher manager.
- * <li> `num_child_watches`: number of paths that are watched for children changes by this watcher manager.
- * </ul>
- */
-public class ZKWatcherManager implements Watcher {
-
-    static final Logger logger = LoggerFactory.getLogger(ZKWatcherManager.class);
-
-    public static Builder newBuilder() {
-        return new Builder();
-    }
-
-    public static class Builder {
-
-        private String _name;
-        private StatsLogger _statsLogger;
-        private ZooKeeperClient _zkc;
-
-        public Builder name(String name) {
-            this._name = name;
-            return this;
-        }
-
-        public Builder zkc(ZooKeeperClient zkc) {
-            this._zkc = zkc;
-            return this;
-        }
-
-        public Builder statsLogger(StatsLogger statsLogger) {
-            this._statsLogger = statsLogger;
-            return this;
-        }
-
-        public ZKWatcherManager build() {
-            return new ZKWatcherManager(_name, _zkc, _statsLogger);
-        }
-    }
-
-    private final String name;
-    private final ZooKeeperClient zkc;
-    private final StatsLogger statsLogger;
-    // Gauges and their labels
-    private final Gauge<Number> totalWatchesGauge;
-    private static final String totalWatchesGauageLabel = "total_watches";
-    private final Gauge<Number> numChildWatchesGauge;
-    private static final String numChildWatchesGauageLabel = "num_child_watches";
-
-    protected final ConcurrentMap<String, Set<Watcher>> childWatches;
-    protected final AtomicInteger allWatchesGauge;
-
-    private ZKWatcherManager(String name,
-                             ZooKeeperClient zkc,
-                             StatsLogger statsLogger) {
-        this.name = name;
-        this.zkc = zkc;
-        this.statsLogger = statsLogger;
-
-        // watches
-        this.childWatches = new ConcurrentHashMap<String, Set<Watcher>>();
-        this.allWatchesGauge = new AtomicInteger(0);
-
-        // stats
-        totalWatchesGauge = new Gauge<Number>() {
-            @Override
-            public Number getDefaultValue() {
-                return 0;
-            }
-
-            @Override
-            public Number getSample() {
-                return allWatchesGauge.get();
-            }
-        };
-        this.statsLogger.registerGauge(totalWatchesGauageLabel, totalWatchesGauge);
-
-        numChildWatchesGauge = new Gauge<Number>() {
-            @Override
-            public Number getDefaultValue() {
-                return 0;
-            }
-
-            @Override
-            public Number getSample() {
-                return childWatches.size();
-            }
-        };
-
-        this.statsLogger.registerGauge(numChildWatchesGauageLabel, numChildWatchesGauge);
-    }
-
-    public Watcher registerChildWatcher(String path, Watcher watcher) {
-        Set<Watcher> watchers = childWatches.get(path);
-        if (null == watchers) {
-            Set<Watcher> newWatchers = new HashSet<Watcher>();
-            Set<Watcher> oldWatchers = childWatches.putIfAbsent(path, newWatchers);
-            watchers = (null == oldWatchers) ? newWatchers : oldWatchers;
-        }
-        synchronized (watchers) {
-            if (childWatches.get(path) == watchers) {
-                if (watchers.add(watcher)) {
-                    allWatchesGauge.incrementAndGet();
-                }
-            } else {
-                logger.warn("Watcher set for path {} has been changed while registering child watcher {}.",
-                        path, watcher);
-            }
-        }
-        return this;
-    }
-
-    public void unregisterChildWatcher(String path, Watcher watcher, boolean removeFromServer) {
-        Set<Watcher> watchers = childWatches.get(path);
-        if (null == watchers) {
-            logger.warn("No watchers found on path {} while unregistering child watcher {}.",
-                    path, watcher);
-            return;
-        }
-        synchronized (watchers) {
-            if (watchers.remove(watcher)) {
-                allWatchesGauge.decrementAndGet();
-            } else {
-                logger.warn("Remove a non-registered child watcher {} from path {}", watcher, path);
-            }
-            if (watchers.isEmpty()) {
-                // best-efforts to remove watches
-                try {
-                    if (null != zkc && removeFromServer) {
-                        zkc.get().removeWatches(path, this, WatcherType.Children, true, new AsyncCallback.VoidCallback() {
-                            @Override
-                            public void processResult(int rc, String path, Object ctx) {
-                                if (KeeperException.Code.OK.intValue() == rc) {
-                                    logger.debug("Successfully removed children watches from {}", path);
-                                } else {
-                                    logger.debug("Encountered exception on removing children watches from {}",
-                                            path, KeeperException.create(KeeperException.Code.get(rc)));
-                                }
-                            }
-                        }, null);
-                    }
-                } catch (InterruptedException e) {
-                    logger.debug("Encountered exception on removing watches from {}", path, e);
-                } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-                    logger.debug("Encountered exception on removing watches from {}", path, e);
-                }
-                childWatches.remove(path, watchers);
-            }
-        }
-    }
-
-    public void unregisterGauges() {
-        this.statsLogger.unregisterGauge(totalWatchesGauageLabel, totalWatchesGauge);
-        this.statsLogger.unregisterGauge(numChildWatchesGauageLabel, numChildWatchesGauge);
-    }
-
-    @Override
-    public void process(WatchedEvent event) {
-        switch (event.getType()) {
-            case None:
-                handleKeeperStateEvent(event);
-                break;
-            case NodeChildrenChanged:
-                handleChildWatchEvent(event);
-                break;
-            default:
-                break;
-        }
-    }
-
-    private void handleKeeperStateEvent(WatchedEvent event) {
-        Set<Watcher> savedAllWatches = new HashSet<Watcher>(allWatchesGauge.get());
-        for (Set<Watcher> watcherSet : childWatches.values()) {
-            synchronized (watcherSet) {
-                savedAllWatches.addAll(watcherSet);
-            }
-        }
-        for (Watcher watcher : savedAllWatches) {
-            watcher.process(event);
-        }
-    }
-
-    private void handleChildWatchEvent(WatchedEvent event) {
-        String path = event.getPath();
-        if (null == path) {
-            logger.warn("Received zookeeper watch event with null path : {}", event);
-            return;
-        }
-        Set<Watcher> watchers = childWatches.get(path);
-        if (null == watchers) {
-            return;
-        }
-        Set<Watcher> watchersToFire;
-        synchronized (watchers) {
-            watchersToFire = new HashSet<Watcher>(watchers.size());
-            watchersToFire.addAll(watchers);
-        }
-        for (Watcher watcher : watchersToFire) {
-            watcher.process(event);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamReader.java
new file mode 100644
index 0000000..1d96f0e
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamReader.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import com.google.common.base.Preconditions;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AppendOnlyStreamReader extends InputStream {
+    static final Logger LOG = LoggerFactory.getLogger(AppendOnlyStreamReader.class);
+
+    private LogRecordWithInputStream currentLogRecord = null;
+    private final DistributedLogManager dlm;
+    private LogReader reader;
+    private long currentPosition;
+    private static final int SKIP_BUFFER_SIZE = 512;
+
+    // Cache the input stream for a log record.
+    private static class LogRecordWithInputStream {
+        private final InputStream payloadStream;
+        private final LogRecordWithDLSN logRecord;
+
+        LogRecordWithInputStream(LogRecordWithDLSN logRecord) {
+            Preconditions.checkNotNull(logRecord);
+
+            LOG.debug("Got record dlsn = {}, txid = {}, len = {}",
+                new Object[] {logRecord.getDlsn(), logRecord.getTransactionId(), logRecord.getPayload().length});
+
+            this.logRecord = logRecord;
+            this.payloadStream = logRecord.getPayLoadInputStream();
+        }
+
+        InputStream getPayLoadInputStream() {
+            return payloadStream;
+        }
+
+        LogRecordWithDLSN getLogRecord() {
+            return logRecord;
+        }
+
+        // The last txid of the log record is the position of the next byte in the stream.
+        // Subtract length to get starting offset.
+        long getOffset() {
+            return logRecord.getTransactionId() - logRecord.getPayload().length;
+        }
+    }
+
+    /**
+     * Construct ledger input stream
+     *
+     * @param dlm the Distributed Log Manager to access the stream
+     */
+    AppendOnlyStreamReader(DistributedLogManager dlm)
+        throws IOException {
+        this.dlm = dlm;
+        reader = dlm.getInputStream(0);
+        currentPosition = 0;
+    }
+
+    /**
+     * Get input stream representing next entry in the
+     * ledger.
+     *
+     * @return input stream, or null if no more entries
+     */
+    private LogRecordWithInputStream nextLogRecord() throws IOException {
+        return nextLogRecord(reader);
+    }
+
+    private static LogRecordWithInputStream nextLogRecord(LogReader reader) throws IOException {
+        LogRecordWithDLSN record = reader.readNext(false);
+
+        if (null != record) {
+            return new LogRecordWithInputStream(record);
+        } else {
+            record = reader.readNext(false);
+            if (null != record) {
+                return new LogRecordWithInputStream(record);
+            } else {
+                LOG.debug("No record");
+                return null;
+            }
+        }
+    }
+
+    @Override
+    public int read() throws IOException {
+        byte[] b = new byte[1];
+        if (read(b, 0, 1) != 1) {
+            return -1;
+        } else {
+            return b[0];
+        }
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        int read = 0;
+        if (currentLogRecord == null) {
+            currentLogRecord = nextLogRecord();
+            if (currentLogRecord == null) {
+                return read;
+            }
+        }
+
+        while (read < len) {
+            int thisread = currentLogRecord.getPayLoadInputStream().read(b, off + read, (len - read));
+            if (thisread == -1) {
+                currentLogRecord = nextLogRecord();
+                if (currentLogRecord == null) {
+                    return read;
+                }
+            } else {
+                LOG.debug("Offset saved = {}, persisted = {}",
+                    currentPosition, currentLogRecord.getLogRecord().getTransactionId());
+                currentPosition += thisread;
+                read += thisread;
+            }
+        }
+        return read;
+    }
+
+    /**
+     * Position the reader at the given offset. If we fail to skip to the desired position
+     * and don't hit end of stream, return false.
+     *
+     * @throws org.apache.distributedlog.exceptions.EndOfStreamException if we attempt to
+     *         skip past the end of the stream.
+     */
+    public boolean skipTo(long position) throws IOException {
+
+        // No need to skip anywhere.
+        if (position == position()) {
+            return true;
+        }
+
+        LogReader skipReader = dlm.getInputStream(position);
+        LogRecordWithInputStream logRecord = null;
+        try {
+            logRecord = nextLogRecord(skipReader);
+        } catch (IOException ex) {
+            skipReader.close();
+            throw ex;
+        }
+
+        if (null == logRecord) {
+            return false;
+        }
+
+        // We may end up with a reader positioned *before* the requested position if
+        // we're near the tail and the writer is still active, or if the desired position
+        // is not at a log record payload boundary.
+        // Transaction ID gives us the starting position of the log record. Read ahead
+        // if necessary.
+        currentPosition = logRecord.getOffset();
+        currentLogRecord = logRecord;
+        LogReader oldReader = reader;
+        reader = skipReader;
+
+        // Close the oldreader after swapping AppendOnlyStreamReader state. Close may fail
+        // and we need to make sure it leaves AppendOnlyStreamReader in a consistent state.
+        oldReader.close();
+
+        byte[] skipBuffer = new byte[SKIP_BUFFER_SIZE];
+        while (currentPosition < position) {
+            long bytesToRead = Math.min(position - currentPosition, SKIP_BUFFER_SIZE);
+            long bytesRead = read(skipBuffer, 0, (int)bytesToRead);
+            if (bytesRead < bytesToRead) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    public long position() {
+        return currentPosition;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamWriter.java
new file mode 100644
index 0000000..8278c68
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamWriter.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import org.apache.distributedlog.exceptions.UnexpectedException;
+import org.apache.distributedlog.util.FutureUtils;
+import com.twitter.util.Await;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AppendOnlyStreamWriter implements Closeable {
+    static final Logger LOG = LoggerFactory.getLogger(AppendOnlyStreamWriter.class);
+
+    // Use a 1-length array to satisfy Java's inner class reference rules. Use primitive
+    // type because synchronized block is needed anyway.
+    final long[] syncPos = new long[1];
+    BKAsyncLogWriter logWriter;
+    long requestPos = 0;
+
+    public AppendOnlyStreamWriter(BKAsyncLogWriter logWriter, long pos) {
+        LOG.debug("initialize at position {}", pos);
+        this.logWriter = logWriter;
+        this.syncPos[0] = pos;
+        this.requestPos = pos;
+    }
+
+    public Future<DLSN> write(byte[] data) {
+        requestPos += data.length;
+        Future<DLSN> writeResult = logWriter.write(new LogRecord(requestPos, data));
+        return writeResult.addEventListener(new WriteCompleteListener(requestPos));
+    }
+
+    public void force(boolean metadata) throws IOException {
+        long pos = 0;
+        try {
+            pos = Await.result(logWriter.flushAndCommit());
+        } catch (IOException ioe) {
+            throw ioe;
+        } catch (Exception ex) {
+            LOG.error("unexpected exception in AppendOnlyStreamWriter.force ", ex);
+            throw new UnexpectedException("unexpected exception in AppendOnlyStreamWriter.force", ex);
+        }
+        synchronized (syncPos) {
+            syncPos[0] = pos;
+        }
+    }
+
+    public long position() {
+        synchronized (syncPos) {
+            return syncPos[0];
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        logWriter.closeAndComplete();
+    }
+
+    public void markEndOfStream() throws IOException {
+        try {
+            Await.result(logWriter.markEndOfStream());
+        } catch (IOException ioe) {
+            throw ioe;
+        } catch (Exception ex) {
+            throw new UnexpectedException("Mark end of stream hit unexpected exception", ex);
+        }
+    }
+
+    class WriteCompleteListener implements FutureEventListener<DLSN> {
+        private final long position;
+        public WriteCompleteListener(long position) {
+            this.position = position;
+        }
+        @Override
+        public void onSuccess(DLSN response) {
+            synchronized (syncPos) {
+                if (position > syncPos[0]) {
+                    syncPos[0] = position;
+                }
+            }
+        }
+        @Override
+        public void onFailure(Throwable cause) {
+            // Handled at the layer above
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogReader.java
new file mode 100644
index 0000000..e3ace05
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogReader.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import org.apache.distributedlog.io.AsyncCloseable;
+import com.twitter.util.Future;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public interface AsyncLogReader extends AsyncCloseable {
+
+    /**
+     * Get stream name that the reader reads from.
+     *
+     * @return stream name.
+     */
+    public String getStreamName();
+
+    /**
+     * Read the next record from the log stream
+     *
+     * @return A promise that when satisfied will contain the Log Record with its DLSN.
+     */
+    public Future<LogRecordWithDLSN> readNext();
+
+    /**
+     * Read next <i>numEntries</i> entries. The future is only satisfied with non-empty list
+     * of entries. It doesn't block until returning exact <i>numEntries</i>. It is a best effort
+     * call.
+     *
+     * @param numEntries
+     *          num entries
+     * @return A promise that when satisfied will contain a non-empty list of records with their DLSN.
+     */
+    public Future<List<LogRecordWithDLSN>> readBulk(int numEntries);
+
+    /**
+     * Read next <i>numEntries</i> entries in a given <i>waitTime</i>.
+     * <p>
+     * The future is satisfied when either reads <i>numEntries</i> entries or reaches <i>waitTime</i>.
+     * The only exception is if there isn't any new entries written within <i>waitTime</i>, it would
+     * wait until new entries are available.
+     *
+     * @param numEntries
+     *          max entries to return
+     * @param waitTime
+     *          maximum wait time if there are entries already for read
+     * @param timeUnit
+     *          wait time unit
+     * @return A promise that when satisfied will contain a non-empty list of records with their DLSN.
+     */
+    public Future<List<LogRecordWithDLSN>> readBulk(int numEntries, long waitTime, TimeUnit timeUnit);
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogWriter.java
new file mode 100644
index 0000000..53b393b
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogWriter.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import org.apache.distributedlog.io.AsyncAbortable;
+import org.apache.distributedlog.io.AsyncCloseable;
+import com.twitter.util.Future;
+
+import java.io.Closeable;
+import java.util.List;
+
+public interface AsyncLogWriter extends AsyncCloseable, AsyncAbortable {
+
+    /**
+     * Get the last committed transaction id.
+     *
+     * @return last committed transaction id.
+     */
+    public long getLastTxId();
+
+    /**
+     * Write a log record to the stream.
+     *
+     * @param record single log record
+     * @return A Future which contains a DLSN if the record was successfully written
+     * or an exception if the write fails
+     */
+    public Future<DLSN> write(LogRecord record);
+
+    /**
+     * Write log records to the stream in bulk. Each future in the list represents the result of
+     * one write operation. The size of the result list is equal to the size of the input list.
+     * Buffers are written in order, and the list of result futures has the same order.
+     *
+     * @param record set of log records
+     * @return A Future which contains a list of Future DLSNs if the record was successfully written
+     * or an exception if the operation fails.
+     */
+    public Future<List<Future<DLSN>>> writeBulk(List<LogRecord> record);
+
+    /**
+     * Truncate the log until <i>dlsn</i>.
+     *
+     * @param dlsn
+     *          dlsn to truncate until.
+     * @return A Future indicates whether the operation succeeds or not, or an exception
+     * if the truncation fails.
+     */
+    public Future<Boolean> truncate(DLSN dlsn);
+
+    /**
+     * Get the name of the stream this writer writes data to
+     */
+    public String getStreamName();
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncNotification.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncNotification.java b/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncNotification.java
new file mode 100644
index 0000000..c12bd10
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncNotification.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+public interface AsyncNotification {
+    /**
+     * Triggered when the background activity encounters an exception
+     *
+     * @param reason the exception that encountered.
+     */
+    void notifyOnError(Throwable reason);
+
+    /**
+     *  Triggered when the background activity completes an operation
+     */
+    void notifyOnOperationComplete();
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/BKAbstractLogWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKAbstractLogWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKAbstractLogWriter.java
new file mode 100644
index 0000000..4a2ef30
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKAbstractLogWriter.java
@@ -0,0 +1,555 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.exceptions.AlreadyClosedException;
+import org.apache.distributedlog.exceptions.LockingException;
+import org.apache.distributedlog.exceptions.UnexpectedException;
+import org.apache.distributedlog.exceptions.ZKException;
+import org.apache.distributedlog.io.Abortable;
+import org.apache.distributedlog.io.Abortables;
+import org.apache.distributedlog.io.AsyncAbortable;
+import org.apache.distributedlog.io.AsyncCloseable;
+import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.util.PermitManager;
+import org.apache.distributedlog.util.Utils;
+import com.twitter.util.Function;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import com.twitter.util.Promise;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.AbstractFunction0;
+import scala.runtime.AbstractFunction1;
+import scala.runtime.BoxedUnit;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortable, AsyncAbortable {
+    static final Logger LOG = LoggerFactory.getLogger(BKAbstractLogWriter.class);
+
+    protected final DistributedLogConfiguration conf;
+    private final DynamicDistributedLogConfiguration dynConf;
+    protected final BKDistributedLogManager bkDistributedLogManager;
+
+    // States
+    private Promise<Void> closePromise = null;
+    private volatile boolean forceRolling = false;
+    private boolean forceRecovery = false;
+
+    // Truncation Related
+    private Future<List<LogSegmentMetadata>> lastTruncationAttempt = null;
+    @VisibleForTesting
+    private Long minTimestampToKeepOverride = null;
+
+    // Log Segment Writers
+    protected BKLogSegmentWriter segmentWriter = null;
+    protected Future<BKLogSegmentWriter> segmentWriterFuture = null;
+    protected BKLogSegmentWriter allocatedSegmentWriter = null;
+    protected BKLogWriteHandler writeHandler = null;
+
+    BKAbstractLogWriter(DistributedLogConfiguration conf,
+                        DynamicDistributedLogConfiguration dynConf,
+                        BKDistributedLogManager bkdlm) {
+        this.conf = conf;
+        this.dynConf = dynConf;
+        this.bkDistributedLogManager = bkdlm;
+        LOG.debug("Initial retention period for {} : {}", bkdlm.getStreamName(),
+                TimeUnit.MILLISECONDS.convert(dynConf.getRetentionPeriodHours(), TimeUnit.HOURS));
+    }
+
+    // manage write handler
+
+    synchronized protected BKLogWriteHandler getCachedWriteHandler() {
+        return writeHandler;
+    }
+
+    protected BKLogWriteHandler getWriteHandler() throws IOException {
+        BKLogWriteHandler writeHandler = createAndCacheWriteHandler();
+        writeHandler.checkMetadataException();
+        return writeHandler;
+    }
+
+    protected BKLogWriteHandler createAndCacheWriteHandler()
+            throws IOException {
+        synchronized (this) {
+            if (writeHandler != null) {
+                return writeHandler;
+            }
+        }
+        // This code path will be executed when the handler is not set or has been closed
+        // due to forceRecovery during testing
+        BKLogWriteHandler newHandler =
+                FutureUtils.result(bkDistributedLogManager.asyncCreateWriteHandler(false));
+        boolean success = false;
+        try {
+            synchronized (this) {
+                if (writeHandler == null) {
+                    writeHandler = newHandler;
+                    success = true;
+                }
+                return writeHandler;
+            }
+        } finally {
+            if (!success) {
+                newHandler.asyncAbort();
+            }
+        }
+    }
+
+    // manage log segment writers
+
+    protected synchronized BKLogSegmentWriter getCachedLogWriter() {
+        return segmentWriter;
+    }
+
+    protected synchronized Future<BKLogSegmentWriter> getCachedLogWriterFuture() {
+        return segmentWriterFuture;
+    }
+
+    protected synchronized void cacheLogWriter(BKLogSegmentWriter logWriter) {
+        this.segmentWriter = logWriter;
+        this.segmentWriterFuture = Future.value(logWriter);
+    }
+
+    protected synchronized BKLogSegmentWriter removeCachedLogWriter() {
+        try {
+            return segmentWriter;
+        } finally {
+            segmentWriter = null;
+            segmentWriterFuture = null;
+        }
+    }
+
+    protected synchronized BKLogSegmentWriter getAllocatedLogWriter() {
+        return allocatedSegmentWriter;
+    }
+
+    protected synchronized void cacheAllocatedLogWriter(BKLogSegmentWriter logWriter) {
+        this.allocatedSegmentWriter = logWriter;
+    }
+
+    protected synchronized BKLogSegmentWriter removeAllocatedLogWriter() {
+        try {
+            return allocatedSegmentWriter;
+        } finally {
+            allocatedSegmentWriter = null;
+        }
+    }
+
+    private Future<Void> asyncCloseAndComplete(boolean shouldThrow) {
+        BKLogSegmentWriter segmentWriter = getCachedLogWriter();
+        BKLogWriteHandler writeHandler = getCachedWriteHandler();
+        if (null != segmentWriter && null != writeHandler) {
+            cancelTruncation();
+            Promise<Void> completePromise = new Promise<Void>();
+            asyncCloseAndComplete(segmentWriter, writeHandler, completePromise, shouldThrow);
+            return completePromise;
+        } else {
+            return closeNoThrow();
+        }
+    }
+
+    private void asyncCloseAndComplete(final BKLogSegmentWriter segmentWriter,
+                                       final BKLogWriteHandler writeHandler,
+                                       final Promise<Void> completePromise,
+                                       final boolean shouldThrow) {
+        writeHandler.completeAndCloseLogSegment(segmentWriter)
+                .addEventListener(new FutureEventListener<LogSegmentMetadata>() {
+                    @Override
+                    public void onSuccess(LogSegmentMetadata segment) {
+                        removeCachedLogWriter();
+                        complete(null);
+                    }
+
+                    @Override
+                    public void onFailure(Throwable cause) {
+                        LOG.error("Completing Log segments encountered exception", cause);
+                        complete(cause);
+                    }
+
+                    private void complete(final Throwable cause) {
+                        closeNoThrow().ensure(new AbstractFunction0<BoxedUnit>() {
+                            @Override
+                            public BoxedUnit apply() {
+                                if (null != cause && shouldThrow) {
+                                    FutureUtils.setException(completePromise, cause);
+                                } else {
+                                    FutureUtils.setValue(completePromise, null);
+                                }
+                                return BoxedUnit.UNIT;
+                            }
+                        });
+                    }
+                });
+    }
+
+    @VisibleForTesting
+    void closeAndComplete() throws IOException {
+        FutureUtils.result(asyncCloseAndComplete(true));
+    }
+
+    protected Future<Void> asyncCloseAndComplete() {
+        return asyncCloseAndComplete(true);
+    }
+
+    @Override
+    public void close() throws IOException {
+        FutureUtils.result(asyncClose());
+    }
+
+    @Override
+    public Future<Void> asyncClose() {
+        return asyncCloseAndComplete(false);
+    }
+
+    /**
+     * Close the writer and release all the underlying resources
+     */
+    protected Future<Void> closeNoThrow() {
+        Promise<Void> closeFuture;
+        synchronized (this) {
+            if (null != closePromise) {
+                return closePromise;
+            }
+            closeFuture = closePromise = new Promise<Void>();
+        }
+        cancelTruncation();
+        Utils.closeSequence(bkDistributedLogManager.getScheduler(),
+                true, /** ignore close errors **/
+                getCachedLogWriter(),
+                getAllocatedLogWriter(),
+                getCachedWriteHandler()
+        ).proxyTo(closeFuture);
+        return closeFuture;
+    }
+
+    @Override
+    public void abort() throws IOException {
+        FutureUtils.result(asyncAbort());
+    }
+
+    @Override
+    public Future<Void> asyncAbort() {
+        Promise<Void> closeFuture;
+        synchronized (this) {
+            if (null != closePromise) {
+                return closePromise;
+            }
+            closeFuture = closePromise = new Promise<Void>();
+        }
+        cancelTruncation();
+        Abortables.abortSequence(bkDistributedLogManager.getScheduler(),
+                getCachedLogWriter(),
+                getAllocatedLogWriter(),
+                getCachedWriteHandler()).proxyTo(closeFuture);
+        return closeFuture;
+    }
+
+    // used by sync writer
+    protected BKLogSegmentWriter getLedgerWriter(final long startTxId,
+                                                 final boolean allowMaxTxID)
+            throws IOException {
+        Future<BKLogSegmentWriter> logSegmentWriterFuture = asyncGetLedgerWriter(true);
+        BKLogSegmentWriter logSegmentWriter = null;
+        if (null != logSegmentWriterFuture) {
+            logSegmentWriter = FutureUtils.result(logSegmentWriterFuture);
+        }
+        if (null == logSegmentWriter || (shouldStartNewSegment(logSegmentWriter) || forceRolling)) {
+            logSegmentWriter = FutureUtils.result(rollLogSegmentIfNecessary(
+                    logSegmentWriter, startTxId, true /* bestEffort */, allowMaxTxID));
+        }
+        return logSegmentWriter;
+    }
+
+    // used by async writer
+    synchronized protected Future<BKLogSegmentWriter> asyncGetLedgerWriter(boolean resetOnError) {
+        final BKLogSegmentWriter ledgerWriter = getCachedLogWriter();
+        Future<BKLogSegmentWriter> ledgerWriterFuture = getCachedLogWriterFuture();
+        if (null == ledgerWriterFuture || null == ledgerWriter) {
+            return null;
+        }
+
+        // Handle the case where the last call to write actually caused an error in the log
+        if ((ledgerWriter.isLogSegmentInError() || forceRecovery) && resetOnError) {
+            // Close the ledger writer so that we will recover and start a new log segment
+            Future<Void> closeFuture;
+            if (ledgerWriter.isLogSegmentInError()) {
+                closeFuture = ledgerWriter.asyncAbort();
+            } else {
+                closeFuture = ledgerWriter.asyncClose();
+            }
+            return closeFuture.flatMap(
+                    new AbstractFunction1<Void, Future<BKLogSegmentWriter>>() {
+                @Override
+                public Future<BKLogSegmentWriter> apply(Void result) {
+                    removeCachedLogWriter();
+
+                    if (ledgerWriter.isLogSegmentInError()) {
+                        return Future.value(null);
+                    }
+
+                    BKLogWriteHandler writeHandler;
+                    try {
+                        writeHandler = getWriteHandler();
+                    } catch (IOException e) {
+                        return Future.exception(e);
+                    }
+                    if (null != writeHandler && forceRecovery) {
+                        return writeHandler.completeAndCloseLogSegment(ledgerWriter)
+                                .map(new AbstractFunction1<LogSegmentMetadata, BKLogSegmentWriter>() {
+                            @Override
+                            public BKLogSegmentWriter apply(LogSegmentMetadata completedLogSegment) {
+                                return null;
+                            }
+                        });
+                    } else {
+                        return Future.value(null);
+                    }
+                }
+            });
+        } else {
+            return ledgerWriterFuture;
+        }
+    }
+
+    boolean shouldStartNewSegment(BKLogSegmentWriter ledgerWriter) throws IOException {
+        BKLogWriteHandler writeHandler = getWriteHandler();
+        return null == ledgerWriter || writeHandler.shouldStartNewSegment(ledgerWriter) || forceRolling;
+    }
+
+    private void truncateLogSegmentsIfNecessary(BKLogWriteHandler writeHandler) {
+        boolean truncationEnabled = false;
+
+        long minTimestampToKeep = 0;
+
+        long retentionPeriodInMillis = TimeUnit.MILLISECONDS.convert(dynConf.getRetentionPeriodHours(), TimeUnit.HOURS);
+        if (retentionPeriodInMillis > 0) {
+            minTimestampToKeep = Utils.nowInMillis() - retentionPeriodInMillis;
+            truncationEnabled = true;
+        }
+
+        if (null != minTimestampToKeepOverride) {
+            minTimestampToKeep = minTimestampToKeepOverride;
+            truncationEnabled = true;
+        }
+
+        // skip scheduling if there is task that's already running
+        //
+        synchronized (this) {
+            if (truncationEnabled && ((lastTruncationAttempt == null) || lastTruncationAttempt.isDefined())) {
+                lastTruncationAttempt = writeHandler.purgeLogSegmentsOlderThanTimestamp(minTimestampToKeep);
+            }
+        }
+    }
+
+    private Future<BKLogSegmentWriter> asyncStartNewLogSegment(final BKLogWriteHandler writeHandler,
+                                                               final long startTxId,
+                                                               final boolean allowMaxTxID) {
+        return writeHandler.recoverIncompleteLogSegments()
+                .flatMap(new AbstractFunction1<Long, Future<BKLogSegmentWriter>>() {
+            @Override
+            public Future<BKLogSegmentWriter> apply(Long lastTxId) {
+                return writeHandler.asyncStartLogSegment(startTxId, false, allowMaxTxID)
+                        .onSuccess(new AbstractFunction1<BKLogSegmentWriter, BoxedUnit>() {
+                    @Override
+                    public BoxedUnit apply(BKLogSegmentWriter newSegmentWriter) {
+                        cacheLogWriter(newSegmentWriter);
+                        return BoxedUnit.UNIT;
+                    }
+                });
+            }
+        });
+    }
+
+    private Future<BKLogSegmentWriter> closeOldLogSegmentAndStartNewOneWithPermit(
+            final BKLogSegmentWriter oldSegmentWriter,
+            final BKLogWriteHandler writeHandler,
+            final long startTxId,
+            final boolean bestEffort,
+            final boolean allowMaxTxID) {
+        final PermitManager.Permit switchPermit = bkDistributedLogManager.getLogSegmentRollingPermitManager().acquirePermit();
+        if (switchPermit.isAllowed()) {
+            return closeOldLogSegmentAndStartNewOne(
+                    oldSegmentWriter,
+                    writeHandler,
+                    startTxId,
+                    bestEffort,
+                    allowMaxTxID
+            ).rescue(new Function<Throwable, Future<BKLogSegmentWriter>>() {
+                @Override
+                public Future<BKLogSegmentWriter> apply(Throwable cause) {
+                    if (cause instanceof LockingException) {
+                        LOG.warn("We lost lock during completeAndClose log segment for {}. Disable ledger rolling until it is recovered : ",
+                                writeHandler.getFullyQualifiedName(), cause);
+                        bkDistributedLogManager.getLogSegmentRollingPermitManager().disallowObtainPermits(switchPermit);
+                        return Future.value(oldSegmentWriter);
+                    } else if (cause instanceof ZKException) {
+                        ZKException zke = (ZKException) cause;
+                        if (ZKException.isRetryableZKException(zke)) {
+                            LOG.warn("Encountered zookeeper connection issues during completeAndClose log segment for {}." +
+                                    " Disable ledger rolling until it is recovered : {}", writeHandler.getFullyQualifiedName(),
+                                    zke.getKeeperExceptionCode());
+                            bkDistributedLogManager.getLogSegmentRollingPermitManager().disallowObtainPermits(switchPermit);
+                            return Future.value(oldSegmentWriter);
+                        }
+                    }
+                    return Future.exception(cause);
+                }
+            }).ensure(new AbstractFunction0<BoxedUnit>() {
+                @Override
+                public BoxedUnit apply() {
+                    bkDistributedLogManager.getLogSegmentRollingPermitManager()
+                            .releasePermit(switchPermit);
+                    return BoxedUnit.UNIT;
+                }
+            });
+        } else {
+            bkDistributedLogManager.getLogSegmentRollingPermitManager().releasePermit(switchPermit);
+            return Future.value(oldSegmentWriter);
+        }
+    }
+
+    private Future<BKLogSegmentWriter> closeOldLogSegmentAndStartNewOne(
+            final BKLogSegmentWriter oldSegmentWriter,
+            final BKLogWriteHandler writeHandler,
+            final long startTxId,
+            final boolean bestEffort,
+            final boolean allowMaxTxID) {
+        // we switch only when we could allocate a new log segment.
+        BKLogSegmentWriter newSegmentWriter = getAllocatedLogWriter();
+        if (null == newSegmentWriter) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Allocating a new log segment from {} for {}.", startTxId,
+                        writeHandler.getFullyQualifiedName());
+            }
+            return writeHandler.asyncStartLogSegment(startTxId, bestEffort, allowMaxTxID)
+                    .flatMap(new AbstractFunction1<BKLogSegmentWriter, Future<BKLogSegmentWriter>>() {
+                        @Override
+                        public Future<BKLogSegmentWriter> apply(BKLogSegmentWriter newSegmentWriter) {
+                            if (null == newSegmentWriter) {
+                                if (bestEffort) {
+                                    return Future.value(oldSegmentWriter);
+                                } else {
+                                    return Future.exception(
+                                            new UnexpectedException("StartLogSegment returns null for bestEffort rolling"));
+                                }
+                            }
+                            cacheAllocatedLogWriter(newSegmentWriter);
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("Allocated a new log segment from {} for {}.", startTxId,
+                                        writeHandler.getFullyQualifiedName());
+                            }
+                            return completeOldSegmentAndCacheNewLogSegmentWriter(oldSegmentWriter, newSegmentWriter);
+                        }
+                    });
+        } else {
+            return completeOldSegmentAndCacheNewLogSegmentWriter(oldSegmentWriter, newSegmentWriter);
+        }
+    }
+
+    private Future<BKLogSegmentWriter> completeOldSegmentAndCacheNewLogSegmentWriter(
+            BKLogSegmentWriter oldSegmentWriter,
+            final BKLogSegmentWriter newSegmentWriter) {
+        final Promise<BKLogSegmentWriter> completePromise = new Promise<BKLogSegmentWriter>();
+        // complete the old log segment
+        writeHandler.completeAndCloseLogSegment(oldSegmentWriter)
+                .addEventListener(new FutureEventListener<LogSegmentMetadata>() {
+
+                    @Override
+                    public void onSuccess(LogSegmentMetadata value) {
+                        cacheLogWriter(newSegmentWriter);
+                        removeAllocatedLogWriter();
+                        FutureUtils.setValue(completePromise, newSegmentWriter);
+                    }
+
+                    @Override
+                    public void onFailure(Throwable cause) {
+                        FutureUtils.setException(completePromise, cause);
+                    }
+                });
+        return completePromise;
+    }
+
+    synchronized protected Future<BKLogSegmentWriter> rollLogSegmentIfNecessary(
+            final BKLogSegmentWriter segmentWriter,
+            long startTxId,
+            boolean bestEffort,
+            boolean allowMaxTxID) {
+        final BKLogWriteHandler writeHandler;
+        try {
+            writeHandler = getWriteHandler();
+        } catch (IOException e) {
+            return Future.exception(e);
+        }
+        Future<BKLogSegmentWriter> rollPromise;
+        if (null != segmentWriter && (writeHandler.shouldStartNewSegment(segmentWriter) || forceRolling)) {
+            rollPromise = closeOldLogSegmentAndStartNewOneWithPermit(
+                    segmentWriter, writeHandler, startTxId, bestEffort, allowMaxTxID);
+        } else if (null == segmentWriter) {
+            rollPromise = asyncStartNewLogSegment(writeHandler, startTxId, allowMaxTxID);
+        } else {
+            rollPromise = Future.value(segmentWriter);
+        }
+        return rollPromise.map(new AbstractFunction1<BKLogSegmentWriter, BKLogSegmentWriter>() {
+            @Override
+            public BKLogSegmentWriter apply(BKLogSegmentWriter newSegmentWriter) {
+                if (segmentWriter == newSegmentWriter) {
+                    return newSegmentWriter;
+                }
+                truncateLogSegmentsIfNecessary(writeHandler);
+                return newSegmentWriter;
+            }
+        });
+    }
+
+    protected synchronized void checkClosedOrInError(String operation) throws AlreadyClosedException {
+        if (null != closePromise) {
+            LOG.error("Executing " + operation + " on already closed Log Writer");
+            throw new AlreadyClosedException("Executing " + operation + " on already closed Log Writer");
+        }
+    }
+
+    @VisibleForTesting
+    public void setForceRolling(boolean forceRolling) {
+        this.forceRolling = forceRolling;
+    }
+
+    @VisibleForTesting
+    public synchronized void overRideMinTimeStampToKeep(Long minTimestampToKeepOverride) {
+        this.minTimestampToKeepOverride = minTimestampToKeepOverride;
+    }
+
+    protected synchronized void cancelTruncation() {
+        if (null != lastTruncationAttempt) {
+            FutureUtils.cancel(lastTruncationAttempt);
+            lastTruncationAttempt = null;
+        }
+    }
+
+    @VisibleForTesting
+    public synchronized void setForceRecovery(boolean forceRecovery) {
+        this.forceRecovery = forceRecovery;
+    }
+
+}