You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/12/04 18:54:25 UTC

svn commit: r1417066 - in /zookeeper/bookkeeper/trunk: ./ hedwig-server/src/main/java/org/apache/hedwig/server/common/ hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ hedwig-server/src/test/java/org/apache/hedwig/server/persistence/

Author: ivank
Date: Tue Dec  4 17:54:23 2012
New Revision: 1417066

URL: http://svn.apache.org/viewvc?rev=1417066&view=rev
Log:
BOOKKEEPER-461: Delivery throughput degrades when there are lots of publishers w/ high traffic. (sijie via ivank)

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheBean.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1417066&r1=1417065&r2=1417066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Tue Dec  4 17:54:23 2012
@@ -168,6 +168,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-442: Failed to deliver messages due to inconsistency between SubscriptionState and LedgerRanges. (jiannan via ivank)
 
+        BOOKKEEPER-461: Delivery throughput degrades when there are lots of publishers w/ high traffic. (sijie via ivank)
+
     IMPROVEMENTS:
 
       BOOKKEEPER-467: Allocate ports for testing dynamically (ivank)

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java?rev=1417066&r1=1417065&r2=1417066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java Tue Dec  4 17:54:23 2012
@@ -66,6 +66,7 @@ public class ServerConfiguration extends
     protected final static String RETRY_REMOTE_SUBSCRIBE_THREAD_RUN_INTERVAL = "retry_remote_subscribe_thread_run_interval";
     protected final static String DEFAULT_MESSAGE_WINDOW_SIZE =
         "default_message_window_size";
+    protected final static String NUM_READAHEAD_CACHE_THREADS = "num_readahead_cache_threads";
 
     protected final static String MAX_ENTRIES_PER_LEDGER = "max_entries_per_ledger";
 
@@ -380,6 +381,15 @@ public class ServerConfiguration extends
     }
 
     /**
+     * Get number of read ahead cache threads.
+     *
+     * @return number of read ahead cache threads.
+     */
+    public int getNumReadAheadCacheThreads() {
+        return conf.getInt(NUM_READAHEAD_CACHE_THREADS, Runtime.getRuntime().availableProcessors());
+    }
+
+    /**
      * Whether enable metadata manager based topic manager.
      *
      * @return true if enabled metadata manager based topic manager.

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java?rev=1417066&r1=1417065&r2=1417066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java Tue Dec  4 17:54:23 2012
@@ -28,8 +28,12 @@ import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hedwig.protocol.PubSubProtocol;
 import org.slf4j.Logger;
@@ -38,6 +42,8 @@ import org.slf4j.LoggerFactory;
 import com.google.protobuf.ByteString;
 
 import org.apache.bookkeeper.util.MathUtils;
+import org.apache.bookkeeper.util.OrderedSafeExecutor;
+import org.apache.bookkeeper.util.SafeRunnable;
 import org.apache.hedwig.exceptions.PubSubException;
 import org.apache.hedwig.exceptions.PubSubException.ServerNotResponsibleForTopicException;
 import org.apache.hedwig.protocol.PubSubProtocol.Message;
@@ -68,7 +74,8 @@ public class ReadAheadCache implements P
     /**
      * The structure for the cache
      */
-    protected Map<CacheKey, CacheValue> cache = new HashMap<CacheKey, CacheValue>();
+    protected ConcurrentMap<CacheKey, CacheValue> cache =
+        new ConcurrentHashMap<CacheKey, CacheValue>();
 
     /**
      * To simplify synchronization, the cache will be maintained by a single
@@ -87,13 +94,14 @@ public class ReadAheadCache implements P
      * We also want to track the entries in seq-id order so that we can clean up
      * entries after the last subscriber
      */
-    protected Map<ByteString, SortedSet<Long>> orderedIndexOnSeqId = new HashMap<ByteString, SortedSet<Long>>();
+    protected Map<ByteString, SortedSet<Long>> orderedIndexOnSeqId =
+        new HashMap<ByteString, SortedSet<Long>>();
 
     /**
      * We maintain an estimate of the current size of the cache, so that we know
      * when to evict entries.
      */
