You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/02/01 12:06:33 UTC

svn commit: r1239068 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ bookkeeper-...

Author: ivank
Date: Wed Feb  1 11:06:32 2012
New Revision: 1239068

URL: http://svn.apache.org/viewvc?rev=1239068&view=rev
Log:
BOOKKEEPER-23: Timeout requests (ivank)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadTimeout.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/NIOServerFactory.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BaseTestCase.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1239068&r1=1239067&r2=1239068&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Wed Feb  1 11:06:32 2012
@@ -26,6 +26,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-153: Ledger can't be opened or closed due to zero-length metadata (Sijie Guo via ivank)
 
+        BOOKKEEPER-23: Timeout requests (ivank)
+
       hedwig-server/
       
         BOOKKEEPER-140: Hub server doesn't subscribe remote region correctly when a region is down. (Sijie Gou via ivank)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1239068&r1=1239067&r2=1239068&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Wed Feb  1 11:06:32 2012
@@ -245,6 +245,7 @@ public class Bookie extends Thread {
 
     public Bookie(ServerConfiguration conf) 
             throws IOException, KeeperException, InterruptedException {
+        super("Bookie-" + conf.getBookiePort());
         this.conf = conf;
         this.journalDirectory = conf.getJournalDir();
         this.ledgerDirectories = conf.getLedgerDirs();

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java?rev=1239068&r1=1239067&r2=1239068&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java Wed Feb  1 11:06:32 2012
@@ -38,6 +38,7 @@ public class ClientConfiguration extends
 
     // NIO Parameters
     protected final static String CLIENT_TCP_NODELAY = "clientTcpNoDelay";
+    protected final static String READ_TIMEOUT = "readTimeout";
 
     /**
      * Construct a default client-side configuration
@@ -213,4 +214,25 @@ public class ClientConfiguration extends
         return this;
     }
 
+    /**
+     * Get the socket read timeout. This is the number of
+     * seconds we wait without hearing a response from a bookie
+     * before we consider it failed.
+     *
+     * @return the current read timeout in seconds
+     */
+    public int getReadTimeout() {
+        return getInt(READ_TIMEOUT, 5);
+    }
+
+    /**
+     * Set the socket read timeout.
+     * @see #getReadTimeout()
+     * @param timeout The new read timeout in seconds
+     * @return client configuration
+     */
+    public ClientConfiguration setReadTimeout(int timeout) {
+        setProperty(READ_TIMEOUT, Integer.toString(timeout));
+        return this;
+    }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/NIOServerFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/NIOServerFactory.java?rev=1239068&r1=1239067&r2=1239068&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/NIOServerFactory.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/NIOServerFactory.java Wed Feb  1 11:06:32 2012
@@ -75,7 +75,7 @@ public class NIOServerFactory extends Th
     ServerConfiguration conf;
 
     public NIOServerFactory(ServerConfiguration conf, PacketProcessor processor) throws IOException {
-        super("NIOServerFactory");
+        super("NIOServerFactory-" + conf.getBookiePort());
         setDaemon(true);
         this.processor = processor;
         this.conf = conf;

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java?rev=1239068&r1=1239067&r2=1239068&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java Wed Feb  1 11:06:32 2012
@@ -36,6 +36,8 @@ import org.apache.bookkeeper.util.Ordere
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.jboss.netty.util.Timer;
+import org.jboss.netty.util.HashedWheelTimer;
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
@@ -56,6 +58,8 @@ import org.jboss.netty.channel.socket.Cl
 import org.jboss.netty.handler.codec.frame.CorruptedFrameException;
 import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
 import org.jboss.netty.handler.codec.frame.TooLongFrameException;
+import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
+import org.jboss.netty.handler.timeout.ReadTimeoutException;
 
 /**
  * This class manages all details of connection to a particular bookie. It also
@@ -76,6 +80,7 @@ public class PerChannelBookieClient exte
     AtomicLong totalBytesOutstanding;
     ClientSocketChannelFactory channelFactory;
     OrderedSafeExecutor executor;
+    private Timer readTimeoutTimer;
 
     ConcurrentHashMap<CompletionKey, AddCompletion> addCompletions = new ConcurrentHashMap<CompletionKey, AddCompletion>();
     ConcurrentHashMap<CompletionKey, ReadCompletion> readCompletions = new ConcurrentHashMap<CompletionKey, ReadCompletion>();
@@ -107,6 +112,7 @@ public class PerChannelBookieClient exte
         this.totalBytesOutstanding = totalBytesOutstanding;
         this.channelFactory = channelFactory;
         this.state = ConnectionState.DISCONNECTED;
+        this.readTimeoutTimer = new HashedWheelTimer();
     }
 
     synchronized private void connect() {
@@ -375,6 +381,9 @@ public class PerChannelBookieClient exte
     @Override
     public ChannelPipeline getPipeline() throws Exception {
         ChannelPipeline pipeline = Channels.pipeline();
+
+        pipeline.addLast("readTimeout", new ReadTimeoutHandler(readTimeoutTimer, 
+                                                               conf.getReadTimeout()));
         pipeline.addLast("lengthbasedframedecoder", new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, 4, 0, 4));
         pipeline.addLast("mainhandler", this);
         return pipeline;
@@ -388,6 +397,7 @@ public class PerChannelBookieClient exte
         LOG.info("Disconnected from bookie: " + addr);
         errorOutOutstandingEntries();
         channel.close();
+        readTimeoutTimer.stop();
 
         state = ConnectionState.DISCONNECTED;
 
@@ -407,6 +417,11 @@ public class PerChannelBookieClient exte
                       + e.getChannel().getRemoteAddress());
             return;
         }
+        if (t instanceof ReadTimeoutException) {
+            ctx.getChannel().disconnect();
+            return;
+        }
+ 
         if (t instanceof IOException) {
             // these are thrown when a bookie fails, logging them just pollutes
             // the logs (the failure is logged from the listeners on the write

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadTimeout.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadTimeout.java?rev=1239068&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadTimeout.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadTimeout.java Wed Feb  1 11:06:32 2012
@@ -0,0 +1,101 @@
+package org.apache.bookkeeper.client;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import org.junit.*;
+import java.net.InetSocketAddress;
+import java.util.Enumeration;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.BookKeeperAdmin;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.test.BaseTestCase;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This unit test tests ledger fencing;
+ *
+ */
+public class TestReadTimeout extends BaseTestCase {
+    static Logger LOG = LoggerFactory.getLogger(TestReadTimeout.class);
+
+    DigestType digestType;
+
+    public TestReadTimeout(DigestType digestType) {
+        super(10);
+        this.digestType = digestType;
+    }
+
+    @Test
+    public void testReadTimeout() throws Exception {
+        final AtomicBoolean completed = new AtomicBoolean(false);
+
+        LedgerHandle writelh = bkc.createLedger(3,3,digestType, "testPasswd".getBytes());
+        String tmp = "Foobar";
+        
+        final int numEntries = 10;
+        for (int i = 0; i < numEntries; i++) {
+            writelh.addEntry(tmp.getBytes());
+        }
+        
+        Set<InetSocketAddress> beforeSet = new HashSet<InetSocketAddress>();
+        for (InetSocketAddress addr : writelh.getLedgerMetadata().getEnsemble(numEntries)) {
+            beforeSet.add(addr);
+        }
+
+        final InetSocketAddress bookieToSleep 
+            = writelh.getLedgerMetadata().getEnsemble(numEntries).get(0);
+        int sleeptime = baseClientConf.getReadTimeout()*3;
+        CountDownLatch latch = new CountDownLatch(1);
+        sleepBookie(bookieToSleep, sleeptime, latch);
+        latch.await();
+
+        writelh.asyncAddEntry(tmp.getBytes(), 
+                new AddCallback() {
+                    public void addComplete(int rc, LedgerHandle lh, 
+                                            long entryId, Object ctx) {
+                        completed.set(true);
+                    }
+                }, null);
+        Thread.sleep((baseClientConf.getReadTimeout()*2)*1000);
+        Assert.assertTrue("Write request did not finish", completed.get());
+
+        Set<InetSocketAddress> afterSet = new HashSet<InetSocketAddress>();
+        for (InetSocketAddress addr : writelh.getLedgerMetadata().getEnsemble(numEntries+1)) {
+            afterSet.add(addr);
+        }
+        beforeSet.removeAll(afterSet);
+        Assert.assertTrue("Bookie set should not match", beforeSet.size() != 0);
+    }
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BaseTestCase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BaseTestCase.java?rev=1239068&r1=1239067&r2=1239068&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BaseTestCase.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BaseTestCase.java Wed Feb  1 11:06:32 2012
@@ -29,6 +29,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 
 import org.apache.bookkeeper.client.BookKeeperTestClient;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
@@ -166,6 +167,33 @@ public abstract class BaseTestCase exten
         }
     }
 
+    public void sleepBookie(InetSocketAddress addr, final int seconds,
+                            final CountDownLatch l)
+            throws InterruptedException, IOException {
+        final String name = "Bookie-" + addr.getPort();
+        Thread[] allthreads = new Thread[Thread.activeCount()];
+        Thread.enumerate(allthreads);
+        for (final Thread t : allthreads) {
+            if (t.getName().equals(name)) {
+                Thread sleeper = new Thread() {
+                    public void run() {
+                        try {
+                            t.suspend();
+                            l.countDown();
+                            Thread.sleep(seconds*1000);
+                            t.resume();
+                        } catch (Exception e) {
+                            LOG.error("Error suspending thread", e);
+                        }
+                    }
+                };
+                sleeper.start();
+                return;
+            }
+        }
+        throw new IOException("Bookie thread not found");
+    }
+
     /**
      * Restart bookie servers
      *