You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2013/07/13 17:23:25 UTC

svn commit: r1502803 - in /zookeeper/bookkeeper/trunk: CHANGES.txt hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java

Author: fpj
Date: Sat Jul 13 15:23:25 2013
New Revision: 1502803

URL: http://svn.apache.org/r1502803
Log:
BOOKKEEPER-601: readahead cache size isn't updated correctly (sijie via fpj)


Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1502803&r1=1502802&r2=1502803&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Sat Jul 13 15:23:25 2013
@@ -70,6 +70,10 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-633: ConcurrentModificationException in RackawareEnsemblePlacementPolicy when a bookie is removed from available list (vinay via sijie)
 
+      hedwig-server:
+
+        BOOKKEEPER-601: readahead cache size isn't updated correctly (sijie via fpj)
+
       hedwig-client:
 
         BOOKKEEPER-598: Fails to compile - RESUBSCRIBE_EXCEPTION conflict (Matthew Farrellee via sijie)

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java?rev=1502803&r1=1502802&r2=1502803&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java Sat Jul 13 15:23:25 2013
@@ -17,11 +17,9 @@
  */
 package org.apache.hedwig.server.persistence;
 
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
-import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.SortedMap;
@@ -30,33 +28,28 @@ import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.hedwig.protocol.PubSubProtocol;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.ByteString;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.apache.hedwig.exceptions.PubSubException;
 import org.apache.hedwig.exceptions.PubSubException.ServerNotResponsibleForTopicException;
+import org.apache.hedwig.protocol.PubSubProtocol;
 import org.apache.hedwig.protocol.PubSubProtocol.Message;
 import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
 import org.apache.hedwig.protoextensions.MessageIdUtils;
 import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.common.UnexpectedError;
 import org.apache.hedwig.server.jmx.HedwigJMXService;
 import org.apache.hedwig.server.jmx.HedwigMBeanInfo;
 import org.apache.hedwig.server.jmx.HedwigMBeanRegistry;
