You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2013/03/25 17:26:48 UTC
svn commit: r1460751 - in /zookeeper/bookkeeper/trunk: ./
hedwig-server/src/main/java/org/apache/hedwig/server/common/
hedwig-server/src/main/java/org/apache/hedwig/server/delivery/
hedwig-server/src/main/java/org/apache/hedwig/server/netty/ hedwig-ser...
Author: ivank
Date: Mon Mar 25 16:26:48 2013
New Revision: 1460751
URL: http://svn.apache.org/r1460751
Log:
BOOKKEEPER-506: Provide better topic release algorithm (sijie via ivank)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TrivialOwnAllTopicManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestFIFODeliveryManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/StubTopicManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestMMTopicManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1460751&r1=1460750&r2=1460751&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Mon Mar 25 16:26:48 2013
@@ -14,6 +14,8 @@ Trunk (unreleased changes)
BOOKKEEPER-586: Remove recursive call in delivery manager (sijie via ivank)
+ BOOKKEEPER-506: Provide better topic release algorithm (sijie via ivank)
+
bookkeeper-server:
BOOKKEEPER-567: ReadOnlyBookieTest hangs on shutdown (sijie via ivank)
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java?rev=1460751&r1=1460750&r2=1460751&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java Mon Mar 25 16:26:48 2013
@@ -57,7 +57,10 @@ public class ServerConfiguration extends
protected final static String PASSWORD = "password";
protected final static String SSL_ENABLED = "ssl_enabled";
protected final static String CONSUME_INTERVAL = "consume_interval";
+ protected final static String INIT_NUM_TOPICS = "init_num_topics";
+ protected final static String MAX_NUM_TOPICS = "max_num_topics";
protected final static String RETENTION_SECS = "retention_secs";
+ protected final static String RETENTION_SECS_AFTER_ACCESS = "retention_secs_after_access";
protected final static String INTER_REGION_SSL_ENABLED = "inter_region_ssl_enabled";
protected final static String MESSAGES_CONSUMED_THREAD_RUN_INTERVAL = "messages_consumed_thread_run_interval";
protected final static String BK_ENSEMBLE_SIZE = "bk_ensemble_size";
@@ -388,6 +391,36 @@ public class ServerConfiguration extends
}
/**
+ * Specifies that the topic should be automatically released
+ * once a fixed duration after the topic is owned, a message is
+ * published, or a message is delivered.
+ *
+ * @return the length of time after an entry is last accessed that
+ * it should be automatically removed.
+ */
+ public int getRetentionSecsAfterAccess() {
+ return conf.getInt(RETENTION_SECS_AFTER_ACCESS, 0);
+ }
+
+ /**
+ * Max number of topics for a hub server to serve.
+ *
+ * @return max number of topics for a hub server to serve.
+ */
+ public int getMaxNumTopics() {
+ return conf.getInt(MAX_NUM_TOPICS, Integer.MAX_VALUE);
+ }
+
+ /**
+ * Minimum size of internal structure to store topics.
+ *
+ * @return init number of topics for a hub server.
+ */
+ public int getInitNumTopics() {
+ return conf.getInt(INIT_NUM_TOPICS, 128);
+ }
+
+ /**
* True if SSL is enabled across regions.
*
* @return boolean
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java?rev=1460751&r1=1460750&r2=1460751&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java Mon Mar 25 16:26:48 2013
@@ -61,6 +61,7 @@ import org.apache.hedwig.server.persiste
import org.apache.hedwig.server.persistence.ReadAheadCache;
import org.apache.hedwig.server.persistence.ScanCallback;
import org.apache.hedwig.server.persistence.ScanRequest;
+import org.apache.hedwig.server.topics.TopicManager;
import org.apache.hedwig.util.Callback;
import static org.apache.hedwig.util.VarArgs.va;
@@ -97,7 +98,7 @@ public class FIFODeliveryManager impleme
private final ReadAheadCache cache;
private final PersistenceManager persistenceMgr;
-
+ private TopicManager tm;
private ServerConfiguration cfg;
private final int numDeliveryWorkers;
@@ -257,7 +258,9 @@ public class FIFODeliveryManager impleme
- public FIFODeliveryManager(PersistenceManager persistenceMgr, ServerConfiguration cfg) {
+ public FIFODeliveryManager(TopicManager tm, PersistenceManager persistenceMgr,
+ ServerConfiguration cfg) {
+ this.tm = tm;
this.persistenceMgr = persistenceMgr;
if (persistenceMgr instanceof ReadAheadCache) {
this.cache = (ReadAheadCache) persistenceMgr;
@@ -691,6 +694,12 @@ public class FIFODeliveryManager impleme
return;
}
+ // only increment topic access times when tried to deliver a message
+ // for those subscribers just waiting for a published for a long time
+ // we don't increment topic access times, so the topic would be evicted
+ // in future.
+ tm.incrementTopicAccessTimes(topic);
+
if (!filter.testMessage(message)) {
sendingFinished();
return;
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java?rev=1460751&r1=1460750&r2=1460751&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java Mon Mar 25 16:26:48 2013
@@ -411,7 +411,7 @@ public class PubSubServer {
instantiateMetadataManagerFactory();
tm = instantiateTopicManager();
pm = instantiatePersistenceManager(tm);
- dm = new FIFODeliveryManager(pm, conf);
+ dm = new FIFODeliveryManager(tm, pm, conf);
dm.start();
sm = instantiateSubscriptionManager(tm, pm, dm);
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java?rev=1460751&r1=1460750&r2=1460751&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java Mon Mar 25 16:26:48 2013
@@ -29,6 +29,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.protobuf.ByteString;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.server.common.ServerConfiguration;
import org.apache.hedwig.server.common.TopicOpQueuer;
@@ -37,6 +41,15 @@ import org.apache.hedwig.util.CallbackUt
import org.apache.hedwig.util.HedwigSocketAddress;
public abstract class AbstractTopicManager implements TopicManager {
+
+ /**
+ * Stats for a topic. For now it just an empty stub class.
+ */
+ static class TopicStats {
+ }
+
+ final static TopicStats STUB_TOPIC_STATS = new TopicStats();
+
/**
* My name.
*/
@@ -50,7 +63,7 @@ public abstract class AbstractTopicManag
/**
* List of topics I believe I am responsible for.
*/
- protected Set<ByteString> topics = Collections.synchronizedSet(new HashSet<ByteString>());
+ protected Cache<ByteString, TopicStats> topics;
protected TopicOpQueuer queuer;
protected ServerConfiguration cfg;
@@ -74,26 +87,69 @@ public abstract class AbstractTopicManag
}
private class ReleaseOp extends TopicOpQueuer.AsynchronousOp<Void> {
+ final boolean checkExistence;
+
public ReleaseOp(ByteString topic, Callback<Void> cb, Object ctx) {
+ this(topic, cb, ctx, true);
+ }
+
+ ReleaseOp(ByteString topic, Callback<Void> cb, Object ctx,
+ boolean checkExistence) {
queuer.super(topic, cb, ctx);
+ this.checkExistence = checkExistence;
}
@Override
public void run() {
- if (!topics.contains(topic)) {
- cb.operationFinished(ctx, null);
- return;
+ if (checkExistence) {
+ TopicStats stats = topics.getIfPresent(topic);
+ if (null == stats) {
+ cb.operationFinished(ctx, null);
+ return;
+ }
}
realReleaseTopic(topic, cb, ctx);
}
}
+ /**
+ * Release topic when the topic is removed from topics cache.
+ */
+ class ReleaseTopicListener implements RemovalListener<ByteString, TopicStats> {
+ @Override
+ public void onRemoval(RemovalNotification<ByteString, TopicStats> notification) {
+ if (notification.wasEvicted()) {
+ logger.info("topic {} is evicted", notification.getKey().toStringUtf8());
+ // if the topic is evicted, we need to release the topic.
+ releaseTopicInternally(notification.getKey(), false);
+ }
+ }
+ }
+
public AbstractTopicManager(ServerConfiguration cfg, ScheduledExecutorService scheduler)
throws UnknownHostException {
this.cfg = cfg;
this.queuer = new TopicOpQueuer(scheduler);
this.scheduler = scheduler;
addr = cfg.getServerAddr();
+
+ // build the topic cache
+ CacheBuilder<ByteString, TopicStats> cacheBuilder = CacheBuilder.newBuilder()
+ .maximumSize(cfg.getMaxNumTopics())
+ .initialCapacity(cfg.getInitNumTopics())
+ // TODO: change to same number as topic op queuer threads
+ .concurrencyLevel(Runtime.getRuntime().availableProcessors())
+ .removalListener(new ReleaseTopicListener());
+ if (cfg.getRetentionSecsAfterAccess() > 0) {
+ cacheBuilder.expireAfterAccess(cfg.getRetentionSecsAfterAccess(), TimeUnit.SECONDS);
+ }
+ topics = cacheBuilder.build();
+ }
+
+ @Override
+ public void incrementTopicAccessTimes(ByteString topic) {
+ // let guava cache handle hits counting
+ topics.getIfPresent(topic);
}
@Override
@@ -101,6 +157,30 @@ public abstract class AbstractTopicManag
listeners.add(listener);
}
+ private void releaseTopicInternally(final ByteString topic, boolean checkExistence) {
+ // Enqueue a release operation. (Recall that release
+ // doesn't "fail" even if the topic is missing.)
+ queuer.pushAndMaybeRun(topic, new ReleaseOp(topic, new Callback<Void>() {
+
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception) {
+ logger.error("failure that should never happen when releasing topic "
+ + topic, exception);
+ }
+
+ @Override
+ public void operationFinished(Object ctx, Void resultOfOperation) {
+ logger.info("successfully release of topic "
+ + topic.toStringUtf8());
+ if (logger.isDebugEnabled()) {
+ logger.debug("successfully release of topic "
+ + topic.toStringUtf8());
+ }
+ }
+
+ }, null, checkExistence));
+ }
+
protected final synchronized void notifyListenersAndAddToOwnedTopics(final ByteString topic,
final Callback<HedwigSocketAddress> originalCallback, final Object originalContext) {
@@ -108,30 +188,12 @@ public abstract class AbstractTopicManag
@Override
public void operationFinished(Object ctx, Void resultOfOperation) {
- topics.add(topic);
+ topics.put(topic, STUB_TOPIC_STATS);
if (cfg.getRetentionSecs() > 0) {
scheduler.schedule(new Runnable() {
@Override
public void run() {
- // Enqueue a release operation. (Recall that release
- // doesn't "fail" even if the topic is missing.)
- releaseTopic(topic, new Callback<Void>() {
-
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- logger.error("failure that should never happen when periodically releasing topic "
- + topic, exception);
- }
-
- @Override
- public void operationFinished(Object ctx, Void resultOfOperation) {
- if (logger.isDebugEnabled()) {
- logger.debug("successful periodic release of topic "
- + topic.toStringUtf8());
- }
- }
-
- }, null);
+ releaseTopicInternally(topic, true);
}
}, cfg.getRetentionSecs(), TimeUnit.SECONDS);
}
@@ -164,7 +226,7 @@ public abstract class AbstractTopicManag
private void realReleaseTopic(ByteString topic, Callback<Void> callback, Object ctx) {
for (TopicOwnershipChangeListener listener : listeners)
listener.lostTopic(topic);
- topics.remove(topic);
+ topics.invalidate(topic);
postReleaseCleanup(topic, callback, ctx);
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java?rev=1460751&r1=1460750&r2=1460751&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java Mon Mar 25 16:26:48 2013
@@ -120,7 +120,8 @@ public class MMTopicManager extends Abst
return;
}
- if (topics.contains(topic)) {
+ TopicStats stats = topics.getIfPresent(topic);
+ if (null != stats) {
cb.operationFinished(ctx, addr);
return;
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java?rev=1460751&r1=1460750&r2=1460751&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java Mon Mar 25 16:26:48 2013
@@ -49,6 +49,14 @@ public interface TopicManager {
Callback<HedwigSocketAddress> cb, Object ctx);
/**
+ * Increment the number of access times for a given <code>topic</code>.
+ *
+ * @param topic
+ * Topic Name.
+ */
+ public void incrementTopicAccessTimes(ByteString topic);
+
+ /**
* Whenever the topic manager finds out that the set of topics owned by this
* node has changed, it can notify a set of
* {@link TopicOwnershipChangeListener} objects. Any component of the system
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TrivialOwnAllTopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TrivialOwnAllTopicManager.java?rev=1460751&r1=1460750&r2=1460751&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TrivialOwnAllTopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TrivialOwnAllTopicManager.java Mon Mar 25 16:26:48 2013
@@ -36,7 +36,8 @@ public class TrivialOwnAllTopicManager e
protected void realGetOwner(ByteString topic, boolean shouldClaim,
Callback<HedwigSocketAddress> cb, Object ctx) {
- if (topics.contains(topic)) {
+ TopicStats stats = topics.getIfPresent(topic);
+ if (null != stats) {
cb.operationFinished(ctx, addr);
return;
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java?rev=1460751&r1=1460750&r2=1460751&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java Mon Mar 25 16:26:48 2013
@@ -134,7 +134,8 @@ public class ZkTopicManager extends Abst
return;
}
- if (topics.contains(topic)) {
+ TopicStats stats = topics.getIfPresent(topic);
+ if (null != stats) {
cb.operationFinished(ctx, addr);
return;
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestFIFODeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestFIFODeliveryManager.java?rev=1460751&r1=1460750&r2=1460751&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestFIFODeliveryManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestFIFODeliveryManager.java Mon Mar 25 16:26:48 2013
@@ -37,6 +37,7 @@ import org.apache.hedwig.protocol.PubSub
import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.topics.StubTopicManager;
import org.apache.hedwig.server.persistence.PersistRequest;
import org.apache.hedwig.server.persistence.PersistenceManager;
import org.apache.hedwig.server.persistence.StubPersistenceManager;
@@ -120,7 +121,7 @@ public class TestFIFODeliveryManager {
ByteString subscriber = ByteString.copyFromUtf8("subRaceSubscriber");
PersistenceManager pm = new StubPersistenceManager();
- FIFODeliveryManager fdm = new FIFODeliveryManager(pm, conf);
+ FIFODeliveryManager fdm = new FIFODeliveryManager(new StubTopicManager(conf), pm, conf);
ExecutorDeliveryEndPointWithQueue dep = new ExecutorDeliveryEndPointWithQueue();
SubscriptionPreferences prefs = SubscriptionPreferences.newBuilder().build();
@@ -251,7 +252,7 @@ public class TestFIFODeliveryManager {
ByteString subscriber = ByteString.copyFromUtf8("throttlingRaceSubscriber");
PersistenceManager pm = new StubPersistenceManager();
- FIFODeliveryManager fdm = new FIFODeliveryManager(pm, conf);
+ FIFODeliveryManager fdm = new FIFODeliveryManager(new StubTopicManager(conf), pm, conf);
ExecutorDeliveryEndPoint dep = new ExecutorDeliveryEndPoint(fdm);
SubscriptionPreferences prefs = SubscriptionPreferences.newBuilder().build();
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/StubTopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/StubTopicManager.java?rev=1460751&r1=1460750&r2=1460751&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/StubTopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/StubTopicManager.java Mon Mar 25 16:26:48 2013
@@ -51,7 +51,7 @@ public class StubTopicManager extends Tr
cb.operationFailed(ctx, new PubSubException.ServiceDownException("Asked to fail"));
return;
}
- if (topics.contains(topic) // already own it
+ if (null != topics.getIfPresent(topic) // already own it
|| shouldOwnEveryNewTopic) {
super.realGetOwner(topic, shouldClaim, cb, ctx);
return;
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestMMTopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestMMTopicManager.java?rev=1460751&r1=1460750&r2=1460751&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestMMTopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestMMTopicManager.java Mon Mar 25 16:26:48 2013
@@ -276,7 +276,7 @@ public class TestMMTopicManager extends
Assert.assertTrue(pair.second());
Assert.assertEquals(PubSubException.ServiceDownException.class, ((CompositeException) addrCbq.take().right())
.getExceptions().iterator().next().getClass());
- Assert.assertFalse(tm.topics.contains(topic));
+ Assert.assertFalse(null != tm.topics.getIfPresent(topic));
Thread.sleep(100);
assertOwnershipNodeDoesntExist();
@@ -318,4 +318,40 @@ public class TestMMTopicManager extends
assertOwnershipNodeExists();
}
+ @Test(timeout=60000)
+ public void testRetentionAfterAccess() throws Exception {
+ conf.getConf().setProperty("retention_secs_after_access", "5");
+ MMTopicManager tm1 = new MMTopicManager(conf, zk, metadataManagerFactory, scheduler);
+ tm1.getOwner(topic, true, addrCbq, null);
+ Assert.assertEquals(me, check(addrCbq.take()));
+ Thread.sleep(6000L);
+ tm1.topics.cleanUp();
+ Thread.sleep(2000L);
+ assertOwnershipNodeDoesntExist();
+ tm1.getOwner(topic, true, addrCbq, null);
+ Assert.assertEquals(me, check(addrCbq.take()));
+ Thread.sleep(1000L);
+ tm1.topics.cleanUp();
+ Thread.sleep(2000L);
+ assertOwnershipNodeExists();
+
+ tm1.stop();
+ }
+
+ @Test(timeout=60000)
+ public void testMaxNumTopics() throws Exception {
+ conf.getConf().setProperty("max_num_topics", "1");
+ MMTopicManager tm1 = new MMTopicManager(conf, zk, metadataManagerFactory, scheduler);
+ tm1.getOwner(topic, true, addrCbq, null);
+ Assert.assertEquals(me, check(addrCbq.take()));
+ assertOwnershipNodeExists();
+ tm1.getOwner(ByteString.copyFromUtf8("MaxNumTopic"),
+ true, addrCbq, null);
+ Assert.assertEquals(me, check(addrCbq.take()));
+ Thread.sleep(2000L);
+ assertOwnershipNodeDoesntExist();
+ tm1.stop();
+ }
+
+
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java?rev=1460751&r1=1460750&r2=1460751&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java Mon Mar 25 16:26:48 2013
@@ -88,11 +88,23 @@ public class TestZkTopicManager extends
protected HedwigSocketAddress me;
protected ScheduledExecutorService scheduler;
+ private volatile int DEFAULT_MAX_NUM_TOPICS = Integer.MAX_VALUE;
+ private volatile int DEFAULT_RETENTION_SECS_AFTER_ACCESS = 0;
+
@Override
@Before
public void setUp() throws Exception {
super.setUp();
- cfg = new ServerConfiguration();
+ cfg = new ServerConfiguration() {
+ @Override
+ public int getRetentionSecsAfterAccess() {
+ return DEFAULT_RETENTION_SECS_AFTER_ACCESS;
+ }
+ @Override
+ public int getMaxNumTopics() {
+ return DEFAULT_MAX_NUM_TOPICS;
+ }
+ };
me = cfg.getServerAddr();
scheduler = Executors.newSingleThreadScheduledExecutor();
tm = new ZkTopicManager(zk, cfg, scheduler);
@@ -283,7 +295,7 @@ public class TestZkTopicManager extends
Assert.assertTrue(pair.second());
Assert.assertEquals(PubSubException.ServiceDownException.class, ((CompositeException) addrCbq.take().right())
.getExceptions().iterator().next().getClass());
- Assert.assertFalse(tm.topics.contains(topic));
+ Assert.assertFalse(null != tm.topics.getIfPresent(topic));
Thread.sleep(100);
assertOwnershipNodeDoesntExist();
@@ -326,4 +338,39 @@ public class TestZkTopicManager extends
assertOwnershipNodeExists();
}
+ @Test(timeout=60000)
+ public void testRetentionAfterAccess() throws Exception {
+ DEFAULT_RETENTION_SECS_AFTER_ACCESS = 5;
+ ZkTopicManager tm1 = new ZkTopicManager(zk, cfg, scheduler);
+ tm1.getOwner(topic, true, addrCbq, null);
+ Assert.assertEquals(me, check(addrCbq.take()));
+ Thread.sleep(6000L);
+ tm1.topics.cleanUp();
+ Thread.sleep(2000L);
+ assertOwnershipNodeDoesntExist();
+ tm1.getOwner(topic, true, addrCbq, null);
+ Assert.assertEquals(me, check(addrCbq.take()));
+ Thread.sleep(1000L);
+ tm1.topics.cleanUp();
+ Thread.sleep(2000L);
+ assertOwnershipNodeExists();
+
+ tm1.stop();
+ }
+
+ @Test(timeout=60000)
+ public void testMaxNumTopics() throws Exception {
+ DEFAULT_MAX_NUM_TOPICS = 1;
+ TopicManager tm1 = new ZkTopicManager(zk, cfg, scheduler);
+ tm1.getOwner(topic, true, addrCbq, null);
+ Assert.assertEquals(me, check(addrCbq.take()));
+ assertOwnershipNodeExists();
+ tm1.getOwner(ByteString.copyFromUtf8("MaxNumTopic"),
+ true, addrCbq, null);
+ Assert.assertEquals(me, check(addrCbq.take()));
+ Thread.sleep(2000L);
+ assertOwnershipNodeDoesntExist();
+ tm1.stop();
+ }
+
}