You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ma...@apache.org on 2010/08/19 23:25:22 UTC
svn commit: r987314 [16/16] - in /hadoop/zookeeper/trunk: ./
src/contrib/hedwig/ src/contrib/hedwig/client/
src/contrib/hedwig/client/src/ src/contrib/hedwig/client/src/main/
src/contrib/hedwig/client/src/main/cpp/
src/contrib/hedwig/client/src/main/cp...
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.apache.log4j.Logger;
+import org.junit.Test;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.HelperMethods;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.server.topics.TopicOwnershipChangeListener;
+import org.apache.hedwig.util.Callback;
+
+public abstract class TestPersistenceManagerBlackBox extends TestCase {
+ protected PersistenceManager persistenceManager;
+ protected int NUM_MESSAGES_TO_TEST = 5;
+ protected int NUM_TOPICS_TO_TEST = 5;
+ static Logger logger = Logger.getLogger(TestPersistenceManagerBlackBox.class);
+ TestCallback testCallback = new TestCallback();
+
+ RuntimeException failureException;
+
+ class TestCallback implements Callback<Long> {
+
+ public void operationFailed(Object ctx, PubSubException exception) {
+ throw (failureException = new RuntimeException(exception));
+ }
+
+ @SuppressWarnings("unchecked")
+ public void operationFinished(Object ctx, Long resultOfOperation) {
+ LinkedBlockingQueue<Boolean> statusQueue = (LinkedBlockingQueue<Boolean>) ctx;
+ try {
+ statusQueue.put(true);
+ } catch (InterruptedException e) {
+ throw (failureException = new RuntimeException(e));
+ }
+ }
+ }
+
+ class RangeScanVerifierListener implements ScanCallback {
+ List<Message> pubMsgs;
+
+ public RangeScanVerifierListener(List<Message> pubMsgs) {
+ this.pubMsgs = pubMsgs;
+ }
+
+ public void messageScanned(Object ctx, Message recvMessage) {
+ if (pubMsgs.isEmpty()) {
+ throw (failureException = new RuntimeException("Message received when none expected"));
+ }
+
+ Message pubMsg = pubMsgs.get(0);
+ if (!HelperMethods.areEqual(recvMessage, pubMsg)) {
+ throw (failureException = new RuntimeException("Scanned message not equal to expected"));
+ }
+ pubMsgs.remove(0);
+ }
+
+ public void scanFailed(Object ctx, Exception exception) {
+ throw (failureException = new RuntimeException(exception));
+ }
+
+ @SuppressWarnings("unchecked")
+ public void scanFinished(Object ctx, ReasonForFinish reason) {
+ if (reason != ReasonForFinish.NO_MORE_MESSAGES) {
+ throw (failureException = new RuntimeException("Scan finished prematurely " + reason));
+ }
+ LinkedBlockingQueue<Boolean> statusQueue = (LinkedBlockingQueue<Boolean>) ctx;
+ try {
+ statusQueue.put(true);
+ } catch (InterruptedException e) {
+ throw (failureException = new RuntimeException(e));
+ }
+ }
+
+ }
+
+ class PointScanVerifierListener implements ScanCallback {
+ List<Message> pubMsgs;
+ ByteString topic;
+
+ public PointScanVerifierListener(List<Message> pubMsgs, ByteString topic) {
+ this.topic = topic;
+ this.pubMsgs = pubMsgs;
+ }
+
+ @SuppressWarnings("unchecked")
+ public void messageScanned(Object ctx, Message recvMessage) {
+
+ Message pubMsg = pubMsgs.get(0);
+ if (!HelperMethods.areEqual(recvMessage, pubMsg)) {
+ throw (failureException = new RuntimeException("Scanned message not equal to expected"));
+ }
+ pubMsgs.remove(0);
+
+ if (pubMsgs.isEmpty()) {
+ LinkedBlockingQueue<Boolean> statusQueue = (LinkedBlockingQueue<Boolean>) ctx;
+ try {
+ statusQueue.put(true);
+ } catch (InterruptedException e) {
+ throw (failureException = new RuntimeException(e));
+ }
+ } else {
+ long seqId = recvMessage.getMsgId().getLocalComponent();
+ seqId = persistenceManager.getSeqIdAfterSkipping(topic, seqId, 1);
+ ScanRequest request = new ScanRequest(topic, seqId, new PointScanVerifierListener(pubMsgs, topic), ctx);
+ persistenceManager.scanSingleMessage(request);
+ }
+
+ }
+
+ public void scanFailed(Object ctx, Exception exception) {
+ throw (failureException = new RuntimeException(exception));
+ }
+
+ public void scanFinished(Object ctx, ReasonForFinish reason) {
+
+ }
+
+ }
+
+ class ScanVerifier implements Runnable {
+ List<Message> pubMsgs;
+ ByteString topic;
+ LinkedBlockingQueue<Boolean> statusQueue = new LinkedBlockingQueue<Boolean>();
+
+ public ScanVerifier(ByteString topic, List<Message> pubMsgs) {
+ this.topic = topic;
+ this.pubMsgs = pubMsgs;
+ }
+
+ public void run() {
+ // start the scan
+ try {
+ if (persistenceManager instanceof PersistenceManagerWithRangeScan) {
+
+ ScanCallback listener = new RangeScanVerifierListener(pubMsgs);
+
+ PersistenceManagerWithRangeScan rangePersistenceManager = (PersistenceManagerWithRangeScan) persistenceManager;
+
+ rangePersistenceManager.scanMessages(new RangeScanRequest(topic, getLowestSeqId(),
+ NUM_MESSAGES_TO_TEST + 1, Long.MAX_VALUE, listener, statusQueue));
+
+ } else {
+
+ ScanCallback listener = new PointScanVerifierListener(pubMsgs, topic);
+ persistenceManager
+ .scanSingleMessage(new ScanRequest(topic, getLowestSeqId(), listener, statusQueue));
+
+ }
+ // now listen for it to finish
+ // wait a maximum of a minute
+ Boolean b = statusQueue.poll(60, TimeUnit.SECONDS);
+ if (b == null) {
+ throw (failureException = new RuntimeException("Scanning timed out"));
+ }
+ } catch (InterruptedException e) {
+ throw (failureException = new RuntimeException(e));
+ }
+ }
+ }
+
+ class Publisher implements Runnable {
+ List<Message> pubMsgs;
+ ByteString topic;
+
+ public Publisher(ByteString topic, List<Message> pubMsgs) {
+ this.pubMsgs = pubMsgs;
+ this.topic = topic;
+ }
+
+ public void run() {
+ LinkedBlockingQueue<Boolean> statusQueue = new LinkedBlockingQueue<Boolean>();
+
+ for (Message msg : pubMsgs) {
+
+ try {
+ persistenceManager.persistMessage(new PersistRequest(topic, msg, testCallback, statusQueue));
+ // wait a maximum of a minute
+ Boolean b = statusQueue.poll(60, TimeUnit.SECONDS);
+ if (b == null) {
+ throw (failureException = new RuntimeException("Scanning timed out"));
+ }
+ } catch (InterruptedException e) {
+ throw (failureException = new RuntimeException(e));
+ }
+ }
+ }
+
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ logger.info("STARTING " + getName());
+ persistenceManager = instantiatePersistenceManager();
+ failureException = null;
+ logger.info("Persistence Manager test setup finished");
+ }
+
+ abstract long getLowestSeqId();
+
+ abstract PersistenceManager instantiatePersistenceManager() throws Exception;
+
+ @Override
+ protected void tearDown() throws Exception {
+ logger.info("tearDown starting");
+ super.tearDown();
+ logger.info("FINISHED " + getName());
+ }
+
+ protected ByteString getTopicName(int number) {
+ return ByteString.copyFromUtf8("topic" + number);
+ }
+
+ @Test
+ public void testPersistenceManager() throws Exception {
+ List<Thread> publisherThreads = new LinkedList<Thread>();
+ List<Thread> scannerThreads = new LinkedList<Thread>();
+ Thread thread;
+ Semaphore latch = new Semaphore(1);
+
+ for (int i = 0; i < NUM_TOPICS_TO_TEST; i++) {
+ ByteString topic = getTopicName(i);
+
+ if (persistenceManager instanceof TopicOwnershipChangeListener) {
+
+ TopicOwnershipChangeListener tocl = (TopicOwnershipChangeListener) persistenceManager;
+
+ latch.acquire();
+
+ tocl.acquiredTopic(topic, new Callback<Void>() {
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception) {
+ failureException = new RuntimeException(exception);
+ ((Semaphore) ctx).release();
+ }
+
+ @Override
+ public void operationFinished(Object ctx, Void res) {
+ ((Semaphore) ctx).release();
+ }
+ }, latch);
+
+ latch.acquire();
+ latch.release();
+ if (failureException != null) {
+ throw (Exception) failureException.getCause();
+ }
+ }
+ List<Message> msgs = HelperMethods.getRandomPublishedMessages(NUM_MESSAGES_TO_TEST, 1024);
+
+ thread = new Thread(new Publisher(topic, msgs));
+ publisherThreads.add(thread);
+ thread.start();
+
+ thread = new Thread(new ScanVerifier(topic, msgs));
+ scannerThreads.add(thread);
+ }
+ for (Thread t : publisherThreads) {
+ t.join();
+ }
+
+ for (Thread t : scannerThreads) {
+ t.start();
+ }
+
+ for (Thread t : scannerThreads) {
+ t.join();
+ }
+
+ assertEquals(null, failureException);
+ for (int i = 0; i < NUM_TOPICS_TO_TEST; i++) {
+ assertEquals(persistenceManager.getCurrentSeqIdForTopic(getTopicName(i)).getLocalComponent(),
+ getExpectedSeqId(NUM_MESSAGES_TO_TEST));
+ }
+
+ }
+
+ abstract long getExpectedSeqId(int numPublished);
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheBlackBox.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheBlackBox.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheBlackBox.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheBlackBox.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+import junit.framework.Test;
+import junit.framework.TestSuite;
+
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.persistence.LocalDBPersistenceManager;
+import org.apache.hedwig.server.persistence.PersistenceManager;
+import org.apache.hedwig.server.persistence.ReadAheadCache;
+
+public class TestReadAheadCacheBlackBox extends TestPersistenceManagerBlackBox {
+
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ LocalDBPersistenceManager.instance().reset();
+ }
+
+ @Override
+ long getExpectedSeqId(int numPublished) {
+ return numPublished;
+ }
+
+ @Override
+ long getLowestSeqId() {
+ return 1;
+ }
+
+ @Override
+ PersistenceManager instantiatePersistenceManager() {
+ return new ReadAheadCache(LocalDBPersistenceManager.instance(), new ServerConfiguration()).start();
+ }
+
+ public static Test suite() {
+ return new TestSuite(TestReadAheadCacheBlackBox.class);
+ }
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+import static org.junit.Assert.*;
+
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.HelperMethods;
+import org.apache.hedwig.StubCallback;
+import org.apache.hedwig.StubScanCallback;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.util.ConcurrencyUtils;
+
+public class TestReadAheadCacheWhiteBox {
+ ByteString topic = ByteString.copyFromUtf8("testTopic");
+ final static int NUM_MESSAGES = 10;
+ final static int MSG_SIZE = 50;
+ List<Message> messages = HelperMethods.getRandomPublishedMessages(NUM_MESSAGES, MSG_SIZE);
+ StubPersistenceManager stubPersistenceManager;
+ ReadAheadCache cacheBasedPersistenceManager;
+ MyServerConfiguration myConf = new MyServerConfiguration();
+
+ class MyReadAheadCache extends ReadAheadCache {
+ public MyReadAheadCache(PersistenceManagerWithRangeScan persistenceManger, ServerConfiguration cfg) {
+ super(persistenceManger, cfg);
+ }
+
+ @Override
+ protected void enqueueWithoutFailure(CacheRequest obj) {
+ // make it perform in the same thread
+ obj.performRequest();
+ }
+ }
+
+ class MyServerConfiguration extends ServerConfiguration {
+
+ // Note these are set up, so that the size limit will be reached before
+ // the count limit
+ int readAheadCount = NUM_MESSAGES / 2;
+ long readAheadSize = (long) (MSG_SIZE * 2.5);
+ long maxCacheSize = Long.MAX_VALUE;
+
+ @Override
+ public int getReadAheadCount() {
+ return readAheadCount;
+ }
+
+ @Override
+ public long getReadAheadSizeBytes() {
+ return readAheadSize;
+ }
+
+ @Override
+ public long getMaximumCacheSize() {
+ return maxCacheSize;
+ }
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ stubPersistenceManager = new StubPersistenceManager();
+ cacheBasedPersistenceManager = new MyReadAheadCache(stubPersistenceManager, myConf).start();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+
+ }
+
+ @Test
+ public void testPersistMessage() throws Exception{
+ StubCallback<Long> callback = new StubCallback<Long>();
+ PersistRequest request = new PersistRequest(topic, messages.get(0), callback, null);
+
+ stubPersistenceManager.failure = true;
+ cacheBasedPersistenceManager.persistMessage(request);
+ assertNotNull(ConcurrencyUtils.take(callback.queue).right());
+
+ CacheKey key = new CacheKey(topic, cacheBasedPersistenceManager.getCurrentSeqIdForTopic(topic)
+ .getLocalComponent());
+ assertFalse(cacheBasedPersistenceManager.cache.containsKey(key));
+
+ stubPersistenceManager.failure = false;
+ persistMessage(messages.get(0));
+ }
+
+ private void persistMessage(Message msg) throws Exception{
+ StubCallback<Long> callback = new StubCallback<Long>();
+ PersistRequest request = new PersistRequest(topic, msg, callback, null);
+ cacheBasedPersistenceManager.persistMessage(request);
+ assertNotNull(ConcurrencyUtils.take(callback.queue).left());
+ CacheKey key = new CacheKey(topic, cacheBasedPersistenceManager.getCurrentSeqIdForTopic(topic)
+ .getLocalComponent());
+ CacheValue cacheValue = cacheBasedPersistenceManager.cache.get(key);
+ assertNotNull(cacheValue);
+ assertFalse(cacheValue.isStub());
+ assertTrue(HelperMethods.areEqual(cacheValue.getMessage(), msg));
+
+ }
+
+ @Test
+ public void testScanSingleMessage() throws Exception {
+ StubScanCallback callback = new StubScanCallback();
+ ScanRequest request = new ScanRequest(topic, 1, callback, null);
+ stubPersistenceManager.failure = true;
+
+ cacheBasedPersistenceManager.scanSingleMessage(request);
+ assertTrue(callback.isFailed());
+ assertTrue(0 == cacheBasedPersistenceManager.cache.size());
+
+ stubPersistenceManager.failure = false;
+ cacheBasedPersistenceManager.scanSingleMessage(request);
+ assertTrue(myConf.readAheadCount == cacheBasedPersistenceManager.cache.size());
+
+ persistMessage(messages.get(0));
+ assertTrue(callback.isSuccess());
+
+ }
+
+ @Test
+ public void testDeliveredUntil() throws Exception{
+ for (Message m : messages) {
+ persistMessage(m);
+ }
+ assertEquals((long) NUM_MESSAGES * MSG_SIZE, cacheBasedPersistenceManager.presentCacheSize);
+ long middle = messages.size() / 2;
+ cacheBasedPersistenceManager.deliveredUntil(topic, middle);
+
+ assertEquals(messages.size() - middle, cacheBasedPersistenceManager.cache.size());
+
+ long middle2 = middle - 1;
+ cacheBasedPersistenceManager.deliveredUntil(topic, middle2);
+ // should have no effect
+ assertEquals(messages.size() - middle, cacheBasedPersistenceManager.cache.size());
+
+ // delivered all messages
+ cacheBasedPersistenceManager.deliveredUntil(topic, (long) messages.size());
+ // should have no effect
+ assertTrue(cacheBasedPersistenceManager.cache.isEmpty());
+ assertTrue(cacheBasedPersistenceManager.timeIndexOfAddition.isEmpty());
+ assertTrue(cacheBasedPersistenceManager.orderedIndexOnSeqId.isEmpty());
+ assertTrue(0 == cacheBasedPersistenceManager.presentCacheSize);
+
+ }
+
+ @Test
+ public void testDoReadAhead() {
+ StubScanCallback callback = new StubScanCallback();
+ ScanRequest request = new ScanRequest(topic, 1, callback, null);
+ cacheBasedPersistenceManager.doReadAhead(request);
+
+ assertEquals(myConf.readAheadCount, cacheBasedPersistenceManager.cache.size());
+
+ request = new ScanRequest(topic, myConf.readAheadCount / 2 - 1, callback, null);
+ cacheBasedPersistenceManager.doReadAhead(request);
+ assertEquals(myConf.readAheadCount, cacheBasedPersistenceManager.cache.size());
+
+ request = new ScanRequest(topic, myConf.readAheadCount / 2 + 2, callback, null);
+ cacheBasedPersistenceManager.doReadAhead(request);
+ assertEquals((int) (1.5 * myConf.readAheadCount), cacheBasedPersistenceManager.cache.size());
+
+ }
+
+ @Test
+ public void testReadAheadSizeLimit() throws Exception{
+ for (Message m : messages) {
+ persistMessage(m);
+ }
+ cacheBasedPersistenceManager.cache.clear();
+ StubScanCallback callback = new StubScanCallback();
+ ScanRequest request = new ScanRequest(topic, 1, callback, null);
+ cacheBasedPersistenceManager.scanSingleMessage(request);
+
+ assertTrue(callback.isSuccess());
+ assertEquals((int) Math.ceil(myConf.readAheadSize / (MSG_SIZE + 0.0)), cacheBasedPersistenceManager.cache
+ .size());
+
+ }
+
+ @Test
+ public void testDoReadAheadStartingFrom() throws Exception{
+ persistMessage(messages.get(0));
+ int readAheadCount = 5;
+ int start = 1;
+ RangeScanRequest readAheadRequest = cacheBasedPersistenceManager.doReadAheadStartingFrom(topic, start,
+ readAheadCount);
+ assertNull(readAheadRequest);
+
+ StubScanCallback callback = new StubScanCallback();
+ int end = 100;
+ ScanRequest request = new ScanRequest(topic, end, callback, null);
+ cacheBasedPersistenceManager.doReadAhead(request);
+
+ int pos = 98;
+ readAheadRequest = cacheBasedPersistenceManager.doReadAheadStartingFrom(topic, pos, readAheadCount);
+ assertEquals(readAheadRequest.messageLimit, end - pos);
+
+ end = 200;
+ request = new ScanRequest(topic, end, callback, null);
+ cacheBasedPersistenceManager.doReadAhead(request);
+
+ // too far back
+ pos = 150;
+ readAheadRequest = cacheBasedPersistenceManager.doReadAheadStartingFrom(topic, pos, readAheadCount);
+ assertEquals(readAheadRequest.messageLimit, readAheadCount);
+ }
+
+ @Test
+ public void testAddMessageToCache() {
+ CacheKey key = new CacheKey(topic, 1);
+ cacheBasedPersistenceManager.addMessageToCache(key, messages.get(0), System.currentTimeMillis());
+ assertEquals(1, cacheBasedPersistenceManager.cache.size());
+ assertEquals(MSG_SIZE, cacheBasedPersistenceManager.presentCacheSize);
+ assertEquals(1, cacheBasedPersistenceManager.orderedIndexOnSeqId.get(topic).size());
+ assertTrue(cacheBasedPersistenceManager.orderedIndexOnSeqId.get(topic).contains(1L));
+
+ CacheValue value = cacheBasedPersistenceManager.cache.get(key);
+ assertTrue(cacheBasedPersistenceManager.timeIndexOfAddition.get(value.timeOfAddition).contains(key));
+ }
+
+ @Test
+ public void testRemoveMessageFromCache() {
+ CacheKey key = new CacheKey(topic, 1);
+ cacheBasedPersistenceManager.addMessageToCache(key, messages.get(0), System.currentTimeMillis());
+ cacheBasedPersistenceManager.removeMessageFromCache(key, new Exception(), true, true);
+ assertTrue(cacheBasedPersistenceManager.cache.isEmpty());
+ assertTrue(cacheBasedPersistenceManager.orderedIndexOnSeqId.isEmpty());
+ assertTrue(cacheBasedPersistenceManager.timeIndexOfAddition.isEmpty());
+ }
+
+ @Test
+ public void testCollectOldCacheEntries() {
+ int i = 1;
+ for (Message m : messages) {
+ CacheKey key = new CacheKey(topic, i);
+ cacheBasedPersistenceManager.addMessageToCache(key, m, i);
+ i++;
+ }
+
+ int n = 2;
+ myConf.maxCacheSize = n * MSG_SIZE;
+ cacheBasedPersistenceManager.collectOldCacheEntries();
+ assertEquals(n, cacheBasedPersistenceManager.cache.size());
+ assertEquals(n, cacheBasedPersistenceManager.timeIndexOfAddition.size());
+ }
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/subscriptions/StubSubscriptionManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/subscriptions/StubSubscriptionManager.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/subscriptions/StubSubscriptionManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/subscriptions/StubSubscriptionManager.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.subscriptions;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.persistence.PersistenceManager;
+import org.apache.hedwig.server.topics.TopicManager;
+import org.apache.hedwig.util.Callback;
+
+public class StubSubscriptionManager extends InMemorySubscriptionManager {
+ boolean fail = false;
+
+ public void setFail(boolean fail) {
+ this.fail = fail;
+ }
+
+ public StubSubscriptionManager(TopicManager tm, PersistenceManager pm, ServerConfiguration conf, ScheduledExecutorService scheduler) {
+ super(tm, pm, conf, scheduler);
+ }
+
+ @Override
+ public void serveSubscribeRequest(ByteString topic, SubscribeRequest subRequest, MessageSeqId consumeSeqId,
+ Callback<MessageSeqId> callback, Object ctx) {
+ if (fail) {
+ callback.operationFailed(ctx, new PubSubException.ServiceDownException("Asked to fail"));
+ return;
+ }
+ super.serveSubscribeRequest(topic, subRequest, consumeSeqId, callback, ctx);
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/subscriptions/TestZkSubscriptionManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/subscriptions/TestZkSubscriptionManager.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/subscriptions/TestZkSubscriptionManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/subscriptions/TestZkSubscriptionManager.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.subscriptions;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.SynchronousQueue;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager;
+import org.apache.hedwig.util.ConcurrencyUtils;
+import org.apache.hedwig.util.Either;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.zookeeper.ZooKeeperTestBase;
+
+public class TestZkSubscriptionManager extends ZooKeeperTestBase {
+ ZkSubscriptionManager sm;
+ ServerConfiguration cfg = new ServerConfiguration();
+ SynchronousQueue<Either<MessageSeqId, PubSubException>> msgIdCallbackQueue = new SynchronousQueue<Either<MessageSeqId, PubSubException>>();
+ SynchronousQueue<Either<Boolean, PubSubException>> BooleanCallbackQueue = new SynchronousQueue<Either<Boolean, PubSubException>>();
+
+ Callback<Void> voidCallback;
+ Callback<MessageSeqId> msgIdCallback;
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ cfg = new ServerConfiguration();
+ final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+ sm = new ZkSubscriptionManager(zk, new TrivialOwnAllTopicManager(cfg, scheduler), null, cfg, scheduler);
+ msgIdCallback = new Callback<MessageSeqId>() {
+ @Override
+ public void operationFailed(Object ctx, final PubSubException exception) {
+ scheduler.execute(new Runnable() {
+ public void run() {
+ ConcurrencyUtils.put(msgIdCallbackQueue, Either.of((MessageSeqId) null, exception));
+ }
+ });
+ }
+
+ @Override
+ public void operationFinished(Object ctx, final MessageSeqId resultOfOperation) {
+ scheduler.execute(new Runnable() {
+ public void run() {
+ ConcurrencyUtils.put(msgIdCallbackQueue, Either.of(resultOfOperation, (PubSubException) null));
+ }
+ });
+ }
+ };
+
+ voidCallback = new Callback<Void>() {
+ @Override
+ public void operationFailed(Object ctx, final PubSubException exception) {
+ scheduler.execute(new Runnable() {
+ public void run() {
+ ConcurrencyUtils.put(BooleanCallbackQueue, Either.of((Boolean) null, exception));
+ }
+ });
+ }
+
+ @Override
+ public void operationFinished(Object ctx, Void resultOfOperation) {
+ scheduler.execute(new Runnable() {
+ public void run() {
+ ConcurrencyUtils.put(BooleanCallbackQueue, Either.of(true, (PubSubException) null));
+ }
+ });
+ }
+ };
+
+ }
+
+ @Test
+ public void testBasics() throws Exception {
+
+ ByteString topic1 = ByteString.copyFromUtf8("topic1");
+ ByteString sub1 = ByteString.copyFromUtf8("sub1");
+
+ //
+ // No topics acquired.
+ //
+ SubscribeRequest subRequest = SubscribeRequest.newBuilder().setSubscriberId(sub1).build();
+ MessageSeqId msgId = MessageSeqId.newBuilder().setLocalComponent(100).build();
+
+ sm.serveSubscribeRequest(topic1, subRequest, msgId, msgIdCallback, null);
+
+ Assert.assertEquals(ConcurrencyUtils.take(msgIdCallbackQueue).right().getClass(),
+ PubSubException.ServerNotResponsibleForTopicException.class);
+
+ sm.unsubscribe(topic1, sub1, voidCallback, null);
+
+ Assert.assertEquals(ConcurrencyUtils.take(BooleanCallbackQueue).right().getClass(),
+ PubSubException.ServerNotResponsibleForTopicException.class);
+
+ //
+ // Acquire topic.
+ //
+
+ sm.acquiredTopic(topic1, voidCallback, null);
+ Assert.assertTrue(BooleanCallbackQueue.take().left());
+
+ Assert.assertTrue(sm.top2sub2seq.containsKey(topic1));
+ Assert.assertEquals(0, sm.top2sub2seq.get(topic1).size());
+
+ sm.unsubscribe(topic1, sub1, voidCallback, null);
+ Assert.assertEquals(ConcurrencyUtils.take(BooleanCallbackQueue).right().getClass(),
+ PubSubException.ClientNotSubscribedException.class);
+
+ //
+ // Try to attach to a subscription.
+ subRequest = SubscribeRequest.newBuilder().setCreateOrAttach(CreateOrAttach.ATTACH).setSubscriberId(sub1)
+ .build();
+
+ sm.serveSubscribeRequest(topic1, subRequest, msgId, msgIdCallback, null);
+ Assert.assertEquals(ConcurrencyUtils.take(msgIdCallbackQueue).right().getClass(),
+ PubSubException.ClientNotSubscribedException.class);
+
+ // now create
+ subRequest = SubscribeRequest.newBuilder().setCreateOrAttach(CreateOrAttach.CREATE).setSubscriberId(sub1)
+ .build();
+ sm.serveSubscribeRequest(topic1, subRequest, msgId, msgIdCallback, null);
+ Assert.assertEquals(msgId.getLocalComponent(), ConcurrencyUtils.take(msgIdCallbackQueue).left().getLocalComponent());
+ Assert.assertEquals(msgId.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getLastConsumeSeqId()
+ .getLocalComponent());
+
+ // try to create again
+ sm.serveSubscribeRequest(topic1, subRequest, msgId, msgIdCallback, null);
+ Assert.assertEquals(ConcurrencyUtils.take(msgIdCallbackQueue).right().getClass(),
+ PubSubException.ClientAlreadySubscribedException.class);
+ Assert.assertEquals(msgId.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getLastConsumeSeqId()
+ .getLocalComponent());
+
+ sm.lostTopic(topic1);
+ sm.acquiredTopic(topic1, voidCallback, null);
+ Assert.assertTrue(BooleanCallbackQueue.take().left());
+
+ // try to attach
+ subRequest = SubscribeRequest.newBuilder().setCreateOrAttach(CreateOrAttach.ATTACH).setSubscriberId(sub1)
+ .build();
+ MessageSeqId msgId1 = MessageSeqId.newBuilder().setLocalComponent(msgId.getLocalComponent() + 10).build();
+ sm.serveSubscribeRequest(topic1, subRequest, msgId1, msgIdCallback, null);
+ Assert.assertEquals(msgId.getLocalComponent(), msgIdCallbackQueue.take().left().getLocalComponent());
+ Assert.assertEquals(msgId.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getLastConsumeSeqId()
+ .getLocalComponent());
+
+ // now manipulate the consume ptrs
+ // dont give it enough to have it persist to ZK
+ MessageSeqId msgId2 = MessageSeqId.newBuilder().setLocalComponent(
+ msgId.getLocalComponent() + cfg.getConsumeInterval() - 1).build();
+ sm.setConsumeSeqIdForSubscriber(topic1, sub1, msgId2, voidCallback, null);
+ Assert.assertTrue(BooleanCallbackQueue.take().left());
+ Assert.assertEquals(msgId2.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getLastConsumeSeqId()
+ .getLocalComponent());
+ Assert.assertEquals(msgId.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getSubscriptionState().getMsgId()
+ .getLocalComponent());
+
+ // give it more so that it will write to ZK
+ MessageSeqId msgId3 = MessageSeqId.newBuilder().setLocalComponent(
+ msgId.getLocalComponent() + cfg.getConsumeInterval() + 1).build();
+ sm.setConsumeSeqIdForSubscriber(topic1, sub1, msgId3, voidCallback, null);
+ Assert.assertTrue(BooleanCallbackQueue.take().left());
+
+ sm.lostTopic(topic1);
+ sm.acquiredTopic(topic1, voidCallback, null);
+ Assert.assertTrue(BooleanCallbackQueue.take().left());
+
+ Assert.assertEquals(msgId3.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getLastConsumeSeqId()
+ .getLocalComponent());
+ Assert.assertEquals(msgId3.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getSubscriptionState().getMsgId()
+ .getLocalComponent());
+
+ // finally unsubscribe
+ sm.unsubscribe(topic1, sub1, voidCallback, null);
+ Assert.assertTrue(BooleanCallbackQueue.take().left());
+
+ sm.lostTopic(topic1);
+ sm.acquiredTopic(topic1, voidCallback, null);
+ Assert.assertTrue(BooleanCallbackQueue.take().left());
+ Assert.assertFalse(sm.top2sub2seq.get(topic1).containsKey(sub1));
+
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/topics/StubTopicManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/topics/StubTopicManager.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/topics/StubTopicManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/topics/StubTopicManager.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.topics;
+
+import java.net.UnknownHostException;
+import java.util.concurrent.Executors;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.HedwigSocketAddress;
+
+public class StubTopicManager extends TrivialOwnAllTopicManager {
+
+ boolean shouldOwnEveryNewTopic = false;
+ boolean shouldError = false;
+
+ public void setShouldOwnEveryNewTopic(boolean shouldOwnEveryNewTopic) {
+ this.shouldOwnEveryNewTopic = shouldOwnEveryNewTopic;
+ }
+
+ public void setShouldError(boolean shouldError) {
+ this.shouldError = shouldError;
+ }
+
+ public StubTopicManager(ServerConfiguration conf) throws UnknownHostException {
+ super(conf, Executors.newSingleThreadScheduledExecutor());
+ }
+
+ @Override
+ protected void realGetOwner(ByteString topic, boolean shouldClaim,
+ Callback<HedwigSocketAddress> cb, Object ctx) {
+
+ if (shouldError) {
+ cb.operationFailed(ctx, new PubSubException.ServiceDownException("Asked to fail"));
+ return;
+ }
+ if (topics.contains(topic) // already own it
+ || shouldOwnEveryNewTopic) {
+ super.realGetOwner(topic, shouldClaim, cb, ctx);
+ return;
+ } else {
+ // return some other address
+ cb.operationFinished(ctx, new HedwigSocketAddress("124.31.0.1:80"));
+ }
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.topics;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.SynchronousQueue;
+
+import org.apache.zookeeper.KeeperException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.CompositeException;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.ConcurrencyUtils;
+import org.apache.hedwig.util.Either;
+import org.apache.hedwig.util.HedwigSocketAddress;
+import org.apache.hedwig.util.Pair;
+import org.apache.hedwig.zookeeper.ZooKeeperTestBase;
+
+public class TestZkTopicManager extends ZooKeeperTestBase {
+
+ protected ZkTopicManager tm;
+
+ protected class CallbackQueue<T> implements Callback<T> {
+ SynchronousQueue<Either<T, Exception>> q = new SynchronousQueue<Either<T, Exception>>();
+
+ public SynchronousQueue<Either<T, Exception>> getQueue() {
+ return q;
+ }
+
+ public Either<T, Exception> take() throws InterruptedException {
+ return q.take();
+ }
+
+ @Override
+ public void operationFailed(Object ctx, final PubSubException exception) {
+ LOG.error("got exception: " + exception);
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ ConcurrencyUtils.put(q, Either.of((T) null, (Exception) exception));
+ }
+ }).start();
+ }
+
+ @Override
+ public void operationFinished(Object ctx, final T resultOfOperation) {
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ ConcurrencyUtils.put(q, Either.of(resultOfOperation, (Exception) null));
+ }
+ }).start();
+ }
+ }
+
+ protected CallbackQueue<HedwigSocketAddress> addrCbq = new CallbackQueue<HedwigSocketAddress>();
+ protected CallbackQueue<ByteString> bsCbq = new CallbackQueue<ByteString>();
+ protected CallbackQueue<Void> voidCbq = new CallbackQueue<Void>();
+
+ protected ByteString topic = ByteString.copyFromUtf8("topic");
+ protected ServerConfiguration cfg;
+ protected HedwigSocketAddress me;
+ protected ScheduledExecutorService scheduler;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ cfg = new ServerConfiguration();
+ me = cfg.getServerAddr();
+ scheduler = Executors.newSingleThreadScheduledExecutor();
+ tm = new ZkTopicManager(zk, cfg, scheduler);
+ }
+
+ @Test
+ public void testGetOwnerSingle() throws Exception {
+ tm.getOwner(topic, false, addrCbq, null);
+ Assert.assertEquals(me, check(addrCbq.take()));
+ }
+
+ protected ByteString mkTopic(int i) {
+ return ByteString.copyFromUtf8(topic.toStringUtf8() + i);
+ }
+
+ protected <T> T check(Either<T, Exception> ex) throws Exception {
+ if (ex.left() == null)
+ throw ex.right();
+ else
+ return ex.left();
+ }
+
+ public static class CustomServerConfiguration extends ServerConfiguration {
+ int port;
+
+ public CustomServerConfiguration(int port) {
+ this.port = port;
+ }
+
+ @Override
+ public int getServerPort() {
+ return port;
+ }
+ }
+
+ @Test
+ public void testGetOwnerMulti() throws Exception {
+ ServerConfiguration cfg1 = new CustomServerConfiguration(cfg.getServerPort() + 1), cfg2 = new CustomServerConfiguration(
+ cfg.getServerPort() + 2);
+ // TODO change cfg1 cfg2 params
+ ZkTopicManager tm1 = new ZkTopicManager(zk, cfg1, scheduler), tm2 = new ZkTopicManager(zk, cfg2, scheduler);
+
+ tm.getOwner(topic, false, addrCbq, null);
+ HedwigSocketAddress owner = check(addrCbq.take());
+
+ // If we were told to have another person claim the topic, make them
+ // claim the topic.
+ if (owner.getPort() == cfg1.getServerPort())
+ tm1.getOwner(topic, true, addrCbq, null);
+ else if (owner.getPort() == cfg2.getServerPort())
+ tm2.getOwner(topic, true, addrCbq, null);
+ if (owner.getPort() != cfg.getServerPort())
+ Assert.assertEquals(owner, check(addrCbq.take()));
+
+ for (int i = 0; i < 100; ++i) {
+ tm.getOwner(topic, false, addrCbq, null);
+ Assert.assertEquals(owner, check(addrCbq.take()));
+
+ tm1.getOwner(topic, false, addrCbq, null);
+ Assert.assertEquals(owner, check(addrCbq.take()));
+
+ tm2.getOwner(topic, false, addrCbq, null);
+ Assert.assertEquals(owner, check(addrCbq.take()));
+ }
+
+ // Give us 100 chances to choose another owner if not shouldClaim.
+ for (int i = 0; i < 100; ++i) {
+ if (!owner.equals(me))
+ break;
+ tm.getOwner(mkTopic(i), false, addrCbq, null);
+ owner = check(addrCbq.take());
+ if (i == 99)
+ Assert.fail("Never chose another owner");
+ }
+
+ // Make sure we always choose ourselves if shouldClaim.
+ for (int i = 0; i < 100; ++i) {
+ tm.getOwner(mkTopic(100), true, addrCbq, null);
+ Assert.assertEquals(me, check(addrCbq.take()));
+ }
+ }
+
+ @Test
+ public void testLoadBalancing() throws Exception {
+ tm.getOwner(topic, false, addrCbq, null);
+
+ Assert.assertEquals(me, check(addrCbq.take()));
+
+ ServerConfiguration cfg1 = new CustomServerConfiguration(cfg.getServerPort() + 1);
+ new ZkTopicManager(zk, cfg1, scheduler);
+
+ ByteString topic1 = mkTopic(1);
+ tm.getOwner(topic1, false, addrCbq, null);
+ Assert.assertEquals(cfg1.getServerAddr(), check(addrCbq.take()));
+
+ }
+
+ class StubOwnershipChangeListener implements TopicOwnershipChangeListener {
+ boolean failure;
+ SynchronousQueue<Pair<ByteString, Boolean>> bsQueue;
+
+ public StubOwnershipChangeListener(SynchronousQueue<Pair<ByteString, Boolean>> bsQueue) {
+ this.bsQueue = bsQueue;
+ }
+
+ public void setFailure(boolean failure) {
+ this.failure = failure;
+ }
+
+ @Override
+ public void lostTopic(final ByteString topic) {
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ ConcurrencyUtils.put(bsQueue, Pair.of(topic, false));
+ }
+ }).start();
+ }
+
+ public void acquiredTopic(final ByteString topic, final Callback<Void> callback, final Object ctx) {
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ ConcurrencyUtils.put(bsQueue, Pair.of(topic, true));
+ if (failure) {
+ callback.operationFailed(ctx, new PubSubException.ServiceDownException("Asked to fail"));
+ } else {
+ callback.operationFinished(ctx, null);
+ }
+ }
+ }).start();
+ }
+ }
+
+ @Test
+ public void testOwnershipChange() throws Exception {
+ SynchronousQueue<Pair<ByteString, Boolean>> bsQueue = new SynchronousQueue<Pair<ByteString, Boolean>>();
+
+ StubOwnershipChangeListener listener = new StubOwnershipChangeListener(bsQueue);
+
+ tm.addTopicOwnershipChangeListener(listener);
+
+ // regular acquire
+ tm.getOwner(topic, true, addrCbq, null);
+ Pair<ByteString, Boolean> pair = bsQueue.take();
+ Assert.assertEquals(topic, pair.first());
+ Assert.assertTrue(pair.second());
+ Assert.assertEquals(me, check(addrCbq.take()));
+ assertOwnershipNodeExists();
+
+ // topic that I already own
+ tm.getOwner(topic, true, addrCbq, null);
+ Assert.assertEquals(me, check(addrCbq.take()));
+ Assert.assertTrue(bsQueue.isEmpty());
+ assertOwnershipNodeExists();
+
+ // regular release
+ tm.releaseTopic(topic, cb, null);
+ pair = bsQueue.take();
+ Assert.assertEquals(topic, pair.first());
+ Assert.assertFalse(pair.second());
+ Assert.assertTrue(queue.take());
+ assertOwnershipNodeDoesntExist();
+
+ // releasing topic that I don't own
+ tm.releaseTopic(mkTopic(0), cb, null);
+ Assert.assertTrue(queue.take());
+ Assert.assertTrue(bsQueue.isEmpty());
+
+ // set listener to return error
+ listener.setFailure(true);
+
+ tm.getOwner(topic, true, addrCbq, null);
+ pair = bsQueue.take();
+ Assert.assertEquals(topic, pair.first());
+ Assert.assertTrue(pair.second());
+ Assert.assertEquals(PubSubException.ServiceDownException.class, ((CompositeException) addrCbq.take().right())
+ .getExceptions().iterator().next().getClass());
+ Assert.assertFalse(tm.topics.contains(topic));
+ Thread.sleep(100);
+ assertOwnershipNodeDoesntExist();
+
+ }
+
+ public void assertOwnershipNodeExists() throws Exception {
+ byte[] data = zk.getData(tm.hubPath(topic), false, null);
+ Assert.assertEquals(new HedwigSocketAddress(new String(data)), tm.addr);
+ }
+
+ public void assertOwnershipNodeDoesntExist() throws Exception {
+ try {
+ zk.getData(tm.hubPath(topic), false, null);
+ Assert.assertTrue(false);
+ } catch (KeeperException e) {
+ Assert.assertEquals(e.code(), KeeperException.Code.NONODE);
+ }
+ }
+
+ @Test
+ public void testZKClientDisconnected() throws Exception {
+ // First assert ownership of the topic
+ tm.getOwner(topic, true, addrCbq, null);
+ Assert.assertEquals(me, check(addrCbq.take()));
+
+ // Suspend the ZKTopicManager and make sure calls to getOwner error out
+ tm.isSuspended = true;
+ tm.getOwner(topic, true, addrCbq, null);
+ Assert.assertEquals(PubSubException.ServiceDownException.class, addrCbq.take().right().getClass());
+ // Release the topic. This should not error out even if suspended.
+ tm.releaseTopic(topic, cb, null);
+ Assert.assertTrue(queue.take());
+ assertOwnershipNodeDoesntExist();
+
+ // Restart the ZKTopicManager and make sure calls to getOwner are okay
+ tm.isSuspended = false;
+ tm.getOwner(topic, true, addrCbq, null);
+ Assert.assertEquals(me, check(addrCbq.take()));
+ assertOwnershipNodeExists();
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/zookeeper/TestZkUtils.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/zookeeper/TestZkUtils.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/zookeeper/TestZkUtils.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/zookeeper/TestZkUtils.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.zookeeper;
+
+import java.util.Arrays;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestZkUtils extends ZooKeeperTestBase {
+
+ @Test
+ public void testCreateFullPathOptimistic() throws Exception {
+ testPath("/a/b/c", CreateMode.EPHEMERAL);
+
+ testPath("/b/c/d", CreateMode.PERSISTENT);
+
+ testPath("/b/c/d/e", CreateMode.PERSISTENT);
+
+ }
+
+ void testPath(String path, CreateMode mode) throws Exception {
+ byte[] data = new byte[] { 77 };
+ ZkUtils.createFullPathOptimistic(zk, path, data, Ids.OPEN_ACL_UNSAFE, mode, strCb, null);
+ Assert.assertTrue(queue.take());
+ Assert.assertTrue(Arrays.equals(data, zk.getData(path, false, null)));
+
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/zookeeper/ZooKeeperTestBase.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/zookeeper/ZooKeeperTestBase.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/zookeeper/ZooKeeperTestBase.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/zookeeper/ZooKeeperTestBase.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.zookeeper;
+
+import java.util.concurrent.SynchronousQueue;
+
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.After;
+import org.junit.Before;
+
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.util.ConcurrencyUtils;
+import org.apache.hedwig.util.Callback;
+
+/**
+ * This is a base class for any tests that need a ZooKeeper client/server setup.
+ *
+ */
+public abstract class ZooKeeperTestBase extends ClientBase {
+
+ protected ZooKeeper zk;
+
+ protected SynchronousQueue<Boolean> queue = new SynchronousQueue<Boolean>();
+
+ protected Callback<Void> cb = new Callback<Void>() {
+
+ @Override
+ public void operationFinished(Object ctx, Void result) {
+ new Thread(new Runnable() {
+ public void run() {
+ ConcurrencyUtils.put(queue, true);
+ }
+ }).start();
+ }
+
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception) {
+ new Thread(new Runnable() {
+ public void run() {
+ ConcurrencyUtils.put(queue, false);
+ }
+ }).start();
+ }
+ };
+
+ protected AsyncCallback.StringCallback strCb = new AsyncCallback.StringCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx, String name) {
+ ConcurrencyUtils.put(queue, rc == Code.OK.intValue());
+ }
+ };
+
+ protected AsyncCallback.VoidCallback voidCb = new AsyncCallback.VoidCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx) {
+ ConcurrencyUtils.put(queue, rc == Code.OK.intValue());
+ }
+ };
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ zk = createClient();
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ super.tearDown();
+ zk.close();
+ }
+
+}