-    protected long presentCacheSize = 0;
+    protected AtomicLong presentCacheSize = new AtomicLong(0);
 
     /**
      * One instance of a callback that we will pass to the underlying
@@ -111,7 +119,10 @@ public class ReadAheadCache implements P
     protected Thread cacheThread;
     // Boolean indicating if this thread should continue running. This is used
     // when we want to stop the thread during a PubSubServer shutdown.
-    protected boolean keepRunning = true;
+    protected volatile boolean keepRunning = true;
+
+    protected final OrderedSafeExecutor cacheWorkers;
+    protected final long maxCacheSize;
 
     // JMX Beans
     ReadAheadCacheBean jmxCacheBean = null;
@@ -125,6 +136,8 @@ public class ReadAheadCache implements P
         this.realPersistenceManager = realPersistenceManager;
         this.cfg = cfg;
         cacheThread = new Thread(this, "CacheThread");
+        cacheWorkers = new OrderedSafeExecutor(cfg.getNumReadAheadCacheThreads());
+        maxCacheSize = cfg.getMaximumCacheSize();
     }
 
     public ReadAheadCache start() {
@@ -205,11 +218,28 @@ public class ReadAheadCache implements P
             // cache
             CacheKey cacheKey = new CacheKey(originalRequest.getTopic(), resultOfOperation.getLocalComponent());
 
-            enqueueWithoutFailure(new ScanResponse(cacheKey, messageWithLocalSeqId));
+            enqueueWithoutFailureByTopic(cacheKey.getTopic(),
+                    new ScanResponse(cacheKey, messageWithLocalSeqId));
         }
 
     }
 
+    protected void enqueueWithoutFailureByTopic(ByteString topic, final CacheRequest obj) {
+        if (!keepRunning) {
+            return;
+        }
+        try {
+            cacheWorkers.submitOrdered(topic, new SafeRunnable() {
+                @Override
+                public void safeRun() {
+                    obj.performRequest();
+                }
+            });
+        } catch (RejectedExecutionException ree) {
+            logger.error("Failed to submit cache request for topic " + topic.toStringUtf8() + " : ", ree);
+        }
+    }
+
     /**
      * Too complicated to deal with enqueue failures from the context of our
      * callbacks. Its just simpler to quit and restart afresh. Moreover, this
@@ -234,7 +264,8 @@ public class ReadAheadCache implements P
      */
     public void scanSingleMessage(ScanRequest request) {
         // Let the scan requests be serialized through the queue
-        enqueueWithoutFailure(new ScanRequestWrapper(request));
+        enqueueWithoutFailureByTopic(request.getTopic(),
+                new ScanRequestWrapper(request));
     }
 
     /**
@@ -358,7 +389,10 @@ public class ReadAheadCache implements P
                 break;
             }
             CacheValue cacheValue = new CacheValue();
-            cache.put(cacheKey, cacheValue);
+            if (null != cache.putIfAbsent(cacheKey, cacheValue)) {
+                logger.warn("It is unexpected that more than one threads are adding message to cache key {}"
+                            +" at the same time.", cacheKey);
+            }
 
             logger.debug("Adding cache stub for: {}", cacheKey);
             installedStubs.add(cacheKey);
@@ -405,7 +439,7 @@ public class ReadAheadCache implements P
             // Any message we read is potentially useful for us, so lets first
             // enqueue it
             CacheKey cacheKey = new CacheKey(topic, message.getMsgId().getLocalComponent());
-            enqueueWithoutFailure(new ScanResponse(cacheKey, message));
+            enqueueWithoutFailureByTopic(topic, new ScanResponse(cacheKey, message));
 
             // Now lets see if this message is the one we were expecting
             CacheKey expectedKey = installedStubs.peek();
@@ -453,7 +487,8 @@ public class ReadAheadCache implements P
         private void enqueueDeleteOfRemainingStubs(Exception reason) {
             CacheKey installedStub;
             while ((installedStub = installedStubs.poll()) != null) {
-                enqueueWithoutFailure(new ExceptionOnCacheKey(installedStub, reason));
+                enqueueWithoutFailureByTopic(installedStub.getTopic(),
+                        new ExceptionOnCacheKey(installedStub, reason));
             }
         }
     }
@@ -482,57 +517,87 @@ public class ReadAheadCache implements P
      * @param cacheKey
      * @param message
      */
