You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2016/12/21 08:09:59 UTC
[2/2] incubator-distributedlog git commit: DL-97: Remove unused
methods in BKLogHandler
DL-97: Remove unused methods in BKLogHandler
merge code change on remove unused methods in BKLogHandler.
Author: Sijie Guo <si...@apache.org>
Author: Sijie Guo <si...@twitter.com>
Author: Leigh Stewart <ls...@twitter.com>
Author: Jordan Bull <jb...@twitter.com>
Author: Dave Rusek <da...@gmail.com>
Author: Dave Rusek <dr...@twitter.com>
Reviewers: Leigh Stewart <ls...@apache.org>
Closes #69 from sijie/merge/DL-97
Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/74a33029
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/74a33029
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/74a33029
Branch: refs/heads/master
Commit: 74a33029cc79f3a770ad6c016a42ff4ff7ad71c2
Parents: 5b55bee
Author: Sijie Guo <si...@apache.org>
Authored: Wed Dec 21 00:09:57 2016 -0800
Committer: Sijie Guo <si...@twitter.com>
Committed: Wed Dec 21 00:09:57 2016 -0800
----------------------------------------------------------------------
.../distributedlog/BKAsyncLogReaderDLSN.java | 15 +-
.../BKDistributedLogNamespace.java | 20 +-
.../twitter/distributedlog/BKLogHandler.java | 140 +----
.../distributedlog/BKLogSegmentWriter.java | 12 +-
.../distributedlog/BookKeeperClient.java | 48 +-
.../DistributedLogConfiguration.java | 8 +-
.../impl/ZKLogSegmentMetadataStore.java | 94 +++-
.../stats/BroadCastStatsLogger.java | 22 +
.../impl/TestZKLogSegmentMetadataStore.java | 22 +-
.../service/DistributedLogServiceImpl.java | 27 +-
.../service/stream/StreamImpl.java | 550 +++++++------------
.../service/stream/StreamManager.java | 5 +-
.../service/stream/StreamManagerImpl.java | 15 +-
.../service/TestDistributedLogService.java | 20 +-
pom.xml | 2 +-
15 files changed, 370 insertions(+), 630 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
index 7d3d53d..b1a9273 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
@@ -49,7 +49,6 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function1;
@@ -73,7 +72,7 @@ import scala.runtime.AbstractFunction1;
* <li> `async_reader`/idle_reader_error: counter. the number idle reader errors.
* </ul>
*/
-class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNotifier, AsyncLogReader, Runnable, AsyncNotification {
+class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotification {
static final Logger LOG = LoggerFactory.getLogger(BKAsyncLogReaderDLSN.class);
private static final Function1<List<LogRecordWithDLSN>, LogRecordWithDLSN> READ_NEXT_MAP_FUNCTION =
@@ -86,7 +85,6 @@ class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNoti
protected final BKDistributedLogManager bkDistributedLogManager;
protected final BKLogReadHandler bkLedgerManager;
- private Watcher sessionExpireWatcher = null;
private final AtomicReference<Throwable> lastException = new AtomicReference<Throwable>();
private final ScheduledExecutorService executorService;
private final ConcurrentLinkedQueue<PendingReadRequest> pendingRequests = new ConcurrentLinkedQueue<PendingReadRequest>();
@@ -218,7 +216,6 @@ class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNoti
this.executorService = executorService;
this.bkLedgerManager = bkDistributedLogManager.createReadHandler(subscriberId,
lockStateExecutor, this, deserializeRecordSet, true);
- sessionExpireWatcher = this.bkLedgerManager.registerExpirationHandler(this);
LOG.debug("Starting async reader at {}", startDLSN);
this.startDLSN = startDLSN;
this.scheduleDelayStopwatch = Stopwatch.createUnstarted();
@@ -255,14 +252,6 @@ class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNoti
this.idleReaderTimeoutTask = scheduleIdleReaderTaskIfNecessary();
}
- @Override
- public void notifySessionExpired() {
- // ZK Session notification is an indication to check if this has resulted in a fatal error
- // of the underlying reader, in itself this reader doesnt error out unless the underlying
- // reader has hit an error
- scheduleBackgroundRead();
- }
-
private ScheduledFuture<?> scheduleIdleReaderTaskIfNecessary() {
if (idleErrorThresholdMillis < Integer.MAX_VALUE) {
// Dont run the task more than once every seconds (for sanity)
@@ -494,8 +483,6 @@ class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNoti
cancelAllPendingReads(exception);
- bkLedgerManager.unregister(sessionExpireWatcher);
-
FutureUtils.ignore(bkLedgerManager.asyncClose()).proxyTo(closePromise);
return closePromise;
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
index 0b522d0..2df1046 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
@@ -75,6 +75,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.net.InetAddress;
import java.net.URI;
import java.util.Collection;
import java.util.HashMap;
@@ -252,6 +253,14 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace {
}
}
+ private static String getHostIpLockClientId() {
+ try {
+ return InetAddress.getLocalHost().toString();
+ } catch(Exception ex) {
+ return DistributedLogConstants.UNKNOWN_CLIENT_ID;
+ }
+ }
+
private final String clientId;
private final int regionId;
private final DistributedLogConfiguration conf;
@@ -326,9 +335,13 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace {
this.featureProvider = featureProvider;
this.statsLogger = statsLogger;
this.perLogStatsLogger = perLogStatsLogger;
- this.clientId = clientId;
this.regionId = regionId;
this.bkdlConfig = bkdlConfig;
+ if (clientId.equals(DistributedLogConstants.UNKNOWN_CLIENT_ID)) {
+ this.clientId = getHostIpLockClientId();
+ } else {
+ this.clientId = clientId;
+ }
// Build resources
StatsLogger schedulerStatsLogger = statsLogger.scope("factory").scope("thread_pool");
@@ -622,13 +635,10 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace {
DistributedLogConfiguration conf,
String zkServers,
StatsLogger statsLogger) {
- RetryPolicy retryPolicy = null;
- if (conf.getZKNumRetries() > 0) {
- retryPolicy = new BoundExponentialBackoffRetryPolicy(
+ RetryPolicy retryPolicy = new BoundExponentialBackoffRetryPolicy(
conf.getBKClientZKRetryBackoffStartMillis(),
conf.getBKClientZKRetryBackoffMaxMillis(),
conf.getBKClientZKNumRetries());
- }
ZooKeeperClientBuilder builder = ZooKeeperClientBuilder.newBuilder()
.name(zkcName)
.sessionTimeoutMs(conf.getBKClientZKSessionTimeoutMilliSeconds())
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
index a6ec318..460de11 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
@@ -32,9 +32,7 @@ import com.twitter.distributedlog.io.AsyncCloseable;
import com.twitter.distributedlog.logsegment.LogSegmentCache;
import com.twitter.distributedlog.logsegment.LogSegmentFilter;
import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
-import com.twitter.distributedlog.util.FutureUtils;
import com.twitter.distributedlog.util.OrderedScheduler;
-import com.twitter.distributedlog.util.Utils;
import com.twitter.util.Function;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
@@ -56,7 +54,6 @@ import scala.runtime.AbstractFunction0;
import scala.runtime.BoxedUnit;
import java.io.IOException;
-import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -104,8 +101,6 @@ import java.util.concurrent.atomic.AtomicReference;
public abstract class BKLogHandler implements Watcher, AsyncCloseable, AsyncAbortable {
static final Logger LOG = LoggerFactory.getLogger(BKLogHandler.class);
- private static final int LAYOUT_VERSION = -1;
-
protected final ZKLogMetadata logMetadata;
protected final DistributedLogConfiguration conf;
protected final ZooKeeperClient zooKeeperClient;
@@ -274,12 +269,7 @@ public abstract class BKLogHandler implements Watcher, AsyncCloseable, AsyncAbor
LOG.debug("Using ZK Path {}", logMetadata.getLogRootPath());
this.bookKeeperClient = bkcBuilder.build();
this.metadataStore = metadataStore;
-
- if (lockClientId.equals(DistributedLogConstants.UNKNOWN_CLIENT_ID)) {
- this.lockClientId = getHostIpLockClientId();
- } else {
- this.lockClientId = lockClientId;
- }
+ this.lockClientId = lockClientId;
this.getChildrenWatcher = this.zooKeeperClient.getWatcherManager()
.registerChildWatcher(logMetadata.getLogSegmentsPath(), this);
@@ -316,14 +306,6 @@ public abstract class BKLogHandler implements Watcher, AsyncCloseable, AsyncAbor
return lockClientId;
}
- private String getHostIpLockClientId() {
- try {
- return InetAddress.getLocalHost().toString();
- } catch(Exception ex) {
- return DistributedLogConstants.UNKNOWN_CLIENT_ID;
- }
- }
-
protected void registerListener(LogSegmentListener listener) {
listeners.add(listener);
}
@@ -472,57 +454,6 @@ public abstract class BKLogHandler implements Watcher, AsyncCloseable, AsyncAbor
}
}
- public LogRecordWithDLSN getLastLogRecord(boolean recover, boolean includeEndOfStream) throws IOException {
- checkLogStreamExists();
- List<LogSegmentMetadata> ledgerList = getFullLedgerListDesc(true, true);
-
- for (LogSegmentMetadata metadata: ledgerList) {
- LogRecordWithDLSN record = recoverLastRecordInLedger(metadata, recover, false, includeEndOfStream);
-
- if (null != record) {
- assert(!record.isControl());
- LOG.debug("{} getLastLogRecord Returned {}", getFullyQualifiedName(), record);
- return record;
- }
- }
-
- throw new LogEmptyException("Log " + getFullyQualifiedName() + " has no records");
- }
-
- public long getLastTxId(boolean recover,
- boolean includeEndOfStream) throws IOException {
- checkLogStreamExists();
- return getLastLogRecord(recover, includeEndOfStream).getTransactionId();
- }
-
- public DLSN getLastDLSN(boolean recover,
- boolean includeEndOfStream) throws IOException {
- checkLogStreamExists();
- return getLastLogRecord(recover, includeEndOfStream).getDlsn();
- }
-
- public long getLogRecordCount() throws IOException {
- try {
- checkLogStreamExists();
- } catch (LogNotFoundException exc) {
- return 0;
- }
-
- List<LogSegmentMetadata> ledgerList = getFullLedgerList(true, false);
- long count = 0;
- for (LogSegmentMetadata l : ledgerList) {
- if (l.isInProgress()) {
- LogRecord record = recoverLastRecordInLedger(l, false, false, false);
- if (null != record) {
- count += record.getLastPositionWithinLogSegment();
- }
- } else {
- count += l.getRecordCount();
- }
- }
- return count;
- }
-
private Future<LogRecordWithDLSN> asyncReadFirstUserRecord(LogSegmentMetadata ledger, DLSN beginDLSN) {
final LedgerHandleCache handleCache =
LedgerHandleCache.newBuilder().bkc(bookKeeperClient).conf(conf).build();
@@ -634,15 +565,6 @@ public abstract class BKLogHandler implements Watcher, AsyncCloseable, AsyncAbor
return sum;
}
- public long getFirstTxId() throws IOException {
- checkLogStreamExists();
- List<LogSegmentMetadata> ledgerList = getFullLedgerList(true, true);
-
- // The ledger list should at least have one element
- // First TxId is populated even for in progress ledgers
- return ledgerList.get(0).getFirstTxId();
- }
-
Future<Void> checkLogStreamExistsAsync() {
final Promise<Void> promise = new Promise<Void>();
try {
@@ -685,54 +607,11 @@ public abstract class BKLogHandler implements Watcher, AsyncCloseable, AsyncAbor
return promise;
}
- private void checkLogStreamExists() throws IOException {
- try {
- if (null == Utils.sync(zooKeeperClient, logMetadata.getLogSegmentsPath())
- .exists(logMetadata.getLogSegmentsPath(), false)) {
- throw new LogNotFoundException("Log " + getFullyQualifiedName() + " doesn't exist");
- }
- } catch (InterruptedException ie) {
- LOG.error("Interrupted while reading {}", logMetadata.getLogSegmentsPath(), ie);
- throw new DLInterruptedException("Interrupted while checking "
- + logMetadata.getLogSegmentsPath(), ie);
- } catch (KeeperException ke) {
- LOG.error("Error checking existence for {} : ", logMetadata.getLogSegmentsPath(), ke);
- throw new ZKException("Error checking existence for " + getFullyQualifiedName() + " : ", ke);
- }
- }
-
@Override
public Future<Void> asyncAbort() {
return asyncClose();
}
- /**
- * Find the id of the last edit log transaction written to a edit log
- * ledger.
- */
- protected Pair<Long, DLSN> readLastTxIdInLedger(LogSegmentMetadata l) throws IOException {
- LogRecordWithDLSN record = recoverLastRecordInLedger(l, false, false, true);
-
- if (null == record) {
- return Pair.of(DistributedLogConstants.EMPTY_LOGSEGMENT_TX_ID, DLSN.InvalidDLSN);
- }
- else {
- return Pair.of(record.getTransactionId(), record.getDlsn());
- }
- }
-
- /**
- * Find the id of the last edit log transaction written to a edit log
- * ledger.
- */
- protected LogRecordWithDLSN recoverLastRecordInLedger(LogSegmentMetadata l,
- boolean fence,
- boolean includeControl,
- boolean includeEndOfStream)
- throws IOException {
- return FutureUtils.result(asyncReadLastRecord(l, fence, includeControl, includeEndOfStream));
- }
-
public Future<LogRecordWithDLSN> asyncReadLastUserRecord(final LogSegmentMetadata l) {
return asyncReadLastRecord(l, false, false, false);
}
@@ -1293,21 +1172,4 @@ public abstract class BKLogHandler implements Watcher, AsyncCloseable, AsyncAbor
}
}
- // ZooKeeper Watchers
-
- Watcher registerExpirationHandler(final ZooKeeperClient.ZooKeeperSessionExpireNotifier onExpired) {
- if (conf.getZKNumRetries() > 0) {
- return new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- // nop
- }
- };
- }
- return zooKeeperClient.registerExpirationHandler(onExpired);
- }
-
- boolean unregister(Watcher watcher) {
- return zooKeeperClient.unregister(watcher);
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
index 1b52951..8276125 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
@@ -156,8 +156,10 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz
// stats
private final StatsLogger envelopeStatsLogger;
+ private final StatsLogger transmitOutstandingLogger;
private final Counter transmitDataSuccesses;
private final Counter transmitDataMisses;
+ private final Gauge<Number> transmitOutstandingGauge;
private final OpStatsLogger transmitDataPacketSize;
private final Counter transmitControlSuccesses;
private final Counter pFlushSuccesses;
@@ -255,8 +257,8 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz
pendingWrites = segWriterStatsLogger.getCounter("pending");
// outstanding transmit requests
- StatsLogger transmitOutstandingLogger = perLogStatsLogger.scope("transmit").scope("outstanding");
- transmitOutstandingLogger.registerGauge("requests", new Gauge<Number>() {
+ transmitOutstandingLogger = perLogStatsLogger.scope("transmit").scope("outstanding");
+ transmitOutstandingGauge = new Gauge<Number>() {
@Override
public Number getDefaultValue() {
return 0;
@@ -265,7 +267,8 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz
public Number getSample() {
return outstandingTransmits.get();
}
- });
+ };
+ transmitOutstandingLogger.registerGauge("requests", transmitOutstandingGauge);
outstandingTransmits = new AtomicInteger(0);
this.fullyQualifiedLogSegment = streamName + ":" + logSegmentName;
@@ -531,6 +534,9 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz
private void closeInternal(final boolean abort,
final AtomicReference<Throwable> throwExc,
final Promise<Void> closePromise) {
+ // remove stats
+ this.transmitOutstandingLogger.unregisterGauge("requests", transmitOutstandingGauge);
+
// Cancel the periodic keep alive schedule first
if (null != periodicKeepAliveSchedule) {
if (!periodicKeepAliveSchedule.cancel(false)) {
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java
index fd22b8f..c39ae4c 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java
@@ -17,6 +17,7 @@
*/
package com.twitter.distributedlog;
+import com.google.common.base.Optional;
import com.twitter.distributedlog.ZooKeeperClient.Credentials;
import com.twitter.distributedlog.ZooKeeperClient.DigestCredentials;
import com.twitter.distributedlog.exceptions.AlreadyClosedException;
@@ -41,16 +42,12 @@ import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import org.apache.bookkeeper.zookeeper.RetryPolicy;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Watcher;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.util.HashedWheelTimer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.google.common.base.Optional;
import static com.google.common.base.Charsets.UTF_8;
@@ -62,7 +59,7 @@ import static com.google.common.base.Charsets.UTF_8;
* <li> bookkeeper operation stats are exposed under current scope by {@link BookKeeper}
* </ul>
*/
-public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireNotifier {
+public class BookKeeperClient {
static final Logger LOG = LoggerFactory.getLogger(BookKeeperClient.class);
// Parameters to build bookkeeper client
@@ -83,14 +80,10 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN
// feature provider
private final Optional<FeatureProvider> featureProvider;
- private Watcher sessionExpireWatcher = null;
- private AtomicBoolean zkSessionExpired = new AtomicBoolean(false);
-
@SuppressWarnings("deprecation")
private synchronized void commonInitialization(
DistributedLogConfiguration conf, String ledgersPath,
- ClientSocketChannelFactory channelFactory, StatsLogger statsLogger, HashedWheelTimer requestTimer,
- boolean registerExpirationHandler)
+ ClientSocketChannelFactory channelFactory, StatsLogger statsLogger, HashedWheelTimer requestTimer)
throws IOException, InterruptedException, KeeperException {
ClientConfiguration bkConfig = new ClientConfiguration();
bkConfig.setAddEntryTimeout(conf.getBKClientWriteTimeout());
@@ -124,10 +117,6 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN
.requestTimer(requestTimer)
.featureProvider(featureProvider.orNull())
.build();
-
- if (registerExpirationHandler) {
- sessionExpireWatcher = this.zkc.registerExpirationHandler(this);
- }
}
BookKeeperClient(DistributedLogConfiguration conf,
@@ -159,16 +148,11 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN
if (null != this.bkc) {
return;
}
- boolean registerExpirationHandler;
if (null == this.zkc) {
int zkSessionTimeout = conf.getBKClientZKSessionTimeoutMilliSeconds();
- RetryPolicy retryPolicy = null;
- if (conf.getBKClientZKNumRetries() > 0) {
- retryPolicy = new BoundExponentialBackoffRetryPolicy(
+ RetryPolicy retryPolicy = new BoundExponentialBackoffRetryPolicy(
conf.getBKClientZKRetryBackoffStartMillis(),
conf.getBKClientZKRetryBackoffMaxMillis(), conf.getBKClientZKNumRetries());
- }
-
Credentials credentials = Credentials.NONE;
if (conf.getZkAclId() != null) {
credentials = new DigestCredentials(conf.getZkAclId(), conf.getZkAclId());
@@ -178,10 +162,9 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN
retryPolicy, statsLogger.scope("bkc_zkc"), conf.getZKClientNumberRetryThreads(),
conf.getBKClientZKRequestRateLimit(), credentials);
}
- registerExpirationHandler = conf.getBKClientZKNumRetries() <= 0;
try {
- commonInitialization(conf, ledgersPath, channelFactory, statsLogger, requestTimer, registerExpirationHandler);
+ commonInitialization(conf, ledgersPath, channelFactory, statsLogger, requestTimer);
} catch (InterruptedException e) {
throw new DLInterruptedException("Interrupted on creating bookkeeper client " + name + " : ", e);
} catch (KeeperException e) {
@@ -190,18 +173,18 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN
if (ownZK) {
LOG.info("BookKeeper Client created {} with its own ZK Client : ledgersPath = {}, numRetries = {}, " +
- "sessionTimeout = {}, backoff = {}, maxBackoff = {}, dnsResolver = {}, registerExpirationHandler = {}",
+ "sessionTimeout = {}, backoff = {}, maxBackoff = {}, dnsResolver = {}",
new Object[] { name, ledgersPath,
conf.getBKClientZKNumRetries(), conf.getBKClientZKSessionTimeoutMilliSeconds(),
conf.getBKClientZKRetryBackoffStartMillis(), conf.getBKClientZKRetryBackoffMaxMillis(),
- conf.getBkDNSResolverOverrides(), registerExpirationHandler });
+ conf.getBkDNSResolverOverrides() });
} else {
LOG.info("BookKeeper Client created {} with shared zookeeper client : ledgersPath = {}, numRetries = {}, " +
- "sessionTimeout = {}, backoff = {}, maxBackoff = {}, dnsResolver = {}, registerExpirationHandler = {}",
+ "sessionTimeout = {}, backoff = {}, maxBackoff = {}, dnsResolver = {}",
new Object[] { name, ledgersPath,
conf.getZKNumRetries(), conf.getZKSessionTimeoutMilliseconds(),
conf.getZKRetryBackoffStartMillis(), conf.getZKRetryBackoffMaxMillis(),
- conf.getBkDNSResolverOverrides(), registerExpirationHandler });
+ conf.getBkDNSResolverOverrides() });
}
}
@@ -284,9 +267,6 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN
}
}
if (null != zkc) {
- if (null != sessionExpireWatcher) {
- zkc.unregister(sessionExpireWatcher);
- }
if (ownZK) {
zkc.close();
}
@@ -294,20 +274,10 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN
closed = true;
}
- @Override
- public void notifySessionExpired() {
- zkSessionExpired.set(true);
- }
-
public synchronized void checkClosedOrInError() throws AlreadyClosedException {
if (closed) {
LOG.error("BookKeeper Client {} is already closed", name);
throw new AlreadyClosedException("BookKeeper Client " + name + " is already closed");
}
-
- if (zkSessionExpired.get()) {
- LOG.error("BookKeeper Client {}'s Zookeeper session has expired", name);
- throw new AlreadyClosedException("BookKeeper Client " + name + "'s Zookeeper session has expired");
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
index c2057df..5d0e59a 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
@@ -809,12 +809,16 @@ public class DistributedLogConfiguration extends CompositeConfiguration {
* Get num of retries for zookeeper client that used by bookkeeper client.
* <p>Retries only happen on retryable failures like session expired,
* session moved. for permanent failures, the request will fail immediately.
- * The default value is 3.
+ * The default value is 3. Setting it to zero or negative will retry infinitely.
*
* @return num of retries of zookeeper client used by bookkeeper client.
*/
public int getBKClientZKNumRetries() {
- return this.getInt(BKDL_BKCLIENT_ZK_NUM_RETRIES, BKDL_BKCLIENT_ZK_NUM_RETRIES_DEFAULT);
+ int zkNumRetries = this.getInt(BKDL_BKCLIENT_ZK_NUM_RETRIES, BKDL_BKCLIENT_ZK_NUM_RETRIES_DEFAULT);
+ if (zkNumRetries <= 0) {
+ return Integer.MAX_VALUE;
+ }
+ return zkNumRetries;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
index c0796a1..cb53b23 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
@@ -17,6 +17,7 @@
*/
package com.twitter.distributedlog.impl;
+import com.google.common.collect.ImmutableList;
import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.LogSegmentMetadata;
import com.twitter.distributedlog.ZooKeeperClient;
@@ -45,10 +46,14 @@ import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.runtime.AbstractFunction1;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -64,7 +69,9 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
private static final Logger logger = LoggerFactory.getLogger(ZKLogSegmentMetadataStore.class);
- private static class ReadLogSegmentsTask implements Runnable, FutureEventListener<List<String>> {
+ private static final List<String> EMPTY_LIST = ImmutableList.of();
+
+ private static class ReadLogSegmentsTask implements Runnable, FutureEventListener<Versioned<List<String>>> {
private final String logSegmentsPath;
private final ZKLogSegmentMetadataStore store;
@@ -78,15 +85,16 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
}
@Override
- public void onSuccess(final List<String> segments) {
+ public void onSuccess(final Versioned<List<String>> segments) {
// reset the back off after a successful operation
currentZKBackOffMs = store.minZKBackoffMs;
- final Set<LogSegmentNamesListener> listenerSet = store.listeners.get(logSegmentsPath);
+ final Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> listenerSet =
+ store.listeners.get(logSegmentsPath);
if (null != listenerSet) {
store.submitTask(logSegmentsPath, new Runnable() {
@Override
public void run() {
- for (LogSegmentNamesListener listener : listenerSet) {
+ for (VersionedLogSegmentNamesListener listener : listenerSet.values()) {
listener.onSegmentsUpdated(segments);
}
}
@@ -120,6 +128,48 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
}
}
+ /**
+ * A log segment names listener that keeps tracking the version of list of log segments that it has been notified.
+ * It only notify the newer log segments.
+ */
+ static class VersionedLogSegmentNamesListener {
+
+ private final LogSegmentNamesListener listener;
+ private Versioned<List<String>> lastNotifiedLogSegments;
+
+ VersionedLogSegmentNamesListener(LogSegmentNamesListener listener) {
+ this.listener = listener;
+ this.lastNotifiedLogSegments = new Versioned<List<String>>(EMPTY_LIST, Version.NEW);
+ }
+
+ synchronized void onSegmentsUpdated(Versioned<List<String>> logSegments) {
+ if (lastNotifiedLogSegments.getVersion() == Version.NEW ||
+ lastNotifiedLogSegments.getVersion().compare(logSegments.getVersion()) == Version.Occurred.BEFORE) {
+ lastNotifiedLogSegments = logSegments;
+ listener.onSegmentsUpdated(logSegments.getValue());
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return listener.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof VersionedLogSegmentNamesListener)) {
+ return false;
+ }
+ VersionedLogSegmentNamesListener other = (VersionedLogSegmentNamesListener) obj;
+ return listener.equals(other.listener);
+ }
+
+ @Override
+ public String toString() {
+ return listener.toString();
+ }
+ }
+
final DistributedLogConfiguration conf;
// settings
final int minZKBackoffMs;
@@ -128,7 +178,7 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
final ZooKeeperClient zkc;
// log segment listeners
- final ConcurrentMap<String, Set<LogSegmentNamesListener>> listeners;
+ final ConcurrentMap<String, Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener>> listeners;
// scheduler
final OrderedScheduler scheduler;
final ReentrantReadWriteLock closeLock;
@@ -139,7 +189,8 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
OrderedScheduler scheduler) {
this.conf = conf;
this.zkc = zkc;
- this.listeners = new ConcurrentHashMap<String, Set<LogSegmentNamesListener>>();
+ this.listeners =
+ new ConcurrentHashMap<String, Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener>>();
this.scheduler = scheduler;
this.closeLock = new ReentrantReadWriteLock();
// settings
@@ -275,11 +326,16 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
@Override
public Future<List<String>> getLogSegmentNames(String logSegmentsPath) {
- return getLogSegmentNames(logSegmentsPath, null);
+ return getLogSegmentNames(logSegmentsPath, null).map(new AbstractFunction1<Versioned<List<String>>, List<String>>() {
+ @Override
+ public List<String> apply(Versioned<List<String>> list) {
+ return list.getValue();
+ }
+ });
}
- Future<List<String>> getLogSegmentNames(String logSegmentsPath, Watcher watcher) {
- Promise<List<String>> result = new Promise<List<String>>();
+ Future<Versioned<List<String>>> getLogSegmentNames(String logSegmentsPath, Watcher watcher) {
+ Promise<Versioned<List<String>>> result = new Promise<Versioned<List<String>>>();
try {
zkc.get().getChildren(logSegmentsPath, watcher, this, result);
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
@@ -293,9 +349,11 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
@Override
@SuppressWarnings("unchecked")
public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
- Promise<List<String>> result = ((Promise<List<String>>) ctx);
+ Promise<Versioned<List<String>>> result = ((Promise<Versioned<List<String>>>) ctx);
if (KeeperException.Code.OK.intValue() == rc) {
- result.setValue(children);
+ /** cversion: the number of changes to the children of this znode **/
+ ZkVersion zkVersion = new ZkVersion(stat.getCversion());
+ result.setValue(new Versioned(children, zkVersion));
} else {
result.setException(KeeperException.create(KeeperException.Code.get(rc)));
}
@@ -312,10 +370,13 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
if (closed) {
return;
}
- Set<LogSegmentNamesListener> listenerSet = listeners.get(logSegmentsPath);
+ Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> listenerSet =
+ listeners.get(logSegmentsPath);
if (null == listenerSet) {
- Set<LogSegmentNamesListener> newListenerSet = new HashSet<LogSegmentNamesListener>();
- Set<LogSegmentNamesListener> oldListenerSet = listeners.putIfAbsent(logSegmentsPath, newListenerSet);
+ Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> newListenerSet =
+ new HashMap<LogSegmentNamesListener, VersionedLogSegmentNamesListener>();
+ Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> oldListenerSet =
+ listeners.putIfAbsent(logSegmentsPath, newListenerSet);
if (null != oldListenerSet) {
listenerSet = oldListenerSet;
} else {
@@ -323,7 +384,7 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
}
}
synchronized (listenerSet) {
- listenerSet.add(listener);
+ listenerSet.put(listener, new VersionedLogSegmentNamesListener(listener));
if (!listeners.containsKey(logSegmentsPath)) {
// listener set has been removed, add it back
listeners.put(logSegmentsPath, listenerSet);
@@ -343,7 +404,8 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
if (closed) {
return;
}
- Set<LogSegmentNamesListener> listenerSet = listeners.get(logSegmentsPath);
+ Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> listenerSet =
+ listeners.get(logSegmentsPath);
if (null == listenerSet) {
return;
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/BroadCastStatsLogger.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/BroadCastStatsLogger.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/BroadCastStatsLogger.java
index e29cc47..10a7011 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/BroadCastStatsLogger.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/BroadCastStatsLogger.java
@@ -133,9 +133,26 @@ public class BroadCastStatsLogger {
}
@Override
+ public <T extends Number> void unregisterGauge(String statName, Gauge<T> gauge) {
+ // no-op
+ }
+
+ @Override
public StatsLogger scope(final String scope) {
return new Two(first.scope(scope), second.scope(scope));
}
+
+ @Override
+ public void removeScope(String scope, StatsLogger statsLogger) {
+ if (!(statsLogger instanceof Two)) {
+ return;
+ }
+
+ Two another = (Two) statsLogger;
+
+ first.removeScope(scope, another.first);
+ second.removeScope(scope, another.second);
+ }
}
/**
@@ -165,6 +182,11 @@ public class BroadCastStatsLogger {
}
@Override
+ public <T extends Number> void unregisterGauge(String statName, Gauge<T> gauge) {
+ first.unregisterGauge(statName, gauge);
+ }
+
+ @Override
public StatsLogger scope(String scope) {
return new MasterSlave(first.scope(scope), second.scope(scope));
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
index e4c774b..f8fd3eb 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
@@ -367,7 +367,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
lsmStore.registerLogSegmentListener(rootPath, listener);
assertEquals(1, lsmStore.listeners.size());
assertTrue("Should contain listener", lsmStore.listeners.containsKey(rootPath));
- assertTrue("Should contain listener", lsmStore.listeners.get(rootPath).contains(listener));
+ assertTrue("Should contain listener", lsmStore.listeners.get(rootPath).containsKey(listener));
while (numNotifications.get() < 1) {
TimeUnit.MILLISECONDS.sleep(10);
}
@@ -429,7 +429,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
lsmStore.registerLogSegmentListener(rootPath, listener);
assertEquals(1, lsmStore.listeners.size());
assertTrue("Should contain listener", lsmStore.listeners.containsKey(rootPath));
- assertTrue("Should contain listener", lsmStore.listeners.get(rootPath).contains(listener));
+ assertTrue("Should contain listener", lsmStore.listeners.get(rootPath).containsKey(listener));
while (numNotifications.get() < 1) {
TimeUnit.MILLISECONDS.sleep(10);
}
@@ -496,7 +496,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
lsmStore.registerLogSegmentListener(rootPath, listener);
assertEquals(1, lsmStore.listeners.size());
assertTrue("Should contain listener", lsmStore.listeners.containsKey(rootPath));
- assertTrue("Should contain listener", lsmStore.listeners.get(rootPath).contains(listener));
+ assertTrue("Should contain listener", lsmStore.listeners.get(rootPath).containsKey(listener));
while (numNotifications.get() < 1) {
TimeUnit.MILLISECONDS.sleep(10);
}
@@ -510,16 +510,6 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
ZooKeeperClientUtils.expireSession(zkc,
DLUtils.getZKServersFromDLUri(uri), conf.getZKSessionTimeoutMilliseconds());
- while (numNotifications.get() < 2) {
- TimeUnit.MILLISECONDS.sleep(10);
- }
- assertEquals("Should receive second segment list update",
- 2, numNotifications.get());
- List<String> secondSegmentList = segmentLists.get(1);
- Collections.sort(secondSegmentList);
- assertEquals("List of segments should be same",
- children, secondSegmentList);
-
logger.info("Create another {} segments.", numSegments);
// create another log segment, it should trigger segment list updated
@@ -532,12 +522,12 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
List<String> newChildren = zkc.get().getChildren(rootPath, false);
Collections.sort(newChildren);
logger.info("All log segments become {}", newChildren);
- while (numNotifications.get() < 3) {
+ while (numNotifications.get() < 2) {
TimeUnit.MILLISECONDS.sleep(10);
}
assertEquals("Should receive third segment list update",
- 3, numNotifications.get());
- List<String> thirdSegmentList = segmentLists.get(2);
+ 2, numNotifications.get());
+ List<String> thirdSegmentList = segmentLists.get(1);
Collections.sort(thirdSegmentList);
assertEquals("List of segments should be updated",
2 * numSegments, thirdSegmentList.size());
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
index 751e972..3a9b904 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
@@ -90,7 +90,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -378,7 +377,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
// if it is closed, we would not acquire stream again.
return null;
}
- writer = streamManager.getOrCreateStream(stream);
+ writer = streamManager.getOrCreateStream(stream, true);
} finally {
closeLock.readLock().unlock();
}
@@ -631,26 +630,6 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
logger.info("Released KeepAlive Latch. Main thread will shut the service down.");
}
- @VisibleForTesting
- java.util.concurrent.Future<?> schedule(Runnable runnable, long delayMs) {
- closeLock.readLock().lock();
- try {
- if (serverStatus != ServerStatus.WRITE_AND_ACCEPT) {
- return null;
- } else if (delayMs > 0) {
- return scheduler.schedule(runnable, delayMs, TimeUnit.MILLISECONDS);
- } else {
- return scheduler.submit(runnable);
- }
- } catch (RejectedExecutionException ree) {
- logger.error("Failed to schedule task {} in {} ms : ",
- new Object[] { runnable, delayMs, ree });
- return null;
- } finally {
- closeLock.readLock().unlock();
- }
- }
-
// Test methods.
private DynamicDistributedLogConfiguration getDynConf(String streamName) {
@@ -664,8 +643,8 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
}
@VisibleForTesting
- Stream newStream(String name) {
- return streamFactory.create(name, getDynConf(name), streamManager);
+ Stream newStream(String name) throws IOException {
+ return streamManager.getOrCreateStream(name, false);
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
index 1204d39..3d5b9e7 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
@@ -26,7 +26,6 @@ import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.DistributedLogManager;
import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
import com.twitter.distributedlog.exceptions.DLException;
-import com.twitter.distributedlog.exceptions.InvalidStreamNameException;
import com.twitter.distributedlog.exceptions.OverCapacityException;
import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException;
import com.twitter.distributedlog.exceptions.StreamNotReadyException;
@@ -70,24 +69,23 @@ import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class StreamImpl implements Stream {
static final Logger logger = LoggerFactory.getLogger(StreamImpl.class);
+ /**
+ * The status of the stream.
+ *
+ * The status change of the stream should just go in one direction. If a stream hits
+ * any error, the stream should be put in error state. If a stream is in error state,
+ * it should be removed and not reused anymore.
+ */
public static enum StreamStatus {
UNINITIALIZED(-1),
INITIALIZING(0),
INITIALIZED(1),
- // if a stream is in failed state, it could be retried immediately.
- // a stream will be put in failed state when encountered any stream exception.
- FAILED(-2),
- // if a stream is in backoff state, it would backoff for a while.
- // a stream will be put in backoff state when failed to acquire the ownership.
- BACKOFF(-3),
CLOSING(-4),
CLOSED(-5),
// if a stream is in error state, it should be abort during closing.
@@ -112,26 +110,15 @@ public class StreamImpl implements Stream {
private final Partition partition;
private DistributedLogManager manager;
- // A write has been attempted since the last stream acquire.
- private volatile boolean writeSinceLastAcquire = false;
private volatile AsyncLogWriter writer;
private volatile StreamStatus status;
private volatile String owner;
private volatile Throwable lastException;
- private volatile boolean running = true;
- private volatile boolean suspended = false;
private volatile Queue<StreamOp> pendingOps = new ArrayDeque<StreamOp>();
private final Promise<Void> closePromise = new Promise<Void>();
private final Object txnLock = new Object();
private final TimeSequencer sequencer = new TimeSequencer();
- // last acquire time
- private final Stopwatch lastAcquireWatch = Stopwatch.createUnstarted();
- // last acquire failure time
- private final Stopwatch lastAcquireFailureWatch = Stopwatch.createUnstarted();
- private final long nextAcquireWaitTimeMs;
- private ScheduledFuture<?> tryAcquireScheduledFuture = null;
- private long scheduledAcquireDelayMs = 0L;
private final StreamRequestLimiter limiter;
private final DynamicDistributedLogConfiguration dynConf;
private final DistributedLogConfiguration dlConfig;
@@ -165,7 +152,7 @@ public class StreamImpl implements Stream {
new ConcurrentHashMap<String, Counter>();
// Since we may create and discard streams at initialization if there's a race,
- // must not do any expensive intialization here (particularly any locking or
+ // must not do any expensive initialization here (particularly any locking or
// significant resource allocation etc.).
StreamImpl(final String name,
final Partition partition,
@@ -189,7 +176,6 @@ public class StreamImpl implements Stream {
this.partition = partition;
this.status = StreamStatus.UNINITIALIZED;
this.lastException = new IOException("Fail to write record to stream " + name);
- this.nextAcquireWaitTimeMs = dlConfig.getZKSessionTimeoutMilliseconds() * 3 / 5;
this.streamConfigProvider = streamConfigProvider;
this.dlNamespace = dlNamespace;
this.featureRateLimitDisabled = featureProvider.getFeature(
@@ -275,54 +261,16 @@ public class StreamImpl implements Stream {
return String.format("Stream:%s, %s, %s Status:%s", name, manager, writer, status);
}
- // schedule stream acquistion
- private void tryAcquireStreamOnce() {
- if (!running) {
- return;
- }
-
- boolean needAcquire = false;
- boolean checkNextTime = false;
- synchronized (this) {
- switch (this.status) {
- case INITIALIZING:
- streamManager.notifyReleased(this);
- needAcquire = true;
- break;
- case FAILED:
- this.status = StreamStatus.INITIALIZING;
- streamManager.notifyReleased(this);
- needAcquire = true;
- break;
- case BACKOFF:
- // We may end up here after timeout on streamLock. To avoid acquire on every timeout
- // we should only try again if a write has been attempted since the last acquire
- // attempt. If we end up here because the request handler woke us up, the flag will
- // be set and we will try to acquire as intended.
- if (writeSinceLastAcquire) {
- this.status = StreamStatus.INITIALIZING;
- streamManager.notifyReleased(this);
- needAcquire = true;
- } else {
- checkNextTime = true;
- }
- break;
- default:
- break;
- }
- }
- if (needAcquire) {
- lastAcquireWatch.reset().start();
- acquireStream().addEventListener(new FutureEventListener<Boolean>() {
+ @Override
+ public void start() {
+ // acquire the stream
+ acquireStream().addEventListener(new FutureEventListener<Boolean>() {
@Override
public void onSuccess(Boolean success) {
- synchronized (StreamImpl.this) {
- scheduledAcquireDelayMs = 0L;
- tryAcquireScheduledFuture = null;
- }
if (!success) {
- // schedule acquire in nextAcquireWaitTimeMs
- scheduleTryAcquireOnce(nextAcquireWaitTimeMs);
+ // failed to acquire the stream. set the stream in error status and close it.
+ setStreamInErrorStatus();
+ requestClose("Failed to acquire the ownership");
}
}
@@ -330,65 +278,40 @@ public class StreamImpl implements Stream {
public void onFailure(Throwable cause) {
// unhandled exceptions
logger.error("Stream {} threw unhandled exception : ", name, cause);
+ // failed to acquire the stream. set the stream in error status and close it.
setStreamInErrorStatus();
requestClose("Unhandled exception");
}
});
- } else if (StreamStatus.isUnavailable(status)) {
- // if the stream is unavailable, stop the thread and close the stream
- requestClose("Stream is unavailable anymore");
- } else if (StreamStatus.INITIALIZED != status && lastAcquireWatch.elapsed(TimeUnit.HOURS) > 2) {
- // if the stream isn't in initialized state and no writes coming in, then close the stream
- requestClose("Stream not used anymore");
- } else if (checkNextTime) {
- synchronized (StreamImpl.this) {
- scheduledAcquireDelayMs = 0L;
- tryAcquireScheduledFuture = null;
- }
- // schedule acquire in nextAcquireWaitTimeMs
- scheduleTryAcquireOnce(nextAcquireWaitTimeMs);
- }
}
- private synchronized void scheduleTryAcquireOnce(long delayMs) {
- if (null != tryAcquireScheduledFuture) {
- if (delayMs <= 0) {
- if (scheduledAcquireDelayMs <= 0L ||
- (scheduledAcquireDelayMs > 0L
- && !tryAcquireScheduledFuture.cancel(false))) {
- return;
- }
- // if the scheduled one could be cancelled, re-submit one
- } else {
- return;
+ //
+ // Stats Operations
+ //
+
+ void countException(Throwable t, StatsLogger streamExceptionLogger) {
+ String exceptionName = null == t ? "null" : t.getClass().getName();
+ Counter counter = exceptionCounters.get(exceptionName);
+ if (null == counter) {
+ counter = exceptionStatLogger.getCounter(exceptionName);
+ Counter oldCounter = exceptionCounters.putIfAbsent(exceptionName, counter);
+ if (null != oldCounter) {
+ counter = oldCounter;
}
}
- tryAcquireScheduledFuture = schedule(new Runnable() {
- @Override
- public void run() {
- tryAcquireStreamOnce();
- }
- }, delayMs);
- scheduledAcquireDelayMs = delayMs;
+ counter.inc();
+ streamExceptionLogger.getCounter(exceptionName).inc();
}
- @Override
- public void start() {
- scheduleTryAcquireOnce(0);
+ boolean isCriticalException(Throwable cause) {
+ return !(cause instanceof OwnershipAcquireFailedException);
}
- ScheduledFuture<?> schedule(Runnable runnable, long delayMs) {
- if (!running) {
- return null;
- }
- try {
- return scheduler.schedule(name, runnable, delayMs, TimeUnit.MILLISECONDS);
- } catch (RejectedExecutionException ree) {
- logger.error("Failed to schedule task {} in {} ms : ",
- new Object[] { runnable, delayMs, ree });
- return null;
- }
- }
+ //
+ // Service Timeout:
+ // - schedule a timeout function to handle operation timeouts: {@link #handleServiceTimeout(String)}
+ // - if the operation is completed within timeout period, cancel the timeout.
+ //
void scheduleTimeout(final StreamOp op) {
final Timeout timeout = requestTimer.newTimeout(new TimerTask() {
@@ -418,12 +341,14 @@ public class StreamImpl implements Stream {
* stream off the proxy for a period of time, hopefully long enough for the
* issues to be resolved, or for whoop to kick in and kill the shard.
*/
- synchronized void handleServiceTimeout(String reason) {
- if (StreamStatus.isUnavailable(status)) {
- return;
+ void handleServiceTimeout(String reason) {
+ synchronized (this) {
+ if (StreamStatus.isUnavailable(status)) {
+ return;
+ }
+ // Mark stream in error state
+ setStreamInErrorStatus();
}
- // Mark stream in error state
- setStreamInErrorStatus();
// Async close request, and schedule eviction when its done.
Future<Void> closeFuture = requestClose(reason, false /* dont remove */);
@@ -436,6 +361,10 @@ public class StreamImpl implements Stream {
});
}
+ //
+ // Submit the operation to the stream.
+ //
+
/**
* Execute the StreamOp. If reacquire is needed, this may initiate reacquire and queue the op for
* execution once complete.
@@ -445,9 +374,6 @@ public class StreamImpl implements Stream {
*/
@Override
public void submit(StreamOp op) {
- // Let stream acquire thread know a write has been attempted.
- writeSinceLastAcquire = true;
-
try {
limiter.apply(op);
} catch (OverCapacityException ex) {
@@ -460,36 +386,28 @@ public class StreamImpl implements Stream {
scheduleTimeout(op);
}
- boolean notifyAcquireThread = false;
boolean completeOpNow = false;
boolean success = true;
if (StreamStatus.isUnavailable(status)) {
// Stream is closed, fail the op immediately
op.fail(new StreamUnavailableException("Stream " + name + " is closed."));
return;
- } if (StreamStatus.INITIALIZED == status && writer != null) {
+ } else if (StreamStatus.INITIALIZED == status && writer != null) {
completeOpNow = true;
success = true;
} else {
synchronized (this) {
if (StreamStatus.isUnavailable(status)) {
- // complete the write op as {@link #executeOp(op, success)} will handle closed case.
- completeOpNow = true;
- success = true;
+ // Stream is closed, fail the op immediately
+ op.fail(new StreamUnavailableException("Stream " + name + " is closed."));
+ return;
} if (StreamStatus.INITIALIZED == status) {
completeOpNow = true;
success = true;
- } else if (StreamStatus.BACKOFF == status &&
- lastAcquireFailureWatch.elapsed(TimeUnit.MILLISECONDS) < nextAcquireWaitTimeMs) {
- completeOpNow = true;
- success = false;
} else if (failFastOnStreamNotReady) {
- notifyAcquireThread = true;
- completeOpNow = false;
- success = false;
op.fail(new StreamNotReadyException("Stream " + name + " is not ready; status = " + status));
- } else { // closing & initializing
- notifyAcquireThread = true;
+ return;
+ } else { // the stream is still initializing
pendingOps.add(op);
pendingOpsCounter.inc();
if (1 == pendingOps.size()) {
@@ -500,14 +418,15 @@ public class StreamImpl implements Stream {
}
}
}
- if (notifyAcquireThread && !suspended) {
- scheduleTryAcquireOnce(0L);
- }
if (completeOpNow) {
executeOp(op, success);
}
}
+ //
+ // Execute operations and handle exceptions on operations
+ //
+
/**
* Execute the <i>op</i> immediately.
*
@@ -516,20 +435,7 @@ public class StreamImpl implements Stream {
* @param success
* whether the operation is success or not.
*/
- void executeOp(StreamOp op, boolean success) {
- closeLock.readLock().lock();
- try {
- if (StreamStatus.isUnavailable(status)) {
- op.fail(new StreamUnavailableException("Stream " + name + " is closed."));
- return;
- }
- doExecuteOp(op, success);
- } finally {
- closeLock.readLock().unlock();
- }
- }
-
- private void doExecuteOp(final StreamOp op, boolean success) {
+ void executeOp(final StreamOp op, boolean success) {
final AsyncLogWriter writer;
final Throwable lastException;
synchronized (this) {
@@ -552,7 +458,7 @@ public class StreamImpl implements Stream {
case FOUND:
assert(cause instanceof OwnershipAcquireFailedException);
countAsException = false;
- handleOwnershipAcquireFailedException(op, (OwnershipAcquireFailedException) cause);
+ handleExceptionOnStreamOp(op, cause);
break;
case ALREADY_CLOSED:
assert(cause instanceof AlreadyClosedException);
@@ -573,13 +479,14 @@ public class StreamImpl implements Stream {
case OVER_CAPACITY:
op.fail(cause);
break;
- // exceptions that *could* / *might* be recovered by creating a new writer
+ // the DL writer hits exception, simple set the stream to error status
+ // and fail the request
default:
- handleRecoverableDLException(op, cause);
+ handleExceptionOnStreamOp(op, cause);
break;
}
} else {
- handleUnknownException(op, cause);
+ handleExceptionOnStreamOp(op, cause);
}
if (countAsException) {
countException(cause, streamExceptionStatLogger);
@@ -587,88 +494,41 @@ public class StreamImpl implements Stream {
}
});
} else {
- op.fail(lastException);
- }
- }
-
- /**
- * Handle recoverable dl exception.
- *
- * @param op
- * stream operation executing
- * @param cause
- * exception received when executing <i>op</i>
- */
- private void handleRecoverableDLException(StreamOp op, final Throwable cause) {
- AsyncLogWriter oldWriter = null;
- boolean statusChanged = false;
- synchronized (this) {
- if (StreamStatus.INITIALIZED == status) {
- oldWriter = setStreamStatus(StreamStatus.FAILED, StreamStatus.INITIALIZED,
- null, null, cause);
- statusChanged = true;
+ if (null != lastException) {
+ op.fail(lastException);
+ } else {
+ op.fail(new StreamUnavailableException("Stream " + name + " is closed."));
}
}
- if (statusChanged) {
- Abortables.asyncAbort(oldWriter, false);
- logger.error("Failed to write data into stream {} : ", name, cause);
- scheduleTryAcquireOnce(0L);
- }
- op.fail(cause);
}
/**
- * Handle unknown exception when executing <i>op</i>.
+ * Handle exception when executing <i>op</i>.
*
* @param op
* stream operation executing
* @param cause
* exception received when executing <i>op</i>
*/
- private void handleUnknownException(StreamOp op, final Throwable cause) {
+ private void handleExceptionOnStreamOp(StreamOp op, final Throwable cause) {
AsyncLogWriter oldWriter = null;
boolean statusChanged = false;
synchronized (this) {
if (StreamStatus.INITIALIZED == status) {
- oldWriter = setStreamStatus(StreamStatus.FAILED, StreamStatus.INITIALIZED,
- null, null, cause);
+ oldWriter = setStreamStatus(StreamStatus.ERROR, StreamStatus.INITIALIZED, null, cause);
statusChanged = true;
}
}
if (statusChanged) {
Abortables.asyncAbort(oldWriter, false);
- logger.error("Failed to write data into stream {} : ", name, cause);
- scheduleTryAcquireOnce(0L);
- }
- op.fail(cause);
- }
-
- /**
- * Handle losing ownership during executing <i>op</i>.
- *
- * @param op
- * stream operation executing
- * @param oafe
- * the ownership exception received when executing <i>op</i>
- */
- private void handleOwnershipAcquireFailedException(StreamOp op, final OwnershipAcquireFailedException oafe) {
- logger.warn("Failed to write data into stream {} because stream is acquired by {} : {}",
- new Object[]{name, oafe.getCurrentOwner(), oafe.getMessage()});
- AsyncLogWriter oldWriter = null;
- boolean statusChanged = false;
- synchronized (this) {
- if (StreamStatus.INITIALIZED == status) {
- oldWriter =
- setStreamStatus(StreamStatus.BACKOFF, StreamStatus.INITIALIZED,
- null, oafe.getCurrentOwner(), oafe);
- statusChanged = true;
+ if (isCriticalException(cause)) {
+ logger.error("Failed to write data into stream {} : ", name, cause);
+ } else {
+ logger.warn("Failed to write data into stream {} : {}", name, cause.getMessage());
}
+ requestClose("Failed to write data into stream " + name + " : " + cause.getMessage());
}
- if (statusChanged) {
- Abortables.asyncAbort(oldWriter, false);
- scheduleTryAcquireOnce(nextAcquireWaitTimeMs);
- }
- op.fail(oafe);
+ op.fail(cause);
}
/**
@@ -680,129 +540,126 @@ public class StreamImpl implements Stream {
fatalErrorHandler.notifyFatalError();
}
- void countException(Throwable t, StatsLogger streamExceptionLogger) {
- String exceptionName = null == t ? "null" : t.getClass().getName();
- Counter counter = exceptionCounters.get(exceptionName);
- if (null == counter) {
- counter = exceptionStatLogger.getCounter(exceptionName);
- Counter oldCounter = exceptionCounters.putIfAbsent(exceptionName, counter);
- if (null != oldCounter) {
- counter = oldCounter;
- }
- }
- counter.inc();
- streamExceptionLogger.getCounter(exceptionName).inc();
- }
+ //
+ // Acquire streams
+ //
Future<Boolean> acquireStream() {
- // Reset this flag so the acquire thread knows whether re-acquire is needed.
- writeSinceLastAcquire = false;
-
final Stopwatch stopwatch = Stopwatch.createStarted();
final Promise<Boolean> acquirePromise = new Promise<Boolean>();
manager.openAsyncLogWriter().addEventListener(FutureUtils.OrderedFutureEventListener.of(new FutureEventListener<AsyncLogWriter>() {
@Override
public void onSuccess(AsyncLogWriter w) {
- synchronized (txnLock) {
- sequencer.setLastId(w.getLastTxId());
- }
- AsyncLogWriter oldWriter;
- Queue<StreamOp> oldPendingOps;
- boolean success;
- synchronized (StreamImpl.this) {
- oldWriter = setStreamStatus(StreamStatus.INITIALIZED,
- StreamStatus.INITIALIZING, w, null, null);
- oldPendingOps = pendingOps;
- pendingOps = new ArrayDeque<StreamOp>();
- success = true;
- }
- // check if the stream is allowed to be acquired
- if (!streamManager.allowAcquire(StreamImpl.this)) {
- if (null != oldWriter) {
- Abortables.asyncAbort(oldWriter, true);
- }
- int maxAcquiredPartitions = dynConf.getMaxAcquiredPartitionsPerProxy();
- StreamUnavailableException sue = new StreamUnavailableException("Stream " + partition.getStream()
- + " is not allowed to acquire more than " + maxAcquiredPartitions + " partitions");
- countException(sue, exceptionStatLogger);
- logger.error("Failed to acquire stream {} because it is unavailable : {}",
- name, sue.getMessage());
- synchronized (this) {
- oldWriter = setStreamStatus(StreamStatus.ERROR,
- StreamStatus.INITIALIZED, null, null, sue);
- // we don't switch the pending ops since they are already switched
- // when setting the status to initialized
- success = false;
- }
- }
- processPendingRequestsAfterOpen(success, oldWriter, oldPendingOps);
+ onAcquireStreamSuccess(w, stopwatch, acquirePromise);
}
@Override
public void onFailure(Throwable cause) {
- AsyncLogWriter oldWriter;
- Queue<StreamOp> oldPendingOps;
- boolean success;
- if (cause instanceof AlreadyClosedException) {
- countException(cause, streamExceptionStatLogger);
- handleAlreadyClosedException((AlreadyClosedException) cause);
- return;
- } else if (cause instanceof OwnershipAcquireFailedException) {
- OwnershipAcquireFailedException oafe = (OwnershipAcquireFailedException) cause;
- logger.warn("Failed to acquire stream ownership for {}, current owner is {} : {}",
- new Object[]{name, oafe.getCurrentOwner(), oafe.getMessage()});
- synchronized (StreamImpl.this) {
- oldWriter = setStreamStatus(StreamStatus.BACKOFF,
- StreamStatus.INITIALIZING, null, oafe.getCurrentOwner(), oafe);
- oldPendingOps = pendingOps;
- pendingOps = new ArrayDeque<StreamOp>();
- success = false;
- }
- } else if (cause instanceof InvalidStreamNameException) {
- InvalidStreamNameException isne = (InvalidStreamNameException) cause;
- countException(isne, streamExceptionStatLogger);
- logger.error("Failed to acquire stream {} due to its name is invalid", name);
- synchronized (StreamImpl.this) {
- oldWriter = setStreamStatus(StreamStatus.ERROR,
- StreamStatus.INITIALIZING, null, null, isne);
- oldPendingOps = pendingOps;
- pendingOps = new ArrayDeque<StreamOp>();
- success = false;
- }
- } else {
- countException(cause, streamExceptionStatLogger);
- logger.error("Failed to initialize stream {} : ", name, cause);
- synchronized (StreamImpl.this) {
- oldWriter = setStreamStatus(StreamStatus.FAILED,
- StreamStatus.INITIALIZING, null, null, cause);
- oldPendingOps = pendingOps;
- pendingOps = new ArrayDeque<StreamOp>();
- success = false;
- }
- }
- processPendingRequestsAfterOpen(success, oldWriter, oldPendingOps);
+ onAcquireStreamFailure(cause, stopwatch, acquirePromise);
}
- void processPendingRequestsAfterOpen(boolean success,
- AsyncLogWriter oldWriter,
- Queue<StreamOp> oldPendingOps) {
- if (success) {
- streamAcquireStat.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
- } else {
- streamAcquireStat.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
- }
- for (StreamOp op : oldPendingOps) {
- executeOp(op, success);
- pendingOpsCounter.dec();
- }
- Abortables.asyncAbort(oldWriter, true);
- FutureUtils.setValue(acquirePromise, success);
- }
}, scheduler, getStreamName()));
return acquirePromise;
}
+ private void onAcquireStreamSuccess(AsyncLogWriter w,
+ Stopwatch stopwatch,
+ Promise<Boolean> acquirePromise) {
+ synchronized (txnLock) {
+ sequencer.setLastId(w.getLastTxId());
+ }
+ AsyncLogWriter oldWriter;
+ Queue<StreamOp> oldPendingOps;
+ boolean success;
+ synchronized (StreamImpl.this) {
+ oldWriter = setStreamStatus(StreamStatus.INITIALIZED,
+ StreamStatus.INITIALIZING, w, null);
+ oldPendingOps = pendingOps;
+ pendingOps = new ArrayDeque<StreamOp>();
+ success = true;
+ }
+ // check if the stream is allowed to be acquired
+ if (!streamManager.allowAcquire(StreamImpl.this)) {
+ if (null != oldWriter) {
+ Abortables.asyncAbort(oldWriter, true);
+ }
+ int maxAcquiredPartitions = dynConf.getMaxAcquiredPartitionsPerProxy();
+ StreamUnavailableException sue = new StreamUnavailableException("Stream " + partition.getStream()
+ + " is not allowed to acquire more than " + maxAcquiredPartitions + " partitions");
+ countException(sue, exceptionStatLogger);
+ logger.error("Failed to acquire stream {} because it is unavailable : {}",
+ name, sue.getMessage());
+ synchronized (this) {
+ oldWriter = setStreamStatus(StreamStatus.ERROR,
+ StreamStatus.INITIALIZED, null, sue);
+ // we don't switch the pending ops since they are already switched
+ // when setting the status to initialized
+ success = false;
+ }
+ }
+ processPendingRequestsAfterAcquire(success, oldWriter, oldPendingOps, stopwatch, acquirePromise);
+ }
+
+ private void onAcquireStreamFailure(Throwable cause,
+ Stopwatch stopwatch,
+ Promise<Boolean> acquirePromise) {
+ AsyncLogWriter oldWriter;
+ Queue<StreamOp> oldPendingOps;
+ boolean success;
+ if (cause instanceof AlreadyClosedException) {
+ countException(cause, streamExceptionStatLogger);
+ handleAlreadyClosedException((AlreadyClosedException) cause);
+ return;
+ } else {
+ if (isCriticalException(cause)) {
+ countException(cause, streamExceptionStatLogger);
+ logger.error("Failed to acquire stream {} : ", name, cause);
+ } else {
+ logger.warn("Failed to acquire stream {} : {}", name, cause.getMessage());
+ }
+ synchronized (StreamImpl.this) {
+ oldWriter = setStreamStatus(StreamStatus.ERROR,
+ StreamStatus.INITIALIZING, null, cause);
+ oldPendingOps = pendingOps;
+ pendingOps = new ArrayDeque<StreamOp>();
+ success = false;
+ }
+ }
+ processPendingRequestsAfterAcquire(success, oldWriter, oldPendingOps, stopwatch, acquirePromise);
+ }
+
+ /**
+ * Process the pending request after acquired stream.
+ *
+ * @param success whether the acquisition succeed or not
+ * @param oldWriter the old writer to abort
+ * @param oldPendingOps the old pending ops to execute
+ * @param stopwatch stopwatch to measure the time spent on acquisition
+ * @param acquirePromise the promise to complete the acquire operation
+ */
+ void processPendingRequestsAfterAcquire(boolean success,
+ AsyncLogWriter oldWriter,
+ Queue<StreamOp> oldPendingOps,
+ Stopwatch stopwatch,
+ Promise<Boolean> acquirePromise) {
+ if (success) {
+ streamAcquireStat.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+ } else {
+ streamAcquireStat.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+ }
+ for (StreamOp op : oldPendingOps) {
+ executeOp(op, success);
+ pendingOpsCounter.dec();
+ }
+ Abortables.asyncAbort(oldWriter, true);
+ FutureUtils.setValue(acquirePromise, success);
+ }
+
+ //
+ // Stream Status Changes
+ //
+
synchronized void setStreamInErrorStatus() {
if (StreamStatus.CLOSING == status || StreamStatus.CLOSED == status) {
return;
@@ -819,8 +676,6 @@ public class StreamImpl implements Stream {
* old status
* @param writer
* new log writer
- * @param owner
- * new owner
* @param t
* new exception
* @return old writer if it exists
@@ -828,7 +683,6 @@ public class StreamImpl implements Stream {
synchronized AsyncLogWriter setStreamStatus(StreamStatus newStatus,
StreamStatus oldStatus,
AsyncLogWriter writer,
- String owner,
Throwable t) {
if (oldStatus != this.status) {
logger.info("Stream {} status already changed from {} -> {} when trying to change it to {}",
@@ -836,6 +690,11 @@ public class StreamImpl implements Stream {
return null;
}
+ String owner = null;
+ if (t instanceof OwnershipAcquireFailedException) {
+ owner = ((OwnershipAcquireFailedException) t).getCurrentOwner();
+ }
+
AsyncLogWriter oldWriter = this.writer;
this.writer = writer;
if (null != owner && owner.equals(clientId)) {
@@ -852,10 +711,6 @@ public class StreamImpl implements Stream {
}
this.lastException = t;
this.status = newStatus;
- if (StreamStatus.BACKOFF == newStatus && null != owner) {
- // start failure watch
- this.lastAcquireFailureWatch.reset().start();
- }
if (StreamStatus.INITIALIZED == newStatus) {
streamManager.notifyAcquired(this);
logger.info("Inserted acquired stream {} -> writer {}", name, this);
@@ -866,12 +721,16 @@ public class StreamImpl implements Stream {
return oldWriter;
}
+ //
+ // Stream Close Functions
+ //
+
void close(DistributedLogManager dlm) {
if (null != dlm) {
try {
dlm.close();
} catch (IOException ioe) {
- logger.warn("Failed to close dlm for {} : ", ioe);
+ logger.warn("Failed to close dlm for {} : ", name, ioe);
}
}
}
@@ -902,12 +761,16 @@ public class StreamImpl implements Stream {
// them.
close(abort);
if (uncache) {
+ final long probationTimeoutMs;
+ if (null != owner) {
+ probationTimeoutMs = 2 * dlConfig.getZKSessionTimeoutMilliseconds() / 3;
+ } else {
+ probationTimeoutMs = 0L;
+ }
closePromise.onSuccess(new AbstractFunction1<Void, BoxedUnit>() {
@Override
public BoxedUnit apply(Void result) {
- if (streamManager.notifyRemoved(StreamImpl.this)) {
- logger.info("Removed cached stream {} after closed.", name);
- }
+ streamManager.scheduleRemoval(StreamImpl.this, probationTimeoutMs);
return BoxedUnit.UNIT;
}
});
@@ -949,14 +812,6 @@ public class StreamImpl implements Stream {
closeLock.writeLock().unlock();
}
logger.info("Closing stream {} ...", name);
- running = false;
- // stop any outstanding ownership acquire actions first
- synchronized (this) {
- if (null != tryAcquireScheduledFuture) {
- tryAcquireScheduledFuture.cancel(true);
- }
- }
- logger.info("Stopped threads of stream {}.", name);
// Close the writers to release the locks before failing the requests
Future<Void> closeWriterFuture;
if (abort) {
@@ -1016,19 +871,6 @@ public class StreamImpl implements Stream {
// Test-only apis
@VisibleForTesting
- public StreamImpl suspendAcquiring() {
- suspended = true;
- return this;
- }
-
- @VisibleForTesting
- public StreamImpl resumeAcquiring() {
- suspended = false;
- scheduleTryAcquireOnce(0L);
- return this;
- }
-
- @VisibleForTesting
public int numPendingOps() {
Queue<StreamOp> queue = pendingOps;
return null == queue ? 0 : queue.size();
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/74a33029/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java
index 972eb55..e171e46 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java
@@ -43,10 +43,11 @@ public interface StreamManager {
/**
* Get a cached stream and create a new one if it doesnt exist.
- * @param stream name
+ * @param streamName stream name
+ * @param start whether to start the stream after it is created.
* @return future satisfied once close complete
*/
- Stream getOrCreateStream(String stream) throws IOException;
+ Stream getOrCreateStream(String streamName, boolean start) throws IOException;
/**
* Asynchronously create a new stream.