You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/03/01 15:10:17 UTC

svn commit: r1295571 - in /zookeeper/bookkeeper/trunk: ./ hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ hedwig-server/src/test/java/org/apache/hedwig/server/integration/ hedwig-server/src/test/java/org/apache/hedwig/server/persistence/

Author: ivank
Date: Thu Mar  1 14:10:17 2012
New Revision: 1295571

URL: http://svn.apache.org/viewvc?rev=1295571&view=rev
Log:
BOOKKEEPER-74: Bookkeeper Persistence Manager should give up topic on error (sijie via ivank)

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1295571&r1=1295570&r2=1295571&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu Mar  1 14:10:17 2012
@@ -54,6 +54,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-133: Hub server should update subscription state to zookeeper when losing topic or shutting down (Sijie Gou via ivank)
 
+        BOOKKEEPER-74: Bookkeeper Persistence Manager should give up topic on error (sijie via ivank)
+
     IMPROVEMENTS:
 
       bookkeeper-server/

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java?rev=1295571&r1=1295570&r2=1295571&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java Thu Mar  1 14:10:17 2012
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -77,6 +78,7 @@ public class BookkeeperPersistenceManage
     private BookKeeper bk;
     private ZooKeeper zk;
     private ServerConfiguration cfg;
+    private TopicManager tm;
 
     static class InMemoryLedgerRange {
         LedgerRange range;
@@ -125,6 +127,10 @@ public class BookkeeperPersistenceManage
          */
         InMemoryLedgerRange currentLedgerRange;
 
+        /**
+         * Flag to release topic when encountering unrecoverable exceptions
+         */
+        AtomicBoolean doRelease = new AtomicBoolean(false);
     }
 
     Map<ByteString, TopicInfo> topicInfos = new ConcurrentHashMap<ByteString, TopicInfo>();
@@ -148,6 +154,7 @@ public class BookkeeperPersistenceManage
         this.bk = bk;
         this.zk = zk;
         this.cfg = cfg;
+        this.tm = tm;
         queuer = new TopicOpQueuer(executor);
         tm.addTopicOwnershipChangeListener(this);
     }
@@ -359,6 +366,12 @@ public class BookkeeperPersistenceManage
                 return;
             }
 
+            if (topicInfo.doRelease.get()) {
+                request.callback.operationFailed(request.ctx, new PubSubException.ServiceDownException(
+                    "The ownership of the topic is releasing due to unrecoverable issue."));
+                return;
+            }
+
             final long localSeqId = topicInfo.lastSeqIdPushed.getLocalComponent() + 1;
             MessageSeqId.Builder builder = MessageSeqId.newBuilder();
             if (request.message.hasMsgId()) {
@@ -383,6 +396,20 @@ public class BookkeeperPersistenceManage
                         // To preserve ordering guarantees, we
                         // should give up the topic and not let
                         // other operations through
+                        if (topicInfo.doRelease.compareAndSet(false, true)) {
+                            tm.releaseTopic(request.topic, new Callback<Void>() {
+                                @Override
+                                public void operationFailed(Object ctx, PubSubException exception) {
+                                    logger.error("Exception found on releasing topic " + request.topic.toStringUtf8()
+                                               + " when encountering exception from bookkeeper:", exception);
+                                }
+                                @Override
+                                public void operationFinished(Object ctx, Void resultOfOperation) {
+                                    logger.debug("successfully releasing topic {} when encountering"
+                                                 + " exception from bookkeeper", request.topic.toStringUtf8());
+                                }
+                            }, null);
+                        }
                         request.callback.operationFailed(ctx, new PubSubException.ServiceDownException(bke));
                         return;
                     }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java?rev=1295571&r1=1295570&r2=1295571&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java Thu Mar  1 14:10:17 2012
@@ -254,23 +254,12 @@ public class TestHedwigHub extends Hedwi
         }
     }
 