-import org.apache.hedwig.server.persistence.ReadAheadCacheBean;
 import org.apache.hedwig.util.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ByteString;
 
 public class ReadAheadCache implements PersistenceManager, HedwigJMXService {
 
@@ -186,10 +179,12 @@ public class ReadAheadCache implements P
      * the real persistence manager.
      */
 
+    @Override
     public long getSeqIdAfterSkipping(ByteString topic, long seqId, int skipAmount) {
         return realPersistenceManager.getSeqIdAfterSkipping(topic, seqId, skipAmount);
     }
 
+    @Override
     public MessageSeqId getCurrentSeqIdForTopic(ByteString topic) throws ServerNotResponsibleForTopicException {
         return realPersistenceManager.getCurrentSeqIdForTopic(topic);
     }
@@ -203,6 +198,7 @@ public class ReadAheadCache implements P
      * our callback on the return path
      *
      */
+    @Override
     public void persistMessage(PersistRequest request) {
         // make a new PersistRequest object so that we can insert our own
         // callback in the middle. Assign the original request as the context
@@ -225,6 +221,7 @@ public class ReadAheadCache implements P
          * In case there is a failure in persisting, just pass it to the
          * original callback
          */
+        @Override
         public void operationFailed(Object ctx, PubSubException exception) {
             PersistRequest originalRequest = (PersistRequest) ctx;
             Callback<PubSubProtocol.MessageSeqId> originalCallback = originalRequest.getCallback();
@@ -237,6 +234,7 @@ public class ReadAheadCache implements P
          * success, and then opportunistically treat the message as if it just
          * came in through a scan
          */
+        @Override
         public void operationFinished(Object ctx, PubSubProtocol.MessageSeqId resultOfOperation) {
             PersistRequest originalRequest = (PersistRequest) ctx;
 
@@ -283,6 +281,7 @@ public class ReadAheadCache implements P
      * 2. Scan - Since the scan needs to touch the cache, we will just enqueue
      * the scan request and let the cache maintainer thread handle it.
      */
+    @Override
     public void scanSingleMessage(ScanRequest request) {
         // Let the scan requests be serialized through the queue
         enqueueWithoutFailureByTopic(request.getTopic(),
@@ -295,6 +294,7 @@ public class ReadAheadCache implements P
      * 3. Enqueue the request so that the cache maintainer thread can delete all
      * message-ids older than the one specified
      */
+    @Override
     public void deliveredUntil(ByteString topic, Long seqId) {
         enqueueWithoutFailureByTopic(topic, new DeliveredUntil(topic, seqId));
     }
@@ -308,18 +308,22 @@ public class ReadAheadCache implements P
      * get aged out of the cache eventually. For now, there is no need to
      * proactively remove those entries from the cache.
      */
+    @Override
     public void consumedUntil(ByteString topic, Long seqId) {
         realPersistenceManager.consumedUntil(topic, seqId);
     }
 
+    @Override
     public void setMessageBound(ByteString topic, Integer bound) {
         realPersistenceManager.setMessageBound(topic, bound);
     }
 
+    @Override
     public void clearMessageBound(ByteString topic) {
         realPersistenceManager.clearMessageBound(topic);
     }
 
+    @Override
     public void consumeToBound(ByteString topic) {
         realPersistenceManager.consumeToBound(topic);
     }
@@ -327,6 +331,7 @@ public class ReadAheadCache implements P
     /**
      * Stop the readahead cache.
      */
+    @Override
     public void stop() {
         try {
             keepRunning = false;
@@ -439,6 +444,7 @@ public class ReadAheadCache implements P
             this.topic = topic;
         }
 
+        @Override
         public void messageScanned(Object ctx, Message message) {
 
             // Any message we read is potentially useful for us, so lets first
@@ -473,10 +479,12 @@ public class ReadAheadCache implements P
 
         }
 
+        @Override
         public void scanFailed(Object ctx, Exception exception) {
             enqueueDeleteOfRemainingStubs(exception);
         }
 
+        @Override
         public void scanFinished(Object ctx, ReasonForFinish reason) {
             // If the scan finished because no more messages are present, its ok
             // to leave the stubs in place because they will get filled in as
@@ -501,6 +509,7 @@ public class ReadAheadCache implements P
     protected static class HashSetCacheKeyFactory implements Factory<Set<CacheKey>> {
         protected final static HashSetCacheKeyFactory instance = new HashSetCacheKeyFactory();
 
+        @Override
         public Set<CacheKey> newInstance() {
             return new HashSet<CacheKey>();
         }
@@ -509,6 +518,7 @@ public class ReadAheadCache implements P
     protected static class TreeSetLongFactory implements Factory<SortedSet<Long>> {
         protected final static TreeSetLongFactory instance = new TreeSetLongFactory();
 
+        @Override
         public SortedSet<Long> newInstance() {
             return new TreeSet<Long>();
         }
@@ -527,7 +537,6 @@ public class ReadAheadCache implements P
         logger.debug("Adding msg {} to readahead cache", cacheKey);
 
         CacheValue cacheValue;
-
         if ((cacheValue = cache.get(cacheKey)) == null) {
             cacheValue = new CacheValue();
             CacheValue oldValue = cache.putIfAbsent(cacheKey, cacheValue);
@@ -539,11 +548,13 @@ public class ReadAheadCache implements P
         }
 
         CacheSegment segment = cacheSegment.get();
-        int size = message.getBody().size();
+        if (cacheValue.isStub()) { // update cache size only when cache value is a stub
+            int size = message.getBody().size();
 
-        // update the cache size
-        segment.presentSegmentSize.addAndGet(size);
-        presentCacheSize.addAndGet(size);
+            // update the cache size
+            segment.presentSegmentSize.addAndGet(size);
+            presentCacheSize.addAndGet(size);
+        }
 
         synchronized (cacheValue) {
             // finally add the message to the cache
@@ -663,6 +674,7 @@ public class ReadAheadCache implements P
          * on the callbacks registered for that stub, and delete the entry from
          * the cache
          */
+        @Override
         public void performRequest() {
             removeMessageFromCache(cacheKey, exception,
                                    // maintainTimeIndex=
@@ -696,6 +708,7 @@ public class ReadAheadCache implements P
             this.request = request;
         }
 
+        @Override
         public void performRequest() {
             // cancel scan request
             cancelScanRequest(request.getScanRequest());
@@ -732,6 +745,7 @@ public class ReadAheadCache implements P
             this.message = message;
         }
 
+        @Override
         public void performRequest() {
             addMessageToCache(cacheKey, message, MathUtils.now());
         }
@@ -747,6 +761,7 @@ public class ReadAheadCache implements P
             this.seqId = seqId;
         }
 
+        @Override
         public void performRequest() {
             SortedSet<Long> orderedSeqIds = orderedIndexOnSeqId.get(topic);
             if (orderedSeqIds == null) {
@@ -795,6 +810,7 @@ public class ReadAheadCache implements P
          * underlying persistence manager.
          */
 
+        @Override
         public void performRequest() {
 
             RangeScanRequest readAheadRequest = doReadAhead(request);
@@ -805,7 +821,7 @@ public class ReadAheadCache implements P
             CacheValue cacheValue = cache.get(cacheKey);
             if (null == cacheValue) {
                 logger.error("Cache key {} is removed after installing stub when scanning.", cacheKey);
-                // reissue the request 
+                // reissue the request
                 scanSingleMessage(request);
                 return;
             }