You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2013/07/13 17:23:25 UTC
svn commit: r1502803 - in /zookeeper/bookkeeper/trunk: CHANGES.txt
hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java
Author: fpj
Date: Sat Jul 13 15:23:25 2013
New Revision: 1502803
URL: http://svn.apache.org/r1502803
Log:
BOOKKEEPER-601: readahead cache size isn't updated correctly (sijie via fpj)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1502803&r1=1502802&r2=1502803&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Sat Jul 13 15:23:25 2013
@@ -70,6 +70,10 @@ Trunk (unreleased changes)
BOOKKEEPER-633: ConcurrentModificationException in RackawareEnsemblePlacementPolicy when a bookie is removed from available list (vinay via sijie)
+ hedwig-server:
+
+ BOOKKEEPER-601: readahead cache size isn't updated correctly (sijie via fpj)
+
hedwig-client:
BOOKKEEPER-598: Fails to compile - RESUBSCRIBE_EXCEPTION conflict (Matthew Farrellee via sijie)
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java?rev=1502803&r1=1502802&r2=1502803&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java Sat Jul 13 15:23:25 2013
@@ -17,11 +17,9 @@
*/
package org.apache.hedwig.server.persistence;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
-import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.SortedMap;
@@ -30,33 +28,28 @@ import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.hedwig.protocol.PubSubProtocol;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.ByteString;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.exceptions.PubSubException.ServerNotResponsibleForTopicException;
+import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.protocol.PubSubProtocol.Message;
import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
import org.apache.hedwig.protoextensions.MessageIdUtils;
import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.common.UnexpectedError;
import org.apache.hedwig.server.jmx.HedwigJMXService;
import org.apache.hedwig.server.jmx.HedwigMBeanInfo;
import org.apache.hedwig.server.jmx.HedwigMBeanRegistry;
-import org.apache.hedwig.server.persistence.ReadAheadCacheBean;
import org.apache.hedwig.util.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ByteString;
public class ReadAheadCache implements PersistenceManager, HedwigJMXService {
@@ -186,10 +179,12 @@ public class ReadAheadCache implements P
* the real persistence manager.
*/
+ @Override
public long getSeqIdAfterSkipping(ByteString topic, long seqId, int skipAmount) {
return realPersistenceManager.getSeqIdAfterSkipping(topic, seqId, skipAmount);
}
+ @Override
public MessageSeqId getCurrentSeqIdForTopic(ByteString topic) throws ServerNotResponsibleForTopicException {
return realPersistenceManager.getCurrentSeqIdForTopic(topic);
}
@@ -203,6 +198,7 @@ public class ReadAheadCache implements P
* our callback on the return path
*
*/
+ @Override
public void persistMessage(PersistRequest request) {
// make a new PersistRequest object so that we can insert our own
// callback in the middle. Assign the original request as the context
@@ -225,6 +221,7 @@ public class ReadAheadCache implements P
* In case there is a failure in persisting, just pass it to the
* original callback
*/
+ @Override
public void operationFailed(Object ctx, PubSubException exception) {
PersistRequest originalRequest = (PersistRequest) ctx;
Callback<PubSubProtocol.MessageSeqId> originalCallback = originalRequest.getCallback();
@@ -237,6 +234,7 @@ public class ReadAheadCache implements P
* success, and then opportunistically treat the message as if it just
* came in through a scan
*/
+ @Override
public void operationFinished(Object ctx, PubSubProtocol.MessageSeqId resultOfOperation) {
PersistRequest originalRequest = (PersistRequest) ctx;
@@ -283,6 +281,7 @@ public class ReadAheadCache implements P
* 2. Scan - Since the scan needs to touch the cache, we will just enqueue
* the scan request and let the cache maintainer thread handle it.
*/
+ @Override
public void scanSingleMessage(ScanRequest request) {
// Let the scan requests be serialized through the queue
enqueueWithoutFailureByTopic(request.getTopic(),
@@ -295,6 +294,7 @@ public class ReadAheadCache implements P
* 3. Enqueue the request so that the cache maintainer thread can delete all
* message-ids older than the one specified
*/
+ @Override
public void deliveredUntil(ByteString topic, Long seqId) {
enqueueWithoutFailureByTopic(topic, new DeliveredUntil(topic, seqId));
}
@@ -308,18 +308,22 @@ public class ReadAheadCache implements P
* get aged out of the cache eventually. For now, there is no need to
* proactively remove those entries from the cache.
*/
+ @Override
public void consumedUntil(ByteString topic, Long seqId) {
realPersistenceManager.consumedUntil(topic, seqId);
}
+ @Override
public void setMessageBound(ByteString topic, Integer bound) {
realPersistenceManager.setMessageBound(topic, bound);
}
+ @Override
public void clearMessageBound(ByteString topic) {
realPersistenceManager.clearMessageBound(topic);
}
+ @Override
public void consumeToBound(ByteString topic) {
realPersistenceManager.consumeToBound(topic);
}
@@ -327,6 +331,7 @@ public class ReadAheadCache implements P
/**
* Stop the readahead cache.
*/
+ @Override
public void stop() {
try {
keepRunning = false;
@@ -439,6 +444,7 @@ public class ReadAheadCache implements P
this.topic = topic;
}
+ @Override
public void messageScanned(Object ctx, Message message) {
// Any message we read is potentially useful for us, so lets first
@@ -473,10 +479,12 @@ public class ReadAheadCache implements P
}
+ @Override
public void scanFailed(Object ctx, Exception exception) {
enqueueDeleteOfRemainingStubs(exception);
}
+ @Override
public void scanFinished(Object ctx, ReasonForFinish reason) {
// If the scan finished because no more messages are present, its ok
// to leave the stubs in place because they will get filled in as
@@ -501,6 +509,7 @@ public class ReadAheadCache implements P
protected static class HashSetCacheKeyFactory implements Factory<Set<CacheKey>> {
protected final static HashSetCacheKeyFactory instance = new HashSetCacheKeyFactory();
+ @Override
public Set<CacheKey> newInstance() {
return new HashSet<CacheKey>();
}
@@ -509,6 +518,7 @@ public class ReadAheadCache implements P
protected static class TreeSetLongFactory implements Factory<SortedSet<Long>> {
protected final static TreeSetLongFactory instance = new TreeSetLongFactory();
+ @Override
public SortedSet<Long> newInstance() {
return new TreeSet<Long>();
}
@@ -527,7 +537,6 @@ public class ReadAheadCache implements P
logger.debug("Adding msg {} to readahead cache", cacheKey);
CacheValue cacheValue;
-
if ((cacheValue = cache.get(cacheKey)) == null) {
cacheValue = new CacheValue();
CacheValue oldValue = cache.putIfAbsent(cacheKey, cacheValue);
@@ -539,11 +548,13 @@ public class ReadAheadCache implements P
}
CacheSegment segment = cacheSegment.get();
- int size = message.getBody().size();
+ if (cacheValue.isStub()) { // update cache size only when cache value is a stub
+ int size = message.getBody().size();
- // update the cache size
- segment.presentSegmentSize.addAndGet(size);
- presentCacheSize.addAndGet(size);
+ // update the cache size
+ segment.presentSegmentSize.addAndGet(size);
+ presentCacheSize.addAndGet(size);
+ }
synchronized (cacheValue) {
// finally add the message to the cache
@@ -663,6 +674,7 @@ public class ReadAheadCache implements P
* on the callbacks registered for that stub, and delete the entry from
* the cache
*/
+ @Override
public void performRequest() {
removeMessageFromCache(cacheKey, exception,
// maintainTimeIndex=
@@ -696,6 +708,7 @@ public class ReadAheadCache implements P
this.request = request;
}
+ @Override
public void performRequest() {
// cancel scan request
cancelScanRequest(request.getScanRequest());
@@ -732,6 +745,7 @@ public class ReadAheadCache implements P
this.message = message;
}
+ @Override
public void performRequest() {
addMessageToCache(cacheKey, message, MathUtils.now());
}
@@ -747,6 +761,7 @@ public class ReadAheadCache implements P
this.seqId = seqId;
}
+ @Override
public void performRequest() {
SortedSet<Long> orderedSeqIds = orderedIndexOnSeqId.get(topic);
if (orderedSeqIds == null) {
@@ -795,6 +810,7 @@ public class ReadAheadCache implements P
* underlying persistence manager.
*/
+ @Override
public void performRequest() {
RangeScanRequest readAheadRequest = doReadAhead(request);
@@ -805,7 +821,7 @@ public class ReadAheadCache implements P
CacheValue cacheValue = cache.get(cacheKey);
if (null == cacheValue) {
logger.error("Cache key {} is removed after installing stub when scanning.", cacheKey);
- // reissue the request
+ // reissue the request
scanSingleMessage(request);
return;
}