You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2013/03/25 17:26:48 UTC

svn commit: r1460751 - in /zookeeper/bookkeeper/trunk: ./ hedwig-server/src/main/java/org/apache/hedwig/server/common/ hedwig-server/src/main/java/org/apache/hedwig/server/delivery/ hedwig-server/src/main/java/org/apache/hedwig/server/netty/ hedwig-ser...

Author: ivank
Date: Mon Mar 25 16:26:48 2013
New Revision: 1460751

URL: http://svn.apache.org/r1460751
Log:
BOOKKEEPER-506: Provide better topic release algorithm (sijie via ivank)

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TrivialOwnAllTopicManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestFIFODeliveryManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/StubTopicManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestMMTopicManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1460751&r1=1460750&r2=1460751&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Mon Mar 25 16:26:48 2013
@@ -14,6 +14,8 @@ Trunk (unreleased changes)
 
       BOOKKEEPER-586: Remove recursive call in delivery manager (sijie via ivank)
 
+      BOOKKEEPER-506: Provide better topic release algorithm (sijie via ivank)
+
       bookkeeper-server:
 
         BOOKKEEPER-567: ReadOnlyBookieTest hangs on shutdown (sijie via ivank)

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java?rev=1460751&r1=1460750&r2=1460751&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java Mon Mar 25 16:26:48 2013
@@ -57,7 +57,10 @@ public class ServerConfiguration extends
     protected final static String PASSWORD = "password";
     protected final static String SSL_ENABLED = "ssl_enabled";
     protected final static String CONSUME_INTERVAL = "consume_interval";
+    protected final static String INIT_NUM_TOPICS = "init_num_topics";
+    protected final static String MAX_NUM_TOPICS = "max_num_topics";
     protected final static String RETENTION_SECS = "retention_secs";
+    protected final static String RETENTION_SECS_AFTER_ACCESS = "retention_secs_after_access";
     protected final static String INTER_REGION_SSL_ENABLED = "inter_region_ssl_enabled";
     protected final static String MESSAGES_CONSUMED_THREAD_RUN_INTERVAL = "messages_consumed_thread_run_interval";
     protected final static String BK_ENSEMBLE_SIZE = "bk_ensemble_size";
@@ -388,6 +391,36 @@ public class ServerConfiguration extends
     }
 
     /**
+     * Specifies that the topic should be automatically released
+     * once a fixed duration after the topic is owned, a message is
+     * published, or a message is delivered.
+     *
+     * @return the length of time after an entry is last accessed that
+     *         it should be automatically removed.
+     */
+    public int getRetentionSecsAfterAccess() {
+        return conf.getInt(RETENTION_SECS_AFTER_ACCESS, 0);
+    }
+
+    /**
+     * Max number of topics for a hub server to serve.
+     *
+     * @return max number of topics for a hub server to serve.
+     */
+    public int getMaxNumTopics() {
+        return conf.getInt(MAX_NUM_TOPICS, Integer.MAX_VALUE);
+    }
+
+    /**
+     * Minimum size of internal structure to store topics.
+     *
+     * @return init number of topics for a hub server.
+     */
+    public int getInitNumTopics() {
+        return conf.getInt(INIT_NUM_TOPICS, 128);
+    }
+
+    /**
      * True if SSL is enabled across regions.
      * 
      * @return boolean

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java?rev=1460751&r1=1460750&r2=1460751&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java Mon Mar 25 16:26:48 2013
@@ -61,6 +61,7 @@ import org.apache.hedwig.server.persiste
 import org.apache.hedwig.server.persistence.ReadAheadCache;
 import org.apache.hedwig.server.persistence.ScanCallback;
 import org.apache.hedwig.server.persistence.ScanRequest;
+import org.apache.hedwig.server.topics.TopicManager;
 import org.apache.hedwig.util.Callback;
 import static org.apache.hedwig.util.VarArgs.va;
 
@@ -97,7 +98,7 @@ public class FIFODeliveryManager impleme
 
     private final ReadAheadCache cache;
     private final PersistenceManager persistenceMgr;
-
+    private TopicManager tm;
     private ServerConfiguration cfg;
 
     private final int numDeliveryWorkers;
@@ -257,7 +258,9 @@ public class FIFODeliveryManager impleme
 
 
 
-    public FIFODeliveryManager(PersistenceManager persistenceMgr, ServerConfiguration cfg) {
+    public FIFODeliveryManager(TopicManager tm, PersistenceManager persistenceMgr,
+                               ServerConfiguration cfg) {
+        this.tm = tm;
         this.persistenceMgr = persistenceMgr;
         if (persistenceMgr instanceof ReadAheadCache) {
             this.cache = (ReadAheadCache) persistenceMgr;
@@ -691,6 +694,12 @@ public class FIFODeliveryManager impleme
                 return;
             }
 
+            // only increment topic access times when tried to deliver a message
+            // for those subscribers just waiting for a published for a long time
+            // we don't increment topic access times, so the topic would be evicted
+            // in future.
+            tm.incrementTopicAccessTimes(topic);
+
             if (!filter.testMessage(message)) {
                 sendingFinished();
                 return;

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java?rev=1460751&r1=1460750&r2=1460751&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java Mon Mar 25 16:26:48 2013
@@ -411,7 +411,7 @@ public class PubSubServer {
                     instantiateMetadataManagerFactory();
                     tm = instantiateTopicManager();
                     pm = instantiatePersistenceManager(tm);
-                    dm = new FIFODeliveryManager(pm, conf);
+                    dm = new FIFODeliveryManager(tm, pm, conf);
                     dm.start();
 
                     sm = instantiateSubscriptionManager(tm, pm, dm);

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java?rev=1460751&r1=1460750&r2=1460751&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java Mon Mar 25 16:26:48 2013
@@ -29,6 +29,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.protobuf.ByteString;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
 import org.apache.hedwig.exceptions.PubSubException;
 import org.apache.hedwig.server.common.ServerConfiguration;
 import org.apache.hedwig.server.common.TopicOpQueuer;
@@ -37,6 +41,15 @@ import org.apache.hedwig.util.CallbackUt
 import org.apache.hedwig.util.HedwigSocketAddress;
 
 public abstract class AbstractTopicManager implements TopicManager {
+
+    /**
+     * Stats for a topic. For now it just an empty stub class.
+     */
+    static class TopicStats {
+    }
+
+    final static TopicStats STUB_TOPIC_STATS = new TopicStats();
+
     /**
      * My name.
      */
