You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/12/04 18:54:25 UTC
svn commit: r1417066 - in /zookeeper/bookkeeper/trunk: ./
hedwig-server/src/main/java/org/apache/hedwig/server/common/
hedwig-server/src/main/java/org/apache/hedwig/server/persistence/
hedwig-server/src/test/java/org/apache/hedwig/server/persistence/
Author: ivank
Date: Tue Dec 4 17:54:23 2012
New Revision: 1417066
URL: http://svn.apache.org/viewvc?rev=1417066&view=rev
Log:
BOOKKEEPER-461: Delivery throughput degrades when there are lots of publishers w/ high traffic. (sijie via ivank)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheBean.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1417066&r1=1417065&r2=1417066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Tue Dec 4 17:54:23 2012
@@ -168,6 +168,8 @@ Trunk (unreleased changes)
BOOKKEEPER-442: Failed to deliver messages due to inconsistency between SubscriptionState and LedgerRanges. (jiannan via ivank)
+ BOOKKEEPER-461: Delivery throughput degrades when there are lots of publishers w/ high traffic. (sijie via ivank)
+
IMPROVEMENTS:
BOOKKEEPER-467: Allocate ports for testing dynamically (ivank)
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java?rev=1417066&r1=1417065&r2=1417066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java Tue Dec 4 17:54:23 2012
@@ -66,6 +66,7 @@ public class ServerConfiguration extends
protected final static String RETRY_REMOTE_SUBSCRIBE_THREAD_RUN_INTERVAL = "retry_remote_subscribe_thread_run_interval";
protected final static String DEFAULT_MESSAGE_WINDOW_SIZE =
"default_message_window_size";
+ protected final static String NUM_READAHEAD_CACHE_THREADS = "num_readahead_cache_threads";
protected final static String MAX_ENTRIES_PER_LEDGER = "max_entries_per_ledger";
@@ -380,6 +381,15 @@ public class ServerConfiguration extends
}
/**
+ * Get number of read ahead cache threads.
+ *
+ * @return number of read ahead cache threads.
+ */
+ public int getNumReadAheadCacheThreads() {
+ return conf.getInt(NUM_READAHEAD_CACHE_THREADS, Runtime.getRuntime().availableProcessors());
+ }
+
+ /**
* Whether enable metadata manager based topic manager.
*
* @return true if enabled metadata manager based topic manager.
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java?rev=1417066&r1=1417065&r2=1417066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java Tue Dec 4 17:54:23 2012
@@ -28,8 +28,12 @@ import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.slf4j.Logger;
@@ -38,6 +42,8 @@ import org.slf4j.LoggerFactory;
import com.google.protobuf.ByteString;
import org.apache.bookkeeper.util.MathUtils;
+import org.apache.bookkeeper.util.OrderedSafeExecutor;
+import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.exceptions.PubSubException.ServerNotResponsibleForTopicException;
import org.apache.hedwig.protocol.PubSubProtocol.Message;
@@ -68,7 +74,8 @@ public class ReadAheadCache implements P
/**
* The structure for the cache
*/
- protected Map<CacheKey, CacheValue> cache = new HashMap<CacheKey, CacheValue>();
+ protected ConcurrentMap<CacheKey, CacheValue> cache =
+ new ConcurrentHashMap<CacheKey, CacheValue>();
/**
* To simplify synchronization, the cache will be maintained by a single
@@ -87,13 +94,14 @@ public class ReadAheadCache implements P
* We also want to track the entries in seq-id order so that we can clean up
* entries after the last subscriber
*/
- protected Map<ByteString, SortedSet<Long>> orderedIndexOnSeqId = new HashMap<ByteString, SortedSet<Long>>();
+ protected Map<ByteString, SortedSet<Long>> orderedIndexOnSeqId =
+ new HashMap<ByteString, SortedSet<Long>>();
/**
* We maintain an estimate of the current size of the cache, so that we know
* when to evict entries.
*/
- protected long presentCacheSize = 0;
+ protected AtomicLong presentCacheSize = new AtomicLong(0);
/**
* One instance of a callback that we will pass to the underlying
@@ -111,7 +119,10 @@ public class ReadAheadCache implements P
protected Thread cacheThread;
// Boolean indicating if this thread should continue running. This is used
// when we want to stop the thread during a PubSubServer shutdown.
- protected boolean keepRunning = true;
+ protected volatile boolean keepRunning = true;
+
+ protected final OrderedSafeExecutor cacheWorkers;
+ protected final long maxCacheSize;
// JMX Beans
ReadAheadCacheBean jmxCacheBean = null;
@@ -125,6 +136,8 @@ public class ReadAheadCache implements P
this.realPersistenceManager = realPersistenceManager;
this.cfg = cfg;
cacheThread = new Thread(this, "CacheThread");
+ cacheWorkers = new OrderedSafeExecutor(cfg.getNumReadAheadCacheThreads());
+ maxCacheSize = cfg.getMaximumCacheSize();
}
public ReadAheadCache start() {
@@ -205,11 +218,28 @@ public class ReadAheadCache implements P
// cache
CacheKey cacheKey = new CacheKey(originalRequest.getTopic(), resultOfOperation.getLocalComponent());
- enqueueWithoutFailure(new ScanResponse(cacheKey, messageWithLocalSeqId));
+ enqueueWithoutFailureByTopic(cacheKey.getTopic(),
+ new ScanResponse(cacheKey, messageWithLocalSeqId));
}
}
+ protected void enqueueWithoutFailureByTopic(ByteString topic, final CacheRequest obj) {
+ if (!keepRunning) {
+ return;
+ }
+ try {
+ cacheWorkers.submitOrdered(topic, new SafeRunnable() {
+ @Override
+ public void safeRun() {
+ obj.performRequest();
+ }
+ });
+ } catch (RejectedExecutionException ree) {
+ logger.error("Failed to submit cache request for topic " + topic.toStringUtf8() + " : ", ree);
+ }
+ }
+
/**
* Too complicated to deal with enqueue failures from the context of our
* callbacks. Its just simpler to quit and restart afresh. Moreover, this
@@ -234,7 +264,8 @@ public class ReadAheadCache implements P
*/
public void scanSingleMessage(ScanRequest request) {
// Let the scan requests be serialized through the queue
- enqueueWithoutFailure(new ScanRequestWrapper(request));
+ enqueueWithoutFailureByTopic(request.getTopic(),
+ new ScanRequestWrapper(request));
}
/**
@@ -358,7 +389,10 @@ public class ReadAheadCache implements P
break;
}
CacheValue cacheValue = new CacheValue();
- cache.put(cacheKey, cacheValue);
+ if (null != cache.putIfAbsent(cacheKey, cacheValue)) {
+ logger.warn("It is unexpected that more than one threads are adding message to cache key {}"
+ +" at the same time.", cacheKey);
+ }
logger.debug("Adding cache stub for: {}", cacheKey);
installedStubs.add(cacheKey);
@@ -405,7 +439,7 @@ public class ReadAheadCache implements P
// Any message we read is potentially useful for us, so lets first
// enqueue it
CacheKey cacheKey = new CacheKey(topic, message.getMsgId().getLocalComponent());
- enqueueWithoutFailure(new ScanResponse(cacheKey, message));
+ enqueueWithoutFailureByTopic(topic, new ScanResponse(cacheKey, message));
// Now lets see if this message is the one we were expecting
CacheKey expectedKey = installedStubs.peek();
@@ -453,7 +487,8 @@ public class ReadAheadCache implements P
private void enqueueDeleteOfRemainingStubs(Exception reason) {
CacheKey installedStub;
while ((installedStub = installedStubs.poll()) != null) {
- enqueueWithoutFailure(new ExceptionOnCacheKey(installedStub, reason));
+ enqueueWithoutFailureByTopic(installedStub.getTopic(),
+ new ExceptionOnCacheKey(installedStub, reason));
}
}
}
@@ -482,57 +517,87 @@ public class ReadAheadCache implements P
* @param cacheKey
* @param message
*/
- protected void addMessageToCache(CacheKey cacheKey, Message message, long currTime) {
+ protected void addMessageToCache(final CacheKey cacheKey,
+ final Message message, final long currTime) {
logger.debug("Adding msg {} to readahead cache", cacheKey);
CacheValue cacheValue;
if ((cacheValue = cache.get(cacheKey)) == null) {
cacheValue = new CacheValue();
- cache.put(cacheKey, cacheValue);
+ CacheValue oldValue = cache.putIfAbsent(cacheKey, cacheValue);
+ if (null != oldValue) {
+ logger.warn("Weird! Should not have two threads adding message to cache key {} at the same time.",
+ cacheKey);
+ cacheValue = oldValue;
+ }
}
// update the cache size
- presentCacheSize += message.getBody().size();
-
- // maintain the time index of addition
- MapMethods.addToMultiMap(timeIndexOfAddition, currTime, cacheKey, HashSetCacheKeyFactory.instance);
+ final long newCacheSize = presentCacheSize.addAndGet(message.getBody().size());
- // maintain the index of seq-id
- MapMethods.addToMultiMap(orderedIndexOnSeqId, cacheKey.getTopic(), cacheKey.getSeqId(),
- TreeSetLongFactory.instance);
-
- // finally add the message to the cache
- cacheValue.setMessageAndInvokeCallbacks(message, currTime);
+ synchronized (cacheValue) {
+ // finally add the message to the cache
+ cacheValue.setMessageAndInvokeCallbacks(message, currTime);
+ }
// if overgrown, collect old entries
- collectOldCacheEntries();
- }
-
- protected void removeMessageFromCache(CacheKey cacheKey, Exception exception, boolean maintainTimeIndex,
- boolean maintainSeqIdIndex) {
+ enqueueWithoutFailure(new CacheRequest() {
+ @Override
+ public void performRequest() {
+ // maintain the index of seq-id
+ MapMethods.addToMultiMap(orderedIndexOnSeqId, cacheKey.getTopic(),
+ cacheKey.getSeqId(), TreeSetLongFactory.instance);
+
+ // maintain the time index of addition
+ MapMethods.addToMultiMap(timeIndexOfAddition, currTime,
+ cacheKey, HashSetCacheKeyFactory.instance);
+ // update time index
+ if (newCacheSize > maxCacheSize) {
+ collectOldCacheEntries();
+ }
+ }
+ });
+ }
+
+ protected void removeMessageFromCache(final CacheKey cacheKey, Exception exception,
+ final boolean maintainTimeIndex,
+ final boolean maintainSeqIdIndex) {
CacheValue cacheValue = cache.remove(cacheKey);
if (cacheValue == null) {
return;
}
- if (cacheValue.isStub()) {
- cacheValue.setErrorAndInvokeCallbacks(exception);
- // Stubs are not present in the indexes, so dont need to maintain
- // indexes here
- return;
- }
-
- presentCacheSize -= cacheValue.getMessage().getBody().size();
+ long timeOfAddition = 0;
+ synchronized (cacheValue) {
+ if (cacheValue.isStub()) {
+ cacheValue.setErrorAndInvokeCallbacks(exception);
+ // Stubs are not present in the indexes, so don't need to maintain
+ // indexes here
+ return;
+ }
- // maintain the 2 indexes
- // TODO: can we maintain these lazily?
- if (maintainSeqIdIndex) {
- MapMethods.removeFromMultiMap(orderedIndexOnSeqId, cacheKey.getTopic(), cacheKey.getSeqId());
+ presentCacheSize.addAndGet(0 - cacheValue.getMessage().getBody().size());
+ timeOfAddition = cacheValue.getTimeOfAddition();
}
- if (maintainTimeIndex) {
- MapMethods.removeFromMultiMap(timeIndexOfAddition, cacheValue.getTimeOfAddition(), cacheKey);
+
+ // maintain the 2 indexes lazily
+ if (maintainSeqIdIndex || maintainTimeIndex) {
+ final long additionTime = timeOfAddition;
+ enqueueWithoutFailure(new CacheRequest() {
+ @Override
+ public void performRequest() {
+ if (maintainSeqIdIndex) {
+ MapMethods.removeFromMultiMap(orderedIndexOnSeqId, cacheKey.getTopic(),
+ cacheKey.getSeqId());
+ }
+ if (maintainTimeIndex) {
+ MapMethods.removeFromMultiMap(timeIndexOfAddition, additionTime,
+ cacheKey);
+ }
+ }
+ });
}
}
@@ -541,9 +606,8 @@ public class ReadAheadCache implements P
* oldest to newest.
*/
protected void collectOldCacheEntries() {
- long maxCacheSize = cfg.getMaximumCacheSize();
-
- while (presentCacheSize > maxCacheSize && !timeIndexOfAddition.isEmpty()) {
+ while (presentCacheSize.get() > cfg.getMaximumCacheSize () &&
+ !timeIndexOfAddition.isEmpty()) {
Long earliestTime = timeIndexOfAddition.firstKey();
Set<CacheKey> oldCacheEntries = timeIndexOfAddition.get(earliestTime);
@@ -551,7 +615,7 @@ public class ReadAheadCache implements P
// index. Hence there can be no callbacks pending on these cache
// entries. Hence safe to remove them directly.
for (Iterator<CacheKey> iter = oldCacheEntries.iterator(); iter.hasNext();) {
- CacheKey cacheKey = iter.next();
+ final CacheKey cacheKey = iter.next();
logger.debug("Removing {} from cache because it's the oldest.", cacheKey);
removeMessageFromCache(cacheKey, readAheadExceptionInstance, //
@@ -562,7 +626,6 @@ public class ReadAheadCache implements P
}
timeIndexOfAddition.remove(earliestTime);
-
}
}
@@ -691,11 +754,20 @@ public class ReadAheadCache implements P
// Read ahead must have installed at least a stub for us, so this
// can't be null
- CacheValue cacheValue = cache.get(new CacheKey(request.getTopic(), request.getStartSeqId()));
+ CacheKey cacheKey = new CacheKey(request.getTopic(), request.getStartSeqId());
+ CacheValue cacheValue = cache.get(cacheKey);
+ if (null == cacheValue) {
+ logger.error("Cache key {} is removed after installing stub when scanning.", cacheKey);
+ // reissue the request
+ scanSingleMessage(request);
+ return;
+ }
- // Add our callback to the stub. If the cache value was already a
- // concrete message, the callback will be called right away
- cacheValue.addCallback(request.getCallback(), request.getCtx());
+ synchronized (cacheValue) {
+ // Add our callback to the stub. If the cache value was already a
+ // concrete message, the callback will be called right away
+ cacheValue.addCallback(request.getCallback(), request.getCtx());
+ }
if (readAheadRequest != null) {
realPersistenceManager.scanMessages(readAheadRequest);
@@ -709,6 +781,7 @@ public class ReadAheadCache implements P
// thread.
public void performRequest() {
keepRunning = false;
+ cacheWorkers.shutdown();
}
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheBean.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheBean.java?rev=1417066&r1=1417065&r2=1417066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheBean.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheBean.java Tue Dec 4 17:54:23 2012
@@ -48,7 +48,7 @@ public class ReadAheadCacheBean implemen
@Override
public long getPresentCacheSize() {
- return cache.presentCacheSize;
+ return cache.presentCacheSize.get();
}
@Override
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java?rev=1417066&r1=1417065&r2=1417066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java Tue Dec 4 17:54:23 2012
@@ -27,6 +27,7 @@ import org.junit.Before;
import org.junit.Test;
import com.google.protobuf.ByteString;
+import org.apache.bookkeeper.util.MathUtils;
import org.apache.hedwig.HelperMethods;
import org.apache.hedwig.StubCallback;
import org.apache.hedwig.StubScanCallback;
@@ -53,6 +54,12 @@ public class TestReadAheadCacheWhiteBox
// make it perform in the same thread
obj.performRequest();
}
+
+ @Override
+ protected void enqueueWithoutFailureByTopic(ByteString topic, final CacheRequest obj) {
+ // make it perform in the same thread
+ obj.performRequest();
+ }
}
class MyServerConfiguration extends ServerConfiguration {
@@ -145,7 +152,7 @@ public class TestReadAheadCacheWhiteBox
for (Message m : messages) {
persistMessage(m);
}
- assertEquals((long) NUM_MESSAGES * MSG_SIZE, cacheBasedPersistenceManager.presentCacheSize);
+ assertEquals((long) NUM_MESSAGES * MSG_SIZE, cacheBasedPersistenceManager.presentCacheSize.get());
long middle = messages.size() / 2;
cacheBasedPersistenceManager.deliveredUntil(topic, middle);
@@ -162,7 +169,7 @@ public class TestReadAheadCacheWhiteBox
assertTrue(cacheBasedPersistenceManager.cache.isEmpty());
assertTrue(cacheBasedPersistenceManager.timeIndexOfAddition.isEmpty());
assertTrue(cacheBasedPersistenceManager.orderedIndexOnSeqId.isEmpty());
- assertTrue(0 == cacheBasedPersistenceManager.presentCacheSize);
+ assertTrue(0 == cacheBasedPersistenceManager.presentCacheSize.get());
}
@@ -231,9 +238,9 @@ public class TestReadAheadCacheWhiteBox
@Test
public void testAddMessageToCache() {
CacheKey key = new CacheKey(topic, 1);
- cacheBasedPersistenceManager.addMessageToCache(key, messages.get(0), System.currentTimeMillis());
+ cacheBasedPersistenceManager.addMessageToCache(key, messages.get(0), MathUtils.now());
assertEquals(1, cacheBasedPersistenceManager.cache.size());
- assertEquals(MSG_SIZE, cacheBasedPersistenceManager.presentCacheSize);
+ assertEquals(MSG_SIZE, cacheBasedPersistenceManager.presentCacheSize.get());
assertEquals(1, cacheBasedPersistenceManager.orderedIndexOnSeqId.get(topic).size());
assertTrue(cacheBasedPersistenceManager.orderedIndexOnSeqId.get(topic).contains(1L));
@@ -244,7 +251,7 @@ public class TestReadAheadCacheWhiteBox
@Test
public void testRemoveMessageFromCache() {
CacheKey key = new CacheKey(topic, 1);
- cacheBasedPersistenceManager.addMessageToCache(key, messages.get(0), System.currentTimeMillis());
+ cacheBasedPersistenceManager.addMessageToCache(key, messages.get(0), MathUtils.now());
cacheBasedPersistenceManager.removeMessageFromCache(key, new Exception(), true, true);
assertTrue(cacheBasedPersistenceManager.cache.isEmpty());
assertTrue(cacheBasedPersistenceManager.orderedIndexOnSeqId.isEmpty());