You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2011/11/18 11:38:46 UTC

svn commit: r1203576 - in /zookeeper/bookkeeper/trunk: ./ hedwig-server/src/main/java/org/apache/hedwig/server/topics/ hedwig-server/src/test/java/org/apache/hedwig/server/persistence/ hedwig-server/src/test/java/org/apache/hedwig/server/topics/

Author: ivank
Date: Fri Nov 18 10:38:46 2011
New Revision: 1203576

URL: http://svn.apache.org/viewvc?rev=1203576&view=rev
Log:
BOOKKEEPER-69: ServerRedirectLoopException when a machine (hosts bookie server & hub server) reboot, which is caused by race condition of topic manager (Sijie, ivank via ivank)

Added:
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestConcurrentTopicAcquisition.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1203576&r1=1203575&r2=1203576&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Nov 18 10:38:46 2011
@@ -86,6 +86,8 @@ BUGFIXES:
 
   BOOKKEEPER-100: Some hedwig tests have build errors (dferro via ivank)
 
+  BOOKKEEPER-69: ServerRedirectLoopException when a machine (hosts bookie server & hub server) reboot, which is caused by race condition of topic manager (Sijie, ivank via ivank)
+
  hedwig-client/
  
   BOOKKEEPER-52: Message sequence confuse due to the subscribeMsgQueue@SubscribeResponseHandler (xulei via ivank)

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java?rev=1203576&r1=1203575&r2=1203576&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java Fri Nov 18 10:38:46 2011
@@ -136,10 +136,19 @@ public abstract class AbstractTopicManag
             }
 
             @Override
-            public void operationFailed(Object ctx, PubSubException exception) {
+            public void operationFailed(final Object ctx, final PubSubException exception) {
                 // TODO: optimization: we can release this as soon as we experience the first error.
-                realReleaseTopic(topic, CallbackUtils.curry(originalCallback, addr), originalContext);
-                originalCallback.operationFailed(ctx, exception);
+                Callback<Void> cb = new Callback<Void>() {
+                    public void operationFinished(Object _ctx, Void _resultOfOperation) {
+                        originalCallback.operationFailed(ctx, exception);
+                    }
+                    public void operationFailed(Object _ctx, PubSubException _exception) {
+                        logger.error("Exception releasing topic", _exception);
+                        originalCallback.operationFailed(ctx, exception);
+                    }
+                };
+                
+                realReleaseTopic(topic, cb, originalContext);
             }
         };
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java?rev=1203576&r1=1203575&r2=1203576&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/persistence/BookKeeperTestBase.java Fri Nov 18 10:38:46 2011
@@ -20,6 +20,7 @@ package org.apache.hedwig.server.persist
 import java.io.File;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Random;
 
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -48,8 +49,8 @@ public class BookKeeperTestBase extends 
 
     // BookKeeper Server variables
     private List<BookieServer> bookiesList;
-    private List<ServerConfiguration> bookieConfsList;
     private int initialPort = 5000;
+    private int nextPort = initialPort;
 
     // String constants used for creating the bookie server files.
     private static final String PREFIX = "bookie";
@@ -100,16 +101,9 @@ public class BookKeeperTestBase extends 
 
         // Create Bookie Servers
         bookiesList = new LinkedList<BookieServer>();
-        bookieConfsList = new LinkedList<ServerConfiguration>();
 
         for (int i = 0; i < numBookies; i++) {
-            File tmpDir = FileUtils.createTempDirectory(PREFIX + i, SUFFIX);
-            ServerConfiguration conf = newServerConfiguration(
-                initialPort + i, hostPort, tmpDir, new File[] { tmpDir });
-            bookieConfsList.add(conf);
-            BookieServer bs = new BookieServer(conf);
-            bs.start();
-            bookiesList.add(bs);
+            startUpNewBookieServer();
         }
 
         // Create the BookKeeper client
@@ -135,6 +129,28 @@ public class BookKeeperTestBase extends 
         bk.close();
         super.tearDown();
     }