@@ -50,7 +63,7 @@ public abstract class AbstractTopicManag
     /**
      * List of topics I believe I am responsible for.
      */
-    protected Set<ByteString> topics = Collections.synchronizedSet(new HashSet<ByteString>());
+    protected Cache<ByteString, TopicStats> topics;
 
     protected TopicOpQueuer queuer;
     protected ServerConfiguration cfg;
@@ -74,26 +87,69 @@ public abstract class AbstractTopicManag
     }
 
     private class ReleaseOp extends TopicOpQueuer.AsynchronousOp<Void> {
+        final boolean checkExistence;
+
         public ReleaseOp(ByteString topic, Callback<Void> cb, Object ctx) {
+            this(topic, cb, ctx, true);
+        }
+
+        ReleaseOp(ByteString topic, Callback<Void> cb, Object ctx,
+                  boolean checkExistence) {
             queuer.super(topic, cb, ctx);
+            this.checkExistence = checkExistence;
         }
 
         @Override
         public void run() {
-            if (!topics.contains(topic)) {
-                cb.operationFinished(ctx, null);
-                return;
+            if (checkExistence) {
+                TopicStats stats = topics.getIfPresent(topic);
+                if (null == stats) {
+                    cb.operationFinished(ctx, null);
+                    return;
+                }
             }
             realReleaseTopic(topic, cb, ctx);
         }
     }
 
+    /**
+     * Release topic when the topic is removed from topics cache.
+     */
+    class ReleaseTopicListener implements RemovalListener<ByteString, TopicStats> {
+        @Override
+        public void onRemoval(RemovalNotification<ByteString, TopicStats> notification) {
+            if (notification.wasEvicted()) {
+                logger.info("topic {} is evicted", notification.getKey().toStringUtf8());
+                // if the topic is evicted, we need to release the topic.
+                releaseTopicInternally(notification.getKey(), false);
+            }
+        }
+    }
+
     public AbstractTopicManager(ServerConfiguration cfg, ScheduledExecutorService scheduler)
             throws UnknownHostException {
         this.cfg = cfg;
         this.queuer = new TopicOpQueuer(scheduler);
         this.scheduler = scheduler;
         addr = cfg.getServerAddr();
+
+        // build the topic cache
+        CacheBuilder<ByteString, TopicStats> cacheBuilder = CacheBuilder.newBuilder()
+            .maximumSize(cfg.getMaxNumTopics())
+            .initialCapacity(cfg.getInitNumTopics())
+            // TODO: change to same number as topic op queuer threads
+            .concurrencyLevel(Runtime.getRuntime().availableProcessors())
+            .removalListener(new ReleaseTopicListener());
+        if (cfg.getRetentionSecsAfterAccess() > 0) {
+            cacheBuilder.expireAfterAccess(cfg.getRetentionSecsAfterAccess(), TimeUnit.SECONDS);
+        }
+        topics = cacheBuilder.build();
+    }
+
+    @Override
+    public void incrementTopicAccessTimes(ByteString topic) {
+        // let guava cache handle hits counting
+        topics.getIfPresent(topic);
     }
 
     @Override
@@ -101,6 +157,30 @@ public abstract class AbstractTopicManag
         listeners.add(listener);
     }
 