-    protected void publishFirstBatch(int batchSize, boolean messagesToBeConsumed) throws Exception {
+    protected void publishBatch(int batchSize, boolean expected, boolean messagesToBeConsumed, int loop) throws Exception {
         if (logger.isDebugEnabled())
-            logger.debug("Publishing first batch of messages.");
+            logger.debug("Publishing " + loop + " batch of messages.");
         for (int i = 0; i < batchSize; i++) {
-            publisher.asyncPublish(getTopic(i), getMsg(i), new TestCallback(queue), null);
-            assertTrue(queue.take());
-            if (messagesToBeConsumed)
-                assertTrue(consumeQueue.take());
-        }
-    }
-
-    protected void publishSecondBatch(int batchSize, boolean messagesToBeConsumed) throws Exception {
-        if (logger.isDebugEnabled())
-            logger.debug("Publishing second batch of messages.");
-        for (int i = 0; i < batchSize; i++) {
-            publisher.asyncPublish(getTopic(i), getMsg(i + batchSize), new TestCallback(queue), null);
-            assertTrue(queue.take());
+            publisher.asyncPublish(getTopic(i), getMsg(i + loop * batchSize), new TestCallback(queue), null);
+            assertTrue(expected == queue.take());
             if (messagesToBeConsumed)
                 assertTrue(consumeQueue.take());
         }
@@ -387,31 +376,31 @@ public class TestHedwigHub extends Hedwi
     @Test
     public void testServerRedirect() throws Exception {
         int batchSize = 10;
-        publishFirstBatch(batchSize, false);
+        publishBatch(batchSize, true, false, 0);
     }
 
     @Test
     public void testSubscribeAndConsume() throws Exception {
         int batchSize = 10;
         subscribeToTopics(batchSize);
-        publishFirstBatch(batchSize, true);
+        publishBatch(batchSize, true, true, 0);
     }
 
     @Test
     public void testServerFailoverPublishOnly() throws Exception {
         int batchSize = 10;
-        publishFirstBatch(batchSize, false);
+        publishBatch(batchSize, true, false, 0);
         shutDownLastServer();
-        publishSecondBatch(batchSize, false);
+        publishBatch(batchSize, true, false, 1);
     }
 
     @Test
     public void testServerFailover() throws Exception {
         int batchSize = 10;
         subscribeToTopics(batchSize);
-        publishFirstBatch(batchSize, true);
+        publishBatch(batchSize, true, true, 0);
         shutDownLastServer();
-        publishSecondBatch(batchSize, true);
+        publishBatch(batchSize, true, true, 1);
     }
 
     @Test
@@ -690,4 +679,17 @@ public class TestHedwigHub extends Hedwi
         hubClient.close();
     }
 
+    @Test
+    public void testPublishWithBookKeeperError() throws Exception {
+        int batchSize = 10;
+        publishBatch(batchSize, true, false, 0);
+        // stop all bookie servers
+        bktb.stopAllBookieServers();
+        // following publish would failed with NotEnoughBookies
+        publishBatch(batchSize, false, false, 1);
+        // start all bookie servers
+        bktb.startAllBookieServers();
+        // following publish should succeed
+        publishBatch(batchSize, true, false, 1);
+    }
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java?rev=1295571&r1=1295570&r2=1295571&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java Thu Mar  1 14:10:17 2012
@@ -17,6 +17,7 @@
  */
 package org.apache.hedwig.server.persistence;
 
+import java.net.InetAddress;
 import java.io.File;
 import java.util.LinkedList;
 import java.util.List;
@@ -49,6 +50,7 @@ public class BookKeeperTestBase extends 
 
     // BookKeeper Server variables
     private List<BookieServer> bookiesList;
+    private List<ServerConfiguration> bkConfsList;
     private int initialPort = 5000;
     private int nextPort = initialPort;
 
@@ -101,6 +103,7 @@ public class BookKeeperTestBase extends 
 
         // Create Bookie Servers
         bookiesList = new LinkedList<BookieServer>();
+        bkConfsList = new LinkedList<ServerConfiguration>();
 
         for (int i = 0; i < numBookies; i++) {
             startUpNewBookieServer();
@@ -129,6 +132,27 @@ public class BookKeeperTestBase extends 
         bk.close();
         super.tearDown();
     }
+
+    public void stopAllBookieServers() throws Exception {
+        try {
+            for (BookieServer bs : bookiesList) {
+                bs.shutdown();
+            }
+            bookiesList.clear();
+        } catch (InterruptedException e) {
+            LOG.error("Error stopping all bookie servers", e);
+        }
+    }
+
+    public void startAllBookieServers() throws Exception {
+        try {
+            for (ServerConfiguration conf : bkConfsList) {
+                bookiesList.add(startBookie(conf));
+            }
+        } catch (InterruptedException e) {
+            LOG.error("Error starting all bookie servers", e);
+        }
+    }
     
     public void tearDownOneBookieServer() throws Exception {
         Random r = new Random();
@@ -140,6 +164,7 @@ public class BookKeeperTestBase extends 
             LOG.error("Error tearing down", e);
         }
         bookiesList.remove(bi);
+        bkConfsList.remove(bi);
     }
     
     public void startUpNewBookieServer() throws Exception {
@@ -147,9 +172,27 @@ public class BookKeeperTestBase extends 
                 PREFIX + (nextPort - initialPort), SUFFIX);
         ServerConfiguration conf = newServerConfiguration(
                 nextPort++, hostPort, tmpDir, new File[] { tmpDir });
-        BookieServer bs = new BookieServer(conf);
-        bs.start();
-        bookiesList.add(bs);
+        bookiesList.add(startBookie(conf));
+        bkConfsList.add(conf);
+    }
+
+    /**
+     * Helper method to startup a bookie server using a configuration object
+     *
+     * @param conf
+     *            Server Configuration Object
+     *
+     */
+    private BookieServer startBookie(ServerConfiguration conf) throws Exception {
+        BookieServer server = new BookieServer(conf);
+        server.start();
+
+        int port = conf.getBookiePort();
+        while(zk.exists("/ledgers/available/" + InetAddress.getLocalHost().getHostAddress() + ":" + port, false) == null) {
+            Thread.sleep(500);
+        }
+
+        return server;
     }
 
     protected ServerConfiguration newServerConfiguration(int port, String zkServers, File journalDir, File[] ledgerDirs) {