-    protected void addMessageToCache(CacheKey cacheKey, Message message, long currTime) {
+    protected void addMessageToCache(final CacheKey cacheKey,
+                                     final Message message, final long currTime) {
         logger.debug("Adding msg {} to readahead cache", cacheKey);
 
         CacheValue cacheValue;
 
         if ((cacheValue = cache.get(cacheKey)) == null) {
             cacheValue = new CacheValue();
-            cache.put(cacheKey, cacheValue);
+            CacheValue oldValue = cache.putIfAbsent(cacheKey, cacheValue);
+            if (null != oldValue) {
+                logger.warn("Weird! Should not have two threads adding message to cache key {} at the same time.",
+                            cacheKey);
+                cacheValue = oldValue;
+            }
         }
 
         // update the cache size
-        presentCacheSize += message.getBody().size();
-
-        // maintain the time index of addition
-        MapMethods.addToMultiMap(timeIndexOfAddition, currTime, cacheKey, HashSetCacheKeyFactory.instance);
+        final long newCacheSize = presentCacheSize.addAndGet(message.getBody().size());
 
-        // maintain the index of seq-id
-        MapMethods.addToMultiMap(orderedIndexOnSeqId, cacheKey.getTopic(), cacheKey.getSeqId(),
-                                 TreeSetLongFactory.instance);
-
-        // finally add the message to the cache
-        cacheValue.setMessageAndInvokeCallbacks(message, currTime);
+        synchronized (cacheValue) {
+            // finally add the message to the cache
+            cacheValue.setMessageAndInvokeCallbacks(message, currTime);
+        }
 
         // if overgrown, collect old entries
-        collectOldCacheEntries();
-    }
-
-    protected void removeMessageFromCache(CacheKey cacheKey, Exception exception, boolean maintainTimeIndex,
-                                          boolean maintainSeqIdIndex) {
+        enqueueWithoutFailure(new CacheRequest() {
+            @Override
+            public void performRequest() {
+                // maintain the index of seq-id
+                MapMethods.addToMultiMap(orderedIndexOnSeqId, cacheKey.getTopic(),
+                                         cacheKey.getSeqId(), TreeSetLongFactory.instance);
+
+                // maintain the time index of addition
+                MapMethods.addToMultiMap(timeIndexOfAddition, currTime,
+                                         cacheKey, HashSetCacheKeyFactory.instance);
+                // update time index
+                if (newCacheSize > maxCacheSize) {
+                    collectOldCacheEntries();
+                }
+            }
+        });
+    }
+
+    protected void removeMessageFromCache(final CacheKey cacheKey, Exception exception,
+                                          final boolean maintainTimeIndex,
+                                          final boolean maintainSeqIdIndex) {
         CacheValue cacheValue = cache.remove(cacheKey);
 
         if (cacheValue == null) {
             return;
         }
 
-        if (cacheValue.isStub()) {
-            cacheValue.setErrorAndInvokeCallbacks(exception);
-            // Stubs are not present in the indexes, so dont need to maintain
-            // indexes here
-            return;
-        }
-
-        presentCacheSize -= cacheValue.getMessage().getBody().size();
+        long timeOfAddition = 0;
+        synchronized (cacheValue) {
+            if (cacheValue.isStub()) {
+                cacheValue.setErrorAndInvokeCallbacks(exception);
+                // Stubs are not present in the indexes, so don't need to maintain
+                // indexes here
+                return;
+            }
 
-        // maintain the 2 indexes
-        // TODO: can we maintain these lazily?
-        if (maintainSeqIdIndex) {
-            MapMethods.removeFromMultiMap(orderedIndexOnSeqId, cacheKey.getTopic(), cacheKey.getSeqId());
+            presentCacheSize.addAndGet(0 - cacheValue.getMessage().getBody().size());
+            timeOfAddition = cacheValue.getTimeOfAddition();
         }
-        if (maintainTimeIndex) {
-            MapMethods.removeFromMultiMap(timeIndexOfAddition, cacheValue.getTimeOfAddition(), cacheKey);
+
+        // maintain the 2 indexes lazily
+        if (maintainSeqIdIndex || maintainTimeIndex) {
+            final long additionTime = timeOfAddition;
+            enqueueWithoutFailure(new CacheRequest() {
+                @Override
+                public void performRequest() {
+                    if (maintainSeqIdIndex) {
+                        MapMethods.removeFromMultiMap(orderedIndexOnSeqId, cacheKey.getTopic(),
+                                                      cacheKey.getSeqId());
+                    }
+                    if (maintainTimeIndex) {
+                        MapMethods.removeFromMultiMap(timeIndexOfAddition, additionTime,
+                                                      cacheKey);
+                    }
+                }
+            });
         }
     }
 
@@ -541,9 +606,8 @@ public class ReadAheadCache implements P
      * oldest to newest.
      */
     protected void collectOldCacheEntries() {
-        long maxCacheSize = cfg.getMaximumCacheSize();
-
-        while (presentCacheSize > maxCacheSize && !timeIndexOfAddition.isEmpty()) {
+        while (presentCacheSize.get() > cfg.getMaximumCacheSize () &&
+               !timeIndexOfAddition.isEmpty()) {
             Long earliestTime = timeIndexOfAddition.firstKey();
             Set<CacheKey> oldCacheEntries = timeIndexOfAddition.get(earliestTime);
 
@@ -551,7 +615,7 @@ public class ReadAheadCache implements P
             // index. Hence there can be no callbacks pending on these cache
             // entries. Hence safe to remove them directly.
             for (Iterator<CacheKey> iter = oldCacheEntries.iterator(); iter.hasNext();) {
-                CacheKey cacheKey = iter.next();
+                final CacheKey cacheKey = iter.next();
 
                 logger.debug("Removing {} from cache because it's the oldest.", cacheKey);
                 removeMessageFromCache(cacheKey, readAheadExceptionInstance, //
@@ -562,7 +626,6 @@ public class ReadAheadCache implements P
             }
 
             timeIndexOfAddition.remove(earliestTime);
-
         }
     }
 
@@ -691,11 +754,20 @@ public class ReadAheadCache implements P
 
             // Read ahead must have installed at least a stub for us, so this
             // can't be null
-            CacheValue cacheValue = cache.get(new CacheKey(request.getTopic(), request.getStartSeqId()));
+            CacheKey cacheKey = new CacheKey(request.getTopic(), request.getStartSeqId());
+            CacheValue cacheValue = cache.get(cacheKey);
+            if (null == cacheValue) {
+                logger.error("Cache key {} is removed after installing stub when scanning.", cacheKey);
+                // reissue the request 
+                scanSingleMessage(request);
+                return;
+            }
 
-            // Add our callback to the stub. If the cache value was already a
-            // concrete message, the callback will be called right away
-            cacheValue.addCallback(request.getCallback(), request.getCtx());
+            synchronized (cacheValue) {
+                // Add our callback to the stub. If the cache value was already a
+                // concrete message, the callback will be called right away
+                cacheValue.addCallback(request.getCallback(), request.getCtx());
+            }
 
             if (readAheadRequest != null) {
                 realPersistenceManager.scanMessages(readAheadRequest);
@@ -709,6 +781,7 @@ public class ReadAheadCache implements P
         // thread.
         public void performRequest() {
             keepRunning = false;
+            cacheWorkers.shutdown();
         }
     }
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheBean.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheBean.java?rev=1417066&r1=1417065&r2=1417066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheBean.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCacheBean.java Tue Dec  4 17:54:23 2012
@@ -48,7 +48,7 @@ public class ReadAheadCacheBean implemen
 
     @Override
     public long getPresentCacheSize() {
-        return cache.presentCacheSize;
+        return cache.presentCacheSize.get();
     }
 
     @Override

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java?rev=1417066&r1=1417065&r2=1417066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java Tue Dec  4 17:54:23 2012
@@ -27,6 +27,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import com.google.protobuf.ByteString;
+import org.apache.bookkeeper.util.MathUtils;
 import org.apache.hedwig.HelperMethods;
 import org.apache.hedwig.StubCallback;
 import org.apache.hedwig.StubScanCallback;
@@ -53,6 +54,12 @@ public class TestReadAheadCacheWhiteBox 
             // make it perform in the same thread
             obj.performRequest();
         }
+
+        @Override
+        protected void enqueueWithoutFailureByTopic(ByteString topic, final CacheRequest obj) {
+            // make it perform in the same thread
+            obj.performRequest();
+        }
     }
 
     class MyServerConfiguration extends ServerConfiguration {
@@ -145,7 +152,7 @@ public class TestReadAheadCacheWhiteBox 
         for (Message m : messages) {
             persistMessage(m);
         }
-        assertEquals((long) NUM_MESSAGES * MSG_SIZE, cacheBasedPersistenceManager.presentCacheSize);
+        assertEquals((long) NUM_MESSAGES * MSG_SIZE, cacheBasedPersistenceManager.presentCacheSize.get());
         long middle = messages.size() / 2;
         cacheBasedPersistenceManager.deliveredUntil(topic, middle);
 
@@ -162,7 +169,7 @@ public class TestReadAheadCacheWhiteBox 
         assertTrue(cacheBasedPersistenceManager.cache.isEmpty());
         assertTrue(cacheBasedPersistenceManager.timeIndexOfAddition.isEmpty());
         assertTrue(cacheBasedPersistenceManager.orderedIndexOnSeqId.isEmpty());
-        assertTrue(0 == cacheBasedPersistenceManager.presentCacheSize);
+        assertTrue(0 == cacheBasedPersistenceManager.presentCacheSize.get());
 
     }
 
@@ -231,9 +238,9 @@ public class TestReadAheadCacheWhiteBox 
     @Test
     public void testAddMessageToCache() {
         CacheKey key = new CacheKey(topic, 1);
-        cacheBasedPersistenceManager.addMessageToCache(key, messages.get(0), System.currentTimeMillis());
+        cacheBasedPersistenceManager.addMessageToCache(key, messages.get(0), MathUtils.now());
         assertEquals(1, cacheBasedPersistenceManager.cache.size());
-        assertEquals(MSG_SIZE, cacheBasedPersistenceManager.presentCacheSize);
+        assertEquals(MSG_SIZE, cacheBasedPersistenceManager.presentCacheSize.get());
         assertEquals(1, cacheBasedPersistenceManager.orderedIndexOnSeqId.get(topic).size());
         assertTrue(cacheBasedPersistenceManager.orderedIndexOnSeqId.get(topic).contains(1L));
 
@@ -244,7 +251,7 @@ public class TestReadAheadCacheWhiteBox 
     @Test
     public void testRemoveMessageFromCache() {
         CacheKey key = new CacheKey(topic, 1);
-        cacheBasedPersistenceManager.addMessageToCache(key, messages.get(0), System.currentTimeMillis());
+        cacheBasedPersistenceManager.addMessageToCache(key, messages.get(0), MathUtils.now());
         cacheBasedPersistenceManager.removeMessageFromCache(key, new Exception(), true, true);
         assertTrue(cacheBasedPersistenceManager.cache.isEmpty());
         assertTrue(cacheBasedPersistenceManager.orderedIndexOnSeqId.isEmpty());