You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/03/01 15:10:17 UTC
svn commit: r1295571 - in /zookeeper/bookkeeper/trunk: ./
hedwig-server/src/main/java/org/apache/hedwig/server/persistence/
hedwig-server/src/test/java/org/apache/hedwig/server/integration/
hedwig-server/src/test/java/org/apache/hedwig/server/persistence/
Author: ivank
Date: Thu Mar 1 14:10:17 2012
New Revision: 1295571
URL: http://svn.apache.org/viewvc?rev=1295571&view=rev
Log:
BOOKKEEPER-74: Bookkeeper Persistence Manager should give up topic on error (sijie via ivank)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1295571&r1=1295570&r2=1295571&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu Mar 1 14:10:17 2012
@@ -54,6 +54,8 @@ Trunk (unreleased changes)
BOOKKEEPER-133: Hub server should update subscription state to zookeeper when losing topic or shutting down (Sijie Gou via ivank)
+ BOOKKEEPER-74: Bookkeeper Persistence Manager should give up topic on error (sijie via ivank)
+
IMPROVEMENTS:
bookkeeper-server/
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java?rev=1295571&r1=1295570&r2=1295571&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.java Thu Mar 1 14:10:17 2012
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
@@ -77,6 +78,7 @@ public class BookkeeperPersistenceManage
private BookKeeper bk;
private ZooKeeper zk;
private ServerConfiguration cfg;
+ private TopicManager tm;
static class InMemoryLedgerRange {
LedgerRange range;
@@ -125,6 +127,10 @@ public class BookkeeperPersistenceManage
*/
InMemoryLedgerRange currentLedgerRange;
+ /**
+ * Flag to release topic when encountering unrecoverable exceptions
+ */
+ AtomicBoolean doRelease = new AtomicBoolean(false);
}
Map<ByteString, TopicInfo> topicInfos = new ConcurrentHashMap<ByteString, TopicInfo>();
@@ -148,6 +154,7 @@ public class BookkeeperPersistenceManage
this.bk = bk;
this.zk = zk;
this.cfg = cfg;
+ this.tm = tm;
queuer = new TopicOpQueuer(executor);
tm.addTopicOwnershipChangeListener(this);
}
@@ -359,6 +366,12 @@ public class BookkeeperPersistenceManage
return;
}
+ if (topicInfo.doRelease.get()) {
+ request.callback.operationFailed(request.ctx, new PubSubException.ServiceDownException(
+ "The ownership of the topic is releasing due to unrecoverable issue."));
+ return;
+ }
+
final long localSeqId = topicInfo.lastSeqIdPushed.getLocalComponent() + 1;
MessageSeqId.Builder builder = MessageSeqId.newBuilder();
if (request.message.hasMsgId()) {
@@ -383,6 +396,20 @@ public class BookkeeperPersistenceManage
// To preserve ordering guarantees, we
// should give up the topic and not let
// other operations through
+ if (topicInfo.doRelease.compareAndSet(false, true)) {
+ tm.releaseTopic(request.topic, new Callback<Void>() {
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception) {
+ logger.error("Exception found on releasing topic " + request.topic.toStringUtf8()
+ + " when encountering exception from bookkeeper:", exception);
+ }
+ @Override
+ public void operationFinished(Object ctx, Void resultOfOperation) {
+ logger.debug("successfully releasing topic {} when encountering"
+ + " exception from bookkeeper", request.topic.toStringUtf8());
+ }
+ }, null);
+ }
request.callback.operationFailed(ctx, new PubSubException.ServiceDownException(bke));
return;
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java?rev=1295571&r1=1295570&r2=1295571&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/integration/TestHedwigHub.java Thu Mar 1 14:10:17 2012
@@ -254,23 +254,12 @@ public class TestHedwigHub extends Hedwi
}
}
- protected void publishFirstBatch(int batchSize, boolean messagesToBeConsumed) throws Exception {
+ protected void publishBatch(int batchSize, boolean expected, boolean messagesToBeConsumed, int loop) throws Exception {
if (logger.isDebugEnabled())
- logger.debug("Publishing first batch of messages.");
+ logger.debug("Publishing " + loop + " batch of messages.");
for (int i = 0; i < batchSize; i++) {
- publisher.asyncPublish(getTopic(i), getMsg(i), new TestCallback(queue), null);
- assertTrue(queue.take());
- if (messagesToBeConsumed)
- assertTrue(consumeQueue.take());
- }
- }
-
- protected void publishSecondBatch(int batchSize, boolean messagesToBeConsumed) throws Exception {
- if (logger.isDebugEnabled())
- logger.debug("Publishing second batch of messages.");
- for (int i = 0; i < batchSize; i++) {
- publisher.asyncPublish(getTopic(i), getMsg(i + batchSize), new TestCallback(queue), null);
- assertTrue(queue.take());
+ publisher.asyncPublish(getTopic(i), getMsg(i + loop * batchSize), new TestCallback(queue), null);
+ assertTrue(expected == queue.take());
if (messagesToBeConsumed)
assertTrue(consumeQueue.take());
}
@@ -387,31 +376,31 @@ public class TestHedwigHub extends Hedwi
@Test
public void testServerRedirect() throws Exception {
int batchSize = 10;
- publishFirstBatch(batchSize, false);
+ publishBatch(batchSize, true, false, 0);
}
@Test
public void testSubscribeAndConsume() throws Exception {
int batchSize = 10;
subscribeToTopics(batchSize);
- publishFirstBatch(batchSize, true);
+ publishBatch(batchSize, true, true, 0);
}
@Test
public void testServerFailoverPublishOnly() throws Exception {
int batchSize = 10;
- publishFirstBatch(batchSize, false);
+ publishBatch(batchSize, true, false, 0);
shutDownLastServer();
- publishSecondBatch(batchSize, false);
+ publishBatch(batchSize, true, false, 1);
}
@Test
public void testServerFailover() throws Exception {
int batchSize = 10;
subscribeToTopics(batchSize);
- publishFirstBatch(batchSize, true);
+ publishBatch(batchSize, true, true, 0);
shutDownLastServer();
- publishSecondBatch(batchSize, true);
+ publishBatch(batchSize, true, true, 1);
}
@Test
@@ -690,4 +679,17 @@ public class TestHedwigHub extends Hedwi
hubClient.close();
}
+ @Test
+ public void testPublishWithBookKeeperError() throws Exception {
+ int batchSize = 10;
+ publishBatch(batchSize, true, false, 0);
+ // stop all bookie servers
+ bktb.stopAllBookieServers();
+ // following publish would failed with NotEnoughBookies
+ publishBatch(batchSize, false, false, 1);
+ // start all bookie servers
+ bktb.startAllBookieServers();
+ // following publish should succeed
+ publishBatch(batchSize, true, false, 1);
+ }
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java?rev=1295571&r1=1295570&r2=1295571&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java Thu Mar 1 14:10:17 2012
@@ -17,6 +17,7 @@
*/
package org.apache.hedwig.server.persistence;
+import java.net.InetAddress;
import java.io.File;
import java.util.LinkedList;
import java.util.List;
@@ -49,6 +50,7 @@ public class BookKeeperTestBase extends
// BookKeeper Server variables
private List<BookieServer> bookiesList;
+ private List<ServerConfiguration> bkConfsList;
private int initialPort = 5000;
private int nextPort = initialPort;
@@ -101,6 +103,7 @@ public class BookKeeperTestBase extends
// Create Bookie Servers
bookiesList = new LinkedList<BookieServer>();
+ bkConfsList = new LinkedList<ServerConfiguration>();
for (int i = 0; i < numBookies; i++) {
startUpNewBookieServer();
@@ -129,6 +132,27 @@ public class BookKeeperTestBase extends
bk.close();
super.tearDown();
}
+
+ public void stopAllBookieServers() throws Exception {
+ try {
+ for (BookieServer bs : bookiesList) {
+ bs.shutdown();
+ }
+ bookiesList.clear();
+ } catch (InterruptedException e) {
+ LOG.error("Error stopping all bookie servers", e);
+ }
+ }
+
+ public void startAllBookieServers() throws Exception {
+ try {
+ for (ServerConfiguration conf : bkConfsList) {
+ bookiesList.add(startBookie(conf));
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Error starting all bookie servers", e);
+ }
+ }
public void tearDownOneBookieServer() throws Exception {
Random r = new Random();
@@ -140,6 +164,7 @@ public class BookKeeperTestBase extends
LOG.error("Error tearing down", e);
}
bookiesList.remove(bi);
+ bkConfsList.remove(bi);
}
public void startUpNewBookieServer() throws Exception {
@@ -147,9 +172,27 @@ public class BookKeeperTestBase extends
PREFIX + (nextPort - initialPort), SUFFIX);
ServerConfiguration conf = newServerConfiguration(
nextPort++, hostPort, tmpDir, new File[] { tmpDir });
- BookieServer bs = new BookieServer(conf);
- bs.start();
- bookiesList.add(bs);
+ bookiesList.add(startBookie(conf));
+ bkConfsList.add(conf);
+ }
+
+ /**
+ * Helper method to startup a bookie server using a configuration object
+ *
+ * @param conf
+ * Server Configuration Object
+ *
+ */
+ private BookieServer startBookie(ServerConfiguration conf) throws Exception {
+ BookieServer server = new BookieServer(conf);
+ server.start();
+
+ int port = conf.getBookiePort();
+ while(zk.exists("/ledgers/available/" + InetAddress.getLocalHost().getHostAddress() + ":" + port, false) == null) {
+ Thread.sleep(500);
+ }
+
+ return server;
}
protected ServerConfiguration newServerConfiguration(int port, String zkServers, File journalDir, File[] ledgerDirs) {