You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:12 UTC
[02/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java b/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java
deleted file mode 100644
index 52a5874..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hedwig.protocol.PubSubProtocol;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.junit.Test;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.HelperMethods;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.server.topics.TopicOwnershipChangeListener;
-import org.apache.hedwig.util.Callback;
-
-import static org.junit.Assert.*;
-
-public abstract class TestPersistenceManagerBlackBox {
- protected PersistenceManager persistenceManager;
- protected int NUM_MESSAGES_TO_TEST = 5;
- protected int NUM_TOPICS_TO_TEST = 5;
- private static final Logger logger = LoggerFactory.getLogger(TestPersistenceManagerBlackBox.class);
- TestCallback testCallback = new TestCallback();
-
- RuntimeException failureException;
-
- class TestCallback implements Callback<PubSubProtocol.MessageSeqId> {
-
- public void operationFailed(Object ctx, PubSubException exception) {
- throw (failureException = new RuntimeException(exception));
- }
-
- @SuppressWarnings("unchecked")
- public void operationFinished(Object ctx, PubSubProtocol.MessageSeqId resultOfOperation) {
- LinkedBlockingQueue<Boolean> statusQueue = (LinkedBlockingQueue<Boolean>) ctx;
- try {
- statusQueue.put(true);
- } catch (InterruptedException e) {
- throw (failureException = new RuntimeException(e));
- }
- }
- }
-
- class RangeScanVerifierListener implements ScanCallback {
- List<Message> pubMsgs;
-
- public RangeScanVerifierListener(List<Message> pubMsgs) {
- this.pubMsgs = pubMsgs;
- }
-
- public void messageScanned(Object ctx, Message recvMessage) {
- if (pubMsgs.isEmpty()) {
- throw (failureException = new RuntimeException("Message received when none expected"));
- }
-
- Message pubMsg = pubMsgs.get(0);
- if (!HelperMethods.areEqual(recvMessage, pubMsg)) {
- throw (failureException = new RuntimeException("Scanned message not equal to expected"));
- }
- pubMsgs.remove(0);
- }
-
- public void scanFailed(Object ctx, Exception exception) {
- throw (failureException = new RuntimeException(exception));
- }
-
- @SuppressWarnings("unchecked")
- public void scanFinished(Object ctx, ReasonForFinish reason) {
- if (reason != ReasonForFinish.NO_MORE_MESSAGES) {
- throw (failureException = new RuntimeException("Scan finished prematurely " + reason));
- }
- LinkedBlockingQueue<Boolean> statusQueue = (LinkedBlockingQueue<Boolean>) ctx;
- try {
- statusQueue.put(true);
- } catch (InterruptedException e) {
- throw (failureException = new RuntimeException(e));
- }
- }
-
- }
-
- class PointScanVerifierListener implements ScanCallback {
- List<Message> pubMsgs;
- ByteString topic;
-
- public PointScanVerifierListener(List<Message> pubMsgs, ByteString topic) {
- this.topic = topic;
- this.pubMsgs = pubMsgs;
- }
-
- @SuppressWarnings("unchecked")
- public void messageScanned(Object ctx, Message recvMessage) {
-
- Message pubMsg = pubMsgs.get(0);
- if (!HelperMethods.areEqual(recvMessage, pubMsg)) {
- throw (failureException = new RuntimeException("Scanned message not equal to expected"));
- }
- pubMsgs.remove(0);
-
- if (pubMsgs.isEmpty()) {
- LinkedBlockingQueue<Boolean> statusQueue = (LinkedBlockingQueue<Boolean>) ctx;
- try {
- statusQueue.put(true);
- } catch (InterruptedException e) {
- throw (failureException = new RuntimeException(e));
- }
- } else {
- long seqId = recvMessage.getMsgId().getLocalComponent();
- seqId = persistenceManager.getSeqIdAfterSkipping(topic, seqId, 1);
- ScanRequest request = new ScanRequest(topic, seqId, new PointScanVerifierListener(pubMsgs, topic), ctx);
- persistenceManager.scanSingleMessage(request);
- }
-
- }
-
- public void scanFailed(Object ctx, Exception exception) {
- throw (failureException = new RuntimeException(exception));
- }
-
- public void scanFinished(Object ctx, ReasonForFinish reason) {
-
- }
-
- }
-
- class ScanVerifier implements Runnable {
- List<Message> pubMsgs;
- ByteString topic;
- LinkedBlockingQueue<Boolean> statusQueue = new LinkedBlockingQueue<Boolean>();
-
- public ScanVerifier(ByteString topic, List<Message> pubMsgs) {
- this.topic = topic;
- this.pubMsgs = pubMsgs;
- }
-
- public void run() {
- // start the scan
- try {
- if (persistenceManager instanceof PersistenceManagerWithRangeScan) {
-
- ScanCallback listener = new RangeScanVerifierListener(pubMsgs);
-
- PersistenceManagerWithRangeScan rangePersistenceManager = (PersistenceManagerWithRangeScan) persistenceManager;
-
- rangePersistenceManager.scanMessages(new RangeScanRequest(topic, getLowestSeqId(),
- NUM_MESSAGES_TO_TEST + 1, Long.MAX_VALUE, listener, statusQueue));
-
- } else {
-
- ScanCallback listener = new PointScanVerifierListener(pubMsgs, topic);
- persistenceManager
- .scanSingleMessage(new ScanRequest(topic, getLowestSeqId(), listener, statusQueue));
-
- }
- // now listen for it to finish
- // wait a maximum of a minute
- Boolean b = statusQueue.poll(60, TimeUnit.SECONDS);
- if (b == null) {
- throw (failureException = new RuntimeException("Scanning timed out"));
- }
- } catch (InterruptedException e) {
- throw (failureException = new RuntimeException(e));
- }
- }
- }
-
- class Publisher implements Runnable {
- List<Message> pubMsgs;
- ByteString topic;
-
- public Publisher(ByteString topic, List<Message> pubMsgs) {
- this.pubMsgs = pubMsgs;
- this.topic = topic;
- }
-
- public void run() {
- LinkedBlockingQueue<Boolean> statusQueue = new LinkedBlockingQueue<Boolean>();
-
- for (Message msg : pubMsgs) {
-
- try {
- persistenceManager.persistMessage(new PersistRequest(topic, msg, testCallback, statusQueue));
- // wait a maximum of a minute
- Boolean b = statusQueue.poll(60, TimeUnit.SECONDS);
- if (b == null) {
- throw (failureException = new RuntimeException("Scanning timed out"));
- }
- } catch (InterruptedException e) {
- throw (failureException = new RuntimeException(e));
- }
- }
- }
-
- }
-
- public void setUp() throws Exception {
- logger.info("STARTING " + getClass());
- persistenceManager = instantiatePersistenceManager();
- failureException = null;
- logger.info("Persistence Manager test setup finished");
- }
-
- abstract long getLowestSeqId();
-
- abstract PersistenceManager instantiatePersistenceManager() throws Exception;
-
- public void tearDown() throws Exception {
- logger.info("tearDown starting");
- persistenceManager.stop();
- logger.info("FINISHED " + getClass());
- }
-
- protected ByteString getTopicName(int number) {
- return ByteString.copyFromUtf8("topic" + number);
- }
-
- @Test(timeout=60000)
- public void testPersistenceManager() throws Exception {
- List<Thread> publisherThreads = new LinkedList<Thread>();
- List<Thread> scannerThreads = new LinkedList<Thread>();
- Thread thread;
- Semaphore latch = new Semaphore(1);
-
- for (int i = 0; i < NUM_TOPICS_TO_TEST; i++) {
- ByteString topic = getTopicName(i);
-
- if (persistenceManager instanceof TopicOwnershipChangeListener) {
-
- TopicOwnershipChangeListener tocl = (TopicOwnershipChangeListener) persistenceManager;
-
- latch.acquire();
-
- tocl.acquiredTopic(topic, new Callback<Void>() {
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- failureException = new RuntimeException(exception);
- ((Semaphore) ctx).release();
- }
-
- @Override
- public void operationFinished(Object ctx, Void res) {
- ((Semaphore) ctx).release();
- }
- }, latch);
-
- latch.acquire();
- latch.release();
- if (failureException != null) {
- throw (Exception) failureException.getCause();
- }
- }
- List<Message> msgs = HelperMethods.getRandomPublishedMessages(NUM_MESSAGES_TO_TEST, 1024);
-
- thread = new Thread(new Publisher(topic, msgs));
- publisherThreads.add(thread);
- thread.start();
-
- thread = new Thread(new ScanVerifier(topic, msgs));
- scannerThreads.add(thread);
- }
- for (Thread t : publisherThreads) {
- t.join();
- }
-
- for (Thread t : scannerThreads) {
- t.start();
- }
-
- for (Thread t : scannerThreads) {
- t.join();
- }
-
- assertEquals(null, failureException);
- for (int i = 0; i < NUM_TOPICS_TO_TEST; i++) {
- assertEquals(persistenceManager.getCurrentSeqIdForTopic(getTopicName(i)).getLocalComponent(),
- getExpectedSeqId(NUM_MESSAGES_TO_TEST));
- }
-
- }
-
- abstract long getExpectedSeqId(int numPublished);
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheBlackBox.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheBlackBox.java b/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheBlackBox.java
deleted file mode 100644
index 2e59a8a..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheBlackBox.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-import junit.framework.Test;
-import junit.framework.TestSuite;
-
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.junit.After;
-import org.junit.Before;
-
-public class TestReadAheadCacheBlackBox extends TestPersistenceManagerBlackBox {
-
- @After
- @Override
- public void tearDown() throws Exception {
- super.tearDown();
- LocalDBPersistenceManager.instance().reset();
- }
-
- @Before
- @Override
- public void setUp() throws Exception {
- super.setUp();
- }
-
- @Override
- long getExpectedSeqId(int numPublished) {
- return numPublished;
- }
-
- @Override
- long getLowestSeqId() {
- return 1;
- }
-
- @Override
- PersistenceManager instantiatePersistenceManager() {
- return new ReadAheadCache(LocalDBPersistenceManager.instance(), new ServerConfiguration()).start();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java b/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java
deleted file mode 100644
index ae08005..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java
+++ /dev/null
@@ -1,302 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.persistence;
-
-import static org.junit.Assert.*;
-
-import java.util.List;
-
-import org.apache.hedwig.protocol.PubSubProtocol;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.protobuf.ByteString;
-import org.apache.bookkeeper.util.MathUtils;
-import org.apache.hedwig.HelperMethods;
-import org.apache.hedwig.StubCallback;
-import org.apache.hedwig.StubScanCallback;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.util.ConcurrencyUtils;
-
-public class TestReadAheadCacheWhiteBox {
- ByteString topic = ByteString.copyFromUtf8("testTopic");
- final static int NUM_MESSAGES = 10;
- final static int MSG_SIZE = 50;
- List<Message> messages = HelperMethods.getRandomPublishedMessages(NUM_MESSAGES, MSG_SIZE);
- StubPersistenceManager stubPersistenceManager;
- ReadAheadCache cacheBasedPersistenceManager;
- MyServerConfiguration myConf = new MyServerConfiguration();
-
- class MyReadAheadCache extends ReadAheadCache {
- public MyReadAheadCache(PersistenceManagerWithRangeScan persistenceManger, ServerConfiguration cfg) {
- super(persistenceManger, cfg);
- }
-
- @Override
- protected void enqueueWithoutFailureByTopic(ByteString topic, final CacheRequest obj) {
- // make it perform in the same thread
- obj.performRequest();
- }
-
- }
-
- class MyServerConfiguration extends ServerConfiguration {
-
- // Note these are set up, so that the size limit will be reached before
- // the count limit
- int readAheadCount = NUM_MESSAGES / 2;
- long readAheadSize = (long) (MSG_SIZE * 2.5);
- long maxCacheSize = Integer.MAX_VALUE;
- long cacheEntryTTL = 0L;
-
- @Override
- public int getReadAheadCount() {
- return readAheadCount;
- }
-
- @Override
- public long getReadAheadSizeBytes() {
- return readAheadSize;
- }
-
- @Override
- public long getMaximumCacheSize() {
- return maxCacheSize;
- }
-
- @Override
- public long getCacheEntryTTL() {
- return cacheEntryTTL;
- }
-
- }
-
- @Before
- public void setUp() throws Exception {
- stubPersistenceManager = new StubPersistenceManager();
- cacheBasedPersistenceManager = new MyReadAheadCache(stubPersistenceManager, myConf).start();
- }
-
- @Test(timeout=60000)
- public void testPersistMessage() throws Exception {
- StubCallback<PubSubProtocol.MessageSeqId> callback = new StubCallback<PubSubProtocol.MessageSeqId>();
- PersistRequest request = new PersistRequest(topic, messages.get(0), callback, null);
-
- stubPersistenceManager.failure = true;
- cacheBasedPersistenceManager.persistMessage(request);
- assertNotNull(ConcurrencyUtils.take(callback.queue).right());
-
- CacheKey key = new CacheKey(topic, cacheBasedPersistenceManager.getCurrentSeqIdForTopic(topic)
- .getLocalComponent());
- assertFalse(cacheBasedPersistenceManager.cache.containsKey(key));
-
- stubPersistenceManager.failure = false;
- persistMessage(messages.get(0));
- }
-
- private void persistMessage(Message msg) throws Exception {
- StubCallback<PubSubProtocol.MessageSeqId> callback = new StubCallback<PubSubProtocol.MessageSeqId>();
- PersistRequest request = new PersistRequest(topic, msg, callback, null);
- cacheBasedPersistenceManager.persistMessage(request);
- assertNotNull(ConcurrencyUtils.take(callback.queue).left());
- CacheKey key = new CacheKey(topic, cacheBasedPersistenceManager.getCurrentSeqIdForTopic(topic)
- .getLocalComponent());
- CacheValue cacheValue = cacheBasedPersistenceManager.cache.get(key);
- assertNotNull(cacheValue);
- assertFalse(cacheValue.isStub());
- assertTrue(HelperMethods.areEqual(cacheValue.getMessage(), msg));
-
- }
-
- @Test(timeout=60000)
- public void testScanSingleMessage() throws Exception {
- StubScanCallback callback = new StubScanCallback();
- ScanRequest request = new ScanRequest(topic, 1, callback, null);
- stubPersistenceManager.failure = true;
-
- cacheBasedPersistenceManager.scanSingleMessage(request);
- assertTrue(callback.isFailed());
- assertTrue(0 == cacheBasedPersistenceManager.cache.size());
-
- stubPersistenceManager.failure = false;
- cacheBasedPersistenceManager.scanSingleMessage(request);
- assertTrue(myConf.readAheadCount == cacheBasedPersistenceManager.cache.size());
-
- persistMessage(messages.get(0));
- assertTrue(callback.isSuccess());
-
- }
-
- @Test(timeout=60000)
- public void testDeliveredUntil() throws Exception {
- for (Message m : messages) {
- persistMessage(m);
- }
- assertEquals((long) NUM_MESSAGES * MSG_SIZE, cacheBasedPersistenceManager.presentCacheSize.get());
- long middle = messages.size() / 2;
- cacheBasedPersistenceManager.deliveredUntil(topic, middle);
-
- assertEquals(messages.size() - middle, cacheBasedPersistenceManager.cache.size());
-
- long middle2 = middle - 1;
- cacheBasedPersistenceManager.deliveredUntil(topic, middle2);
- // should have no effect
- assertEquals(messages.size() - middle, cacheBasedPersistenceManager.cache.size());
-
- // delivered all messages
- cacheBasedPersistenceManager.deliveredUntil(topic, (long) messages.size());
- // should have no effect
- assertTrue(cacheBasedPersistenceManager.cache.isEmpty());
- assertTrue(cacheBasedPersistenceManager.cacheSegment.get()
- .timeIndexOfAddition.isEmpty());
- assertTrue(cacheBasedPersistenceManager.orderedIndexOnSeqId.isEmpty());
- assertTrue(0 == cacheBasedPersistenceManager.presentCacheSize.get());
-
- }
-
- @Test(timeout=60000)
- public void testDoReadAhead() {
- StubScanCallback callback = new StubScanCallback();
- ScanRequest request = new ScanRequest(topic, 1, callback, null);
- cacheBasedPersistenceManager.doReadAhead(request);
-
- assertEquals(myConf.readAheadCount, cacheBasedPersistenceManager.cache.size());
-
- request = new ScanRequest(topic, myConf.readAheadCount / 2 - 1, callback, null);
- cacheBasedPersistenceManager.doReadAhead(request);
- assertEquals(myConf.readAheadCount, cacheBasedPersistenceManager.cache.size());
-
- request = new ScanRequest(topic, myConf.readAheadCount / 2 + 2, callback, null);
- cacheBasedPersistenceManager.doReadAhead(request);
- assertEquals((int) (1.5 * myConf.readAheadCount), cacheBasedPersistenceManager.cache.size());
-
- }
-
- @Test(timeout=60000)
- public void testReadAheadSizeLimit() throws Exception {
- for (Message m : messages) {
- persistMessage(m);
- }
- cacheBasedPersistenceManager.cache.clear();
- StubScanCallback callback = new StubScanCallback();
- ScanRequest request = new ScanRequest(topic, 1, callback, null);
- cacheBasedPersistenceManager.scanSingleMessage(request);
-
- assertTrue(callback.isSuccess());
- assertEquals((int) Math.ceil(myConf.readAheadSize / (MSG_SIZE + 0.0)), cacheBasedPersistenceManager.cache
- .size());
-
- }
-
- @Test(timeout=60000)
- public void testDoReadAheadStartingFrom() throws Exception {
- persistMessage(messages.get(0));
- int readAheadCount = 5;
- int start = 1;
- RangeScanRequest readAheadRequest = cacheBasedPersistenceManager.doReadAheadStartingFrom(topic, start,
- readAheadCount);
- assertNull(readAheadRequest);
-
- StubScanCallback callback = new StubScanCallback();
- int end = 100;
- ScanRequest request = new ScanRequest(topic, end, callback, null);
- cacheBasedPersistenceManager.doReadAhead(request);
-
- int pos = 98;
- readAheadRequest = cacheBasedPersistenceManager.doReadAheadStartingFrom(topic, pos, readAheadCount);
- assertEquals(readAheadRequest.messageLimit, end - pos);
-
- end = 200;
- request = new ScanRequest(topic, end, callback, null);
- cacheBasedPersistenceManager.doReadAhead(request);
-
- // too far back
- pos = 150;
- readAheadRequest = cacheBasedPersistenceManager.doReadAheadStartingFrom(topic, pos, readAheadCount);
- assertEquals(readAheadRequest.messageLimit, readAheadCount);
- }
-
- @Test(timeout=60000)
- public void testAddMessageToCache() {
- CacheKey key = new CacheKey(topic, 1);
- cacheBasedPersistenceManager.addMessageToCache(key, messages.get(0), MathUtils.now());
- assertEquals(1, cacheBasedPersistenceManager.cache.size());
- assertEquals(MSG_SIZE, cacheBasedPersistenceManager.presentCacheSize.get());
- assertEquals(1, cacheBasedPersistenceManager.orderedIndexOnSeqId.get(topic).size());
- assertTrue(cacheBasedPersistenceManager.orderedIndexOnSeqId.get(topic).contains(1L));
-
- CacheValue value = cacheBasedPersistenceManager.cache.get(key);
- assertTrue(cacheBasedPersistenceManager.cacheSegment.get()
- .timeIndexOfAddition.get(value.timeOfAddition).contains(key));
- }
-
- @Test(timeout=60000)
- public void testRemoveMessageFromCache() {
- CacheKey key = new CacheKey(topic, 1);
- cacheBasedPersistenceManager.addMessageToCache(key, messages.get(0), MathUtils.now());
- cacheBasedPersistenceManager.removeMessageFromCache(key, new Exception(), true, true);
- assertTrue(cacheBasedPersistenceManager.cache.isEmpty());
- assertTrue(cacheBasedPersistenceManager.orderedIndexOnSeqId.isEmpty());
- assertTrue(cacheBasedPersistenceManager.cacheSegment.get()
- .timeIndexOfAddition.isEmpty());
- }
-
- @Test(timeout=60000)
- public void testCollectOldCacheEntries() {
- int i = 1;
- for (Message m : messages) {
- CacheKey key = new CacheKey(topic, i);
- cacheBasedPersistenceManager.addMessageToCache(key, m, i);
- i++;
- }
-
- int n = 2;
- myConf.maxCacheSize = n * MSG_SIZE * myConf.getNumReadAheadCacheThreads();
- cacheBasedPersistenceManager.reloadConf(myConf);
- cacheBasedPersistenceManager.collectOldOrExpiredCacheEntries(
- cacheBasedPersistenceManager.cacheSegment.get());
- assertEquals(n, cacheBasedPersistenceManager.cache.size());
- assertEquals(n, cacheBasedPersistenceManager.cacheSegment.get()
- .timeIndexOfAddition.size());
- }
-
- @Test(timeout=60000)
- public void testCollectExpiredCacheEntries() throws Exception {
- int i = 1;
- int n = 2;
- long ttl = 5000L;
- myConf.cacheEntryTTL = ttl;
- long curTime = MathUtils.now();
- cacheBasedPersistenceManager.reloadConf(myConf);
- for (Message m : messages) {
- CacheKey key = new CacheKey(topic, i);
- cacheBasedPersistenceManager.addMessageToCache(key, m, curTime++);
- if (i == NUM_MESSAGES - n) {
- Thread.sleep(2 * ttl);
- curTime += 2 * ttl;
- }
- i++;
- }
-
- assertEquals(n, cacheBasedPersistenceManager.cache.size());
- assertEquals(n, cacheBasedPersistenceManager.cacheSegment.get()
- .timeIndexOfAddition.size());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/StubSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/StubSubscriptionManager.java b/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/StubSubscriptionManager.java
deleted file mode 100644
index 26b2ce3..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/StubSubscriptionManager.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.subscriptions;
-
-import java.util.concurrent.ScheduledExecutorService;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.delivery.DeliveryManager;
-import org.apache.hedwig.server.persistence.PersistenceManager;
-import org.apache.hedwig.server.topics.TopicManager;
-import org.apache.hedwig.util.Callback;
-
-public class StubSubscriptionManager extends InMemorySubscriptionManager {
- boolean fail = false;
-
- public void setFail(boolean fail) {
- this.fail = fail;
- }
-
- public StubSubscriptionManager(TopicManager tm, PersistenceManager pm, DeliveryManager dm,
- ServerConfiguration conf, ScheduledExecutorService scheduler) {
- super(conf, tm, pm, dm, scheduler);
- }
-
- @Override
- public void serveSubscribeRequest(ByteString topic, SubscribeRequest subRequest, MessageSeqId consumeSeqId,
- Callback<SubscriptionData> callback, Object ctx) {
- if (fail) {
- callback.operationFailed(ctx, new PubSubException.ServiceDownException("Asked to fail"));
- return;
- }
- super.serveSubscribeRequest(topic, subRequest, consumeSeqId, callback, ctx);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestMMSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestMMSubscriptionManager.java b/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestMMSubscriptionManager.java
deleted file mode 100644
index 0e0f670..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestMMSubscriptionManager.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.subscriptions;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.SynchronousQueue;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.Before;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.persistence.LocalDBPersistenceManager;
-import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager;
-import org.apache.hedwig.server.meta.MetadataManagerFactory;
-import org.apache.hedwig.util.ConcurrencyUtils;
-import org.apache.hedwig.util.Either;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.zookeeper.ZooKeeperTestBase;
-
-public class TestMMSubscriptionManager extends ZooKeeperTestBase {
- MetadataManagerFactory mm;
- MMSubscriptionManager sm;
- ServerConfiguration cfg = new ServerConfiguration();
- SynchronousQueue<Either<SubscriptionData, PubSubException>> subDataCallbackQueue = new SynchronousQueue<Either<SubscriptionData, PubSubException>>();
- SynchronousQueue<Either<Boolean, PubSubException>> BooleanCallbackQueue = new SynchronousQueue<Either<Boolean, PubSubException>>();
-
- Callback<Void> voidCallback;
- Callback<SubscriptionData> subDataCallback;
-
- @Before
- @Override
- public void setUp() throws Exception {
- super.setUp();
- cfg = new ServerConfiguration();
- final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
- mm = MetadataManagerFactory.newMetadataManagerFactory(cfg, zk);
- sm = new MMSubscriptionManager(cfg, mm, new TrivialOwnAllTopicManager(cfg, scheduler),
- LocalDBPersistenceManager.instance(), null, scheduler);
- subDataCallback = new Callback<SubscriptionData>() {
- @Override
- public void operationFailed(Object ctx, final PubSubException exception) {
- scheduler.execute(new Runnable() {
- public void run() {
- ConcurrencyUtils.put(subDataCallbackQueue, Either.of((SubscriptionData) null, exception));
- }
- });
- }
-
- @Override
- public void operationFinished(Object ctx, final SubscriptionData resultOfOperation) {
- scheduler.execute(new Runnable() {
- public void run() {
- ConcurrencyUtils.put(subDataCallbackQueue, Either.of(resultOfOperation, (PubSubException) null));
- }
- });
- }
- };
-
- voidCallback = new Callback<Void>() {
- @Override
- public void operationFailed(Object ctx, final PubSubException exception) {
- scheduler.execute(new Runnable() {
- public void run() {
- ConcurrencyUtils.put(BooleanCallbackQueue, Either.of((Boolean) null, exception));
- }
- });
- }
-
- @Override
- public void operationFinished(Object ctx, Void resultOfOperation) {
- scheduler.execute(new Runnable() {
- public void run() {
- ConcurrencyUtils.put(BooleanCallbackQueue, Either.of(true, (PubSubException) null));
- }
- });
- }
- };
-
- }
-
- @Test(timeout=60000)
- public void testBasics() throws Exception {
-
- ByteString topic1 = ByteString.copyFromUtf8("topic1");
- ByteString sub1 = ByteString.copyFromUtf8("sub1");
-
- //
- // No topics acquired.
- //
- SubscribeRequest subRequest = SubscribeRequest.newBuilder().setSubscriberId(sub1).build();
- MessageSeqId msgId = MessageSeqId.newBuilder().setLocalComponent(100).build();
-
- sm.serveSubscribeRequest(topic1, subRequest, msgId, subDataCallback, null);
-
- Assert.assertEquals(ConcurrencyUtils.take(subDataCallbackQueue).right().getClass(),
- PubSubException.ServerNotResponsibleForTopicException.class);
-
- sm.unsubscribe(topic1, sub1, voidCallback, null);
-
- Assert.assertEquals(ConcurrencyUtils.take(BooleanCallbackQueue).right().getClass(),
- PubSubException.ServerNotResponsibleForTopicException.class);
-
- //
- // Acquire topic.
- //
-
- sm.acquiredTopic(topic1, voidCallback, null);
- Assert.assertTrue(BooleanCallbackQueue.take().left());
-
- Assert.assertTrue(sm.top2sub2seq.containsKey(topic1));
- Assert.assertEquals(0, sm.top2sub2seq.get(topic1).size());
-
- sm.unsubscribe(topic1, sub1, voidCallback, null);
- Assert.assertEquals(ConcurrencyUtils.take(BooleanCallbackQueue).right().getClass(),
- PubSubException.ClientNotSubscribedException.class);
-
- //
- // Try to attach to a subscription.
- subRequest = SubscribeRequest.newBuilder().setCreateOrAttach(CreateOrAttach.ATTACH).setSubscriberId(sub1)
- .build();
-
- sm.serveSubscribeRequest(topic1, subRequest, msgId, subDataCallback, null);
- Assert.assertEquals(ConcurrencyUtils.take(subDataCallbackQueue).right().getClass(),
- PubSubException.ClientNotSubscribedException.class);
-
- // now create
- subRequest = SubscribeRequest.newBuilder().setCreateOrAttach(CreateOrAttach.CREATE).setSubscriberId(sub1)
- .build();
- sm.serveSubscribeRequest(topic1, subRequest, msgId, subDataCallback, null);
- Assert.assertEquals(msgId.getLocalComponent(), ConcurrencyUtils.take(subDataCallbackQueue).left().getState().getMsgId().getLocalComponent());
- Assert.assertEquals(msgId.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getLastConsumeSeqId()
- .getLocalComponent());
-
- // try to create again
- sm.serveSubscribeRequest(topic1, subRequest, msgId, subDataCallback, null);
- Assert.assertEquals(ConcurrencyUtils.take(subDataCallbackQueue).right().getClass(),
- PubSubException.ClientAlreadySubscribedException.class);
- Assert.assertEquals(msgId.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getLastConsumeSeqId()
- .getLocalComponent());
-
- sm.lostTopic(topic1);
- sm.acquiredTopic(topic1, voidCallback, null);
- Assert.assertTrue(BooleanCallbackQueue.take().left());
-
- // try to attach
- subRequest = SubscribeRequest.newBuilder().setCreateOrAttach(CreateOrAttach.ATTACH).setSubscriberId(sub1)
- .build();
- MessageSeqId msgId1 = MessageSeqId.newBuilder().setLocalComponent(msgId.getLocalComponent() + 10).build();
- sm.serveSubscribeRequest(topic1, subRequest, msgId1, subDataCallback, null);
- Assert.assertEquals(msgId.getLocalComponent(), subDataCallbackQueue.take().left().getState().getMsgId().getLocalComponent());
- Assert.assertEquals(msgId.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getLastConsumeSeqId()
- .getLocalComponent());
-
- // now manipulate the consume ptrs
- // dont give it enough to have it persist to ZK
- MessageSeqId msgId2 = MessageSeqId.newBuilder().setLocalComponent(
- msgId.getLocalComponent() + cfg.getConsumeInterval() - 1).build();
- sm.setConsumeSeqIdForSubscriber(topic1, sub1, msgId2, voidCallback, null);
- Assert.assertTrue(BooleanCallbackQueue.take().left());
- Assert.assertEquals(msgId2.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getLastConsumeSeqId()
- .getLocalComponent());
- Assert.assertEquals(msgId.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getSubscriptionState().getMsgId()
- .getLocalComponent());
-
- // give it more so that it will write to ZK
- MessageSeqId msgId3 = MessageSeqId.newBuilder().setLocalComponent(
- msgId.getLocalComponent() + cfg.getConsumeInterval() + 1).build();
- sm.setConsumeSeqIdForSubscriber(topic1, sub1, msgId3, voidCallback, null);
- Assert.assertTrue(BooleanCallbackQueue.take().left());
-
- sm.lostTopic(topic1);
- sm.acquiredTopic(topic1, voidCallback, null);
- Assert.assertTrue(BooleanCallbackQueue.take().left());
-
- Assert.assertEquals(msgId3.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getLastConsumeSeqId()
- .getLocalComponent());
- Assert.assertEquals(msgId3.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getSubscriptionState().getMsgId()
- .getLocalComponent());
-
- // finally unsubscribe
- sm.unsubscribe(topic1, sub1, voidCallback, null);
- Assert.assertTrue(BooleanCallbackQueue.take().left());
-
- sm.lostTopic(topic1);
- sm.acquiredTopic(topic1, voidCallback, null);
- Assert.assertTrue(BooleanCallbackQueue.take().left());
- Assert.assertFalse(sm.top2sub2seq.get(topic1).containsKey(sub1));
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestUpdateSubscriptionState.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestUpdateSubscriptionState.java b/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestUpdateSubscriptionState.java
deleted file mode 100644
index d5569de..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/subscriptions/TestUpdateSubscriptionState.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.subscriptions;
-
-import java.util.concurrent.SynchronousQueue;
-
-import org.apache.hedwig.client.HedwigClient;
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.api.Publisher;
-import org.apache.hedwig.client.api.Subscriber;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
-import org.apache.hedwig.server.HedwigHubTestBase;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.ConcurrencyUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.protobuf.ByteString;
-
-public class TestUpdateSubscriptionState extends HedwigHubTestBase {
-
- private static final int RETENTION_SECS_VALUE = 100;
-
- // Client side variables
- protected HedwigClient client;
- protected Publisher publisher;
- protected Subscriber subscriber;
-
- // SynchronousQueues to verify async calls
- private final SynchronousQueue<Boolean> queue = new SynchronousQueue<Boolean>();
-
- // Test implementation of subscriber's message handler
- class OrderCheckingMessageHandler implements MessageHandler {
-
- ByteString topic;
- ByteString subscriberId;
- int startMsgId;
- int numMsgs;
- int endMsgId;
- boolean inOrder = true;
-
- OrderCheckingMessageHandler(ByteString topic, ByteString subscriberId,
- int startMsgId, int numMsgs) {
- this.topic = topic;
- this.subscriberId = subscriberId;
- this.startMsgId = startMsgId;
- this.numMsgs = numMsgs;
- this.endMsgId = startMsgId + numMsgs - 1;
- }
-
- @Override
- public void deliver(ByteString thisTopic, ByteString thisSubscriberId,
- Message msg, Callback<Void> callback, Object context) {
- if (!topic.equals(thisTopic) ||
- !subscriberId.equals(thisSubscriberId)) {
- return;
- }
- // check order
- int msgId = Integer.parseInt(msg.getBody().toStringUtf8());
- if (logger.isDebugEnabled()) {
- logger.debug("Received message : " + msgId);
- }
-
- if (inOrder) {
- if (startMsgId != msgId) {
- logger.error("Expected message " + startMsgId + ", but received message " + msgId);
- inOrder = false;
- } else {
- ++startMsgId;
- }
- }
- callback.operationFinished(context, null);
- if (msgId == endMsgId) {
- new Thread(new Runnable() {
- @Override
- public void run() {
- if (logger.isDebugEnabled()) {
- logger.debug("Deliver finished!");
- }
- ConcurrencyUtils.put(queue, true);
- }
- }).start();
- }
- }
-
- public boolean isInOrder() {
- return inOrder;
- }
- }
-
- public TestUpdateSubscriptionState() {
- super(1);
- }
-
- protected class NewHubServerConfiguration extends HubServerConfiguration {
-
- public NewHubServerConfiguration(int serverPort, int sslServerPort) {
- super(serverPort, sslServerPort);
- }
-
- @Override
- public int getRetentionSecs() {
- return RETENTION_SECS_VALUE;
- }
-
- }
-
- @Override
- protected ServerConfiguration getServerConfiguration(int serverPort, int sslServerPort) {
- return new NewHubServerConfiguration(serverPort, sslServerPort);
- }
-
- protected class TestClientConfiguration extends HubClientConfiguration {
- @Override
- public boolean isAutoSendConsumeMessageEnabled() {
- return true;
- }
- }
-
- @Override
- @Before
- public void setUp() throws Exception {
- super.setUp();
- client = new HedwigClient(new TestClientConfiguration());
- publisher = client.getPublisher();
- subscriber = client.getSubscriber();
- }
-
- @Override
- @After
- public void tearDown() throws Exception {
- client.close();
- super.tearDown();
- }
-
- @Test(timeout=60000)
- public void testConsumeWhenTopicRelease() throws Exception {
- ByteString topic = ByteString.copyFromUtf8("TestConsumeWhenTopicRelease");
- ByteString subId = ByteString.copyFromUtf8("mysub");
-
- int startMsgId = 0;
- int numMsgs = 10;
- // subscriber in client
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
- subscriber.subscribe(topic, subId, opts);
- // start delivery
- OrderCheckingMessageHandler ocm = new OrderCheckingMessageHandler(
- topic, subId, startMsgId, numMsgs);
- subscriber.startDelivery(topic, subId, ocm);
- for (int i=0; i<numMsgs; i++) {
- Message msg = Message.newBuilder().setBody(
- ByteString.copyFromUtf8(Integer.toString(startMsgId + i))).build();
- publisher.publish(topic, msg);
- }
- logger.info("Publish finished.");
- queue.take();
- logger.info("Deliver finished.");
- // check messages received in order
- assertTrue(ocm.isInOrder());
-
- // wait for retention secs
- Thread.sleep((RETENTION_SECS_VALUE + 2) * 1000);
-
- subscriber.stopDelivery(topic, subId);
- subscriber.closeSubscription(topic, subId);
-
- startMsgId = 20;
- // reconnect it again
- subscriber.subscribe(topic, subId, opts);
- ocm = new OrderCheckingMessageHandler(topic, subId, startMsgId, numMsgs);
- subscriber.startDelivery(topic, subId, ocm);
- for (int i=0; i<numMsgs; i++) {
- Message msg = Message.newBuilder().setBody(
- ByteString.copyFromUtf8(Integer.toString(startMsgId + i))).build();
- publisher.publish(topic, msg);
- }
- queue.take();
- // check messages received in order
- assertTrue(ocm.isInOrder());
- }
-
- @Test(timeout=60000)
- public void testConsumeWhenHubShutdown() throws Exception {
- ByteString topic = ByteString.copyFromUtf8("TestConsumeWhenHubShutdown");
- ByteString subId = ByteString.copyFromUtf8("mysub");
-
- int startMsgId = 0;
- int numMsgs = 10;
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
- // subscriber in client
- subscriber.subscribe(topic, subId, opts);
- // start delivery
- OrderCheckingMessageHandler ocm = new OrderCheckingMessageHandler(
- topic, subId, startMsgId, numMsgs);
- subscriber.startDelivery(topic, subId, ocm);
- for (int i=0; i<numMsgs; i++) {
- Message msg = Message.newBuilder().setBody(
- ByteString.copyFromUtf8(Integer.toString(startMsgId + i))).build();
- publisher.publish(topic, msg);
- }
- logger.info("Publish finished.");
- queue.take();
- logger.info("Deliver finished.");
- // check messages received in order
- assertTrue(ocm.isInOrder());
- // make sure consume request sent to hub server before shut down
- Thread.sleep(2000);
- subscriber.stopDelivery(topic, subId);
- subscriber.closeSubscription(topic, subId);
-
- stopHubServers();
- Thread.sleep(1000);
- startHubServers();
-
- startMsgId = 20;
- // reconnect it again
- subscriber.subscribe(topic, subId, opts);
- ocm = new OrderCheckingMessageHandler(topic, subId, startMsgId, numMsgs);
- subscriber.startDelivery(topic, subId, ocm);
- for (int i=0; i<numMsgs; i++) {
- Message msg = Message.newBuilder().setBody(
- ByteString.copyFromUtf8(Integer.toString(startMsgId + i))).build();
- publisher.publish(topic, msg);
- }
- queue.take();
- // check messages received in order
- assertTrue(ocm.isInOrder());
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/topics/StubTopicManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/topics/StubTopicManager.java b/hedwig-server/src/test/java/org/apache/hedwig/server/topics/StubTopicManager.java
deleted file mode 100644
index b66196e..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/topics/StubTopicManager.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.topics;
-
-import java.net.UnknownHostException;
-import java.util.concurrent.Executors;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.HedwigSocketAddress;
-
-public class StubTopicManager extends TrivialOwnAllTopicManager {
-
- boolean shouldOwnEveryNewTopic = false;
- boolean shouldError = false;
-
- public void setShouldOwnEveryNewTopic(boolean shouldOwnEveryNewTopic) {
- this.shouldOwnEveryNewTopic = shouldOwnEveryNewTopic;
- }
-
- public void setShouldError(boolean shouldError) {
- this.shouldError = shouldError;
- }
-
- public StubTopicManager(ServerConfiguration conf) throws UnknownHostException {
- super(conf, Executors.newSingleThreadScheduledExecutor());
- }
-
- @Override
- protected void realGetOwner(ByteString topic, boolean shouldClaim,
- Callback<HedwigSocketAddress> cb, Object ctx) {
-
- if (shouldError) {
- cb.operationFailed(ctx, new PubSubException.ServiceDownException("Asked to fail"));
- return;
- }
- if (null != topics.getIfPresent(topic) // already own it
- || shouldOwnEveryNewTopic) {
- super.realGetOwner(topic, shouldClaim, cb, ctx);
- return;
- } else {
- // return some other address
- cb.operationFinished(ctx, new HedwigSocketAddress("124.31.0.1:80"));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestConcurrentTopicAcquisition.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestConcurrentTopicAcquisition.java b/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestConcurrentTopicAcquisition.java
deleted file mode 100644
index 04fb451..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestConcurrentTopicAcquisition.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.topics;
-
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.hedwig.client.HedwigClient;
-import org.apache.hedwig.client.api.Publisher;
-import org.apache.hedwig.client.api.Subscriber;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
-import org.apache.hedwig.server.HedwigHubTestBase;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.ConcurrencyUtils;
-import org.junit.Test;
-
-import com.google.protobuf.ByteString;
-
-public class TestConcurrentTopicAcquisition extends HedwigHubTestBase {
-
- // Client variables
- protected HedwigClient client;
- protected Publisher publisher;
- protected Subscriber subscriber;
-
- final LinkedBlockingQueue<ByteString> subscribers =
- new LinkedBlockingQueue<ByteString>();
- final ByteString topic = ByteString.copyFromUtf8("concurrent-topic");
- final int numSubscribers = 300;
- final AtomicInteger numDone = new AtomicInteger(0);
-
- // SynchronousQueues to verify async calls
- private final SynchronousQueue<Boolean> queue = new SynchronousQueue<Boolean>();
-
- class SubCallback implements Callback<Void> {
-
- ByteString subId;
-
- public SubCallback(ByteString subId) {
- this.subId = subId;
- }
-
- @Override
- public void operationFinished(Object ctx,
- Void resultOfOperation) {
- if (logger.isDebugEnabled()) {
- logger.debug("subscriber " + subId.toStringUtf8() + " succeed.");
- }
- int done = numDone.incrementAndGet();
- if (done == numSubscribers) {
- ConcurrencyUtils.put(queue, false);
- }
- }
-
- @Override
- public void operationFailed(Object ctx,
- PubSubException exception) {
- if (logger.isDebugEnabled()) {
- logger.debug("subscriber " + subId.toStringUtf8() + " failed : ", exception);
- }
- ConcurrencyUtils.put(subscribers, subId);
- // ConcurrencyUtils.put(queue, false);
- }
- }
-
- @Override
- public void setUp() throws Exception {
- super.setUp();
- client = new HedwigClient(new HubClientConfiguration());
-
- publisher = client.getPublisher();
- subscriber = client.getSubscriber();
- }
-
- @Override
- public void tearDown() throws Exception {
- // sub.interrupt();
- // sub.join();
-
- client.close();
- super.tearDown();
- }
-
- @Test(timeout=60000)
- public void testTopicAcquistion() throws Exception {
- logger.info("Start concurrent topic acquistion test.");
-
- // let one bookie down to cause not enough bookie exception
- logger.info("Tear down one bookie server.");
- bktb.tearDownOneBookieServer();
-
- // In current implementation, the first several subscriptions will succeed to put topic in topic manager set,
- // because the tear down bookie server's zk node need time to disappear
- // some subscriptions will create ledger successfully, then other subscriptions will fail.
- // the race condition will be: topic manager own topic but persistence manager doesn't
-
- // 300 subscribers subscribe to a same topic
- final AtomicBoolean inRedirectLoop = new AtomicBoolean(false);
- numDone.set(0);
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
- for (int i=0; i<numSubscribers; i++) {
- ByteString subId = ByteString.copyFromUtf8("sub-" + i);
- if (logger.isDebugEnabled()) {
- logger.debug("subscriber " + subId.toStringUtf8() + " subscribes topic " + topic.toStringUtf8());
- }
- subscriber.asyncSubscribe(topic, subId, opts,
- new Callback<Void>() {
-
- private void tick() {
- if (numDone.incrementAndGet() == numSubscribers) {
- ConcurrencyUtils.put(queue, true);
- }
- }
-
- @Override
- public void operationFinished(Object ctx,
- Void resultOfOperation) {
- tick();
- }
-
- @Override
- public void operationFailed(Object ctx,
- PubSubException exception) {
- if (exception instanceof PubSubException.ServiceDownException) {
- String msg = exception.getMessage();
- if (msg.indexOf("ServerRedirectLoopException") > 0) {
- inRedirectLoop.set(true);
- }
- if (logger.isDebugEnabled()) {
- logger.debug("Operation failed : ", exception);
- }
- }
- tick();
- }
-
- },
- null);
- }
-
- queue.take();
-
- // TODO: remove comment after we fix the issue
- // Assert.assertEquals(false, inRedirectLoop.get());
-
- // start a thread to send subscriptions
- numDone.set(0);
- Thread sub = new Thread(new Runnable() {
-
- @Override
- public void run() {
- logger.info("sub thread started");
- try {
- // 100 subscribers subscribe to a same topic
- for (int i=0; i<numSubscribers; i++) {
- ByteString subscriberId = ByteString.copyFromUtf8("sub-" + i);
- subscribers.put(subscriberId);
- }
-
- ByteString subId;
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
- while (true) {
- subId = subscribers.take();
-
- if (logger.isDebugEnabled()) {
- logger.debug("subscriber " + subId.toStringUtf8() + " subscribes topic " + topic.toStringUtf8());
- }
- subscriber.asyncSubscribe(topic, subId, opts, new SubCallback(subId), null);
- }
- // subscriber.asyncSubscribe(topic, subscriberId, mode, callback, context)
- } catch (InterruptedException ie) {
- // break
- logger.warn("Interrupted : ", ie);
- }
- }
-
- });
- sub.start();
- Thread.sleep(2000);
-
- // start a new bookie server
- logger.info("start new bookie server");
- bktb.startUpNewBookieServer();
-
- // hope that all the subscriptions will be OK
- queue.take();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestHubInfo.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestHubInfo.java b/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestHubInfo.java
deleted file mode 100644
index 77c6fad..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestHubInfo.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hedwig.server.topics;
-
-import org.apache.hedwig.util.HedwigSocketAddress;
-
-import org.junit.Test;
-import org.junit.Assert;
-
-public class TestHubInfo {
-
- @Test(timeout=60000)
- public void testParseHubInfo() throws Exception {
- HedwigSocketAddress addr = new HedwigSocketAddress("localhost", 9086, 9087);
- HubInfo hubInfo1 = new HubInfo(addr, 9999);
-
- String strHubInfo1 = hubInfo1.toString();
- HubInfo parsedHubInfo1 = HubInfo.parse(strHubInfo1);
- Assert.assertEquals("Hub infos should be same", hubInfo1, parsedHubInfo1);
-
- HubInfo hubInfo2 = new HubInfo(addr, 0);
- HubInfo parsedHubInfo2 = HubInfo.parse("localhost:9086:9087");
- Assert.assertEquals("Hub infos w/o zxid should be same", hubInfo2, parsedHubInfo2);
-
- // parse empty string
- try {
- HubInfo.parse("");
- Assert.fail("Should throw InvalidHubInfoException parsing empty string.");
- } catch (HubInfo.InvalidHubInfoException ihie) {
- }
-
- // parse corrupted hostname
- try {
- HubInfo.parse("localhost,a,b,c");
- Assert.fail("Should throw InvalidHubInfoException parsing corrupted hostname.");
- } catch (HubInfo.InvalidHubInfoException ihie) {
- }
-
- // parse corrupted string
- try {
- HubInfo.parse("hostname: localhost:9086:9087");
- Assert.fail("Should throw InvalidHubInfoException parsing corrupted string.");
- } catch (HubInfo.InvalidHubInfoException ihie) {
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestHubLoad.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestHubLoad.java b/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestHubLoad.java
deleted file mode 100644
index f14d601..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestHubLoad.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hedwig.server.topics;
-
-import org.junit.Test;
-import org.junit.Assert;
-
-public class TestHubLoad {
-
- @Test(timeout=60000)
- public void testParseHubLoad() throws Exception {
- HubLoad hubLoad1 = new HubLoad(9999);
-
- String strHubLoad1 = hubLoad1.toString();
- HubLoad parsedHubLoad1 = HubLoad.parse(strHubLoad1);
- Assert.assertEquals("Hub load data should be same", hubLoad1, parsedHubLoad1);
-
- final int numTopics = 9998;
- HubLoad hubLoad2 = new HubLoad(numTopics);
- HubLoad parsedHubLoad2 = HubLoad.parse(numTopics + "");
- Assert.assertEquals("Hub load data not protobuf encoded should be same", hubLoad2, parsedHubLoad2);
-
- // parse empty string
- try {
- HubLoad.parse("");
- Assert.fail("Should throw InvalidHubLoadException parsing empty string.");
- } catch (HubLoad.InvalidHubLoadException ihie) {
- }
-
- // parse corrupted numTopics
- try {
- HubLoad.parse("9998_x");
- Assert.fail("Should throw InvalidHubLoadException parsing corrupted hub load data.");
- } catch (HubLoad.InvalidHubLoadException ihie) {
- }
-
- // parse corrupted string
- try {
- HubLoad.parse("hostname: 9998_x");
- Assert.fail("Should throw InvalidHubLoadException parsing corrupted hub load data.");
- } catch (HubLoad.InvalidHubLoadException ihie) {
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestMMTopicManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestMMTopicManager.java b/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestMMTopicManager.java
deleted file mode 100644
index c75ff05..0000000
--- a/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestMMTopicManager.java
+++ /dev/null
@@ -1,354 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.topics;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.SynchronousQueue;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.StubCallback;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.exceptions.PubSubException.CompositeException;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.meta.MetadataManagerFactoryTestCase;
-import org.apache.hedwig.server.meta.TopicOwnershipManager;
-import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.ConcurrencyUtils;
-import org.apache.hedwig.util.Either;
-import org.apache.hedwig.util.HedwigSocketAddress;
-import org.apache.hedwig.util.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TestMMTopicManager extends MetadataManagerFactoryTestCase {
-
- private static final Logger LOG = LoggerFactory.getLogger(TestMMTopicManager.class);
-
- protected MMTopicManager tm;
- protected TopicOwnershipManager tom;
-
- protected class CallbackQueue<T> implements Callback<T> {
- SynchronousQueue<Either<T, Exception>> q = new SynchronousQueue<Either<T, Exception>>();
-
- public SynchronousQueue<Either<T, Exception>> getQueue() {
- return q;
- }
-
- public Either<T, Exception> take() throws InterruptedException {
- return q.take();
- }
-
- @Override
- public void operationFailed(Object ctx, final PubSubException exception) {
- LOG.error("got exception: " + exception);
- new Thread(new Runnable() {
- @Override
- public void run() {
- ConcurrencyUtils.put(q, Either.of((T) null, (Exception) exception));
- }
- }).start();
- }
-
- @Override
- public void operationFinished(Object ctx, final T resultOfOperation) {
- new Thread(new Runnable() {
- @Override
- public void run() {
- ConcurrencyUtils.put(q, Either.of(resultOfOperation, (Exception) null));
- }
- }).start();
- }
- }
-
- protected CallbackQueue<HedwigSocketAddress> addrCbq = new CallbackQueue<HedwigSocketAddress>();
- protected CallbackQueue<ByteString> bsCbq = new CallbackQueue<ByteString>();
- protected CallbackQueue<Void> voidCbq = new CallbackQueue<Void>();
-
- protected ByteString topic = ByteString.copyFromUtf8("topic");
- protected HedwigSocketAddress me;
- protected ScheduledExecutorService scheduler;
-
- public TestMMTopicManager(String metaManagerCls) {
- super(metaManagerCls);
- }
-
- @Override
- @Before
- public void setUp() throws Exception {
- super.setUp();
- me = conf.getServerAddr();
- scheduler = Executors.newSingleThreadScheduledExecutor();
- tom = metadataManagerFactory.newTopicOwnershipManager();
- tm = new MMTopicManager(conf, zk, metadataManagerFactory, scheduler);
- }
-
- @Override
- @After
- public void tearDown() throws Exception {
- tom.close();
- tm.stop();
- super.tearDown();
- }
-
- @Test(timeout=60000)
- public void testGetOwnerSingle() throws Exception {
- tm.getOwner(topic, false, addrCbq, null);
- Assert.assertEquals(me, check(addrCbq.take()));
- }
-
- protected ByteString mkTopic(int i) {
- return ByteString.copyFromUtf8(topic.toStringUtf8() + i);
- }
-
- protected <T> T check(Either<T, Exception> ex) throws Exception {
- if (ex.left() == null)
- throw ex.right();
- else
- return ex.left();
- }
-
- public static class CustomServerConfiguration extends ServerConfiguration {
- int port;
-
- public CustomServerConfiguration(int port) {
- this.port = port;
- }
-
- @Override
- public int getServerPort() {
- return port;
- }
- }
-
- @Test(timeout=60000)
- public void testGetOwnerMulti() throws Exception {
- ServerConfiguration conf1 = new CustomServerConfiguration(conf.getServerPort() + 1),
- conf2 = new CustomServerConfiguration(conf.getServerPort() + 2);
- MMTopicManager tm1 = new MMTopicManager(conf1, zk, metadataManagerFactory, scheduler),
- tm2 = new MMTopicManager(conf2, zk, metadataManagerFactory, scheduler);
-
- tm.getOwner(topic, false, addrCbq, null);
- HedwigSocketAddress owner = check(addrCbq.take());
-
- for (int i = 0; i < 100; ++i) {
- tm.getOwner(topic, false, addrCbq, null);
- Assert.assertEquals(owner, check(addrCbq.take()));
-
- tm1.getOwner(topic, false, addrCbq, null);
- Assert.assertEquals(owner, check(addrCbq.take()));
-
- tm2.getOwner(topic, false, addrCbq, null);
- Assert.assertEquals(owner, check(addrCbq.take()));
- }
-
- for (int i = 0; i < 100; ++i) {
- if (!owner.equals(me))
- break;
- tm.getOwner(mkTopic(i), false, addrCbq, null);
- owner = check(addrCbq.take());
- if (i == 99)
- Assert.fail("Never chose another owner");
- }
-
- tm1.stop();
- tm2.stop();
- }
-
- @Test(timeout=60000)
- public void testLoadBalancing() throws Exception {
- tm.getOwner(topic, false, addrCbq, null);
-
- Assert.assertEquals(me, check(addrCbq.take()));
-
- ServerConfiguration conf1 = new CustomServerConfiguration(conf.getServerPort() + 1);
- TopicManager tm1 = new MMTopicManager(conf1, zk, metadataManagerFactory, scheduler);
-
- ByteString topic1 = mkTopic(1);
- tm.getOwner(topic1, false, addrCbq, null);
- Assert.assertEquals(conf1.getServerAddr(), check(addrCbq.take()));
-
- tm1.stop();
- }
-
- class StubOwnershipChangeListener implements TopicOwnershipChangeListener {
- boolean failure;
- SynchronousQueue<Pair<ByteString, Boolean>> bsQueue;
-
- public StubOwnershipChangeListener(SynchronousQueue<Pair<ByteString, Boolean>> bsQueue) {
- this.bsQueue = bsQueue;
- }
-
- public void setFailure(boolean failure) {
- this.failure = failure;
- }
-
- @Override
- public void lostTopic(final ByteString topic) {
- new Thread(new Runnable() {
- @Override
- public void run() {
- ConcurrencyUtils.put(bsQueue, Pair.of(topic, false));
- }
- }).start();
- }
-
- public void acquiredTopic(final ByteString topic, final Callback<Void> callback, final Object ctx) {
- new Thread(new Runnable() {
- @Override
- public void run() {
- ConcurrencyUtils.put(bsQueue, Pair.of(topic, true));
- if (failure) {
- callback.operationFailed(ctx, new PubSubException.ServiceDownException("Asked to fail"));
- } else {
- callback.operationFinished(ctx, null);
- }
- }
- }).start();
- }
- }
-
- @Test(timeout=60000)
- public void testOwnershipChange() throws Exception {
- SynchronousQueue<Pair<ByteString, Boolean>> bsQueue = new SynchronousQueue<Pair<ByteString, Boolean>>();
-
- StubOwnershipChangeListener listener = new StubOwnershipChangeListener(bsQueue);
-
- tm.addTopicOwnershipChangeListener(listener);
-
- // regular acquire
- tm.getOwner(topic, true, addrCbq, null);
- Pair<ByteString, Boolean> pair = bsQueue.take();
- Assert.assertEquals(topic, pair.first());
- Assert.assertTrue(pair.second());
- Assert.assertEquals(me, check(addrCbq.take()));
- assertOwnershipNodeExists();
-
- // topic that I already own
- tm.getOwner(topic, true, addrCbq, null);
- Assert.assertEquals(me, check(addrCbq.take()));
- Assert.assertTrue(bsQueue.isEmpty());
- assertOwnershipNodeExists();
-
- // regular release
- tm.releaseTopic(topic, cb, null);
- pair = bsQueue.take();
- Assert.assertEquals(topic, pair.first());
- Assert.assertFalse(pair.second());
- Assert.assertTrue(queue.take());
- assertOwnershipNodeDoesntExist();
-
- // releasing topic that I don't own
- tm.releaseTopic(mkTopic(0), cb, null);
- Assert.assertTrue(queue.take());
- Assert.assertTrue(bsQueue.isEmpty());
-
- // set listener to return error
- listener.setFailure(true);
-
- tm.getOwner(topic, true, addrCbq, null);
- pair = bsQueue.take();
- Assert.assertEquals(topic, pair.first());
- Assert.assertTrue(pair.second());
- Assert.assertEquals(PubSubException.ServiceDownException.class, ((CompositeException) addrCbq.take().right())
- .getExceptions().iterator().next().getClass());
- Assert.assertFalse(null != tm.topics.getIfPresent(topic));
- Thread.sleep(100);
- assertOwnershipNodeDoesntExist();
-
- }
-
- public void assertOwnershipNodeExists() throws Exception {
- StubCallback<Versioned<HubInfo>> callback = new StubCallback<Versioned<HubInfo>>();
- tom.readOwnerInfo(topic, callback, null);
- Versioned<HubInfo> hubInfo = callback.queue.take().left();
- Assert.assertEquals(tm.addr, hubInfo.getValue().getAddress());
- }
-
- public void assertOwnershipNodeDoesntExist() throws Exception {
- StubCallback<Versioned<HubInfo>> callback = new StubCallback<Versioned<HubInfo>>();
- tom.readOwnerInfo(topic, callback, null);
- Versioned<HubInfo> hubInfo = callback.queue.take().left();
- Assert.assertEquals(null, hubInfo);
- }
-
- @Test(timeout=60000)
- public void testZKClientDisconnected() throws Exception {
- // First assert ownership of the topic
- tm.getOwner(topic, true, addrCbq, null);
- Assert.assertEquals(me, check(addrCbq.take()));
-
- // Suspend the ZKTopicManager and make sure calls to getOwner error out
- tm.isSuspended = true;
- tm.getOwner(topic, true, addrCbq, null);
- Assert.assertEquals(PubSubException.ServiceDownException.class, addrCbq.take().right().getClass());
- // Release the topic. This should not error out even if suspended.
- tm.releaseTopic(topic, cb, null);
- Assert.assertTrue(queue.take());
- assertOwnershipNodeDoesntExist();
-
- // Restart the ZKTopicManager and make sure calls to getOwner are okay
- tm.isSuspended = false;
- tm.getOwner(topic, true, addrCbq, null);
- Assert.assertEquals(me, check(addrCbq.take()));
- assertOwnershipNodeExists();
- }
-
- @Test(timeout=60000)
- public void testRetentionAfterAccess() throws Exception {
- conf.getConf().setProperty("retention_secs_after_access", "5");
- MMTopicManager tm1 = new MMTopicManager(conf, zk, metadataManagerFactory, scheduler);
- tm1.getOwner(topic, true, addrCbq, null);
- Assert.assertEquals(me, check(addrCbq.take()));
- Thread.sleep(6000L);
- tm1.topics.cleanUp();
- Thread.sleep(2000L);
- assertOwnershipNodeDoesntExist();
- tm1.getOwner(topic, true, addrCbq, null);
- Assert.assertEquals(me, check(addrCbq.take()));
- Thread.sleep(1000L);
- tm1.topics.cleanUp();
- Thread.sleep(2000L);
- assertOwnershipNodeExists();
-
- tm1.stop();
- }
-
- @Test(timeout=60000)
- public void testMaxNumTopics() throws Exception {
- conf.getConf().setProperty("max_num_topics", "1");
- MMTopicManager tm1 = new MMTopicManager(conf, zk, metadataManagerFactory, scheduler);
- tm1.getOwner(topic, true, addrCbq, null);
- Assert.assertEquals(me, check(addrCbq.take()));
- assertOwnershipNodeExists();
- tm1.getOwner(ByteString.copyFromUtf8("MaxNumTopic"),
- true, addrCbq, null);
- Assert.assertEquals(me, check(addrCbq.take()));
- Thread.sleep(2000L);
- assertOwnershipNodeDoesntExist();
- tm1.stop();
- }
-
-
-}