+    private void releaseTopicInternally(final ByteString topic, boolean checkExistence) {
+        // Enqueue a release operation. (Recall that release
+        // doesn't "fail" even if the topic is missing.)
+        queuer.pushAndMaybeRun(topic, new ReleaseOp(topic, new Callback<Void>() {
+
+            @Override
+            public void operationFailed(Object ctx, PubSubException exception) {
+                logger.error("failure that should never happen when releasing topic "
+                             + topic, exception);
+            }
+
+            @Override
+            public void operationFinished(Object ctx, Void resultOfOperation) {
+                    logger.info("successfully release of topic "
+                        + topic.toStringUtf8());
+                if (logger.isDebugEnabled()) {
+                    logger.debug("successfully release of topic "
+                        + topic.toStringUtf8());
+                }
+            }
+
+        }, null, checkExistence));
+    }
+
     protected final synchronized void notifyListenersAndAddToOwnedTopics(final ByteString topic,
             final Callback<HedwigSocketAddress> originalCallback, final Object originalContext) {
 
@@ -108,30 +188,12 @@ public abstract class AbstractTopicManag
 
             @Override
             public void operationFinished(Object ctx, Void resultOfOperation) {
-                topics.add(topic);
+                topics.put(topic, STUB_TOPIC_STATS);
                 if (cfg.getRetentionSecs() > 0) {
                     scheduler.schedule(new Runnable() {
                         @Override
                         public void run() {
-                            // Enqueue a release operation. (Recall that release
-                            // doesn't "fail" even if the topic is missing.)
-                            releaseTopic(topic, new Callback<Void>() {
-
-                                @Override
-                                public void operationFailed(Object ctx, PubSubException exception) {
-                                    logger.error("failure that should never happen when periodically releasing topic "
-                                                 + topic, exception);
-                                }
-
-                                @Override
-                                public void operationFinished(Object ctx, Void resultOfOperation) {
-                                    if (logger.isDebugEnabled()) {
-                                        logger.debug("successful periodic release of topic "
-                                            + topic.toStringUtf8());
-                                    }
-                                }
-
-                            }, null);
+                            releaseTopicInternally(topic, true);
                         }
                     }, cfg.getRetentionSecs(), TimeUnit.SECONDS);
                 }
@@ -164,7 +226,7 @@ public abstract class AbstractTopicManag
     private void realReleaseTopic(ByteString topic, Callback<Void> callback, Object ctx) {
         for (TopicOwnershipChangeListener listener : listeners)
             listener.lostTopic(topic);
-        topics.remove(topic);
+        topics.invalidate(topic);
         postReleaseCleanup(topic, callback, ctx);
     }
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java?rev=1460751&r1=1460750&r2=1460751&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java Mon Mar 25 16:26:48 2013
@@ -120,7 +120,8 @@ public class MMTopicManager extends Abst
             return;
         }
 
-        if (topics.contains(topic)) {
+        TopicStats stats = topics.getIfPresent(topic);
+        if (null != stats) {
             cb.operationFinished(ctx, addr);
             return;
         }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java?rev=1460751&r1=1460750&r2=1460751&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java Mon Mar 25 16:26:48 2013
@@ -49,6 +49,14 @@ public interface TopicManager {
                          Callback<HedwigSocketAddress> cb, Object ctx);
 
     /**
+     * Increment the number of access times for a given <code>topic</code>.
+     *
+     * @param topic
+     *          Topic Name.
+     */
+    public void incrementTopicAccessTimes(ByteString topic);
+
+    /**
      * Whenever the topic manager finds out that the set of topics owned by this
      * node has changed, it can notify a set of
      * {@link TopicOwnershipChangeListener} objects. Any component of the system

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TrivialOwnAllTopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TrivialOwnAllTopicManager.java?rev=1460751&r1=1460750&r2=1460751&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TrivialOwnAllTopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TrivialOwnAllTopicManager.java Mon Mar 25 16:26:48 2013
@@ -36,7 +36,8 @@ public class TrivialOwnAllTopicManager e
     protected void realGetOwner(ByteString topic, boolean shouldClaim,
                                 Callback<HedwigSocketAddress> cb, Object ctx) {
 
-        if (topics.contains(topic)) {
+        TopicStats stats = topics.getIfPresent(topic);
+        if (null != stats) {
             cb.operationFinished(ctx, addr);
             return;
         }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java?rev=1460751&r1=1460750&r2=1460751&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java Mon Mar 25 16:26:48 2013
@@ -134,7 +134,8 @@ public class ZkTopicManager extends Abst
             return;
         }
 
-        if (topics.contains(topic)) {
+        TopicStats stats = topics.getIfPresent(topic);
+        if (null != stats) {
             cb.operationFinished(ctx, addr);
             return;
         }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestFIFODeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestFIFODeliveryManager.java?rev=1460751&r1=1460750&r2=1460751&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestFIFODeliveryManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestFIFODeliveryManager.java Mon Mar 25 16:26:48 2013
@@ -37,6 +37,7 @@ import org.apache.hedwig.protocol.PubSub
 import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
 import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.topics.StubTopicManager;
 import org.apache.hedwig.server.persistence.PersistRequest;
 import org.apache.hedwig.server.persistence.PersistenceManager;
 import org.apache.hedwig.server.persistence.StubPersistenceManager;
@@ -120,7 +121,7 @@ public class TestFIFODeliveryManager {
         ByteString subscriber = ByteString.copyFromUtf8("subRaceSubscriber");
 
         PersistenceManager pm = new StubPersistenceManager();
-        FIFODeliveryManager fdm = new FIFODeliveryManager(pm, conf);
+        FIFODeliveryManager fdm = new FIFODeliveryManager(new StubTopicManager(conf), pm, conf);
         ExecutorDeliveryEndPointWithQueue dep = new ExecutorDeliveryEndPointWithQueue();
         SubscriptionPreferences prefs = SubscriptionPreferences.newBuilder().build();
 
@@ -251,7 +252,7 @@ public class TestFIFODeliveryManager {
         ByteString subscriber = ByteString.copyFromUtf8("throttlingRaceSubscriber");
 
         PersistenceManager pm = new StubPersistenceManager();
-        FIFODeliveryManager fdm = new FIFODeliveryManager(pm, conf);
+        FIFODeliveryManager fdm = new FIFODeliveryManager(new StubTopicManager(conf), pm, conf);
         ExecutorDeliveryEndPoint dep = new ExecutorDeliveryEndPoint(fdm);
         SubscriptionPreferences prefs = SubscriptionPreferences.newBuilder().build();
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/StubTopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/StubTopicManager.java?rev=1460751&r1=1460750&r2=1460751&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/StubTopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/StubTopicManager.java Mon Mar 25 16:26:48 2013
@@ -51,7 +51,7 @@ public class StubTopicManager extends Tr
             cb.operationFailed(ctx, new PubSubException.ServiceDownException("Asked to fail"));
             return;
         }
-        if (topics.contains(topic) // already own it
+        if (null != topics.getIfPresent(topic) // already own it
                 || shouldOwnEveryNewTopic) {
             super.realGetOwner(topic, shouldClaim, cb, ctx);
             return;

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestMMTopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestMMTopicManager.java?rev=1460751&r1=1460750&r2=1460751&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestMMTopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestMMTopicManager.java Mon Mar 25 16:26:48 2013
@@ -276,7 +276,7 @@ public class TestMMTopicManager extends 
         Assert.assertTrue(pair.second());
         Assert.assertEquals(PubSubException.ServiceDownException.class, ((CompositeException) addrCbq.take().right())
                             .getExceptions().iterator().next().getClass());
-        Assert.assertFalse(tm.topics.contains(topic));
+        Assert.assertFalse(null != tm.topics.getIfPresent(topic));
         Thread.sleep(100);
         assertOwnershipNodeDoesntExist();
 
@@ -318,4 +318,40 @@ public class TestMMTopicManager extends 
         assertOwnershipNodeExists();
     }
 
+    @Test(timeout=60000)
+    public void testRetentionAfterAccess() throws Exception {
+        conf.getConf().setProperty("retention_secs_after_access", "5");
+        MMTopicManager tm1 = new MMTopicManager(conf, zk, metadataManagerFactory, scheduler);
+        tm1.getOwner(topic, true, addrCbq, null);
+        Assert.assertEquals(me, check(addrCbq.take()));
+        Thread.sleep(6000L);
+        tm1.topics.cleanUp();
+        Thread.sleep(2000L);
+        assertOwnershipNodeDoesntExist();
+        tm1.getOwner(topic, true, addrCbq, null);
+        Assert.assertEquals(me, check(addrCbq.take()));
+        Thread.sleep(1000L);
+        tm1.topics.cleanUp();
+        Thread.sleep(2000L);
+        assertOwnershipNodeExists();
+
+        tm1.stop();
+    }
+
+    @Test(timeout=60000)
+    public void testMaxNumTopics() throws Exception {
+        conf.getConf().setProperty("max_num_topics", "1");
+        MMTopicManager tm1 = new MMTopicManager(conf, zk, metadataManagerFactory, scheduler);
+        tm1.getOwner(topic, true, addrCbq, null);
+        Assert.assertEquals(me, check(addrCbq.take()));
+        assertOwnershipNodeExists();
+        tm1.getOwner(ByteString.copyFromUtf8("MaxNumTopic"),
+                     true, addrCbq, null);
+        Assert.assertEquals(me, check(addrCbq.take()));
+        Thread.sleep(2000L);
+        assertOwnershipNodeDoesntExist();
+        tm1.stop();
+    }
+
+
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java?rev=1460751&r1=1460750&r2=1460751&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java Mon Mar 25 16:26:48 2013
@@ -88,11 +88,23 @@ public class TestZkTopicManager extends 
     protected HedwigSocketAddress me;
     protected ScheduledExecutorService scheduler;
 
+    private volatile int DEFAULT_MAX_NUM_TOPICS = Integer.MAX_VALUE;
+    private volatile int DEFAULT_RETENTION_SECS_AFTER_ACCESS = 0;
+
     @Override
     @Before
     public void setUp() throws Exception {
         super.setUp();
-        cfg = new ServerConfiguration();
+        cfg = new ServerConfiguration() {
+            @Override
+            public int getRetentionSecsAfterAccess() {
+                return DEFAULT_RETENTION_SECS_AFTER_ACCESS;
+            }
+            @Override
+            public int getMaxNumTopics() {
+                return DEFAULT_MAX_NUM_TOPICS;
+            }
+        };
         me = cfg.getServerAddr();
         scheduler = Executors.newSingleThreadScheduledExecutor();
         tm = new ZkTopicManager(zk, cfg, scheduler);
@@ -283,7 +295,7 @@ public class TestZkTopicManager extends 
         Assert.assertTrue(pair.second());
         Assert.assertEquals(PubSubException.ServiceDownException.class, ((CompositeException) addrCbq.take().right())
                             .getExceptions().iterator().next().getClass());
-        Assert.assertFalse(tm.topics.contains(topic));
+        Assert.assertFalse(null != tm.topics.getIfPresent(topic));
         Thread.sleep(100);
         assertOwnershipNodeDoesntExist();
 
@@ -326,4 +338,39 @@ public class TestZkTopicManager extends 
         assertOwnershipNodeExists();
     }
 
+    @Test(timeout=60000)
+    public void testRetentionAfterAccess() throws Exception {
+        DEFAULT_RETENTION_SECS_AFTER_ACCESS = 5;
+        ZkTopicManager tm1 = new ZkTopicManager(zk, cfg, scheduler);
+        tm1.getOwner(topic, true, addrCbq, null);
+        Assert.assertEquals(me, check(addrCbq.take()));
+        Thread.sleep(6000L);
+        tm1.topics.cleanUp();
+        Thread.sleep(2000L);
+        assertOwnershipNodeDoesntExist();
+        tm1.getOwner(topic, true, addrCbq, null);
+        Assert.assertEquals(me, check(addrCbq.take()));
+        Thread.sleep(1000L);
+        tm1.topics.cleanUp();
+        Thread.sleep(2000L);
+        assertOwnershipNodeExists();
+
+        tm1.stop();
+    }
+
+    @Test(timeout=60000)
+    public void testMaxNumTopics() throws Exception {
+        DEFAULT_MAX_NUM_TOPICS = 1;
+        TopicManager tm1 = new ZkTopicManager(zk, cfg, scheduler);
+        tm1.getOwner(topic, true, addrCbq, null);
+        Assert.assertEquals(me, check(addrCbq.take()));
+        assertOwnershipNodeExists();
+        tm1.getOwner(ByteString.copyFromUtf8("MaxNumTopic"),
+                     true, addrCbq, null);
+        Assert.assertEquals(me, check(addrCbq.take()));
+        Thread.sleep(2000L);
+        assertOwnershipNodeDoesntExist();
+        tm1.stop();
+    }
+
 }