You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/12/13 13:07:55 UTC

svn commit: r1421242 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ bookkeeper...

Author: ivank
Date: Thu Dec 13 12:07:52 2012
New Revision: 1421242

URL: http://svn.apache.org/viewvc?rev=1421242&view=rev
Log:
BOOKKEEPER-336: bookie readEntries is taking more time if the ensemble has failed bookie(s) (ivank)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1421242&r1=1421241&r2=1421242&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu Dec 13 12:07:52 2012
@@ -138,6 +138,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-365: Ledger will never recover if one of the quorum bookie is down forever and others dont have entry (sijie via ivank)
 
+        BOOKKEEPER-336: bookie readEntries is taking more time if the ensemble has failed bookie(s) (ivank)
+
       hedwig-protocol:
 
         BOOKKEEPER-394: CompositeException message is not useful (Stu Hood via sijie)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java?rev=1421242&r1=1421241&r2=1421242&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java Thu Dec 13 12:07:52 2012
@@ -23,6 +23,7 @@ package org.apache.bookkeeper.client;
 import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
 import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
@@ -75,6 +76,7 @@ public class BookKeeper {
     final BookieWatcher bookieWatcher;
 
     final OrderedSafeExecutor mainWorkerPool;
+    final ScheduledExecutorService scheduler;
 
     // Ledger manager responsible for how to store ledger meta data
     final LedgerManagerFactory ledgerManagerFactory;
@@ -125,9 +127,11 @@ public class BookKeeper {
 
         this.channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
                                                                 Executors.newCachedThreadPool());
+        this.scheduler = Executors.newSingleThreadScheduledExecutor();
+
         mainWorkerPool = new OrderedSafeExecutor(conf.getNumWorkerThreads());
         bookieClient = new BookieClient(conf, channelFactory, mainWorkerPool);
-        bookieWatcher = new BookieWatcher(conf, this);
+        bookieWatcher = new BookieWatcher(conf, scheduler, this);
         bookieWatcher.readBookiesBlocking();
 
         ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, zk);
@@ -187,10 +191,11 @@ public class BookKeeper {
         this.conf = conf;
         this.zk = zk;
         this.channelFactory = channelFactory;
+        this.scheduler = Executors.newSingleThreadScheduledExecutor();
 
         mainWorkerPool = new OrderedSafeExecutor(conf.getNumWorkerThreads());
         bookieClient = new BookieClient(conf, channelFactory, mainWorkerPool);
-        bookieWatcher = new BookieWatcher(conf, this);
+        bookieWatcher = new BookieWatcher(conf, scheduler, this);
         bookieWatcher.readBookiesBlocking();
 
         ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, zk);
@@ -562,7 +567,7 @@ public class BookKeeper {
         } catch (IOException ie) {
             LOG.error("Failed to close ledger manager : ", ie);
         }
-        bookieWatcher.halt();
+        scheduler.shutdown();
         if (ownChannelFactory) {
             channelFactory.releaseExternalResources();
         }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java?rev=1421242&r1=1421241&r2=1421242&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java Thu Dec 13 12:07:52 2012
@@ -25,7 +25,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.Executors;
+
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -61,10 +61,10 @@ class BookieWatcher implements Watcher, 
     static final Set<InetSocketAddress> EMPTY_SET = new HashSet<InetSocketAddress>();
     public static int ZK_CONNECT_BACKOFF_SEC = 1;
 
-    BookKeeper bk;
-    ScheduledExecutorService scheduler;
+    final BookKeeper bk;
 
     HashSet<InetSocketAddress> knownBookies = new HashSet<InetSocketAddress>();
+    final ScheduledExecutorService scheduler;
 
     SafeRunnable reReadTask = new SafeRunnable() {
         @Override
@@ -74,18 +74,16 @@ class BookieWatcher implements Watcher, 
     };
     private ReadOnlyBookieWatcher readOnlyBookieWatcher;
 
