You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ma...@apache.org on 2010/08/19 23:25:22 UTC

svn commit: r987314 [16/16] - in /hadoop/zookeeper/trunk: ./ src/contrib/hedwig/ src/contrib/hedwig/client/ src/contrib/hedwig/client/src/ src/contrib/hedwig/client/src/main/ src/contrib/hedwig/client/src/main/cpp/ src/contrib/hedwig/client/src/main/cp...

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/TestPersistenceManagerBlackBox.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.apache.log4j.Logger;
+import org.junit.Test;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.HelperMethods;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.server.topics.TopicOwnershipChangeListener;
+import org.apache.hedwig.util.Callback;
+
+public abstract class TestPersistenceManagerBlackBox extends TestCase {
+    protected PersistenceManager persistenceManager;
+    protected int NUM_MESSAGES_TO_TEST = 5;
+    protected int NUM_TOPICS_TO_TEST = 5;
+    static Logger logger = Logger.getLogger(TestPersistenceManagerBlackBox.class);
+    TestCallback testCallback = new TestCallback();
+
+    RuntimeException failureException;
+
+    class TestCallback implements Callback<Long> {
+
+        public void operationFailed(Object ctx, PubSubException exception) {
+            throw (failureException = new RuntimeException(exception));
+        }
+
+        @SuppressWarnings("unchecked")
+        public void operationFinished(Object ctx, Long resultOfOperation) {
+            LinkedBlockingQueue<Boolean> statusQueue = (LinkedBlockingQueue<Boolean>) ctx;
+            try {
+                statusQueue.put(true);
+            } catch (InterruptedException e) {
+                throw (failureException = new RuntimeException(e));
+            }
+        }
+    }
+
+    class RangeScanVerifierListener implements ScanCallback {
+        List<Message> pubMsgs;
+
+        public RangeScanVerifierListener(List<Message> pubMsgs) {
+            this.pubMsgs = pubMsgs;
+        }
+
+        public void messageScanned(Object ctx, Message recvMessage) {
+            if (pubMsgs.isEmpty()) {
+                throw (failureException = new RuntimeException("Message received when none expected"));
+            }
+
+            Message pubMsg = pubMsgs.get(0);
+            if (!HelperMethods.areEqual(recvMessage, pubMsg)) {
+                throw (failureException = new RuntimeException("Scanned message not equal to expected"));
+            }
+            pubMsgs.remove(0);
+        }
+
+        public void scanFailed(Object ctx, Exception exception) {
+            throw (failureException = new RuntimeException(exception));
+        }
+
+        @SuppressWarnings("unchecked")
+        public void scanFinished(Object ctx, ReasonForFinish reason) {
+            if (reason != ReasonForFinish.NO_MORE_MESSAGES) {
+                throw (failureException = new RuntimeException("Scan finished prematurely " + reason));
+            }
+            LinkedBlockingQueue<Boolean> statusQueue = (LinkedBlockingQueue<Boolean>) ctx;
+            try {
+                statusQueue.put(true);
+            } catch (InterruptedException e) {
+                throw (failureException = new RuntimeException(e));
+            }
+        }
+
+    }
+
+    class PointScanVerifierListener implements ScanCallback {
+        List<Message> pubMsgs;
+        ByteString topic;
+
+        public PointScanVerifierListener(List<Message> pubMsgs, ByteString topic) {
+            this.topic = topic;
+            this.pubMsgs = pubMsgs;
+        }
+
+        @SuppressWarnings("unchecked")
+        public void messageScanned(Object ctx, Message recvMessage) {
+
+            Message pubMsg = pubMsgs.get(0);
+            if (!HelperMethods.areEqual(recvMessage, pubMsg)) {
+                throw (failureException = new RuntimeException("Scanned message not equal to expected"));
+            }
+            pubMsgs.remove(0);
+
+            if (pubMsgs.isEmpty()) {
+                LinkedBlockingQueue<Boolean> statusQueue = (LinkedBlockingQueue<Boolean>) ctx;
+                try {
+                    statusQueue.put(true);
+                } catch (InterruptedException e) {
+                    throw (failureException = new RuntimeException(e));
+                }
+            } else {
+                long seqId = recvMessage.getMsgId().getLocalComponent();
+                seqId = persistenceManager.getSeqIdAfterSkipping(topic, seqId, 1);
+                ScanRequest request = new ScanRequest(topic, seqId, new PointScanVerifierListener(pubMsgs, topic), ctx);
+                persistenceManager.scanSingleMessage(request);
+            }
+
+        }
+
+        public void scanFailed(Object ctx, Exception exception) {
+            throw (failureException = new RuntimeException(exception));
+        }
+
+        public void scanFinished(Object ctx, ReasonForFinish reason) {
+
+        }
+
+    }
+
+    class ScanVerifier implements Runnable {
+        List<Message> pubMsgs;
+        ByteString topic;
+        LinkedBlockingQueue<Boolean> statusQueue = new LinkedBlockingQueue<Boolean>();
+
+        public ScanVerifier(ByteString topic, List<Message> pubMsgs) {
+            this.topic = topic;
+            this.pubMsgs = pubMsgs;
+        }
+
+        public void run() {
+            // start the scan
+            try {
+                if (persistenceManager instanceof PersistenceManagerWithRangeScan) {
+
+                    ScanCallback listener = new RangeScanVerifierListener(pubMsgs);
+
+                    PersistenceManagerWithRangeScan rangePersistenceManager = (PersistenceManagerWithRangeScan) persistenceManager;
+
+                    rangePersistenceManager.scanMessages(new RangeScanRequest(topic, getLowestSeqId(),
+                            NUM_MESSAGES_TO_TEST + 1, Long.MAX_VALUE, listener, statusQueue));
+
+                } else {
+
+                    ScanCallback listener = new PointScanVerifierListener(pubMsgs, topic);
+                    persistenceManager
+                            .scanSingleMessage(new ScanRequest(topic, getLowestSeqId(), listener, statusQueue));
+
+                }
+                // now listen for it to finish
+                // wait a maximum of a minute
+                Boolean b = statusQueue.poll(60, TimeUnit.SECONDS);
+                if (b == null) {
+                    throw (failureException = new RuntimeException("Scanning timed out"));
+                }
+            } catch (InterruptedException e) {
+                throw (failureException = new RuntimeException(e));
+            }
+        }
+    }
+
+    class Publisher implements Runnable {
+        List<Message> pubMsgs;
+        ByteString topic;
+
+        public Publisher(ByteString topic, List<Message> pubMsgs) {
+            this.pubMsgs = pubMsgs;
+            this.topic = topic;
+        }
+
+        public void run() {
+            LinkedBlockingQueue<Boolean> statusQueue = new LinkedBlockingQueue<Boolean>();
+
+            for (Message msg : pubMsgs) {
+
+                try {
+                    persistenceManager.persistMessage(new PersistRequest(topic, msg, testCallback, statusQueue));
+                    // wait a maximum of a minute
+                    Boolean b = statusQueue.poll(60, TimeUnit.SECONDS);
+                    if (b == null) {
+                        throw (failureException = new RuntimeException("Scanning timed out"));
+                    }
+                } catch (InterruptedException e) {
+                    throw (failureException = new RuntimeException(e));
+                }
+            }
+        }
+
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        logger.info("STARTING " + getName());
+        persistenceManager = instantiatePersistenceManager();
+        failureException = null;
+        logger.info("Persistence Manager test setup finished");
+    }
+
+    abstract long getLowestSeqId();
+
+    abstract PersistenceManager instantiatePersistenceManager() throws Exception;
+
+    @Override
+    protected void tearDown() throws Exception {
+        logger.info("tearDown starting");
+        super.tearDown();
+        logger.info("FINISHED " + getName());
+    }
+
+    protected ByteString getTopicName(int number) {
+        return ByteString.copyFromUtf8("topic" + number);
+    }
+
+    @Test
+    public void testPersistenceManager() throws Exception {
+        List<Thread> publisherThreads = new LinkedList<Thread>();
+        List<Thread> scannerThreads = new LinkedList<Thread>();
+        Thread thread;
+        Semaphore latch = new Semaphore(1);
+
+        for (int i = 0; i < NUM_TOPICS_TO_TEST; i++) {
+            ByteString topic = getTopicName(i);
+
+            if (persistenceManager instanceof TopicOwnershipChangeListener) {
+
+                TopicOwnershipChangeListener tocl = (TopicOwnershipChangeListener) persistenceManager;
+
+                latch.acquire();
+
+                tocl.acquiredTopic(topic, new Callback<Void>() {
+                    @Override
+                    public void operationFailed(Object ctx, PubSubException exception) {
+                        failureException = new RuntimeException(exception);
+                        ((Semaphore) ctx).release();
+                    }
+
+                    @Override
+                    public void operationFinished(Object ctx, Void res) {
+                        ((Semaphore) ctx).release();
+                    }
+                }, latch);
+
+                latch.acquire();
+                latch.release();
+                if (failureException != null) {
+                    throw (Exception) failureException.getCause();
+                }
+            }
+            List<Message> msgs = HelperMethods.getRandomPublishedMessages(NUM_MESSAGES_TO_TEST, 1024);
+
+            thread = new Thread(new Publisher(topic, msgs));
+            publisherThreads.add(thread);
+            thread.start();
+
+            thread = new Thread(new ScanVerifier(topic, msgs));
+            scannerThreads.add(thread);
+        }
+        for (Thread t : publisherThreads) {
+            t.join();
+        }
+
+        for (Thread t : scannerThreads) {
+            t.start();
+        }
+
+        for (Thread t : scannerThreads) {
+            t.join();
+        }
+
+        assertEquals(null, failureException);
+        for (int i = 0; i < NUM_TOPICS_TO_TEST; i++) {
+            assertEquals(persistenceManager.getCurrentSeqIdForTopic(getTopicName(i)).getLocalComponent(),
+                    getExpectedSeqId(NUM_MESSAGES_TO_TEST));
+        }
+
+    }
+
+    abstract long getExpectedSeqId(int numPublished);
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheBlackBox.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheBlackBox.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheBlackBox.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheBlackBox.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+import junit.framework.Test;
+import junit.framework.TestSuite;
+
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.persistence.LocalDBPersistenceManager;
+import org.apache.hedwig.server.persistence.PersistenceManager;
+import org.apache.hedwig.server.persistence.ReadAheadCache;
+
+public class TestReadAheadCacheBlackBox extends TestPersistenceManagerBlackBox {
+
+    @Override
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        LocalDBPersistenceManager.instance().reset();
+    }
+
+    @Override
+    long getExpectedSeqId(int numPublished) {
+        return numPublished;
+    }
+
+    @Override
+    long getLowestSeqId() {
+        return 1;
+    }
+
+    @Override
+    PersistenceManager instantiatePersistenceManager() {
+        return new ReadAheadCache(LocalDBPersistenceManager.instance(), new ServerConfiguration()).start();
+    }
+
+    public static Test suite() {
+        return new TestSuite(TestReadAheadCacheBlackBox.class);
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/persistence/TestReadAheadCacheWhiteBox.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+import static org.junit.Assert.*;
+
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.HelperMethods;
+import org.apache.hedwig.StubCallback;
+import org.apache.hedwig.StubScanCallback;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.util.ConcurrencyUtils;
+
+public class TestReadAheadCacheWhiteBox {
+    ByteString topic = ByteString.copyFromUtf8("testTopic");
+    final static int NUM_MESSAGES = 10;
+    final static int MSG_SIZE = 50;
+    List<Message> messages = HelperMethods.getRandomPublishedMessages(NUM_MESSAGES, MSG_SIZE);
+    StubPersistenceManager stubPersistenceManager;
+    ReadAheadCache cacheBasedPersistenceManager;
+    MyServerConfiguration myConf = new MyServerConfiguration();
+
+    class MyReadAheadCache extends ReadAheadCache {
+        public MyReadAheadCache(PersistenceManagerWithRangeScan persistenceManger, ServerConfiguration cfg) {
+            super(persistenceManger, cfg);
+        }
+
+        @Override
+        protected void enqueueWithoutFailure(CacheRequest obj) {
+            // make it perform in the same thread
+            obj.performRequest();
+        }
+    }
+
+    class MyServerConfiguration extends ServerConfiguration {
+
+        // Note these are set up, so that the size limit will be reached before
+        // the count limit
+        int readAheadCount = NUM_MESSAGES / 2;
+        long readAheadSize = (long) (MSG_SIZE * 2.5);
+        long maxCacheSize = Long.MAX_VALUE;
+
+        @Override
+        public int getReadAheadCount() {
+            return readAheadCount;
+        }
+
+        @Override
+        public long getReadAheadSizeBytes() {
+            return readAheadSize;
+        }
+
+        @Override
+        public long getMaximumCacheSize() {
+            return maxCacheSize;
+        }
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        stubPersistenceManager = new StubPersistenceManager();
+        cacheBasedPersistenceManager = new MyReadAheadCache(stubPersistenceManager, myConf).start();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+
+    }
+
+    @Test
+    public void testPersistMessage() throws Exception{
+        StubCallback<Long> callback = new StubCallback<Long>();
+        PersistRequest request = new PersistRequest(topic, messages.get(0), callback, null);
+
+        stubPersistenceManager.failure = true;
+        cacheBasedPersistenceManager.persistMessage(request);
+        assertNotNull(ConcurrencyUtils.take(callback.queue).right());
+
+        CacheKey key = new CacheKey(topic, cacheBasedPersistenceManager.getCurrentSeqIdForTopic(topic)
+                .getLocalComponent());
+        assertFalse(cacheBasedPersistenceManager.cache.containsKey(key));
+
+        stubPersistenceManager.failure = false;
+        persistMessage(messages.get(0));
+    }
+
+    private void persistMessage(Message msg) throws Exception{
+        StubCallback<Long> callback = new StubCallback<Long>();
+        PersistRequest request = new PersistRequest(topic, msg, callback, null);
+        cacheBasedPersistenceManager.persistMessage(request);
+        assertNotNull(ConcurrencyUtils.take(callback.queue).left());
+        CacheKey key = new CacheKey(topic, cacheBasedPersistenceManager.getCurrentSeqIdForTopic(topic)
+                .getLocalComponent());
+        CacheValue cacheValue = cacheBasedPersistenceManager.cache.get(key);
+        assertNotNull(cacheValue);
+        assertFalse(cacheValue.isStub());
+        assertTrue(HelperMethods.areEqual(cacheValue.getMessage(), msg));
+
+    }
+
+    @Test
+    public void testScanSingleMessage() throws Exception {
+        StubScanCallback callback = new StubScanCallback();
+        ScanRequest request = new ScanRequest(topic, 1, callback, null);
+        stubPersistenceManager.failure = true;
+
+        cacheBasedPersistenceManager.scanSingleMessage(request);
+        assertTrue(callback.isFailed());
+        assertTrue(0 == cacheBasedPersistenceManager.cache.size());
+
+        stubPersistenceManager.failure = false;
+        cacheBasedPersistenceManager.scanSingleMessage(request);
+        assertTrue(myConf.readAheadCount == cacheBasedPersistenceManager.cache.size());
+
+        persistMessage(messages.get(0));
+        assertTrue(callback.isSuccess());
+
+    }
+
+    @Test
+    public void testDeliveredUntil() throws Exception{
+        for (Message m : messages) {
+            persistMessage(m);
+        }
+        assertEquals((long) NUM_MESSAGES * MSG_SIZE, cacheBasedPersistenceManager.presentCacheSize);
+        long middle = messages.size() / 2;
+        cacheBasedPersistenceManager.deliveredUntil(topic, middle);
+
+        assertEquals(messages.size() - middle, cacheBasedPersistenceManager.cache.size());
+
+        long middle2 = middle - 1;
+        cacheBasedPersistenceManager.deliveredUntil(topic, middle2);
+        // should have no effect
+        assertEquals(messages.size() - middle, cacheBasedPersistenceManager.cache.size());
+
+        // delivered all messages
+        cacheBasedPersistenceManager.deliveredUntil(topic, (long) messages.size());
+        // should have no effect
+        assertTrue(cacheBasedPersistenceManager.cache.isEmpty());
+        assertTrue(cacheBasedPersistenceManager.timeIndexOfAddition.isEmpty());
+        assertTrue(cacheBasedPersistenceManager.orderedIndexOnSeqId.isEmpty());
+        assertTrue(0 == cacheBasedPersistenceManager.presentCacheSize);
+
+    }
+
+    @Test
+    public void testDoReadAhead() {
+        StubScanCallback callback = new StubScanCallback();
+        ScanRequest request = new ScanRequest(topic, 1, callback, null);
+        cacheBasedPersistenceManager.doReadAhead(request);
+
+        assertEquals(myConf.readAheadCount, cacheBasedPersistenceManager.cache.size());
+
+        request = new ScanRequest(topic, myConf.readAheadCount / 2 - 1, callback, null);
+        cacheBasedPersistenceManager.doReadAhead(request);
+        assertEquals(myConf.readAheadCount, cacheBasedPersistenceManager.cache.size());
+
+        request = new ScanRequest(topic, myConf.readAheadCount / 2 + 2, callback, null);
+        cacheBasedPersistenceManager.doReadAhead(request);
+        assertEquals((int) (1.5 * myConf.readAheadCount), cacheBasedPersistenceManager.cache.size());
+
+    }
+
+    @Test
+    public void testReadAheadSizeLimit() throws Exception{
+        for (Message m : messages) {
+            persistMessage(m);
+        }
+        cacheBasedPersistenceManager.cache.clear();
+        StubScanCallback callback = new StubScanCallback();
+        ScanRequest request = new ScanRequest(topic, 1, callback, null);
+        cacheBasedPersistenceManager.scanSingleMessage(request);
+
+        assertTrue(callback.isSuccess());
+        assertEquals((int) Math.ceil(myConf.readAheadSize / (MSG_SIZE + 0.0)), cacheBasedPersistenceManager.cache
+                .size());
+
+    }
+
+    @Test
+    public void testDoReadAheadStartingFrom() throws Exception{
+        persistMessage(messages.get(0));
+        int readAheadCount = 5;
+        int start = 1;
+        RangeScanRequest readAheadRequest = cacheBasedPersistenceManager.doReadAheadStartingFrom(topic, start,
+                readAheadCount);
+        assertNull(readAheadRequest);
+
+        StubScanCallback callback = new StubScanCallback();
+        int end = 100;
+        ScanRequest request = new ScanRequest(topic, end, callback, null);
+        cacheBasedPersistenceManager.doReadAhead(request);
+
+        int pos = 98;
+        readAheadRequest = cacheBasedPersistenceManager.doReadAheadStartingFrom(topic, pos, readAheadCount);
+        assertEquals(readAheadRequest.messageLimit, end - pos);
+
+        end = 200;
+        request = new ScanRequest(topic, end, callback, null);
+        cacheBasedPersistenceManager.doReadAhead(request);
+
+        // too far back
+        pos = 150;
+        readAheadRequest = cacheBasedPersistenceManager.doReadAheadStartingFrom(topic, pos, readAheadCount);
+        assertEquals(readAheadRequest.messageLimit, readAheadCount);
+    }
+
+    @Test
+    public void testAddMessageToCache() {
+        CacheKey key = new CacheKey(topic, 1);
+        cacheBasedPersistenceManager.addMessageToCache(key, messages.get(0), System.currentTimeMillis());
+        assertEquals(1, cacheBasedPersistenceManager.cache.size());
+        assertEquals(MSG_SIZE, cacheBasedPersistenceManager.presentCacheSize);
+        assertEquals(1, cacheBasedPersistenceManager.orderedIndexOnSeqId.get(topic).size());
+        assertTrue(cacheBasedPersistenceManager.orderedIndexOnSeqId.get(topic).contains(1L));
+
+        CacheValue value = cacheBasedPersistenceManager.cache.get(key);
+        assertTrue(cacheBasedPersistenceManager.timeIndexOfAddition.get(value.timeOfAddition).contains(key));
+    }
+
+    @Test
+    public void testRemoveMessageFromCache() {
+        CacheKey key = new CacheKey(topic, 1);
+        cacheBasedPersistenceManager.addMessageToCache(key, messages.get(0), System.currentTimeMillis());
+        cacheBasedPersistenceManager.removeMessageFromCache(key, new Exception(), true, true);
+        assertTrue(cacheBasedPersistenceManager.cache.isEmpty());
+        assertTrue(cacheBasedPersistenceManager.orderedIndexOnSeqId.isEmpty());
+        assertTrue(cacheBasedPersistenceManager.timeIndexOfAddition.isEmpty());
+    }
+
+    @Test
+    public void testCollectOldCacheEntries() {
+        int i = 1;
+        for (Message m : messages) {
+            CacheKey key = new CacheKey(topic, i);
+            cacheBasedPersistenceManager.addMessageToCache(key, m, i);
+            i++;
+        }
+
+        int n = 2;
+        myConf.maxCacheSize = n * MSG_SIZE;
+        cacheBasedPersistenceManager.collectOldCacheEntries();
+        assertEquals(n, cacheBasedPersistenceManager.cache.size());
+        assertEquals(n, cacheBasedPersistenceManager.timeIndexOfAddition.size());
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/subscriptions/StubSubscriptionManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/subscriptions/StubSubscriptionManager.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/subscriptions/StubSubscriptionManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/subscriptions/StubSubscriptionManager.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.subscriptions;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.persistence.PersistenceManager;
+import org.apache.hedwig.server.topics.TopicManager;
+import org.apache.hedwig.util.Callback;
+
+public class StubSubscriptionManager extends InMemorySubscriptionManager {
+    boolean fail = false;
+
+    public void setFail(boolean fail) {
+        this.fail = fail;
+    }
+
+    public StubSubscriptionManager(TopicManager tm, PersistenceManager pm, ServerConfiguration conf, ScheduledExecutorService scheduler) {
+        super(tm, pm, conf, scheduler);
+    }
+
+    @Override
+    public void serveSubscribeRequest(ByteString topic, SubscribeRequest subRequest, MessageSeqId consumeSeqId,
+            Callback<MessageSeqId> callback, Object ctx) {
+        if (fail) {
+            callback.operationFailed(ctx, new PubSubException.ServiceDownException("Asked to fail"));
+            return;
+        }
+        super.serveSubscribeRequest(topic, subRequest, consumeSeqId, callback, ctx);
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/subscriptions/TestZkSubscriptionManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/subscriptions/TestZkSubscriptionManager.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/subscriptions/TestZkSubscriptionManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/subscriptions/TestZkSubscriptionManager.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.subscriptions;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.SynchronousQueue;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager;
+import org.apache.hedwig.util.ConcurrencyUtils;
+import org.apache.hedwig.util.Either;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.zookeeper.ZooKeeperTestBase;
+
+public class TestZkSubscriptionManager extends ZooKeeperTestBase {
+    ZkSubscriptionManager sm;
+    ServerConfiguration cfg = new ServerConfiguration();
+    SynchronousQueue<Either<MessageSeqId, PubSubException>> msgIdCallbackQueue = new SynchronousQueue<Either<MessageSeqId, PubSubException>>();
+    SynchronousQueue<Either<Boolean, PubSubException>> BooleanCallbackQueue = new SynchronousQueue<Either<Boolean, PubSubException>>();
+
+    Callback<Void> voidCallback;
+    Callback<MessageSeqId> msgIdCallback;
+
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        cfg = new ServerConfiguration();
+        final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+        sm = new ZkSubscriptionManager(zk, new TrivialOwnAllTopicManager(cfg, scheduler), null, cfg, scheduler);
+        msgIdCallback = new Callback<MessageSeqId>() {
+            @Override
+            public void operationFailed(Object ctx, final PubSubException exception) {
+                scheduler.execute(new Runnable() {
+                    public void run() {
+                        ConcurrencyUtils.put(msgIdCallbackQueue, Either.of((MessageSeqId) null, exception));
+                    }
+                });
+            }
+
+            @Override
+            public void operationFinished(Object ctx, final MessageSeqId resultOfOperation) {
+                scheduler.execute(new Runnable() {
+                    public void run() {
+                        ConcurrencyUtils.put(msgIdCallbackQueue, Either.of(resultOfOperation, (PubSubException) null));
+                    }
+                });
+            }
+        };
+
+        voidCallback = new Callback<Void>() {
+            @Override
+            public void operationFailed(Object ctx, final PubSubException exception) {
+                scheduler.execute(new Runnable() {
+                    public void run() {
+                        ConcurrencyUtils.put(BooleanCallbackQueue, Either.of((Boolean) null, exception));
+                    }
+                });
+            }
+
+            @Override
+            public void operationFinished(Object ctx, Void resultOfOperation) {
+                scheduler.execute(new Runnable() {
+                    public void run() {
+                        ConcurrencyUtils.put(BooleanCallbackQueue, Either.of(true, (PubSubException) null));
+                    }
+                });
+            }
+        };
+
+    }
+
+    @Test
+    public void testBasics() throws Exception {
+
+        ByteString topic1 = ByteString.copyFromUtf8("topic1");
+        ByteString sub1 = ByteString.copyFromUtf8("sub1");
+
+        //
+        // No topics acquired.
+        //
+        SubscribeRequest subRequest = SubscribeRequest.newBuilder().setSubscriberId(sub1).build();
+        MessageSeqId msgId = MessageSeqId.newBuilder().setLocalComponent(100).build();
+
+        sm.serveSubscribeRequest(topic1, subRequest, msgId, msgIdCallback, null);
+
+        Assert.assertEquals(ConcurrencyUtils.take(msgIdCallbackQueue).right().getClass(),
+                PubSubException.ServerNotResponsibleForTopicException.class);
+
+        sm.unsubscribe(topic1, sub1, voidCallback, null);
+
+        Assert.assertEquals(ConcurrencyUtils.take(BooleanCallbackQueue).right().getClass(),
+                PubSubException.ServerNotResponsibleForTopicException.class);
+
+        //
+        // Acquire topic.
+        //
+
+        sm.acquiredTopic(topic1, voidCallback, null);
+        Assert.assertTrue(BooleanCallbackQueue.take().left());
+
+        Assert.assertTrue(sm.top2sub2seq.containsKey(topic1));
+        Assert.assertEquals(0, sm.top2sub2seq.get(topic1).size());
+
+        sm.unsubscribe(topic1, sub1, voidCallback, null);
+        Assert.assertEquals(ConcurrencyUtils.take(BooleanCallbackQueue).right().getClass(),
+                PubSubException.ClientNotSubscribedException.class);
+
+        //
+        // Try to attach to a subscription.
+        subRequest = SubscribeRequest.newBuilder().setCreateOrAttach(CreateOrAttach.ATTACH).setSubscriberId(sub1)
+                .build();
+
+        sm.serveSubscribeRequest(topic1, subRequest, msgId, msgIdCallback, null);
+        Assert.assertEquals(ConcurrencyUtils.take(msgIdCallbackQueue).right().getClass(),
+                PubSubException.ClientNotSubscribedException.class);
+
+        // now create
+        subRequest = SubscribeRequest.newBuilder().setCreateOrAttach(CreateOrAttach.CREATE).setSubscriberId(sub1)
+                .build();
+        sm.serveSubscribeRequest(topic1, subRequest, msgId, msgIdCallback, null);
+        Assert.assertEquals(msgId.getLocalComponent(), ConcurrencyUtils.take(msgIdCallbackQueue).left().getLocalComponent());
+        Assert.assertEquals(msgId.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getLastConsumeSeqId()
+                .getLocalComponent());
+
+        // try to create again
+        sm.serveSubscribeRequest(topic1, subRequest, msgId, msgIdCallback, null);
+        Assert.assertEquals(ConcurrencyUtils.take(msgIdCallbackQueue).right().getClass(),
+                PubSubException.ClientAlreadySubscribedException.class);
+        Assert.assertEquals(msgId.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getLastConsumeSeqId()
+                .getLocalComponent());
+
+        sm.lostTopic(topic1);
+        sm.acquiredTopic(topic1, voidCallback, null);
+        Assert.assertTrue(BooleanCallbackQueue.take().left());
+
+        // try to attach
+        subRequest = SubscribeRequest.newBuilder().setCreateOrAttach(CreateOrAttach.ATTACH).setSubscriberId(sub1)
+                .build();
+        MessageSeqId msgId1 = MessageSeqId.newBuilder().setLocalComponent(msgId.getLocalComponent() + 10).build();
+        sm.serveSubscribeRequest(topic1, subRequest, msgId1, msgIdCallback, null);
+        Assert.assertEquals(msgId.getLocalComponent(), msgIdCallbackQueue.take().left().getLocalComponent());
+        Assert.assertEquals(msgId.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getLastConsumeSeqId()
+                .getLocalComponent());
+
+        // now manipulate the consume ptrs
+        // dont give it enough to have it persist to ZK
+        MessageSeqId msgId2 = MessageSeqId.newBuilder().setLocalComponent(
+                msgId.getLocalComponent() + cfg.getConsumeInterval() - 1).build();
+        sm.setConsumeSeqIdForSubscriber(topic1, sub1, msgId2, voidCallback, null);
+        Assert.assertTrue(BooleanCallbackQueue.take().left());
+        Assert.assertEquals(msgId2.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getLastConsumeSeqId()
+                .getLocalComponent());
+        Assert.assertEquals(msgId.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getSubscriptionState().getMsgId()
+                .getLocalComponent());
+
+        // give it more so that it will write to ZK
+        MessageSeqId msgId3 = MessageSeqId.newBuilder().setLocalComponent(
+                msgId.getLocalComponent() + cfg.getConsumeInterval() + 1).build();
+        sm.setConsumeSeqIdForSubscriber(topic1, sub1, msgId3, voidCallback, null);
+        Assert.assertTrue(BooleanCallbackQueue.take().left());
+
+        sm.lostTopic(topic1);
+        sm.acquiredTopic(topic1, voidCallback, null);
+        Assert.assertTrue(BooleanCallbackQueue.take().left());
+
+        Assert.assertEquals(msgId3.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getLastConsumeSeqId()
+                .getLocalComponent());
+        Assert.assertEquals(msgId3.getLocalComponent(), sm.top2sub2seq.get(topic1).get(sub1).getSubscriptionState().getMsgId()
+                .getLocalComponent());
+
+        // finally unsubscribe
+        sm.unsubscribe(topic1, sub1, voidCallback, null);
+        Assert.assertTrue(BooleanCallbackQueue.take().left());
+
+        sm.lostTopic(topic1);
+        sm.acquiredTopic(topic1, voidCallback, null);
+        Assert.assertTrue(BooleanCallbackQueue.take().left());
+        Assert.assertFalse(sm.top2sub2seq.get(topic1).containsKey(sub1));
+
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/topics/StubTopicManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/topics/StubTopicManager.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/topics/StubTopicManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/topics/StubTopicManager.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.topics;
+
+import java.net.UnknownHostException;
+import java.util.concurrent.Executors;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.HedwigSocketAddress;
+
+public class StubTopicManager extends TrivialOwnAllTopicManager {
+
+    boolean shouldOwnEveryNewTopic = false;
+    boolean shouldError = false;
+
+    public void setShouldOwnEveryNewTopic(boolean shouldOwnEveryNewTopic) {
+        this.shouldOwnEveryNewTopic = shouldOwnEveryNewTopic;
+    }
+
+    public void setShouldError(boolean shouldError) {
+        this.shouldError = shouldError;
+    }
+
+    public StubTopicManager(ServerConfiguration conf) throws UnknownHostException {
+        super(conf, Executors.newSingleThreadScheduledExecutor());
+    }
+
+    @Override
+    protected void realGetOwner(ByteString topic, boolean shouldClaim, 
+            Callback<HedwigSocketAddress> cb, Object ctx) {
+
+        if (shouldError) {
+            cb.operationFailed(ctx, new PubSubException.ServiceDownException("Asked to fail"));
+            return;
+        }
+        if (topics.contains(topic) // already own it
+                || shouldOwnEveryNewTopic) {
+            super.realGetOwner(topic, shouldClaim, cb, ctx);
+            return;
+        } else {
+            // return some other address
+            cb.operationFinished(ctx, new HedwigSocketAddress("124.31.0.1:80"));
+        }
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/server/topics/TestZkTopicManager.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.topics;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.SynchronousQueue;
+
+import org.apache.zookeeper.KeeperException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.CompositeException;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.ConcurrencyUtils;
+import org.apache.hedwig.util.Either;
+import org.apache.hedwig.util.HedwigSocketAddress;
+import org.apache.hedwig.util.Pair;
+import org.apache.hedwig.zookeeper.ZooKeeperTestBase;
+
+public class TestZkTopicManager extends ZooKeeperTestBase {
+
+    protected ZkTopicManager tm;
+
+    protected class CallbackQueue<T> implements Callback<T> {
+        SynchronousQueue<Either<T, Exception>> q = new SynchronousQueue<Either<T, Exception>>();
+
+        public SynchronousQueue<Either<T, Exception>> getQueue() {
+            return q;
+        }
+
+        public Either<T, Exception> take() throws InterruptedException {
+            return q.take();
+        }
+
+        @Override
+        public void operationFailed(Object ctx, final PubSubException exception) {
+            LOG.error("got exception: " + exception);
+            new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    ConcurrencyUtils.put(q, Either.of((T) null, (Exception) exception));
+                }
+            }).start();
+        }
+
+        @Override
+        public void operationFinished(Object ctx, final T resultOfOperation) {
+            new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    ConcurrencyUtils.put(q, Either.of(resultOfOperation, (Exception) null));
+                }
+            }).start();
+        }
+    }
+
+    protected CallbackQueue<HedwigSocketAddress> addrCbq = new CallbackQueue<HedwigSocketAddress>();
+    protected CallbackQueue<ByteString> bsCbq = new CallbackQueue<ByteString>();
+    protected CallbackQueue<Void> voidCbq = new CallbackQueue<Void>();
+
+    protected ByteString topic = ByteString.copyFromUtf8("topic");
+    protected ServerConfiguration cfg;
+    protected HedwigSocketAddress me;
+    protected ScheduledExecutorService scheduler;
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        cfg = new ServerConfiguration();
+        me = cfg.getServerAddr();
+        scheduler = Executors.newSingleThreadScheduledExecutor();
+        tm = new ZkTopicManager(zk, cfg, scheduler);
+    }
+
+    @Test
+    public void testGetOwnerSingle() throws Exception {
+        tm.getOwner(topic, false, addrCbq, null);
+        Assert.assertEquals(me, check(addrCbq.take()));
+    }
+
+    protected ByteString mkTopic(int i) {
+        return ByteString.copyFromUtf8(topic.toStringUtf8() + i);
+    }
+
+    protected <T> T check(Either<T, Exception> ex) throws Exception {
+        if (ex.left() == null)
+            throw ex.right();
+        else
+            return ex.left();
+    }
+
+    public static class CustomServerConfiguration extends ServerConfiguration {
+        int port;
+
+        public CustomServerConfiguration(int port) {
+            this.port = port;
+        }
+
+        @Override
+        public int getServerPort() {
+            return port;
+        }
+    }
+
+    @Test
+    public void testGetOwnerMulti() throws Exception {
+        ServerConfiguration cfg1 = new CustomServerConfiguration(cfg.getServerPort() + 1), cfg2 = new CustomServerConfiguration(
+                cfg.getServerPort() + 2);
+        // TODO change cfg1 cfg2 params
+        ZkTopicManager tm1 = new ZkTopicManager(zk, cfg1, scheduler), tm2 = new ZkTopicManager(zk, cfg2, scheduler);
+
+        tm.getOwner(topic, false, addrCbq, null);
+        HedwigSocketAddress owner = check(addrCbq.take());
+
+        // If we were told to have another person claim the topic, make them
+        // claim the topic.
+        if (owner.getPort() == cfg1.getServerPort())
+            tm1.getOwner(topic, true, addrCbq, null);
+        else if (owner.getPort() == cfg2.getServerPort())
+            tm2.getOwner(topic, true, addrCbq, null);
+        if (owner.getPort() != cfg.getServerPort())
+            Assert.assertEquals(owner, check(addrCbq.take()));
+
+        for (int i = 0; i < 100; ++i) {
+            tm.getOwner(topic, false, addrCbq, null);
+            Assert.assertEquals(owner, check(addrCbq.take()));
+
+            tm1.getOwner(topic, false, addrCbq, null);
+            Assert.assertEquals(owner, check(addrCbq.take()));
+
+            tm2.getOwner(topic, false, addrCbq, null);
+            Assert.assertEquals(owner, check(addrCbq.take()));
+        }
+
+        // Give us 100 chances to choose another owner if not shouldClaim.
+        for (int i = 0; i < 100; ++i) {
+            if (!owner.equals(me))
+                break;
+            tm.getOwner(mkTopic(i), false, addrCbq, null);
+            owner = check(addrCbq.take());
+            if (i == 99)
+                Assert.fail("Never chose another owner");
+        }
+
+        // Make sure we always choose ourselves if shouldClaim.
+        for (int i = 0; i < 100; ++i) {
+            tm.getOwner(mkTopic(100), true, addrCbq, null);
+            Assert.assertEquals(me, check(addrCbq.take()));
+        }
+    }
+
+    @Test
+    public void testLoadBalancing() throws Exception {
+        tm.getOwner(topic, false, addrCbq, null);
+
+        Assert.assertEquals(me, check(addrCbq.take()));
+
+        ServerConfiguration cfg1 = new CustomServerConfiguration(cfg.getServerPort() + 1);
+        new ZkTopicManager(zk, cfg1, scheduler);
+
+        ByteString topic1 = mkTopic(1);
+        tm.getOwner(topic1, false, addrCbq, null);
+        Assert.assertEquals(cfg1.getServerAddr(), check(addrCbq.take()));
+
+    }
+
+    class StubOwnershipChangeListener implements TopicOwnershipChangeListener {
+        boolean failure;
+        SynchronousQueue<Pair<ByteString, Boolean>> bsQueue;
+
+        public StubOwnershipChangeListener(SynchronousQueue<Pair<ByteString, Boolean>> bsQueue) {
+            this.bsQueue = bsQueue;
+        }
+
+        public void setFailure(boolean failure) {
+            this.failure = failure;
+        }
+
+        @Override
+        public void lostTopic(final ByteString topic) {
+            new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    ConcurrencyUtils.put(bsQueue, Pair.of(topic, false));
+                }
+            }).start();
+        }
+
+        public void acquiredTopic(final ByteString topic, final Callback<Void> callback, final Object ctx) {
+            new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    ConcurrencyUtils.put(bsQueue, Pair.of(topic, true));
+                    if (failure) {
+                        callback.operationFailed(ctx, new PubSubException.ServiceDownException("Asked to fail"));
+                    } else {
+                        callback.operationFinished(ctx, null);
+                    }
+                }
+            }).start();
+        }
+    }
+
+    @Test
+    public void testOwnershipChange() throws Exception {
+        SynchronousQueue<Pair<ByteString, Boolean>> bsQueue = new SynchronousQueue<Pair<ByteString, Boolean>>();
+
+        StubOwnershipChangeListener listener = new StubOwnershipChangeListener(bsQueue);
+
+        tm.addTopicOwnershipChangeListener(listener);
+
+        // regular acquire
+        tm.getOwner(topic, true, addrCbq, null);
+        Pair<ByteString, Boolean> pair = bsQueue.take();
+        Assert.assertEquals(topic, pair.first());
+        Assert.assertTrue(pair.second());
+        Assert.assertEquals(me, check(addrCbq.take()));
+        assertOwnershipNodeExists();
+
+        // topic that I already own
+        tm.getOwner(topic, true, addrCbq, null);
+        Assert.assertEquals(me, check(addrCbq.take()));
+        Assert.assertTrue(bsQueue.isEmpty());
+        assertOwnershipNodeExists();
+
+        // regular release
+        tm.releaseTopic(topic, cb, null);
+        pair = bsQueue.take();
+        Assert.assertEquals(topic, pair.first());
+        Assert.assertFalse(pair.second());
+        Assert.assertTrue(queue.take());
+        assertOwnershipNodeDoesntExist();
+
+        // releasing topic that I don't own
+        tm.releaseTopic(mkTopic(0), cb, null);
+        Assert.assertTrue(queue.take());
+        Assert.assertTrue(bsQueue.isEmpty());
+
+        // set listener to return error
+        listener.setFailure(true);
+
+        tm.getOwner(topic, true, addrCbq, null);
+        pair = bsQueue.take();
+        Assert.assertEquals(topic, pair.first());
+        Assert.assertTrue(pair.second());
+        Assert.assertEquals(PubSubException.ServiceDownException.class, ((CompositeException) addrCbq.take().right())
+                .getExceptions().iterator().next().getClass());
+        Assert.assertFalse(tm.topics.contains(topic));
+        Thread.sleep(100);
+        assertOwnershipNodeDoesntExist();
+
+    }
+
+    public void assertOwnershipNodeExists() throws Exception {
+        byte[] data = zk.getData(tm.hubPath(topic), false, null);
+        Assert.assertEquals(new HedwigSocketAddress(new String(data)), tm.addr);
+    }
+
+    public void assertOwnershipNodeDoesntExist() throws Exception {
+        try {
+            zk.getData(tm.hubPath(topic), false, null);
+            Assert.assertTrue(false);
+        } catch (KeeperException e) {
+            Assert.assertEquals(e.code(), KeeperException.Code.NONODE);
+        }
+    }
+
+    @Test
+    public void testZKClientDisconnected() throws Exception {
+        // First assert ownership of the topic
+        tm.getOwner(topic, true, addrCbq, null);
+        Assert.assertEquals(me, check(addrCbq.take()));
+
+        // Suspend the ZKTopicManager and make sure calls to getOwner error out
+        tm.isSuspended = true;
+        tm.getOwner(topic, true, addrCbq, null);
+        Assert.assertEquals(PubSubException.ServiceDownException.class, addrCbq.take().right().getClass());
+        // Release the topic. This should not error out even if suspended.
+        tm.releaseTopic(topic, cb, null);
+        Assert.assertTrue(queue.take());
+        assertOwnershipNodeDoesntExist();
+
+        // Restart the ZKTopicManager and make sure calls to getOwner are okay
+        tm.isSuspended = false;
+        tm.getOwner(topic, true, addrCbq, null);
+        Assert.assertEquals(me, check(addrCbq.take()));
+        assertOwnershipNodeExists();
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/zookeeper/TestZkUtils.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/zookeeper/TestZkUtils.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/zookeeper/TestZkUtils.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/zookeeper/TestZkUtils.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.zookeeper;
+
+import java.util.Arrays;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestZkUtils extends ZooKeeperTestBase {
+
+    @Test
+    public void testCreateFullPathOptimistic() throws Exception {
+        testPath("/a/b/c", CreateMode.EPHEMERAL);
+
+        testPath("/b/c/d", CreateMode.PERSISTENT);
+
+        testPath("/b/c/d/e", CreateMode.PERSISTENT);
+
+    }
+
+    void testPath(String path, CreateMode mode) throws Exception {
+        byte[] data = new byte[] { 77 };
+        ZkUtils.createFullPathOptimistic(zk, path, data, Ids.OPEN_ACL_UNSAFE, mode, strCb, null);
+        Assert.assertTrue(queue.take());
+        Assert.assertTrue(Arrays.equals(data, zk.getData(path, false, null)));
+
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/zookeeper/ZooKeeperTestBase.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/zookeeper/ZooKeeperTestBase.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/zookeeper/ZooKeeperTestBase.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/test/java/org/apache/hedwig/zookeeper/ZooKeeperTestBase.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.zookeeper;
+
+import java.util.concurrent.SynchronousQueue;
+
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.After;
+import org.junit.Before;
+
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.util.ConcurrencyUtils;
+import org.apache.hedwig.util.Callback;
+
+/**
+ * This is a base class for any tests that need a ZooKeeper client/server setup.
+ * 
+ */
+public abstract class ZooKeeperTestBase extends ClientBase {
+
+    protected ZooKeeper zk;
+
+    protected SynchronousQueue<Boolean> queue = new SynchronousQueue<Boolean>();
+
+    protected Callback<Void> cb = new Callback<Void>() {
+
+        @Override
+        public void operationFinished(Object ctx, Void result) {
+            new Thread(new Runnable() {
+                public void run() {
+                    ConcurrencyUtils.put(queue, true);
+                }
+            }).start();
+        }
+
+        @Override
+        public void operationFailed(Object ctx, PubSubException exception) {
+            new Thread(new Runnable() {
+                public void run() {
+                    ConcurrencyUtils.put(queue, false);
+                }
+            }).start();
+        }
+    };
+
+    protected AsyncCallback.StringCallback strCb = new AsyncCallback.StringCallback() {
+        @Override
+        public void processResult(int rc, String path, Object ctx, String name) {
+            ConcurrencyUtils.put(queue, rc == Code.OK.intValue());
+        }
+    };
+
+    protected AsyncCallback.VoidCallback voidCb = new AsyncCallback.VoidCallback() {
+        @Override
+        public void processResult(int rc, String path, Object ctx) {
+            ConcurrencyUtils.put(queue, rc == Code.OK.intValue());
+        }
+    };
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        zk = createClient();
+    }
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        super.tearDown();
+        zk.close();
+    }
+
+}