You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2016/12/21 08:09:59 UTC

[2/2] incubator-distributedlog git commit: DL-97: Remove unused methods in BKLogHandler

DL-97: Remove unused methods in BKLogHandler

merge code change on remove unused methods in BKLogHandler.

Author: Sijie Guo <si...@apache.org>
Author: Sijie Guo <si...@twitter.com>
Author: Leigh Stewart <ls...@twitter.com>
Author: Jordan Bull <jb...@twitter.com>
Author: Dave Rusek <da...@gmail.com>
Author: Dave Rusek <dr...@twitter.com>

Reviewers: Leigh Stewart <ls...@apache.org>

Closes #69 from sijie/merge/DL-97


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/74a33029
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/74a33029
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/74a33029

Branch: refs/heads/master
Commit: 74a33029cc79f3a770ad6c016a42ff4ff7ad71c2
Parents: 5b55bee
Author: Sijie Guo <si...@apache.org>
Authored: Wed Dec 21 00:09:57 2016 -0800
Committer: Sijie Guo <si...@twitter.com>
Committed: Wed Dec 21 00:09:57 2016 -0800

----------------------------------------------------------------------
 .../distributedlog/BKAsyncLogReaderDLSN.java    |  15 +-
 .../BKDistributedLogNamespace.java              |  20 +-
 .../twitter/distributedlog/BKLogHandler.java    | 140 +----
 .../distributedlog/BKLogSegmentWriter.java      |  12 +-
 .../distributedlog/BookKeeperClient.java        |  48 +-
 .../DistributedLogConfiguration.java            |   8 +-
 .../impl/ZKLogSegmentMetadataStore.java         |  94 +++-
 .../stats/BroadCastStatsLogger.java             |  22 +
 .../impl/TestZKLogSegmentMetadataStore.java     |  22 +-
 .../service/DistributedLogServiceImpl.java      |  27 +-
 .../service/stream/StreamImpl.java              | 550 +++++++------------
 .../service/stream/StreamManager.java           |   5 +-
 .../service/stream/StreamManagerImpl.java       |  15 +-
 .../service/TestDistributedLogService.java      |  20 +-
 pom.xml                                         |   2 +-
 15 files changed, 370 insertions(+), 630 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
index 7d3d53d..b1a9273 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
@@ -49,7 +49,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Function1;
@@ -73,7 +72,7 @@ import scala.runtime.AbstractFunction1;
  * <li> `async_reader`/idle_reader_error: counter. the number idle reader errors.
  * </ul>
  */