+    
+    public void tearDownOneBookieServer() throws Exception {
+        Random r = new Random();
+        int bi = r.nextInt(bookiesList.size());
+        BookieServer bs = bookiesList.get(bi);
+        try {
+            bs.shutdown();
+        } catch (InterruptedException e) {
+            LOG.error("Error tearing down", e);
+        }
+        bookiesList.remove(bi);
+    }
+    
+    public void startUpNewBookieServer() throws Exception {
+        File tmpDir = FileUtils.createTempDirectory(
+                PREFIX + (nextPort - initialPort), SUFFIX);
+        ServerConfiguration conf = newServerConfiguration(
+                nextPort++, hostPort, tmpDir, new File[] { tmpDir });
+        BookieServer bs = new BookieServer(conf);
+        bs.start();
+        bookiesList.add(bs);
+    }
 
     protected ServerConfiguration newServerConfiguration(int port, String zkServers, File journalDir, File[] ledgerDirs) {
         ServerConfiguration conf = new ServerConfiguration(baseConf);

Added: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestConcurrentTopicAcquisition.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestConcurrentTopicAcquisition.java?rev=1203576&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestConcurrentTopicAcquisition.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestConcurrentTopicAcquisition.java Fri Nov 18 10:38:46 2011
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.topics;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.HedwigClient;
+import org.apache.hedwig.client.api.Publisher;
+import org.apache.hedwig.client.api.Subscriber;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.server.HedwigHubTestBase;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.ConcurrencyUtils;
+import org.junit.Test;
+
+import com.google.protobuf.ByteString;
+
+public class TestConcurrentTopicAcquisition extends HedwigHubTestBase {
+
+    // Client variables
+    protected HedwigClient client;
+    protected Publisher publisher;
+    protected Subscriber subscriber;
+    
+    final LinkedBlockingQueue<ByteString> subscribers =
+            new LinkedBlockingQueue<ByteString>();
+    final ByteString topic = ByteString.copyFromUtf8("concurrent-topic");
+    final int numSubscribers = 300;
+    final AtomicInteger numDone = new AtomicInteger(0);
+
+    // SynchronousQueues to verify async calls
+    private final SynchronousQueue<Boolean> queue = new SynchronousQueue<Boolean>();
+    
+    class SubCallback implements Callback<Void> {
+        
+        ByteString subId;
+        
+        public SubCallback(ByteString subId) {
+            this.subId = subId;
+        }
+
+        @Override
+        public void operationFinished(Object ctx,
+                Void resultOfOperation) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("subscriber " + subId.toStringUtf8() + " succeed.");
+            }
+            int done = numDone.incrementAndGet();
+            if (done == numSubscribers) {
+                ConcurrencyUtils.put(queue, false);
+            }
+        }
+
+        @Override
+        public void operationFailed(Object ctx,
+                PubSubException exception) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("subscriber " + subId.toStringUtf8() + " failed : ", exception);
+            }
+            ConcurrencyUtils.put(subscribers, subId);
+            // ConcurrencyUtils.put(queue, false);
+        }
+    }
+    
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        client = new HedwigClient(new ClientConfiguration());
+
+        publisher = client.getPublisher();
+        subscriber = client.getSubscriber();
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        // sub.interrupt();
+        // sub.join();
+        
+        client.close();
+        super.tearDown();
+    }
+    
+    @Test
+    public void testTopicAcquistion() throws Exception {
+        logger.info("Start concurrent topic acquistion test.");
+        
+        // let one bookie down to cause not enough bookie exception
+        logger.info("Tear down one bookie server.");
+        bktb.tearDownOneBookieServer();
+        
+        // In current implementation, the first several subscriptions will succeed to put topic in topic manager set,
+        // because the tear down bookie server's zk node need time to disappear
+        // some subscriptions will create ledger successfully, then other subscriptions will fail.
+        // the race condition will be: topic manager own topic but persistence manager doesn't
+        
+        // 300 subscribers subscribe to a same topic
+        final AtomicBoolean inRedirectLoop = new AtomicBoolean(false);
+        numDone.set(0);
+        for (int i=0; i<numSubscribers; i++) {
+            ByteString subId = ByteString.copyFromUtf8("sub-" + i);
+            if (logger.isDebugEnabled()) {
+                logger.debug("subscriber " + subId.toStringUtf8() + " subscribes topic " + topic.toStringUtf8());
+            }
+            subscriber.asyncSubscribe(topic, subId, CreateOrAttach.CREATE_OR_ATTACH,
+                new Callback<Void>() {
+                
+                    private void tick() {
+                        if (numDone.incrementAndGet() == numSubscribers) {
+                            ConcurrencyUtils.put(queue, true);
+                        }
+                    }
+
+                    @Override
+                    public void operationFinished(Object ctx,
+                            Void resultOfOperation) {
+                        tick();
+                    }
+
+                    @Override
+                    public void operationFailed(Object ctx,
+                            PubSubException exception) {
+                        if (exception instanceof PubSubException.ServiceDownException) {
+                            String msg = exception.getMessage();
+                            if (msg.indexOf("ServerRedirectLoopException") > 0) {
+                                inRedirectLoop.set(true);
+                            }
+                            if (logger.isDebugEnabled()) {
+                                logger.debug("Operation failed : ", exception);
+                            }
+                        }
+                        tick(); 
+                    }
+                
+                },
+            null);
+        }
+        
+        queue.take();
+        
+        // TODO: remove comment after we fix the issue
+        // Assert.assertEquals(false, inRedirectLoop.get());
+        
+        // start a thread to send subscriptions
+        numDone.set(0);
+        Thread sub = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                logger.info("sub thread started");
+                try {
+                    // 100 subscribers subscribe to a same topic
+                    for (int i=0; i<numSubscribers; i++) {
+                        ByteString subscriberId = ByteString.copyFromUtf8("sub-" + i);
+                        subscribers.put(subscriberId);
+                    }
+                    
+                    ByteString subId;
+                    while (true) {
+                        subId = subscribers.take();
+                        
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("subscriber " + subId.toStringUtf8() + " subscribes topic " + topic.toStringUtf8());
+                        }
+                        subscriber.asyncSubscribe(topic, subId, CreateOrAttach.CREATE_OR_ATTACH,
+                            new SubCallback(subId), null);
+                    }
+                    // subscriber.asyncSubscribe(topic, subscriberId, mode, callback, context)
+                } catch (InterruptedException ie) {
+                    // break
+                    logger.warn("Interrupted : ", ie);
+                }
+            }
+
+        });
+        sub.start();
+        Thread.sleep(2000);
+        
+        // start a new bookie server
+        logger.info("start new bookie server");
+        bktb.startUpNewBookieServer();
+        
+        // hope that all the subscriptions will be OK
+        queue.take();
+    }
+
+}