You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2011/11/18 11:38:46 UTC
svn commit: r1203576 - in /zookeeper/bookkeeper/trunk: ./
hedwig-server/src/main/java/org/apache/hedwig/server/topics/
hedwig-server/src/test/java/org/apache/hedwig/server/persistence/
hedwig-server/src/test/java/org/apache/hedwig/server/topics/
Author: ivank
Date: Fri Nov 18 10:38:46 2011
New Revision: 1203576
URL: http://svn.apache.org/viewvc?rev=1203576&view=rev
Log:
BOOKKEEPER-69: ServerRedirectLoopException when a machine (hosts bookie server & hub server) reboot, which is caused by race condition of topic manager (Sijie, ivank via ivank)
Added:
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestConcurrentTopicAcquisition.java
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1203576&r1=1203575&r2=1203576&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Nov 18 10:38:46 2011
@@ -86,6 +86,8 @@ BUGFIXES:
BOOKKEEPER-100: Some hedwig tests have build errors (dferro via ivank)
+ BOOKKEEPER-69: ServerRedirectLoopException when a machine (hosts bookie server & hub server) reboot, which is caused by race condition of topic manager (Sijie, ivank via ivank)
+
hedwig-client/
BOOKKEEPER-52: Message sequence confuse due to the subscribeMsgQueue@SubscribeResponseHandler (xulei via ivank)
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java?rev=1203576&r1=1203575&r2=1203576&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java Fri Nov 18 10:38:46 2011
@@ -136,10 +136,19 @@ public abstract class AbstractTopicManag
}
@Override
- public void operationFailed(Object ctx, PubSubException exception) {
+ public void operationFailed(final Object ctx, final PubSubException exception) {
// TODO: optimization: we can release this as soon as we experience the first error.
- realReleaseTopic(topic, CallbackUtils.curry(originalCallback, addr), originalContext);
- originalCallback.operationFailed(ctx, exception);
+ Callback<Void> cb = new Callback<Void>() {
+ public void operationFinished(Object _ctx, Void _resultOfOperation) {
+ originalCallback.operationFailed(ctx, exception);
+ }
+ public void operationFailed(Object _ctx, PubSubException _exception) {
+ logger.error("Exception releasing topic", _exception);
+ originalCallback.operationFailed(ctx, exception);
+ }
+ };
+
+ realReleaseTopic(topic, cb, originalContext);
}
};
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java?rev=1203576&r1=1203575&r2=1203576&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java Fri Nov 18 10:38:46 2011
@@ -20,6 +20,7 @@ package org.apache.hedwig.server.persist
import java.io.File;
import java.util.LinkedList;
import java.util.List;
+import java.util.Random;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -48,8 +49,8 @@ public class BookKeeperTestBase extends
// BookKeeper Server variables
private List<BookieServer> bookiesList;
- private List<ServerConfiguration> bookieConfsList;
private int initialPort = 5000;
+ private int nextPort = initialPort;
// String constants used for creating the bookie server files.
private static final String PREFIX = "bookie";
@@ -100,16 +101,9 @@ public class BookKeeperTestBase extends
// Create Bookie Servers
bookiesList = new LinkedList<BookieServer>();
- bookieConfsList = new LinkedList<ServerConfiguration>();
for (int i = 0; i < numBookies; i++) {
- File tmpDir = FileUtils.createTempDirectory(PREFIX + i, SUFFIX);
- ServerConfiguration conf = newServerConfiguration(
- initialPort + i, hostPort, tmpDir, new File[] { tmpDir });
- bookieConfsList.add(conf);
- BookieServer bs = new BookieServer(conf);
- bs.start();
- bookiesList.add(bs);
+ startUpNewBookieServer();
}
// Create the BookKeeper client
@@ -135,6 +129,28 @@ public class BookKeeperTestBase extends
bk.close();
super.tearDown();
}
+
+ public void tearDownOneBookieServer() throws Exception {
+ Random r = new Random();
+ int bi = r.nextInt(bookiesList.size());
+ BookieServer bs = bookiesList.get(bi);
+ try {
+ bs.shutdown();
+ } catch (InterruptedException e) {
+ LOG.error("Error tearing down", e);
+ }
+ bookiesList.remove(bi);
+ }
+
+ public void startUpNewBookieServer() throws Exception {
+ File tmpDir = FileUtils.createTempDirectory(
+ PREFIX + (nextPort - initialPort), SUFFIX);
+ ServerConfiguration conf = newServerConfiguration(
+ nextPort++, hostPort, tmpDir, new File[] { tmpDir });
+ BookieServer bs = new BookieServer(conf);
+ bs.start();
+ bookiesList.add(bs);
+ }
protected ServerConfiguration newServerConfiguration(int port, String zkServers, File journalDir, File[] ledgerDirs) {
ServerConfiguration conf = new ServerConfiguration(baseConf);
Added: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestConcurrentTopicAcquisition.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestConcurrentTopicAcquisition.java?rev=1203576&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestConcurrentTopicAcquisition.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestConcurrentTopicAcquisition.java Fri Nov 18 10:38:46 2011
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.topics;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.HedwigClient;
+import org.apache.hedwig.client.api.Publisher;
+import org.apache.hedwig.client.api.Subscriber;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.server.HedwigHubTestBase;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.ConcurrencyUtils;
+import org.junit.Test;
+
+import com.google.protobuf.ByteString;
+
+public class TestConcurrentTopicAcquisition extends HedwigHubTestBase {
+
+ // Client variables
+ protected HedwigClient client;
+ protected Publisher publisher;
+ protected Subscriber subscriber;
+
+ final LinkedBlockingQueue<ByteString> subscribers =
+ new LinkedBlockingQueue<ByteString>();
+ final ByteString topic = ByteString.copyFromUtf8("concurrent-topic");
+ final int numSubscribers = 300;
+ final AtomicInteger numDone = new AtomicInteger(0);
+
+ // SynchronousQueues to verify async calls
+ private final SynchronousQueue<Boolean> queue = new SynchronousQueue<Boolean>();
+
+ class SubCallback implements Callback<Void> {
+
+ ByteString subId;
+
+ public SubCallback(ByteString subId) {
+ this.subId = subId;
+ }
+
+ @Override
+ public void operationFinished(Object ctx,
+ Void resultOfOperation) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("subscriber " + subId.toStringUtf8() + " succeed.");
+ }
+ int done = numDone.incrementAndGet();
+ if (done == numSubscribers) {
+ ConcurrencyUtils.put(queue, false);
+ }
+ }
+
+ @Override
+ public void operationFailed(Object ctx,
+ PubSubException exception) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("subscriber " + subId.toStringUtf8() + " failed : ", exception);
+ }
+ ConcurrencyUtils.put(subscribers, subId);
+ // ConcurrencyUtils.put(queue, false);
+ }
+ }
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ client = new HedwigClient(new ClientConfiguration());
+
+ publisher = client.getPublisher();
+ subscriber = client.getSubscriber();
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ // sub.interrupt();
+ // sub.join();
+
+ client.close();
+ super.tearDown();
+ }
+
+ @Test
+ public void testTopicAcquistion() throws Exception {
+ logger.info("Start concurrent topic acquistion test.");
+
+ // let one bookie down to cause not enough bookie exception
+ logger.info("Tear down one bookie server.");
+ bktb.tearDownOneBookieServer();
+
+ // In current implementation, the first several subscriptions will succeed to put topic in topic manager set,
+ // because the tear down bookie server's zk node need time to disappear
+ // some subscriptions will create ledger successfully, then other subscriptions will fail.
+ // the race condition will be: topic manager own topic but persistence manager doesn't
+
+ // 300 subscribers subscribe to a same topic
+ final AtomicBoolean inRedirectLoop = new AtomicBoolean(false);
+ numDone.set(0);
+ for (int i=0; i<numSubscribers; i++) {
+ ByteString subId = ByteString.copyFromUtf8("sub-" + i);
+ if (logger.isDebugEnabled()) {
+ logger.debug("subscriber " + subId.toStringUtf8() + " subscribes topic " + topic.toStringUtf8());
+ }
+ subscriber.asyncSubscribe(topic, subId, CreateOrAttach.CREATE_OR_ATTACH,
+ new Callback<Void>() {
+
+ private void tick() {
+ if (numDone.incrementAndGet() == numSubscribers) {
+ ConcurrencyUtils.put(queue, true);
+ }
+ }
+
+ @Override
+ public void operationFinished(Object ctx,
+ Void resultOfOperation) {
+ tick();
+ }
+
+ @Override
+ public void operationFailed(Object ctx,
+ PubSubException exception) {
+ if (exception instanceof PubSubException.ServiceDownException) {
+ String msg = exception.getMessage();
+ if (msg.indexOf("ServerRedirectLoopException") > 0) {
+ inRedirectLoop.set(true);
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Operation failed : ", exception);
+ }
+ }
+ tick();
+ }
+
+ },
+ null);
+ }
+
+ queue.take();
+
+ // TODO: remove comment after we fix the issue
+ // Assert.assertEquals(false, inRedirectLoop.get());
+
+ // start a thread to send subscriptions
+ numDone.set(0);
+ Thread sub = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ logger.info("sub thread started");
+ try {
+ // 100 subscribers subscribe to a same topic
+ for (int i=0; i<numSubscribers; i++) {
+ ByteString subscriberId = ByteString.copyFromUtf8("sub-" + i);
+ subscribers.put(subscriberId);
+ }
+
+ ByteString subId;
+ while (true) {
+ subId = subscribers.take();
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("subscriber " + subId.toStringUtf8() + " subscribes topic " + topic.toStringUtf8());
+ }
+ subscriber.asyncSubscribe(topic, subId, CreateOrAttach.CREATE_OR_ATTACH,
+ new SubCallback(subId), null);
+ }
+ // subscriber.asyncSubscribe(topic, subscriberId, mode, callback, context)
+ } catch (InterruptedException ie) {
+ // break
+ logger.warn("Interrupted : ", ie);
+ }
+ }
+
+ });
+ sub.start();
+ Thread.sleep(2000);
+
+ // start a new bookie server
+ logger.info("start new bookie server");
+ bktb.startUpNewBookieServer();
+
+ // hope that all the subscriptions will be OK
+ queue.take();
+ }
+
+}