-class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNotifier, AsyncLogReader, Runnable, AsyncNotification {
+class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotification {
     static final Logger LOG = LoggerFactory.getLogger(BKAsyncLogReaderDLSN.class);
 
     private static final Function1<List<LogRecordWithDLSN>, LogRecordWithDLSN> READ_NEXT_MAP_FUNCTION =
@@ -86,7 +85,6 @@ class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNoti
 
     protected final BKDistributedLogManager bkDistributedLogManager;
     protected final BKLogReadHandler bkLedgerManager;
-    private Watcher sessionExpireWatcher = null;
     private final AtomicReference<Throwable> lastException = new AtomicReference<Throwable>();
     private final ScheduledExecutorService executorService;
     private final ConcurrentLinkedQueue<PendingReadRequest> pendingRequests = new ConcurrentLinkedQueue<PendingReadRequest>();
@@ -218,7 +216,6 @@ class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNoti
         this.executorService = executorService;
         this.bkLedgerManager = bkDistributedLogManager.createReadHandler(subscriberId,
                 lockStateExecutor, this, deserializeRecordSet, true);
-        sessionExpireWatcher = this.bkLedgerManager.registerExpirationHandler(this);
         LOG.debug("Starting async reader at {}", startDLSN);
         this.startDLSN = startDLSN;
         this.scheduleDelayStopwatch = Stopwatch.createUnstarted();
@@ -255,14 +252,6 @@ class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNoti
         this.idleReaderTimeoutTask = scheduleIdleReaderTaskIfNecessary();
     }
 
-    @Override
-    public void notifySessionExpired() {
-        // ZK Session notification is an indication to check if this has resulted in a fatal error
-        // of the underlying reader, in itself this reader doesnt error out unless the underlying
-        // reader has hit an error
-        scheduleBackgroundRead();
-    }
-
     private ScheduledFuture<?> scheduleIdleReaderTaskIfNecessary() {
         if (idleErrorThresholdMillis < Integer.MAX_VALUE) {
             // Dont run the task more than once every seconds (for sanity)
@@ -494,8 +483,6 @@ class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNoti
 
         cancelAllPendingReads(exception);
 
-        bkLedgerManager.unregister(sessionExpireWatcher);
-
         FutureUtils.ignore(bkLedgerManager.asyncClose()).proxyTo(closePromise);
         return closePromise;
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
index 0b522d0..2df1046 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
@@ -75,6 +75,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.URI;
 import java.util.Collection;
 import java.util.HashMap;
@@ -252,6 +253,14 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace {
         }
     }
 
+    private static String getHostIpLockClientId() {
+        try {
+            return InetAddress.getLocalHost().toString();
+        } catch(Exception ex) {
+            return DistributedLogConstants.UNKNOWN_CLIENT_ID;
+        }
+    }
+
     private final String clientId;
     private final int regionId;
     private final DistributedLogConfiguration conf;
@@ -326,9 +335,13 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace {
         this.featureProvider = featureProvider;
         this.statsLogger = statsLogger;
         this.perLogStatsLogger = perLogStatsLogger;
-        this.clientId = clientId;
         this.regionId = regionId;
         this.bkdlConfig = bkdlConfig;
+        if (clientId.equals(DistributedLogConstants.UNKNOWN_CLIENT_ID)) {
+            this.clientId = getHostIpLockClientId();
+        } else {
+            this.clientId = clientId;
+        }
 
         // Build resources
         StatsLogger schedulerStatsLogger = statsLogger.scope("factory").scope("thread_pool");
@@ -622,13 +635,10 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace {
                                                                   DistributedLogConfiguration conf,
                                                                   String zkServers,
                                                                   StatsLogger statsLogger) {
-        RetryPolicy retryPolicy = null;
-        if (conf.getZKNumRetries() > 0) {
-            retryPolicy = new BoundExponentialBackoffRetryPolicy(
+        RetryPolicy retryPolicy = new BoundExponentialBackoffRetryPolicy(
                     conf.getBKClientZKRetryBackoffStartMillis(),
                     conf.getBKClientZKRetryBackoffMaxMillis(),
                     conf.getBKClientZKNumRetries());
-        }
         ZooKeeperClientBuilder builder = ZooKeeperClientBuilder.newBuilder()
                 .name(zkcName)
                 .sessionTimeoutMs(conf.getBKClientZKSessionTimeoutMilliSeconds())

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
index a6ec318..460de11 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
@@ -32,9 +32,7 @@ import com.twitter.distributedlog.io.AsyncCloseable;
 import com.twitter.distributedlog.logsegment.LogSegmentCache;
 import com.twitter.distributedlog.logsegment.LogSegmentFilter;
 import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
-import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.distributedlog.util.OrderedScheduler;
-import com.twitter.distributedlog.util.Utils;
 import com.twitter.util.Function;
 import com.twitter.util.Future;
 import com.twitter.util.FutureEventListener;
@@ -56,7 +54,6 @@ import scala.runtime.AbstractFunction0;
 import scala.runtime.BoxedUnit;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -104,8 +101,6 @@ import java.util.concurrent.atomic.AtomicReference;
 public abstract class BKLogHandler implements Watcher, AsyncCloseable, AsyncAbortable {
     static final Logger LOG = LoggerFactory.getLogger(BKLogHandler.class);
 
-    private static final int LAYOUT_VERSION = -1;
-
     protected final ZKLogMetadata logMetadata;
     protected final DistributedLogConfiguration conf;
     protected final ZooKeeperClient zooKeeperClient;
@@ -274,12 +269,7 @@ public abstract class BKLogHandler implements Watcher, AsyncCloseable, AsyncAbor
         LOG.debug("Using ZK Path {}", logMetadata.getLogRootPath());
         this.bookKeeperClient = bkcBuilder.build();
         this.metadataStore = metadataStore;
-
-        if (lockClientId.equals(DistributedLogConstants.UNKNOWN_CLIENT_ID)) {
-            this.lockClientId = getHostIpLockClientId();
-        } else {
-            this.lockClientId = lockClientId;
-        }
+        this.lockClientId = lockClientId;
 
         this.getChildrenWatcher = this.zooKeeperClient.getWatcherManager()
                 .registerChildWatcher(logMetadata.getLogSegmentsPath(), this);
@@ -316,14 +306,6 @@ public abstract class BKLogHandler implements Watcher, AsyncCloseable, AsyncAbor
         return lockClientId;
     }
 
-    private String getHostIpLockClientId() {
-        try {
-            return InetAddress.getLocalHost().toString();
-        } catch(Exception ex) {
-            return DistributedLogConstants.UNKNOWN_CLIENT_ID;
-        }
-    }
-
     protected void registerListener(LogSegmentListener listener) {
         listeners.add(listener);
     }
@@ -472,57 +454,6 @@ public abstract class BKLogHandler implements Watcher, AsyncCloseable, AsyncAbor
         }
     }
 
-    public LogRecordWithDLSN getLastLogRecord(boolean recover, boolean includeEndOfStream) throws IOException {
-        checkLogStreamExists();
-        List<LogSegmentMetadata> ledgerList = getFullLedgerListDesc(true, true);
-
-        for (LogSegmentMetadata metadata: ledgerList) {
-            LogRecordWithDLSN record = recoverLastRecordInLedger(metadata, recover, false, includeEndOfStream);
-
-            if (null != record) {
-                assert(!record.isControl());
-                LOG.debug("{} getLastLogRecord Returned {}", getFullyQualifiedName(), record);
-                return record;
-            }
-        }
-
-        throw new LogEmptyException("Log " + getFullyQualifiedName() + " has no records");
-    }
-
-    public long getLastTxId(boolean recover,
-                            boolean includeEndOfStream) throws IOException {
-        checkLogStreamExists();
-        return getLastLogRecord(recover, includeEndOfStream).getTransactionId();
-    }
-
-    public DLSN getLastDLSN(boolean recover,
-                            boolean includeEndOfStream) throws IOException {
-        checkLogStreamExists();
-        return getLastLogRecord(recover, includeEndOfStream).getDlsn();
-    }
-
-    public long getLogRecordCount() throws IOException {
-        try {
-            checkLogStreamExists();
-        } catch (LogNotFoundException exc) {
-            return 0;
-        }
-
-        List<LogSegmentMetadata> ledgerList = getFullLedgerList(true, false);
-        long count = 0;
-        for (LogSegmentMetadata l : ledgerList) {
-            if (l.isInProgress()) {
-                LogRecord record = recoverLastRecordInLedger(l, false, false, false);
-                if (null != record) {
-                    count += record.getLastPositionWithinLogSegment();
-                }
-            } else {
-                count += l.getRecordCount();
-            }
-        }
-        return count;
-    }
-
     private Future<LogRecordWithDLSN> asyncReadFirstUserRecord(LogSegmentMetadata ledger, DLSN beginDLSN) {
         final LedgerHandleCache handleCache =
                 LedgerHandleCache.newBuilder().bkc(bookKeeperClient).conf(conf).build();
@@ -634,15 +565,6 @@ public abstract class BKLogHandler implements Watcher, AsyncCloseable, AsyncAbor
         return sum;
     }
 
-    public long getFirstTxId() throws IOException {
-        checkLogStreamExists();
-        List<LogSegmentMetadata> ledgerList = getFullLedgerList(true, true);
-
-        // The ledger list should at least have one element
-        // First TxId is populated even for in progress ledgers
-        return ledgerList.get(0).getFirstTxId();
-    }
-
     Future<Void> checkLogStreamExistsAsync() {
         final Promise<Void> promise = new Promise<Void>();
         try {
@@ -685,54 +607,11 @@ public abstract class BKLogHandler implements Watcher, AsyncCloseable, AsyncAbor
         return promise;
     }
 
-    private void checkLogStreamExists() throws IOException {
-        try {
-            if (null == Utils.sync(zooKeeperClient, logMetadata.getLogSegmentsPath())
-                    .exists(logMetadata.getLogSegmentsPath(), false)) {
-                throw new LogNotFoundException("Log " + getFullyQualifiedName() + " doesn't exist");
-            }
-        } catch (InterruptedException ie) {
-            LOG.error("Interrupted while reading {}", logMetadata.getLogSegmentsPath(), ie);
-            throw new DLInterruptedException("Interrupted while checking "
-                    + logMetadata.getLogSegmentsPath(), ie);
-        } catch (KeeperException ke) {
-            LOG.error("Error checking existence for {} : ", logMetadata.getLogSegmentsPath(), ke);
-            throw new ZKException("Error checking existence for " + getFullyQualifiedName() + " : ", ke);
-        }
-    }
-
     @Override
     public Future<Void> asyncAbort() {
         return asyncClose();
     }
 
-    /**
-     * Find the id of the last edit log transaction written to a edit log
-     * ledger.
-     */
-    protected Pair<Long, DLSN> readLastTxIdInLedger(LogSegmentMetadata l) throws IOException {
-        LogRecordWithDLSN record = recoverLastRecordInLedger(l, false, false, true);
-
-        if (null == record) {
-            return Pair.of(DistributedLogConstants.EMPTY_LOGSEGMENT_TX_ID, DLSN.InvalidDLSN);
-        }
-        else {
-            return Pair.of(record.getTransactionId(), record.getDlsn());
-        }
-    }
-
-    /**
-     * Find the id of the last edit log transaction written to a edit log
-     * ledger.
-     */
-    protected LogRecordWithDLSN recoverLastRecordInLedger(LogSegmentMetadata l,
-                                                          boolean fence,
-                                                          boolean includeControl,
-                                                          boolean includeEndOfStream)
-        throws IOException {
-        return FutureUtils.result(asyncReadLastRecord(l, fence, includeControl, includeEndOfStream));
-    }
-
     public Future<LogRecordWithDLSN> asyncReadLastUserRecord(final LogSegmentMetadata l) {
         return asyncReadLastRecord(l, false, false, false);
     }
@@ -1293,21 +1172,4 @@ public abstract class BKLogHandler implements Watcher, AsyncCloseable, AsyncAbor
         }
     }
 
-    // ZooKeeper Watchers
-
-    Watcher registerExpirationHandler(final ZooKeeperClient.ZooKeeperSessionExpireNotifier onExpired) {
-        if (conf.getZKNumRetries() > 0) {
-            return new Watcher() {
-                @Override
-                public void process(WatchedEvent event) {
-                    // nop
-                }
-            };
-        }
-        return zooKeeperClient.registerExpirationHandler(onExpired);
-    }
-
-    boolean unregister(Watcher watcher) {
-        return zooKeeperClient.unregister(watcher);
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
index 1b52951..8276125 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
@@ -156,8 +156,10 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz
 
     // stats
     private final StatsLogger envelopeStatsLogger;
+    private final StatsLogger transmitOutstandingLogger;
     private final Counter transmitDataSuccesses;
     private final Counter transmitDataMisses;
+    private final Gauge<Number> transmitOutstandingGauge;
     private final OpStatsLogger transmitDataPacketSize;
     private final Counter transmitControlSuccesses;
     private final Counter pFlushSuccesses;
@@ -255,8 +257,8 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz
         pendingWrites = segWriterStatsLogger.getCounter("pending");
 
         // outstanding transmit requests
-        StatsLogger transmitOutstandingLogger = perLogStatsLogger.scope("transmit").scope("outstanding");
-        transmitOutstandingLogger.registerGauge("requests", new Gauge<Number>() {
+        transmitOutstandingLogger = perLogStatsLogger.scope("transmit").scope("outstanding");
+        transmitOutstandingGauge = new Gauge<Number>() {
             @Override
             public Number getDefaultValue() {
                 return 0;
@@ -265,7 +267,8 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz
             public Number getSample() {
                 return outstandingTransmits.get();
             }
-        });
+        };
+        transmitOutstandingLogger.registerGauge("requests", transmitOutstandingGauge);
 
         outstandingTransmits = new AtomicInteger(0);
         this.fullyQualifiedLogSegment = streamName + ":" + logSegmentName;
@@ -531,6 +534,9 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz
     private void closeInternal(final boolean abort,
                                final AtomicReference<Throwable> throwExc,
                                final Promise<Void> closePromise) {
+        // remove stats
+        this.transmitOutstandingLogger.unregisterGauge("requests", transmitOutstandingGauge);
+
         // Cancel the periodic keep alive schedule first
         if (null != periodicKeepAliveSchedule) {
             if (!periodicKeepAliveSchedule.cancel(false)) {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java
index fd22b8f..c39ae4c 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java
@@ -17,6 +17,7 @@
  */
 package com.twitter.distributedlog;
 
+import com.google.common.base.Optional;
 import com.twitter.distributedlog.ZooKeeperClient.Credentials;
 import com.twitter.distributedlog.ZooKeeperClient.DigestCredentials;
 import com.twitter.distributedlog.exceptions.AlreadyClosedException;
@@ -41,16 +42,12 @@ import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
 import org.apache.bookkeeper.zookeeper.RetryPolicy;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Watcher;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 import org.jboss.netty.util.HashedWheelTimer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.google.common.base.Optional;
 
 import static com.google.common.base.Charsets.UTF_8;
 
@@ -62,7 +59,7 @@ import static com.google.common.base.Charsets.UTF_8;
  * <li> bookkeeper operation stats are exposed under current scope by {@link BookKeeper}
  * </ul>
  */
-public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireNotifier {
+public class BookKeeperClient {
     static final Logger LOG = LoggerFactory.getLogger(BookKeeperClient.class);
 
     // Parameters to build bookkeeper client
@@ -83,14 +80,10 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN
     // feature provider
     private final Optional<FeatureProvider> featureProvider;
 
-    private Watcher sessionExpireWatcher = null;
-    private AtomicBoolean zkSessionExpired = new AtomicBoolean(false);
-
     @SuppressWarnings("deprecation")
     private synchronized void commonInitialization(
             DistributedLogConfiguration conf, String ledgersPath,
-            ClientSocketChannelFactory channelFactory, StatsLogger statsLogger, HashedWheelTimer requestTimer,
-            boolean registerExpirationHandler)
+            ClientSocketChannelFactory channelFactory, StatsLogger statsLogger, HashedWheelTimer requestTimer)
         throws IOException, InterruptedException, KeeperException {
         ClientConfiguration bkConfig = new ClientConfiguration();
         bkConfig.setAddEntryTimeout(conf.getBKClientWriteTimeout());
@@ -124,10 +117,6 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN
             .requestTimer(requestTimer)
             .featureProvider(featureProvider.orNull())
             .build();
-
-        if (registerExpirationHandler) {
-            sessionExpireWatcher = this.zkc.registerExpirationHandler(this);
-        }
     }
 
     BookKeeperClient(DistributedLogConfiguration conf,
@@ -159,16 +148,11 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN
         if (null != this.bkc) {
             return;
         }
-        boolean registerExpirationHandler;
         if (null == this.zkc) {
             int zkSessionTimeout = conf.getBKClientZKSessionTimeoutMilliSeconds();
-            RetryPolicy retryPolicy = null;
-            if (conf.getBKClientZKNumRetries() > 0) {
-                retryPolicy = new BoundExponentialBackoffRetryPolicy(
+            RetryPolicy retryPolicy = new BoundExponentialBackoffRetryPolicy(
                         conf.getBKClientZKRetryBackoffStartMillis(),
                         conf.getBKClientZKRetryBackoffMaxMillis(), conf.getBKClientZKNumRetries());
-            }
-
             Credentials credentials = Credentials.NONE;
             if (conf.getZkAclId() != null) {
                 credentials = new DigestCredentials(conf.getZkAclId(), conf.getZkAclId());
@@ -178,10 +162,9 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN
                                            retryPolicy, statsLogger.scope("bkc_zkc"), conf.getZKClientNumberRetryThreads(),
                                            conf.getBKClientZKRequestRateLimit(), credentials);
         }
-        registerExpirationHandler = conf.getBKClientZKNumRetries() <= 0;
 
         try {
-            commonInitialization(conf, ledgersPath, channelFactory, statsLogger, requestTimer, registerExpirationHandler);
+            commonInitialization(conf, ledgersPath, channelFactory, statsLogger, requestTimer);
         } catch (InterruptedException e) {
             throw new DLInterruptedException("Interrupted on creating bookkeeper client " + name + " : ", e);
         } catch (KeeperException e) {
@@ -190,18 +173,18 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN
 
         if (ownZK) {
             LOG.info("BookKeeper Client created {} with its own ZK Client : ledgersPath = {}, numRetries = {}, " +
-                    "sessionTimeout = {}, backoff = {}, maxBackoff = {}, dnsResolver = {}, registerExpirationHandler = {}",
+                    "sessionTimeout = {}, backoff = {}, maxBackoff = {}, dnsResolver = {}",
                     new Object[] { name, ledgersPath,
                     conf.getBKClientZKNumRetries(), conf.getBKClientZKSessionTimeoutMilliSeconds(),
                     conf.getBKClientZKRetryBackoffStartMillis(), conf.getBKClientZKRetryBackoffMaxMillis(),
-                    conf.getBkDNSResolverOverrides(), registerExpirationHandler });
+                    conf.getBkDNSResolverOverrides() });
         } else {
             LOG.info("BookKeeper Client created {} with shared zookeeper client : ledgersPath = {}, numRetries = {}, " +
-                    "sessionTimeout = {}, backoff = {}, maxBackoff = {}, dnsResolver = {}, registerExpirationHandler = {}",
+                    "sessionTimeout = {}, backoff = {}, maxBackoff = {}, dnsResolver = {}",
                     new Object[] { name, ledgersPath,
                     conf.getZKNumRetries(), conf.getZKSessionTimeoutMilliseconds(),
                     conf.getZKRetryBackoffStartMillis(), conf.getZKRetryBackoffMaxMillis(),
-                    conf.getBkDNSResolverOverrides(), registerExpirationHandler });
+                    conf.getBkDNSResolverOverrides() });
         }
     }
 
@@ -284,9 +267,6 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN
             }
         }
         if (null != zkc) {
-            if (null != sessionExpireWatcher) {
-                zkc.unregister(sessionExpireWatcher);
-            }
             if (ownZK) {
                 zkc.close();
             }
@@ -294,20 +274,10 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN
         closed = true;
     }
 
-    @Override
-    public void notifySessionExpired() {
-        zkSessionExpired.set(true);
-    }
-
     public synchronized void checkClosedOrInError() throws AlreadyClosedException {
         if (closed) {
             LOG.error("BookKeeper Client {} is already closed", name);
             throw new AlreadyClosedException("BookKeeper Client " + name + " is already closed");
         }
-
-        if (zkSessionExpired.get()) {
-            LOG.error("BookKeeper Client {}'s Zookeeper session has expired", name);
-            throw new AlreadyClosedException("BookKeeper Client " + name + "'s Zookeeper session has expired");
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
index c2057df..5d0e59a 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
@@ -809,12 +809,16 @@ public class DistributedLogConfiguration extends CompositeConfiguration {
      * Get num of retries for zookeeper client that used by bookkeeper client.
      * <p>Retries only happen on retryable failures like session expired,
      * session moved. for permanent failures, the request will fail immediately.
-     * The default value is 3.
+     * The default value is 3. Setting it to zero or negative will retry infinitely.
      *
      * @return num of retries of zookeeper client used by bookkeeper client.
      */
     public int getBKClientZKNumRetries() {
-        return this.getInt(BKDL_BKCLIENT_ZK_NUM_RETRIES, BKDL_BKCLIENT_ZK_NUM_RETRIES_DEFAULT);
+        int zkNumRetries = this.getInt(BKDL_BKCLIENT_ZK_NUM_RETRIES, BKDL_BKCLIENT_ZK_NUM_RETRIES_DEFAULT);
+        if (zkNumRetries <= 0) {
+            return Integer.MAX_VALUE;
+        }
+        return zkNumRetries;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
index c0796a1..cb53b23 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
@@ -17,6 +17,7 @@
  */
 package com.twitter.distributedlog.impl;
 
+import com.google.common.collect.ImmutableList;
 import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.LogSegmentMetadata;
 import com.twitter.distributedlog.ZooKeeperClient;
@@ -45,10 +46,14 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.runtime.AbstractFunction1;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -64,7 +69,9 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
 
     private static final Logger logger = LoggerFactory.getLogger(ZKLogSegmentMetadataStore.class);
 
-    private static class ReadLogSegmentsTask implements Runnable, FutureEventListener<List<String>> {
+    private static final List<String> EMPTY_LIST = ImmutableList.of();
+
+    private static class ReadLogSegmentsTask implements Runnable, FutureEventListener<Versioned<List<String>>> {
 
         private final String logSegmentsPath;
         private final ZKLogSegmentMetadataStore store;
@@ -78,15 +85,16 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
         }
 
         @Override
-        public void onSuccess(final List<String> segments) {
+        public void onSuccess(final Versioned<List<String>> segments) {
             // reset the back off after a successful operation
             currentZKBackOffMs = store.minZKBackoffMs;
-            final Set<LogSegmentNamesListener> listenerSet = store.listeners.get(logSegmentsPath);
+            final Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> listenerSet =
+                    store.listeners.get(logSegmentsPath);
             if (null != listenerSet) {
                 store.submitTask(logSegmentsPath, new Runnable() {
                     @Override
                     public void run() {
-                        for (LogSegmentNamesListener listener : listenerSet) {
+                        for (VersionedLogSegmentNamesListener listener : listenerSet.values()) {
                             listener.onSegmentsUpdated(segments);
                         }
                     }
@@ -120,6 +128,48 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
         }
     }
 
+    /**
+     * A log segment names listener that keeps tracking the version of list of log segments that it has been notified.
+     * It only notify the newer log segments.
+     */
+    static class VersionedLogSegmentNamesListener {
+
+        private final LogSegmentNamesListener listener;
+        private Versioned<List<String>> lastNotifiedLogSegments;
+
+        VersionedLogSegmentNamesListener(LogSegmentNamesListener listener) {
+            this.listener = listener;
+            this.lastNotifiedLogSegments = new Versioned<List<String>>(EMPTY_LIST, Version.NEW);
+        }
+
+        synchronized void onSegmentsUpdated(Versioned<List<String>> logSegments) {
+            if (lastNotifiedLogSegments.getVersion() == Version.NEW ||
+                    lastNotifiedLogSegments.getVersion().compare(logSegments.getVersion()) == Version.Occurred.BEFORE) {
+                lastNotifiedLogSegments = logSegments;
+                listener.onSegmentsUpdated(logSegments.getValue());
+            }
+        }
+
+        @Override
+        public int hashCode() {
+            return listener.hashCode();
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (!(obj instanceof VersionedLogSegmentNamesListener)) {
+                return false;
+            }
+            VersionedLogSegmentNamesListener other = (VersionedLogSegmentNamesListener) obj;
+            return listener.equals(other.listener);
+        }
+
+        @Override
+        public String toString() {
+            return listener.toString();
+        }
+    }
+
     final DistributedLogConfiguration conf;
     // settings
     final int minZKBackoffMs;
@@ -128,7 +178,7 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
 
     final ZooKeeperClient zkc;
     // log segment listeners
-    final ConcurrentMap<String, Set<LogSegmentNamesListener>> listeners;
+    final ConcurrentMap<String, Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener>> listeners;
     // scheduler
     final OrderedScheduler scheduler;
     final ReentrantReadWriteLock closeLock;
@@ -139,7 +189,8 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
                                      OrderedScheduler scheduler) {
         this.conf = conf;
         this.zkc = zkc;
-        this.listeners = new ConcurrentHashMap<String, Set<LogSegmentNamesListener>>();
+        this.listeners =
+                new ConcurrentHashMap<String, Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener>>();
         this.scheduler = scheduler;
         this.closeLock = new ReentrantReadWriteLock();
         // settings
@@ -275,11 +326,16 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
 
     @Override
     public Future<List<String>> getLogSegmentNames(String logSegmentsPath) {
-        return getLogSegmentNames(logSegmentsPath, null);
+        return getLogSegmentNames(logSegmentsPath, null).map(new AbstractFunction1<Versioned<List<String>>, List<String>>() {
+            @Override
+            public List<String> apply(Versioned<List<String>> list) {
+                return list.getValue();
+            }
+        });
     }
 
-    Future<List<String>> getLogSegmentNames(String logSegmentsPath, Watcher watcher) {
-        Promise<List<String>> result = new Promise<List<String>>();
+    Future<Versioned<List<String>>> getLogSegmentNames(String logSegmentsPath, Watcher watcher) {
+        Promise<Versioned<List<String>>> result = new Promise<Versioned<List<String>>>();
         try {
             zkc.get().getChildren(logSegmentsPath, watcher, this, result);
         } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
@@ -293,9 +349,11 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
     @Override
     @SuppressWarnings("unchecked")
     public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
-        Promise<List<String>> result = ((Promise<List<String>>) ctx);
+        Promise<Versioned<List<String>>> result = ((Promise<Versioned<List<String>>>) ctx);
         if (KeeperException.Code.OK.intValue() == rc) {
-            result.setValue(children);
+            /** cversion: the number of changes to the children of this znode **/
+            ZkVersion zkVersion = new ZkVersion(stat.getCversion());
+            result.setValue(new Versioned(children, zkVersion));
         } else {
             result.setException(KeeperException.create(KeeperException.Code.get(rc)));
         }
@@ -312,10 +370,13 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
             if (closed) {
                 return;
             }
-            Set<LogSegmentNamesListener> listenerSet = listeners.get(logSegmentsPath);
+            Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> listenerSet =
+                    listeners.get(logSegmentsPath);
             if (null == listenerSet) {
-                Set<LogSegmentNamesListener> newListenerSet = new HashSet<LogSegmentNamesListener>();
-                Set<LogSegmentNamesListener> oldListenerSet = listeners.putIfAbsent(logSegmentsPath, newListenerSet);
+                Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> newListenerSet =
+                        new HashMap<LogSegmentNamesListener, VersionedLogSegmentNamesListener>();
+                Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> oldListenerSet =
+                        listeners.putIfAbsent(logSegmentsPath, newListenerSet);
                 if (null != oldListenerSet) {
                     listenerSet = oldListenerSet;
                 } else {
@@ -323,7 +384,7 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
                 }
             }
             synchronized (listenerSet) {
-                listenerSet.add(listener);
+                listenerSet.put(listener, new VersionedLogSegmentNamesListener(listener));
                 if (!listeners.containsKey(logSegmentsPath)) {
                     // listener set has been removed, add it back
                     listeners.put(logSegmentsPath, listenerSet);
@@ -343,7 +404,8 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
             if (closed) {
                 return;
             }
-            Set<LogSegmentNamesListener> listenerSet = listeners.get(logSegmentsPath);
+            Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> listenerSet =
+                    listeners.get(logSegmentsPath);
             if (null == listenerSet) {
                 return;
             }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/BroadCastStatsLogger.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/BroadCastStatsLogger.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/BroadCastStatsLogger.java
index e29cc47..10a7011 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/BroadCastStatsLogger.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/BroadCastStatsLogger.java
@@ -133,9 +133,26 @@ public class BroadCastStatsLogger {
         }
 
         @Override
+        public <T extends Number> void unregisterGauge(String statName, Gauge<T> gauge) {
+            // no-op
+        }
+
+        @Override
         public StatsLogger scope(final String scope) {
             return new Two(first.scope(scope), second.scope(scope));
         }
+
+        @Override
+        public void removeScope(String scope, StatsLogger statsLogger) {
+            if (!(statsLogger instanceof Two)) {
+                return;
+            }
+
+            Two another = (Two) statsLogger;
+
+            first.removeScope(scope, another.first);
+            second.removeScope(scope, another.second);
+        }
     }
 
     /**
@@ -165,6 +182,11 @@ public class BroadCastStatsLogger {
         }
 
         @Override
+        public <T extends Number> void unregisterGauge(String statName, Gauge<T> gauge) {
+            first.unregisterGauge(statName, gauge);
+        }
+
+        @Override
         public StatsLogger scope(String scope) {
             return new MasterSlave(first.scope(scope), second.scope(scope));
         }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
index e4c774b..f8fd3eb 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
@@ -367,7 +367,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
         lsmStore.registerLogSegmentListener(rootPath, listener);
         assertEquals(1, lsmStore.listeners.size());
         assertTrue("Should contain listener", lsmStore.listeners.containsKey(rootPath));
-        assertTrue("Should contain listener", lsmStore.listeners.get(rootPath).contains(listener));
+        assertTrue("Should contain listener", lsmStore.listeners.get(rootPath).containsKey(listener));
         while (numNotifications.get() < 1) {
             TimeUnit.MILLISECONDS.sleep(10);
         }
@@ -429,7 +429,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
         lsmStore.registerLogSegmentListener(rootPath, listener);
         assertEquals(1, lsmStore.listeners.size());
         assertTrue("Should contain listener", lsmStore.listeners.containsKey(rootPath));
-        assertTrue("Should contain listener", lsmStore.listeners.get(rootPath).contains(listener));
+        assertTrue("Should contain listener", lsmStore.listeners.get(rootPath).containsKey(listener));
         while (numNotifications.get() < 1) {
             TimeUnit.MILLISECONDS.sleep(10);
         }
@@ -496,7 +496,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
         lsmStore.registerLogSegmentListener(rootPath, listener);
         assertEquals(1, lsmStore.listeners.size());
         assertTrue("Should contain listener", lsmStore.listeners.containsKey(rootPath));
-        assertTrue("Should contain listener", lsmStore.listeners.get(rootPath).contains(listener));
+        assertTrue("Should contain listener", lsmStore.listeners.get(rootPath).containsKey(listener));
         while (numNotifications.get() < 1) {
             TimeUnit.MILLISECONDS.sleep(10);
         }
@@ -510,16 +510,6 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
         ZooKeeperClientUtils.expireSession(zkc,
                 DLUtils.getZKServersFromDLUri(uri), conf.getZKSessionTimeoutMilliseconds());
 
-        while (numNotifications.get() < 2) {
-            TimeUnit.MILLISECONDS.sleep(10);
-        }
-        assertEquals("Should receive second segment list update",
-                2, numNotifications.get());
-        List<String> secondSegmentList = segmentLists.get(1);
-        Collections.sort(secondSegmentList);
-        assertEquals("List of segments should be same",
-                children, secondSegmentList);
-
         logger.info("Create another {} segments.", numSegments);
 
         // create another log segment, it should trigger segment list updated
@@ -532,12 +522,12 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
         List<String> newChildren = zkc.get().getChildren(rootPath, false);
         Collections.sort(newChildren);
         logger.info("All log segments become {}", newChildren);
-        while (numNotifications.get() < 3) {
+        while (numNotifications.get() < 2) {
             TimeUnit.MILLISECONDS.sleep(10);
         }
         assertEquals("Should receive third segment list update",
-                3, numNotifications.get());
-        List<String> thirdSegmentList = segmentLists.get(2);
+                2, numNotifications.get());
+        List<String> thirdSegmentList = segmentLists.get(1);
         Collections.sort(thirdSegmentList);
         assertEquals("List of segments should be updated",
                 2 * numSegments, thirdSegmentList.size());

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
index 751e972..3a9b904 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
@@ -90,7 +90,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -378,7 +377,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
                     // if it is closed, we would not acquire stream again.
                     return null;
                 }
-                writer = streamManager.getOrCreateStream(stream);
+                writer = streamManager.getOrCreateStream(stream, true);
             } finally {
                 closeLock.readLock().unlock();
             }
@@ -631,26 +630,6 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
         logger.info("Released KeepAlive Latch. Main thread will shut the service down.");
     }
 
-    @VisibleForTesting
-    java.util.concurrent.Future<?> schedule(Runnable runnable, long delayMs) {
-        closeLock.readLock().lock();
-        try {
-            if (serverStatus != ServerStatus.WRITE_AND_ACCEPT) {
-                return null;
-            } else if (delayMs > 0) {
-                return scheduler.schedule(runnable, delayMs, TimeUnit.MILLISECONDS);
-            } else {
-                return scheduler.submit(runnable);
-            }
-        } catch (RejectedExecutionException ree) {
-            logger.error("Failed to schedule task {} in {} ms : ",
-                    new Object[] { runnable, delayMs, ree });
-            return null;
-        } finally {
-            closeLock.readLock().unlock();
-        }
-    }
-
     // Test methods.
 
     private DynamicDistributedLogConfiguration getDynConf(String streamName) {
@@ -664,8 +643,8 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
     }
 
     @VisibleForTesting
-    Stream newStream(String name) {
-        return streamFactory.create(name, getDynConf(name), streamManager);
+    Stream newStream(String name) throws IOException {
+        return streamManager.getOrCreateStream(name, false);
     }
 
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
index 1204d39..3d5b9e7 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
@@ -26,7 +26,6 @@ import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.DistributedLogManager;
 import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
 import com.twitter.distributedlog.exceptions.DLException;
-import com.twitter.distributedlog.exceptions.InvalidStreamNameException;
 import com.twitter.distributedlog.exceptions.OverCapacityException;
 import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException;
 import com.twitter.distributedlog.exceptions.StreamNotReadyException;
@@ -70,24 +69,23 @@ import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class StreamImpl implements Stream {
     static final Logger logger = LoggerFactory.getLogger(StreamImpl.class);
 
+    /**
+     * The status of the stream.
+     *
+     * The status change of the stream should just go in one direction. If a stream hits
+     * any error, the stream should be put in error state. If a stream is in error state,
+     * it should be removed and not reused anymore.
+     */
     public static enum StreamStatus {
         UNINITIALIZED(-1),
         INITIALIZING(0),
         INITIALIZED(1),
-        // if a stream is in failed state, it could be retried immediately.
-        // a stream will be put in failed state when encountered any stream exception.
-        FAILED(-2),
-        // if a stream is in backoff state, it would backoff for a while.
-        // a stream will be put in backoff state when failed to acquire the ownership.
-        BACKOFF(-3),
         CLOSING(-4),
         CLOSED(-5),
         // if a stream is in error state, it should be abort during closing.
@@ -112,26 +110,15 @@ public class StreamImpl implements Stream {
     private final Partition partition;
     private DistributedLogManager manager;
 
-    // A write has been attempted since the last stream acquire.
-    private volatile boolean writeSinceLastAcquire = false;
     private volatile AsyncLogWriter writer;
     private volatile StreamStatus status;
     private volatile String owner;
     private volatile Throwable lastException;
-    private volatile boolean running = true;
-    private volatile boolean suspended = false;
     private volatile Queue<StreamOp> pendingOps = new ArrayDeque<StreamOp>();
 
     private final Promise<Void> closePromise = new Promise<Void>();
     private final Object txnLock = new Object();
     private final TimeSequencer sequencer = new TimeSequencer();
-    // last acquire time
-    private final Stopwatch lastAcquireWatch = Stopwatch.createUnstarted();
-    // last acquire failure time
-    private final Stopwatch lastAcquireFailureWatch = Stopwatch.createUnstarted();
-    private final long nextAcquireWaitTimeMs;
-    private ScheduledFuture<?> tryAcquireScheduledFuture = null;
-    private long scheduledAcquireDelayMs = 0L;
     private final StreamRequestLimiter limiter;
     private final DynamicDistributedLogConfiguration dynConf;
     private final DistributedLogConfiguration dlConfig;
@@ -165,7 +152,7 @@ public class StreamImpl implements Stream {
         new ConcurrentHashMap<String, Counter>();
 
     // Since we may create and discard streams at initialization if there's a race,
-    // must not do any expensive intialization here (particularly any locking or
+    // must not do any expensive initialization here (particularly any locking or
     // significant resource allocation etc.).
     StreamImpl(final String name,
                final Partition partition,
@@ -189,7 +176,6 @@ public class StreamImpl implements Stream {
         this.partition = partition;
         this.status = StreamStatus.UNINITIALIZED;
         this.lastException = new IOException("Fail to write record to stream " + name);
-        this.nextAcquireWaitTimeMs = dlConfig.getZKSessionTimeoutMilliseconds() * 3 / 5;
         this.streamConfigProvider = streamConfigProvider;
         this.dlNamespace = dlNamespace;
         this.featureRateLimitDisabled = featureProvider.getFeature(
@@ -275,54 +261,16 @@ public class StreamImpl implements Stream {
         return String.format("Stream:%s, %s, %s Status:%s", name, manager, writer, status);
     }
 
-    // schedule stream acquistion
-    private void tryAcquireStreamOnce() {
-        if (!running) {
-            return;
-        }
-
-        boolean needAcquire = false;
-        boolean checkNextTime = false;
-        synchronized (this) {
-            switch (this.status) {
-            case INITIALIZING:
-                streamManager.notifyReleased(this);
-                needAcquire = true;
-                break;
-            case FAILED:
-                this.status = StreamStatus.INITIALIZING;
-                streamManager.notifyReleased(this);
-                needAcquire = true;
-                break;
-            case BACKOFF:
-                // We may end up here after timeout on streamLock. To avoid acquire on every timeout
-                // we should only try again if a write has been attempted since the last acquire
-                // attempt. If we end up here because the request handler woke us up, the flag will
-                // be set and we will try to acquire as intended.
-                if (writeSinceLastAcquire) {
-                    this.status = StreamStatus.INITIALIZING;
-                    streamManager.notifyReleased(this);
-                    needAcquire = true;
-                } else {
-                    checkNextTime = true;
-                }
-                break;
-            default:
-                break;
-            }
-        }
-        if (needAcquire) {
-            lastAcquireWatch.reset().start();
-            acquireStream().addEventListener(new FutureEventListener<Boolean>() {
+    @Override
+    public void start() {
+        // acquire the stream
+        acquireStream().addEventListener(new FutureEventListener<Boolean>() {
                 @Override
                 public void onSuccess(Boolean success) {
-                    synchronized (StreamImpl.this) {
-                        scheduledAcquireDelayMs = 0L;
-                        tryAcquireScheduledFuture = null;
-                    }
                     if (!success) {
-                        // schedule acquire in nextAcquireWaitTimeMs
-                        scheduleTryAcquireOnce(nextAcquireWaitTimeMs);
+                        // failed to acquire the stream. set the stream in error status and close it.
+                        setStreamInErrorStatus();
+                        requestClose("Failed to acquire the ownership");
                     }
                 }
 
@@ -330,65 +278,40 @@ public class StreamImpl implements Stream {
                 public void onFailure(Throwable cause) {
                     // unhandled exceptions
                     logger.error("Stream {} threw unhandled exception : ", name, cause);
+                    // failed to acquire the stream. set the stream in error status and close it.
                     setStreamInErrorStatus();
                     requestClose("Unhandled exception");
                 }
             });
-        } else if (StreamStatus.isUnavailable(status)) {
-            // if the stream is unavailable, stop the thread and close the stream
-            requestClose("Stream is unavailable anymore");
-        } else if (StreamStatus.INITIALIZED != status && lastAcquireWatch.elapsed(TimeUnit.HOURS) > 2) {
-            // if the stream isn't in initialized state and no writes coming in, then close the stream
-            requestClose("Stream not used anymore");
-        } else if (checkNextTime) {
-            synchronized (StreamImpl.this) {
-                scheduledAcquireDelayMs = 0L;
-                tryAcquireScheduledFuture = null;
-            }
-            // schedule acquire in nextAcquireWaitTimeMs
-            scheduleTryAcquireOnce(nextAcquireWaitTimeMs);
-        }
     }
 
-    private synchronized void scheduleTryAcquireOnce(long delayMs) {
-        if (null != tryAcquireScheduledFuture) {
-            if (delayMs <= 0) {
-                if (scheduledAcquireDelayMs <= 0L ||
-                        (scheduledAcquireDelayMs > 0L
-                                && !tryAcquireScheduledFuture.cancel(false))) {
-                    return;
-                }
-                // if the scheduled one could be cancelled, re-submit one
-            } else {
-                return;
+    //
+    // Stats Operations
+    //
+
+    void countException(Throwable t, StatsLogger streamExceptionLogger) {
+        String exceptionName = null == t ? "null" : t.getClass().getName();
+        Counter counter = exceptionCounters.get(exceptionName);
+        if (null == counter) {
+            counter = exceptionStatLogger.getCounter(exceptionName);
+            Counter oldCounter = exceptionCounters.putIfAbsent(exceptionName, counter);
+            if (null != oldCounter) {
+                counter = oldCounter;
             }
         }
-        tryAcquireScheduledFuture = schedule(new Runnable() {
-            @Override
-            public void run() {
-                tryAcquireStreamOnce();
-            }
-        }, delayMs);
-        scheduledAcquireDelayMs = delayMs;
+        counter.inc();
+        streamExceptionLogger.getCounter(exceptionName).inc();
     }
 
-    @Override
-    public void start() {
-        scheduleTryAcquireOnce(0);
+    boolean isCriticalException(Throwable cause) {
+        return !(cause instanceof OwnershipAcquireFailedException);
     }
 
-    ScheduledFuture<?> schedule(Runnable runnable, long delayMs) {
-        if (!running) {
-            return null;
-        }
-        try {
-            return scheduler.schedule(name, runnable, delayMs, TimeUnit.MILLISECONDS);
-        } catch (RejectedExecutionException ree) {
-            logger.error("Failed to schedule task {} in {} ms : ",
-                    new Object[] { runnable, delayMs, ree });
-            return null;
-        }
-    }
+    //
+    // Service Timeout:
+    // - schedule a timeout function to handle operation timeouts: {@link #handleServiceTimeout(String)}
+    // - if the operation is completed within timeout period, cancel the timeout.
+    //
 
     void scheduleTimeout(final StreamOp op) {
         final Timeout timeout = requestTimer.newTimeout(new TimerTask() {
@@ -418,12 +341,14 @@ public class StreamImpl implements Stream {
      * stream off the proxy for a period of time, hopefully long enough for the
      * issues to be resolved, or for whoop to kick in and kill the shard.
      */
-    synchronized void handleServiceTimeout(String reason) {
-        if (StreamStatus.isUnavailable(status)) {
-            return;
+    void handleServiceTimeout(String reason) {
+        synchronized (this) {
+            if (StreamStatus.isUnavailable(status)) {
+                return;
+            }
+            // Mark stream in error state
+            setStreamInErrorStatus();
         }
-        // Mark stream in error state
-        setStreamInErrorStatus();
 
         // Async close request, and schedule eviction when its done.
         Future<Void> closeFuture = requestClose(reason, false /* dont remove */);
@@ -436,6 +361,10 @@ public class StreamImpl implements Stream {
         });
     }
 
+    //
+    // Submit the operation to the stream.
+    //
+
     /**
      * Execute the StreamOp. If reacquire is needed, this may initiate reacquire and queue the op for
      * execution once complete.
@@ -445,9 +374,6 @@ public class StreamImpl implements Stream {
      */
     @Override
     public void submit(StreamOp op) {
-        // Let stream acquire thread know a write has been attempted.
-        writeSinceLastAcquire = true;
-
         try {
             limiter.apply(op);
         } catch (OverCapacityException ex) {
@@ -460,36 +386,28 @@ public class StreamImpl implements Stream {
             scheduleTimeout(op);
         }
 
-        boolean notifyAcquireThread = false;
         boolean completeOpNow = false;
         boolean success = true;
         if (StreamStatus.isUnavailable(status)) {
             // Stream is closed, fail the op immediately
             op.fail(new StreamUnavailableException("Stream " + name + " is closed."));
             return;
-        } if (StreamStatus.INITIALIZED == status && writer != null) {
+        } else if (StreamStatus.INITIALIZED == status && writer != null) {
             completeOpNow = true;
             success = true;
         } else {
             synchronized (this) {
                 if (StreamStatus.isUnavailable(status)) {
-                    // complete the write op as {@link #executeOp(op, success)} will handle closed case.
-                    completeOpNow = true;
-                    success = true;
+                    // Stream is closed, fail the op immediately
+                    op.fail(new StreamUnavailableException("Stream " + name + " is closed."));
+                    return;
                 } if (StreamStatus.INITIALIZED == status) {
                     completeOpNow = true;
                     success = true;
-                } else if (StreamStatus.BACKOFF == status &&
-                        lastAcquireFailureWatch.elapsed(TimeUnit.MILLISECONDS) < nextAcquireWaitTimeMs) {
-                    completeOpNow = true;
-                    success = false;
                 } else if (failFastOnStreamNotReady) {
-                    notifyAcquireThread = true;
-                    completeOpNow = false;
-                    success = false;
                     op.fail(new StreamNotReadyException("Stream " + name + " is not ready; status = " + status));
-                } else { // closing & initializing
-                    notifyAcquireThread = true;
+                    return;
+                } else { // the stream is still initializing
                     pendingOps.add(op);
                     pendingOpsCounter.inc();
                     if (1 == pendingOps.size()) {
@@ -500,14 +418,15 @@ public class StreamImpl implements Stream {
                 }
             }
         }
-        if (notifyAcquireThread && !suspended) {
-            scheduleTryAcquireOnce(0L);
-        }
         if (completeOpNow) {
             executeOp(op, success);
         }
     }
 
+    //
+    // Execute operations and handle exceptions on operations
+    //
+
     /**
      * Execute the <i>op</i> immediately.
      *
@@ -516,20 +435,7 @@ public class StreamImpl implements Stream {
      * @param success
      *          whether the operation is success or not.
      */
-    void executeOp(StreamOp op, boolean success) {
-        closeLock.readLock().lock();
-        try {
-            if (StreamStatus.isUnavailable(status)) {
-                op.fail(new StreamUnavailableException("Stream " + name + " is closed."));
-                return;
-            }
-            doExecuteOp(op, success);
-        } finally {
-            closeLock.readLock().unlock();
-        }
-    }
-
-    private void doExecuteOp(final StreamOp op, boolean success) {
+    void executeOp(final StreamOp op, boolean success) {
         final AsyncLogWriter writer;
         final Throwable lastException;
         synchronized (this) {
@@ -552,7 +458,7 @@ public class StreamImpl implements Stream {
                         case FOUND:
                             assert(cause instanceof OwnershipAcquireFailedException);
                             countAsException = false;
-                            handleOwnershipAcquireFailedException(op, (OwnershipAcquireFailedException) cause);
+                            handleExceptionOnStreamOp(op, cause);
                             break;
                         case ALREADY_CLOSED:
                             assert(cause instanceof AlreadyClosedException);
@@ -573,13 +479,14 @@ public class StreamImpl implements Stream {
                         case OVER_CAPACITY:
                             op.fail(cause);
                             break;
-                        // exceptions that *could* / *might* be recovered by creating a new writer
+                        // the DL writer hits exception, simple set the stream to error status
+                        // and fail the request
                         default:
-                            handleRecoverableDLException(op, cause);
+                            handleExceptionOnStreamOp(op, cause);
                             break;
                         }
                     } else {
-                        handleUnknownException(op, cause);
+                        handleExceptionOnStreamOp(op, cause);
                     }
                     if (countAsException) {
                         countException(cause, streamExceptionStatLogger);
@@ -587,88 +494,41 @@ public class StreamImpl implements Stream {
                 }
             });
         } else {
-            op.fail(lastException);
-        }
-    }
-
-    /**
-     * Handle recoverable dl exception.
-     *
-     * @param op
-     *          stream operation executing
-     * @param cause
-     *          exception received when executing <i>op</i>
-     */
-    private void handleRecoverableDLException(StreamOp op, final Throwable cause) {
-        AsyncLogWriter oldWriter = null;
-        boolean statusChanged = false;
-        synchronized (this) {
-            if (StreamStatus.INITIALIZED == status) {
-                oldWriter = setStreamStatus(StreamStatus.FAILED, StreamStatus.INITIALIZED,
-                        null, null, cause);
-                statusChanged = true;
+            if (null != lastException) {
+                op.fail(lastException);
+            } else {
+                op.fail(new StreamUnavailableException("Stream " + name + " is closed."));
             }
         }
-        if (statusChanged) {
-            Abortables.asyncAbort(oldWriter, false);
-            logger.error("Failed to write data into stream {} : ", name, cause);
-            scheduleTryAcquireOnce(0L);
-        }
-        op.fail(cause);
     }
 
     /**
-     * Handle unknown exception when executing <i>op</i>.
+     * Handle exception when executing <i>op</i>.
      *
      * @param op
      *          stream operation executing
      * @param cause
      *          exception received when executing <i>op</i>
      */
-    private void handleUnknownException(StreamOp op, final Throwable cause) {
+    private void handleExceptionOnStreamOp(StreamOp op, final Throwable cause) {
         AsyncLogWriter oldWriter = null;
         boolean statusChanged = false;
         synchronized (this) {
             if (StreamStatus.INITIALIZED == status) {
-                oldWriter = setStreamStatus(StreamStatus.FAILED, StreamStatus.INITIALIZED,
-                        null, null, cause);
+                oldWriter = setStreamStatus(StreamStatus.ERROR, StreamStatus.INITIALIZED, null, cause);
                 statusChanged = true;
             }
         }
         if (statusChanged) {
             Abortables.asyncAbort(oldWriter, false);
-            logger.error("Failed to write data into stream {} : ", name, cause);
-            scheduleTryAcquireOnce(0L);
-        }
-        op.fail(cause);
-    }
-
-    /**
-     * Handle losing ownership during executing <i>op</i>.
-     *
-     * @param op
-     *          stream operation executing
-     * @param oafe
-     *          the ownership exception received when executing <i>op</i>
-     */
-    private void handleOwnershipAcquireFailedException(StreamOp op, final OwnershipAcquireFailedException oafe) {
-        logger.warn("Failed to write data into stream {} because stream is acquired by {} : {}",
-                new Object[]{name, oafe.getCurrentOwner(), oafe.getMessage()});
-        AsyncLogWriter oldWriter = null;
-        boolean statusChanged = false;
-        synchronized (this) {
-            if (StreamStatus.INITIALIZED == status) {
-                oldWriter =
-                    setStreamStatus(StreamStatus.BACKOFF, StreamStatus.INITIALIZED,
-                            null, oafe.getCurrentOwner(), oafe);
-                statusChanged = true;
+            if (isCriticalException(cause)) {
+                logger.error("Failed to write data into stream {} : ", name, cause);
+            } else {
+                logger.warn("Failed to write data into stream {} : {}", name, cause.getMessage());
             }
+            requestClose("Failed to write data into stream " + name + " : " + cause.getMessage());
         }
-        if (statusChanged) {
-            Abortables.asyncAbort(oldWriter, false);
-            scheduleTryAcquireOnce(nextAcquireWaitTimeMs);
-        }
-        op.fail(oafe);
+        op.fail(cause);
     }
 
     /**
@@ -680,129 +540,126 @@ public class StreamImpl implements Stream {
         fatalErrorHandler.notifyFatalError();
     }
 
-    void countException(Throwable t, StatsLogger streamExceptionLogger) {
-        String exceptionName = null == t ? "null" : t.getClass().getName();
-        Counter counter = exceptionCounters.get(exceptionName);
-        if (null == counter) {
-            counter = exceptionStatLogger.getCounter(exceptionName);
-            Counter oldCounter = exceptionCounters.putIfAbsent(exceptionName, counter);
-            if (null != oldCounter) {
-                counter = oldCounter;
-            }
-        }
-        counter.inc();
-        streamExceptionLogger.getCounter(exceptionName).inc();
-    }
+    //
+    // Acquire streams
+    //
 
     Future<Boolean> acquireStream() {
-        // Reset this flag so the acquire thread knows whether re-acquire is needed.
-        writeSinceLastAcquire = false;
-
         final Stopwatch stopwatch = Stopwatch.createStarted();
         final Promise<Boolean> acquirePromise = new Promise<Boolean>();
         manager.openAsyncLogWriter().addEventListener(FutureUtils.OrderedFutureEventListener.of(new FutureEventListener<AsyncLogWriter>() {
 
             @Override
             public void onSuccess(AsyncLogWriter w) {
-                synchronized (txnLock) {
-                    sequencer.setLastId(w.getLastTxId());
-                }
-                AsyncLogWriter oldWriter;
-                Queue<StreamOp> oldPendingOps;
-                boolean success;
-                synchronized (StreamImpl.this) {
-                    oldWriter = setStreamStatus(StreamStatus.INITIALIZED,
-                            StreamStatus.INITIALIZING, w, null, null);
-                    oldPendingOps = pendingOps;
-                    pendingOps = new ArrayDeque<StreamOp>();
-                    success = true;
-                }
-                // check if the stream is allowed to be acquired
-                if (!streamManager.allowAcquire(StreamImpl.this)) {
-                    if (null != oldWriter) {
-                        Abortables.asyncAbort(oldWriter, true);
-                    }
-                    int maxAcquiredPartitions = dynConf.getMaxAcquiredPartitionsPerProxy();
-                    StreamUnavailableException sue = new StreamUnavailableException("Stream " + partition.getStream()
-                            + " is not allowed to acquire more than " + maxAcquiredPartitions + " partitions");
-                    countException(sue, exceptionStatLogger);
-                    logger.error("Failed to acquire stream {} because it is unavailable : {}",
-                            name, sue.getMessage());
-                    synchronized (this) {
-                        oldWriter = setStreamStatus(StreamStatus.ERROR,
-                                StreamStatus.INITIALIZED, null, null, sue);
-                        // we don't switch the pending ops since they are already switched
-                        // when setting the status to initialized
-                        success = false;
-                    }
-                }
-                processPendingRequestsAfterOpen(success, oldWriter, oldPendingOps);
+                onAcquireStreamSuccess(w, stopwatch, acquirePromise);
             }
 
             @Override
             public void onFailure(Throwable cause) {
-                AsyncLogWriter oldWriter;
-                Queue<StreamOp> oldPendingOps;
-                boolean success;
-                if (cause instanceof AlreadyClosedException) {
-                    countException(cause, streamExceptionStatLogger);
-                    handleAlreadyClosedException((AlreadyClosedException) cause);
-                    return;
-                } else if (cause instanceof OwnershipAcquireFailedException) {
-                    OwnershipAcquireFailedException oafe = (OwnershipAcquireFailedException) cause;
-                    logger.warn("Failed to acquire stream ownership for {}, current owner is {} : {}",
-                            new Object[]{name, oafe.getCurrentOwner(), oafe.getMessage()});
-                    synchronized (StreamImpl.this) {
-                        oldWriter = setStreamStatus(StreamStatus.BACKOFF,
-                                StreamStatus.INITIALIZING, null, oafe.getCurrentOwner(), oafe);
-                        oldPendingOps = pendingOps;
-                        pendingOps = new ArrayDeque<StreamOp>();
-                        success = false;
-                    }
-                } else if (cause instanceof InvalidStreamNameException) {
-                    InvalidStreamNameException isne = (InvalidStreamNameException) cause;
-                    countException(isne, streamExceptionStatLogger);
-                    logger.error("Failed to acquire stream {} due to its name is invalid", name);
-                    synchronized (StreamImpl.this) {
-                        oldWriter = setStreamStatus(StreamStatus.ERROR,
-                                StreamStatus.INITIALIZING, null, null, isne);
-                        oldPendingOps = pendingOps;
-                        pendingOps = new ArrayDeque<StreamOp>();
-                        success = false;
-                    }
-                } else {
-                    countException(cause, streamExceptionStatLogger);
-                    logger.error("Failed to initialize stream {} : ", name, cause);
-                    synchronized (StreamImpl.this) {
-                        oldWriter = setStreamStatus(StreamStatus.FAILED,
-                                StreamStatus.INITIALIZING, null, null, cause);
-                        oldPendingOps = pendingOps;
-                        pendingOps = new ArrayDeque<StreamOp>();
-                        success = false;
-                    }
-                }
-                processPendingRequestsAfterOpen(success, oldWriter, oldPendingOps);
+                onAcquireStreamFailure(cause, stopwatch, acquirePromise);
             }
 
-            void processPendingRequestsAfterOpen(boolean success,
-                                                 AsyncLogWriter oldWriter,
-                                                 Queue<StreamOp> oldPendingOps) {
-                if (success) {
-                    streamAcquireStat.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
-                } else {
-                    streamAcquireStat.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
-                }
-                for (StreamOp op : oldPendingOps) {
-                    executeOp(op, success);
-                    pendingOpsCounter.dec();
-                }
-                Abortables.asyncAbort(oldWriter, true);
-                FutureUtils.setValue(acquirePromise, success);
-            }
         }, scheduler, getStreamName()));
         return acquirePromise;
     }
 
+    private void onAcquireStreamSuccess(AsyncLogWriter w,
+                                        Stopwatch stopwatch,
+                                        Promise<Boolean> acquirePromise) {
+        synchronized (txnLock) {
+            sequencer.setLastId(w.getLastTxId());
+        }
+        AsyncLogWriter oldWriter;
+        Queue<StreamOp> oldPendingOps;
+        boolean success;
+        synchronized (StreamImpl.this) {
+            oldWriter = setStreamStatus(StreamStatus.INITIALIZED,
+                    StreamStatus.INITIALIZING, w, null);
+            oldPendingOps = pendingOps;
+            pendingOps = new ArrayDeque<StreamOp>();
+            success = true;
+        }
+        // check if the stream is allowed to be acquired
+        if (!streamManager.allowAcquire(StreamImpl.this)) {
+            if (null != oldWriter) {
+                Abortables.asyncAbort(oldWriter, true);
+            }
+            int maxAcquiredPartitions = dynConf.getMaxAcquiredPartitionsPerProxy();
+            StreamUnavailableException sue = new StreamUnavailableException("Stream " + partition.getStream()
+                    + " is not allowed to acquire more than " + maxAcquiredPartitions + " partitions");
+            countException(sue, exceptionStatLogger);
+            logger.error("Failed to acquire stream {} because it is unavailable : {}",
+                    name, sue.getMessage());
+            synchronized (this) {
+                oldWriter = setStreamStatus(StreamStatus.ERROR,
+                        StreamStatus.INITIALIZED, null, sue);
+                // we don't switch the pending ops since they are already switched
+                // when setting the status to initialized
+                success = false;
+            }
+        }
+        processPendingRequestsAfterAcquire(success, oldWriter, oldPendingOps, stopwatch, acquirePromise);
+    }
+
+    private void onAcquireStreamFailure(Throwable cause,
+                                        Stopwatch stopwatch,
+                                        Promise<Boolean> acquirePromise) {
+        AsyncLogWriter oldWriter;
+        Queue<StreamOp> oldPendingOps;
+        boolean success;
+        if (cause instanceof AlreadyClosedException) {
+            countException(cause, streamExceptionStatLogger);
+            handleAlreadyClosedException((AlreadyClosedException) cause);
+            return;
+        } else {
+            if (isCriticalException(cause)) {
+                countException(cause, streamExceptionStatLogger);
+                logger.error("Failed to acquire stream {} : ", name, cause);
+            } else {
+                logger.warn("Failed to acquire stream {} : {}", name, cause.getMessage());
+            }
+            synchronized (StreamImpl.this) {
+                oldWriter = setStreamStatus(StreamStatus.ERROR,
+                        StreamStatus.INITIALIZING, null, cause);
+                oldPendingOps = pendingOps;
+                pendingOps = new ArrayDeque<StreamOp>();
+                success = false;
+            }
+        }
+        processPendingRequestsAfterAcquire(success, oldWriter, oldPendingOps, stopwatch, acquirePromise);
+    }
+
+    /**
+     * Process the pending request after acquired stream.
+     *
+     * @param success whether the acquisition succeed or not
+     * @param oldWriter the old writer to abort
+     * @param oldPendingOps the old pending ops to execute
+     * @param stopwatch stopwatch to measure the time spent on acquisition
+     * @param acquirePromise the promise to complete the acquire operation
+     */
+    void processPendingRequestsAfterAcquire(boolean success,
+                                            AsyncLogWriter oldWriter,
+                                            Queue<StreamOp> oldPendingOps,
+                                            Stopwatch stopwatch,
+                                            Promise<Boolean> acquirePromise) {
+        if (success) {
+            streamAcquireStat.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+        } else {
+            streamAcquireStat.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+        }
+        for (StreamOp op : oldPendingOps) {
+            executeOp(op, success);
+            pendingOpsCounter.dec();
+        }
+        Abortables.asyncAbort(oldWriter, true);
+        FutureUtils.setValue(acquirePromise, success);
+    }
+
+    //
+    // Stream Status Changes
+    //
+
     synchronized void setStreamInErrorStatus() {
         if (StreamStatus.CLOSING == status || StreamStatus.CLOSED == status) {
             return;
@@ -819,8 +676,6 @@ public class StreamImpl implements Stream {
      *          old status
      * @param writer
      *          new log writer
-     * @param owner
-     *          new owner
      * @param t
      *          new exception
      * @return old writer if it exists
@@ -828,7 +683,6 @@ public class StreamImpl implements Stream {
     synchronized AsyncLogWriter setStreamStatus(StreamStatus newStatus,
                                                 StreamStatus oldStatus,
                                                 AsyncLogWriter writer,
-                                                String owner,
                                                 Throwable t) {
         if (oldStatus != this.status) {
             logger.info("Stream {} status already changed from {} -> {} when trying to change it to {}",
@@ -836,6 +690,11 @@ public class StreamImpl implements Stream {
             return null;
         }
 
+        String owner = null;
+        if (t instanceof OwnershipAcquireFailedException) {
+            owner = ((OwnershipAcquireFailedException) t).getCurrentOwner();
+        }
+
         AsyncLogWriter oldWriter = this.writer;
         this.writer = writer;
         if (null != owner && owner.equals(clientId)) {
@@ -852,10 +711,6 @@ public class StreamImpl implements Stream {
         }
         this.lastException = t;
         this.status = newStatus;
-        if (StreamStatus.BACKOFF == newStatus && null != owner) {
-            // start failure watch
-            this.lastAcquireFailureWatch.reset().start();
-        }
         if (StreamStatus.INITIALIZED == newStatus) {
             streamManager.notifyAcquired(this);
             logger.info("Inserted acquired stream {} -> writer {}", name, this);
@@ -866,12 +721,16 @@ public class StreamImpl implements Stream {
         return oldWriter;
     }
 
+    //
+    // Stream Close Functions
+    //
+
     void close(DistributedLogManager dlm) {
         if (null != dlm) {
             try {
                 dlm.close();
             } catch (IOException ioe) {
-                logger.warn("Failed to close dlm for {} : ", ioe);
+                logger.warn("Failed to close dlm for {} : ", name, ioe);
             }
         }
     }
@@ -902,12 +761,16 @@ public class StreamImpl implements Stream {
         // them.
         close(abort);
         if (uncache) {
+            final long probationTimeoutMs;
+            if (null != owner) {
+                probationTimeoutMs = 2 * dlConfig.getZKSessionTimeoutMilliseconds() / 3;
+            } else {
+                probationTimeoutMs = 0L;
+            }
             closePromise.onSuccess(new AbstractFunction1<Void, BoxedUnit>() {
                 @Override
                 public BoxedUnit apply(Void result) {
-                    if (streamManager.notifyRemoved(StreamImpl.this)) {
-                        logger.info("Removed cached stream {} after closed.", name);
-                    }
+                    streamManager.scheduleRemoval(StreamImpl.this, probationTimeoutMs);
                     return BoxedUnit.UNIT;
                 }
             });
@@ -949,14 +812,6 @@ public class StreamImpl implements Stream {
             closeLock.writeLock().unlock();
         }
         logger.info("Closing stream {} ...", name);
-        running = false;
-        // stop any outstanding ownership acquire actions first
-        synchronized (this) {
-            if (null != tryAcquireScheduledFuture) {
-                tryAcquireScheduledFuture.cancel(true);
-            }
-        }
-        logger.info("Stopped threads of stream {}.", name);
         // Close the writers to release the locks before failing the requests
         Future<Void> closeWriterFuture;
         if (abort) {
@@ -1016,19 +871,6 @@ public class StreamImpl implements Stream {
     // Test-only apis
 
     @VisibleForTesting
-    public StreamImpl suspendAcquiring() {
-        suspended = true;
-        return this;
-    }
-
-    @VisibleForTesting
-    public StreamImpl resumeAcquiring() {
-        suspended = false;
-        scheduleTryAcquireOnce(0L);
-        return this;
-    }
-
-    @VisibleForTesting
     public int numPendingOps() {
         Queue<StreamOp> queue = pendingOps;
         return null == queue ? 0 : queue.size();

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java
index 972eb55..e171e46 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java
@@ -43,10 +43,11 @@ public interface StreamManager {
 
     /**
      * Get a cached stream and create a new one if it doesnt exist.
-     * @param stream name
+     * @param streamName stream name
+     * @param start whether to start the stream after it is created.
      * @return future satisfied once close complete
      */
-    Stream getOrCreateStream(String stream) throws IOException;
+    Stream getOrCreateStream(String streamName, boolean start) throws IOException;
 
     /**
      * Asynchronously create a new stream.