-    public BookieWatcher(ClientConfiguration conf, BookKeeper bk) throws KeeperException, InterruptedException {
+    public BookieWatcher(ClientConfiguration conf,
+                         ScheduledExecutorService scheduler,
+                         BookKeeper bk) throws KeeperException, InterruptedException  {
         this.bk = bk;
         // ZK bookie registration path
         this.bookieRegistrationPath = conf.getZkAvailableBookiesPath();
-        this.scheduler = Executors.newSingleThreadScheduledExecutor();
+        this.scheduler = scheduler;
         readOnlyBookieWatcher = new ReadOnlyBookieWatcher(conf, bk);
     }
 
-    public void halt() {
-        scheduler.shutdown();
-    }
-
     public void readBookies() {
         readBookies(this);
     }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=1421242&r1=1421241&r2=1421242&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java Thu Dec 13 12:07:52 2012
@@ -382,7 +382,8 @@ public class LedgerHandle {
         }
 
         try {
-            new PendingReadOp(this, firstEntry, lastEntry, cb, ctx).initiate();
+            new PendingReadOp(this, bk.scheduler,
+                              firstEntry, lastEntry, cb, ctx).initiate();
         } catch (InterruptedException e) {
             cb.readComplete(BKException.Code.InterruptedException, this, null, ctx);
         }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java?rev=1421242&r1=1421241&r2=1421242&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java Thu Dec 13 12:07:52 2012
@@ -21,11 +21,19 @@ package org.apache.bookkeeper.client;
  *
  */
 import java.net.InetSocketAddress;
-import java.util.ArrayDeque;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ScheduledFuture;
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.NoSuchElementException;
 import java.util.Queue;
+import java.util.BitSet;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.List;
+
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
@@ -45,7 +53,11 @@ import org.jboss.netty.buffer.ChannelBuf
 class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
     Logger LOG = LoggerFactory.getLogger(PendingReadOp.class);
 
+    final int speculativeReadTimeout;
+    final private ScheduledExecutorService scheduler;
+    private ScheduledFuture<?> speculativeTask = null;
     Queue<LedgerEntryRequest> seq;
+    Set<InetSocketAddress> heardFromHosts;
     ReadCallback cb;
     Object ctx;
     LedgerHandle lh;
@@ -54,7 +66,8 @@ class PendingReadOp implements Enumerati
     long endEntryId;
     final int maxMissedReadsAllowed;
 
-    private class LedgerEntryRequest extends LedgerEntry {
+    class LedgerEntryRequest extends LedgerEntry {
+        final static int NOT_FOUND = -1;
         int nextReplicaIndexToReadFrom = 0;
         AtomicBoolean complete = new AtomicBoolean(false);
 
@@ -62,14 +75,77 @@ class PendingReadOp implements Enumerati
         int numMissedEntryReads = 0;
 
         final ArrayList<InetSocketAddress> ensemble;
+        final List<Integer> writeSet;
+        final BitSet sentReplicas;
+        final BitSet erroredReplicas;
 
         LedgerEntryRequest(ArrayList<InetSocketAddress> ensemble, long lId, long eId) {
             super(lId, eId);
 
             this.ensemble = ensemble;
+            this.writeSet = lh.distributionSchedule.getWriteSet(entryId);
+            this.sentReplicas = new BitSet(lh.getLedgerMetadata().getWriteQuorumSize());
+            this.erroredReplicas = new BitSet(lh.getLedgerMetadata().getWriteQuorumSize());
+        }
+
+        private int getReplicaIndex(InetSocketAddress host) {
+            int bookieIndex = ensemble.indexOf(host);
+            if (bookieIndex == -1) {
+                return NOT_FOUND;
+            }
+            return writeSet.indexOf(bookieIndex);
+        }
+
+        private BitSet getSentToBitSet() {
+            BitSet b = new BitSet(ensemble.size());
+
+            for (int i = 0; i < sentReplicas.length(); i++) {
+                if (sentReplicas.get(i)) {
+                    b.set(writeSet.get(i));
+                }
+            }
+            return b;
         }
 
-        void sendNextRead() {
+        private BitSet getHeardFromBitSet(Set<InetSocketAddress> heardFromHosts) {
+            BitSet b = new BitSet(ensemble.size());
+            for (InetSocketAddress i : heardFromHosts) {
+                int index = ensemble.indexOf(i);
+                if (index != -1) {
+                    b.set(index);
+                }
+            }
+            return b;
+        }
+
+        private boolean readsOutstanding() {
+            return (sentReplicas.cardinality() - erroredReplicas.cardinality()) > 0;
+        }
+
+        /**
+         * Send to next replica speculatively, if required and possible.
+         * This returns the host we may have sent to for unit testing.
+         * @return host we sent to if we sent. null otherwise.
+         */
+        synchronized InetSocketAddress maybeSendSpeculativeRead(Set<InetSocketAddress> heardFromHosts) {
+            if (nextReplicaIndexToReadFrom >= lh.getLedgerMetadata().getWriteQuorumSize()) {
+                return null;
+            }
+
+            BitSet sentTo = getSentToBitSet();
+            BitSet heardFrom = getHeardFromBitSet(heardFromHosts);
+            sentTo.and(heardFrom);
+
+            // only send another read, if we have had no response at all (even for other entries)
+            // from any of the other bookies we have sent the request to
+            if (sentTo.cardinality() == 0) {
+                return sendNextRead();
+            } else {
+                return null;
+            }
+        }
+
+        synchronized InetSocketAddress sendNextRead() {
             if (nextReplicaIndexToReadFrom >= lh.metadata.getWriteQuorumSize()) {
                 // we are done, the read has failed from all replicas, just fail the
                 // read
@@ -82,22 +158,27 @@ class PendingReadOp implements Enumerati
                 }
 
                 submitCallback(firstError);
-                return;
+                return null;
             }
 
+            int replica = nextReplicaIndexToReadFrom;
             int bookieIndex = lh.distributionSchedule.getWriteSet(entryId).get(nextReplicaIndexToReadFrom);
             nextReplicaIndexToReadFrom++;
 
             try {
-                sendReadTo(ensemble.get(bookieIndex), this);
+                InetSocketAddress to = ensemble.get(bookieIndex);
+                sendReadTo(to, this);
+                sentReplicas.set(replica);
+                return to;
             } catch (InterruptedException ie) {
                 LOG.error("Interrupted reading entry " + this, ie);
                 Thread.currentThread().interrupt();
                 submitCallback(BKException.Code.ReadException);
+                return null;
             }
         }
 
-        void logErrorAndReattemptRead(String errMsg, int rc) {
+        synchronized void logErrorAndReattemptRead(InetSocketAddress host, String errMsg, int rc) {
             if (BKException.Code.OK == firstError ||
                 BKException.Code.NoSuchEntryException == firstError) {
                 firstError = rc;
@@ -113,18 +194,27 @@ class PendingReadOp implements Enumerati
 
             int bookieIndex = lh.distributionSchedule.getWriteSet(entryId).get(nextReplicaIndexToReadFrom - 1);
             LOG.error(errMsg + " while reading entry: " + entryId + " ledgerId: " + lh.ledgerId + " from bookie: "
-                      + ensemble.get(bookieIndex));
+                      + host);
 
-            sendNextRead();
+            int replica = getReplicaIndex(host);
+            if (replica == NOT_FOUND) {
+                LOG.error("Received error from a host which is not in the ensemble {} {}.", host, ensemble);
+                return;
+            }
+            erroredReplicas.set(replica);
+
+            if (!readsOutstanding()) {
+                sendNextRead();
+            }
         }
 
         // return true if we managed to complete the entry
-        boolean complete(final ChannelBuffer buffer) {
+        boolean complete(InetSocketAddress host, final ChannelBuffer buffer) {
             ChannelBufferInputStream is;
             try {
                 is = lh.macManager.verifyDigestAndReturnData(entryId, buffer);
             } catch (BKDigestMatchException e) {
-                logErrorAndReattemptRead("Mac mismatch", BKException.Code.DigestMatchException);
+                logErrorAndReattemptRead(host, "Mac mismatch", BKException.Code.DigestMatchException);
                 return false;
             }
 
@@ -151,22 +241,38 @@ class PendingReadOp implements Enumerati
         }
     }
 
-    PendingReadOp(LedgerHandle lh, long startEntryId, long endEntryId, ReadCallback cb, Object ctx) {
-
-        seq = new ArrayDeque<LedgerEntryRequest>((int) (endEntryId - startEntryId));
+    PendingReadOp(LedgerHandle lh, ScheduledExecutorService scheduler,
+                  long startEntryId, long endEntryId, ReadCallback cb, Object ctx) {
+        seq = new ArrayBlockingQueue<LedgerEntryRequest>((int) ((endEntryId + 1) - startEntryId));
         this.cb = cb;
         this.ctx = ctx;
         this.lh = lh;
         this.startEntryId = startEntryId;
         this.endEntryId = endEntryId;
+        this.scheduler = scheduler;
         numPendingEntries = endEntryId - startEntryId + 1;
         maxMissedReadsAllowed = lh.metadata.getWriteQuorumSize() - lh.metadata.getAckQuorumSize();
+        speculativeReadTimeout = lh.bk.getConf().getSpeculativeReadTimeout();
+        heardFromHosts = new HashSet<InetSocketAddress>();
     }
 
     public void initiate() throws InterruptedException {
         long nextEnsembleChange = startEntryId, i = startEntryId;
 
         ArrayList<InetSocketAddress> ensemble = null;
+
+        if (speculativeReadTimeout > 0) {
+            speculativeTask = scheduler.scheduleWithFixedDelay(new Runnable() {
+                    public void run() {
+                        for (LedgerEntryRequest r : seq) {
+                            if (!r.isComplete()) {
+                                r.maybeSendSpeculativeRead(heardFromHosts);
+                            }
+                        }
+                    }
+                }, speculativeReadTimeout, speculativeReadTimeout, TimeUnit.MILLISECONDS);
+        }
+
         do {
             LOG.debug("Acquiring lock: {}", i);
 
@@ -182,25 +288,38 @@ class PendingReadOp implements Enumerati
         } while (i <= endEntryId);
     }
 
+    private static class ReadContext {
+        final InetSocketAddress to;
+        final LedgerEntryRequest entry;
+
+        ReadContext(InetSocketAddress to, LedgerEntryRequest entry) {
+            this.to = to;
+            this.entry = entry;
+        }
+    }
+
     void sendReadTo(InetSocketAddress to, LedgerEntryRequest entry) throws InterruptedException {
         lh.opCounterSem.acquire();
 
         lh.bk.bookieClient.readEntry(to, lh.ledgerId, entry.entryId, 
-                                     this, entry);
+                                     this, new ReadContext(to, entry));
     }
 
     @Override
     public void readEntryComplete(int rc, long ledgerId, final long entryId, final ChannelBuffer buffer, Object ctx) {
-        final LedgerEntryRequest entry = (LedgerEntryRequest) ctx;
+        final ReadContext rctx = (ReadContext)ctx;
+        final LedgerEntryRequest entry = rctx.entry;
 
         lh.opCounterSem.release();
 
         if (rc != BKException.Code.OK) {
-            entry.logErrorAndReattemptRead("Error: " + BKException.getMessage(rc), rc);
+            entry.logErrorAndReattemptRead(rctx.to, "Error: " + BKException.getMessage(rc), rc);
             return;
         }
 
-        if (entry.complete(buffer)) {
+        heardFromHosts.add(rctx.to);
+
+        if (entry.complete(rctx.to, buffer)) {
             numPendingEntries--;
         }
 
@@ -213,6 +332,9 @@ class PendingReadOp implements Enumerati
     }
 
     private void submitCallback(int code) {
+        if (speculativeTask != null) {
+            speculativeTask.cancel(true);
+        }
         cb.readComplete(code, lh, PendingReadOp.this, PendingReadOp.this.ctx);
     }
     public boolean hasMoreElements() {

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java?rev=1421242&r1=1421241&r2=1421242&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java Thu Dec 13 12:07:52 2012
@@ -43,6 +43,7 @@ public class ClientConfiguration extends
     // NIO Parameters
     protected final static String CLIENT_TCP_NODELAY = "clientTcpNoDelay";
     protected final static String READ_TIMEOUT = "readTimeout";
+    protected final static String SPECULATIVE_READ_TIMEOUT = "speculativeReadTimeout";
 
     // Number Woker Threads
     protected final static String NUM_WORKER_THREADS = "numWorkerThreads";
@@ -275,4 +276,39 @@ public class ClientConfiguration extends
         setProperty(NUM_WORKER_THREADS, numThreads);
         return this;
     }
+
+    /**
+     * Get the period of time after which a speculative entry read should be triggered.
+     * A speculative entry read is sent to the next replica bookie before
+     * an error or response has been received for the previous entry read request.
+     *
+     * A speculative entry read is only sent if we have not heard from the current
+     * replica bookie during the entire read operation which may comprise of many entries.
+     *
+     * Speculative reads allow the client to avoid having to wait for the connect timeout
+     * in the case that a bookie has failed. It induces higher load on the network and on
+     * bookies. This should be taken into account before changing this configuration value.
+     *
+     * @see org.apache.bookkeeper.client.LedgerHandle#asyncReadEntries
+     * @return the speculative read timeout in milliseconds. Default 2000.
+     */
+    public int getSpeculativeReadTimeout() {
+        return getInt(SPECULATIVE_READ_TIMEOUT, 2000);
+    }
+
+    /**
+     * Set the speculative read timeout. A lower timeout will reduce read latency in the
+     * case of a failed bookie, while increasing the load on bookies and the network.
+     *
+     * The default is 2000 milliseconds. A value of 0 will disable speculative reads
+     * completely.
+     *
+     * @see #getSpeculativeReadTimeout()
+     * @param timeout the timeout value, in milliseconds
+     * @return client configuration
+     */
+    public ClientConfiguration setSpeculativeReadTimeout(int timeout) {
+        setProperty(SPECULATIVE_READ_TIMEOUT, timeout);
+        return this;
+    }
 }

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java?rev=1421242&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java Thu Dec 13 12:07:52 2012
@@ -0,0 +1,333 @@
+package org.apache.bookkeeper.client;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import org.junit.*;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Enumeration;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CountDownLatch;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.test.BaseTestCase;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This unit test tests ledger fencing;
+ *
+ */
+public class TestSpeculativeRead extends BaseTestCase {
+    static Logger LOG = LoggerFactory.getLogger(TestSpeculativeRead.class);
+
+    DigestType digestType;
+    byte[] passwd = "specPW".getBytes();
+
+    public TestSpeculativeRead(DigestType digestType) {
+        super(10);
+        this.digestType = digestType;
+    }
+
+    long getLedgerToRead(int ensemble, int quorum) throws Exception {
+        byte[] data = "Data for test".getBytes();
+        LedgerHandle l = bkc.createLedger(ensemble, quorum, digestType, passwd);
+        for (int i = 0; i < 10; i++) {
+            l.addEntry(data);
+        }
+        l.close();
+
+        return l.getId();
+    }
+
+    BookKeeper createClient(int specTimeout) throws Exception {
+        ClientConfiguration conf = new ClientConfiguration()
+            .setSpeculativeReadTimeout(specTimeout)
+            .setReadTimeout(30000);
+        conf.setZkServers(zkUtil.getZooKeeperConnectString());
+        return new BookKeeper(conf);
+    }
+
+    class LatchCallback implements ReadCallback {
+        CountDownLatch l = new CountDownLatch(1);
+        boolean success = false;
+        long startMillis = System.currentTimeMillis();
+        long endMillis = Long.MAX_VALUE;
+
+        public void readComplete(int rc,
+                                 LedgerHandle lh,
+                                 Enumeration<LedgerEntry> seq,
+                                 Object ctx) {
+            endMillis = System.currentTimeMillis();
+            LOG.debug("Got response {} {}", rc, getDuration());
+            success = rc == BKException.Code.OK;
+            l.countDown();
+        }
+
+        long getDuration() {
+            return endMillis - startMillis;
+        }
+
+        void expectSuccess(int milliseconds) throws Exception {
+            assertTrue(l.await(milliseconds, TimeUnit.MILLISECONDS));
+            assertTrue(success);
+        }
+
+        void expectFail(int milliseconds) throws Exception {
+            assertTrue(l.await(milliseconds, TimeUnit.MILLISECONDS));
+            assertFalse(success);
+        }
+
+        void expectTimeout(int milliseconds) throws Exception {
+            assertFalse(l.await(milliseconds, TimeUnit.MILLISECONDS));
+        }
+    }
+
+    /**
+     * Test basic speculative functionality.
+     * - Create 2 clients with read timeout disabled, one with spec
+     *   read enabled, the other not.
+     * - create ledger
+     * - sleep second bookie in ensemble
+     * - read first entry, both should find on first bookie.
+     * - read second bookie, spec client should find on bookie three,
+     *   non spec client should hang.
+     */
+    @Test
+    public void testSpeculativeRead() throws Exception {
+        long id = getLedgerToRead(3,2);
+        BookKeeper bknospec = createClient(0); // disabled
+        BookKeeper bkspec = createClient(2000);
+
+        LedgerHandle lnospec = bknospec.openLedger(id, digestType, passwd);
+        LedgerHandle lspec = bkspec.openLedger(id, digestType, passwd);
+
+        // sleep second bookie
+        CountDownLatch sleepLatch = new CountDownLatch(1);
+        InetSocketAddress second = lnospec.getLedgerMetadata().getEnsembles().get(0L).get(1);
+        sleepBookie(second, sleepLatch);
+
+        try {
+            // read first entry, both go to first bookie, should be fine
+            LatchCallback nospeccb = new LatchCallback();
+            LatchCallback speccb = new LatchCallback();
+            lnospec.asyncReadEntries(0, 0, nospeccb, null);
+            lspec.asyncReadEntries(0, 0, speccb, null);
+            nospeccb.expectSuccess(2000);
+            speccb.expectSuccess(2000);
+
+            // read second entry, both look for second book, spec read client
+            // tries third bookie, nonspec client hangs as read timeout is very long.
+            nospeccb = new LatchCallback();
+            speccb = new LatchCallback();
+            lnospec.asyncReadEntries(1, 1, nospeccb, null);
+            lspec.asyncReadEntries(1, 1, speccb, null);
+            speccb.expectSuccess(4000);
+            nospeccb.expectTimeout(4000);
+        } finally {
+            sleepLatch.countDown();
+            lspec.close();
+            lnospec.close();
+            bkspec.close();
+            bknospec.close();
+        }
+    }
+
+    /**
+     * Test that if more than one replica is down, we can still read, as long as the quorum
+     * size is larger than the number of down replicas.
+     */
+    @Test
+    public void testSpeculativeReadMultipleReplicasDown() throws Exception {
+        long id = getLedgerToRead(5,5);
+        int timeout = 5000;
+        BookKeeper bkspec = createClient(timeout);
+
+        LedgerHandle l = bkspec.openLedger(id, digestType, passwd);
+
+        // sleep bookie 1, 2 & 4
+        CountDownLatch sleepLatch = new CountDownLatch(1);
+        sleepBookie(l.getLedgerMetadata().getEnsembles().get(0L).get(1), sleepLatch);
+        sleepBookie(l.getLedgerMetadata().getEnsembles().get(0L).get(2), sleepLatch);
+        sleepBookie(l.getLedgerMetadata().getEnsembles().get(0L).get(4), sleepLatch);
+
+        try {
+            // read first entry, should complete faster than timeout
+            // as bookie 0 has the entry
+            LatchCallback latch0 = new LatchCallback();
+            l.asyncReadEntries(0, 0, latch0, null);
+            latch0.expectSuccess(timeout/2);
+
+            // second should have to hit two timeouts (bookie 1 & 2)
+            // bookie 3 has the entry
+            LatchCallback latch1 = new LatchCallback();
+            l.asyncReadEntries(1, 1, latch1, null);
+            latch1.expectTimeout(timeout);
+            latch1.expectSuccess(timeout*2);
+            LOG.info("Timeout {} latch1 duration {}", timeout, latch1.getDuration());
+            assertTrue("should have taken longer than two timeouts, but less than 3",
+                       latch1.getDuration() >= timeout*2
+                       && latch1.getDuration() < timeout*3);
+
+            // third should have to hit one timeouts (bookie 2)
+            // bookie 3 has the entry
+            LatchCallback latch2 = new LatchCallback();
+            l.asyncReadEntries(2, 2, latch2, null);
+            latch2.expectTimeout(timeout/2);
+            latch2.expectSuccess(timeout);
+            LOG.info("Timeout {} latch2 duration {}", timeout, latch2.getDuration());
+            assertTrue("should have taken longer than one timeout, but less than 2",
+                       latch2.getDuration() >= timeout
+                       && latch2.getDuration() < timeout*2);
+
+            // fourth should have no timeout
+            // bookie 3 has the entry
+            LatchCallback latch3 = new LatchCallback();
+            l.asyncReadEntries(3, 3, latch3, null);
+            latch3.expectSuccess(timeout/2);
+
+            // fifth should hit one timeout, (bookie 4)
+            // bookie 0 has the entry
+            LatchCallback latch4 = new LatchCallback();
+            l.asyncReadEntries(4, 4, latch4, null);
+            latch4.expectTimeout(timeout/2);
+            latch4.expectSuccess(timeout);
+            LOG.info("Timeout {} latch4 duration {}", timeout, latch4.getDuration());
+            assertTrue("should have taken longer than one timeout, but less than 2",
+                       latch4.getDuration() >= timeout
+                       && latch4.getDuration() < timeout*2);
+
+        } finally {
+            sleepLatch.countDown();
+            l.close();
+            bkspec.close();
+        }
+    }
+
+    /**
+     * Test that if after a speculative read is kicked off, the original read completes
+     * nothing bad happens.
+     */
+    @Test
+    public void testSpeculativeReadFirstReadCompleteIsOk() throws Exception {
+        long id = getLedgerToRead(2,2);
+        int timeout = 1000;
+        BookKeeper bkspec = createClient(timeout);
+
+        LedgerHandle l = bkspec.openLedger(id, digestType, passwd);
+
+        // sleep bookies
+        CountDownLatch sleepLatch0 = new CountDownLatch(1);
+        CountDownLatch sleepLatch1 = new CountDownLatch(1);
+        sleepBookie(l.getLedgerMetadata().getEnsembles().get(0L).get(0), sleepLatch0);
+        sleepBookie(l.getLedgerMetadata().getEnsembles().get(0L).get(1), sleepLatch1);
+
+        try {
+            // read goes to first bookie, spec read timeout occurs,
+            // goes to second
+            LatchCallback latch0 = new LatchCallback();
+            l.asyncReadEntries(0, 0, latch0, null);
+            latch0.expectTimeout(timeout);
+
+            // wake up first bookie
+            sleepLatch0.countDown();
+            latch0.expectSuccess(timeout/2);
+
+            sleepLatch1.countDown();
+
+            // check we can read next entry without issue
+            LatchCallback latch1 = new LatchCallback();
+            l.asyncReadEntries(1, 1, latch1, null);
+            latch1.expectSuccess(timeout/2);
+
+        } finally {
+            sleepLatch0.countDown();
+            sleepLatch1.countDown();
+            l.close();
+            bkspec.close();
+        }
+    }
+
+    /**
+     * Unit test for the speculative read scheduling method
+     */
+    @Test
+    public void testSpeculativeReadScheduling() throws Exception {
+        long id = getLedgerToRead(3,2);
+        int timeout = 1000;
+        BookKeeper bkspec = createClient(timeout);
+
+        LedgerHandle l = bkspec.openLedger(id, digestType, passwd);
+
+        ArrayList<InetSocketAddress> ensemble = l.getLedgerMetadata().getEnsembles().get(0L);
+        Set<InetSocketAddress> allHosts = new HashSet(ensemble);
+        Set<InetSocketAddress> noHost = new HashSet();
+        Set<InetSocketAddress> secondHostOnly = new HashSet();
+        secondHostOnly.add(ensemble.get(1));
+        try {
+            LatchCallback latch0 = new LatchCallback();
+            PendingReadOp op = new PendingReadOp(l, bkspec.scheduler,
+                                                 0, 5, latch0, null);
+
+            // if we've already heard from all hosts,
+            // we only send the initial read
+            PendingReadOp.LedgerEntryRequest req0
+                = op.new LedgerEntryRequest(ensemble, l.getId(), 0);
+            assertTrue("Should have sent to first",
+                       req0.maybeSendSpeculativeRead(allHosts).equals(ensemble.get(0)));
+            assertNull("Should not have sent another",
+                       req0.maybeSendSpeculativeRead(allHosts));
+
+            // if we have heard from some hosts, but not one we have sent to
+            // send again
+            PendingReadOp.LedgerEntryRequest req2
+                = op.new LedgerEntryRequest(ensemble, l.getId(), 2);
+            assertTrue("Should have sent to third",
+                       req2.maybeSendSpeculativeRead(noHost).equals(ensemble.get(2)));
+            assertTrue("Should have sent to first",
+                       req2.maybeSendSpeculativeRead(secondHostOnly).equals(ensemble.get(0)));
+
+            // if we have heard from some hosts, which includes one we sent to
+            // do not read again
+            PendingReadOp.LedgerEntryRequest req4
+                = op.new LedgerEntryRequest(ensemble, l.getId(), 4);
+            assertTrue("Should have sent to second",
+                       req4.maybeSendSpeculativeRead(noHost).equals(ensemble.get(1)));
+            assertNull("Should not have sent another",
+                       req4.maybeSendSpeculativeRead(secondHostOnly));
+        } finally {
+            // wait for all ops to complete
+            l.opCounterSem.acquire(bkspec.getConf().getThrottleValue());
+
+            l.close();
+            bkspec.close();
+        }
+    }
+}
\ No newline at end of file

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java?rev=1421242&r1=1421241&r2=1421242&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java Thu Dec 13 12:07:52 2012
@@ -241,7 +241,7 @@ public abstract class BookKeeperClusterT
     public CountDownLatch sleepBookie(InetSocketAddress addr, final int seconds)
             throws InterruptedException, IOException {
         final CountDownLatch l = new CountDownLatch(1);
-        final String name = "BookieJournal-" + addr.getPort();
+        final String name = "NIOServerFactory-" + addr.getPort();
         Thread[] allthreads = new Thread[Thread.activeCount()];
         Thread.enumerate(allthreads);
         for (final Thread t : allthreads) {
@@ -278,7 +278,7 @@ public abstract class BookKeeperClusterT
      */
     public void sleepBookie(InetSocketAddress addr, final CountDownLatch l)
             throws InterruptedException, IOException {
-        final String name = "BookieJournal-" + addr.getPort();
+        final String name = "NIOServerFactory-" + addr.getPort();
         Thread[] allthreads = new Thread[Thread.activeCount()];
         Thread.enumerate(allthreads);
         for (final Thread t : allthreads) {