You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/12/03 10:55:02 UTC

svn commit: r1416408 - in /zookeeper/bookkeeper/trunk/bookkeeper-server/src: main/java/org/apache/bookkeeper/client/ main/java/org/apache/bookkeeper/conf/ test/java/org/apache/bookkeeper/client/ test/java/org/apache/bookkeeper/test/

Author: ivank
Date: Mon Dec  3 09:55:01 2012
New Revision: 1416408

URL: http://svn.apache.org/viewvc?rev=1416408&view=rev
Log:
[REVERT] BOOKKEEPER-336 bookie readEntries is taking more time if the ensemble has failed bookie(s) Basic speculative functionality  in place
Accidently committed this change, before approval. Reverting (ivank)

Removed:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
Modified:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java?rev=1416408&r1=1416407&r2=1416408&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java Mon Dec  3 09:55:01 2012
@@ -21,19 +21,11 @@ package org.apache.bookkeeper.client;
  *
  */
 import java.net.InetSocketAddress;
-import java.util.concurrent.ArrayBlockingQueue;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.NoSuchElementException;
 import java.util.Queue;
-import java.util.BitSet;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.List;
-
-import java.util.Timer;
-import java.util.TimerTask;
-
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
@@ -53,10 +45,7 @@ import org.jboss.netty.buffer.ChannelBuf
 class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
     Logger LOG = LoggerFactory.getLogger(PendingReadOp.class);
 
-    final int speculativeReadTimeout;
-    Timer speculativeReadTimer;
     Queue<LedgerEntryRequest> seq;
-    Set<InetSocketAddress> heardFromHosts;
     ReadCallback cb;
     Object ctx;
     LedgerHandle lh;
@@ -64,135 +53,59 @@ class PendingReadOp implements Enumerati
     long startEntryId;
     long endEntryId;
 
-    class LedgerEntryRequest extends LedgerEntry {
+    private class LedgerEntryRequest extends LedgerEntry {
         int nextReplicaIndexToReadFrom = 0;
         AtomicBoolean complete = new AtomicBoolean(false);
 
         int firstError = BKException.Code.OK;
 
         final ArrayList<InetSocketAddress> ensemble;
-        final List<Integer> writeSet;
-        final BitSet sentReplicas;
-        final BitSet erroredReplicas;
 
         LedgerEntryRequest(ArrayList<InetSocketAddress> ensemble, long lId, long eId) {
             super(lId, eId);
 
             this.ensemble = ensemble;
-            this.writeSet = lh.distributionSchedule.getWriteSet(entryId);
-            this.sentReplicas = new BitSet(lh.getLedgerMetadata().getWriteQuorumSize());
-            this.erroredReplicas = new BitSet(lh.getLedgerMetadata().getWriteQuorumSize());
-        }
-
-        private int getReplicaIndex(InetSocketAddress host) {
-            int bookieIndex = ensemble.indexOf(host);
-            if (bookieIndex == -1) {
-                return -1;
-            }
-            return writeSet.indexOf(bookieIndex);
-        }
-
-        private BitSet getSentToBitSet() {
-            BitSet b = new BitSet(ensemble.size());
-
-            for (int i = 0; i < sentReplicas.length(); i++) {
-                if (sentReplicas.get(i)) {
-                    b.set(writeSet.get(i));
-                }
-            }
-            return b;
-        }
-
-        private BitSet getHeardFromBitSet(Set<InetSocketAddress> heardFromHosts) {
-            BitSet b = new BitSet(ensemble.size());
-            for (InetSocketAddress i : heardFromHosts) {
-                int index = ensemble.indexOf(i);
-                if (index != -1) {
-                    b.set(index);
-                }
-            }
-            return b;
-        }
-
-        private boolean readsOutstanding() {
-            return (sentReplicas.cardinality() - erroredReplicas.cardinality()) > 0;
-        }
-
-        /**
-         * Send to next replica speculatively, if required and possible.
-         * This returns the host we may have sent to for unit testing.
-         * @return host we sent to if we sent. null otherwise.
-         */
-        synchronized InetSocketAddress maybeSendSpeculativeRead(Set<InetSocketAddress> heardFromHosts) {
-            if (nextReplicaIndexToReadFrom >= lh.getLedgerMetadata().getWriteQuorumSize()) {
-                return null;
-            }
-
-            BitSet sentTo = getSentToBitSet();
-            BitSet heardFrom = getHeardFromBitSet(heardFromHosts);
-            sentTo.and(heardFrom);
-
-            // only send another read, if we have had no response at all (even for other entries)
-            // from any of the other bookies we have sent the request to
-            if (sentTo.cardinality() == 0) {
-                return sendNextRead();
-            } else {
-                return null;
-            }
         }
 
-        synchronized InetSocketAddress sendNextRead() {
+        void sendNextRead() {
             if (nextReplicaIndexToReadFrom >= lh.metadata.getWriteQuorumSize()) {
                 // we are done, the read has failed from all replicas, just fail the
                 // read
                 submitCallback(firstError);
-                return null;
+                return;
             }
 
-            int replica = nextReplicaIndexToReadFrom;
             int bookieIndex = lh.distributionSchedule.getWriteSet(entryId).get(nextReplicaIndexToReadFrom);
             nextReplicaIndexToReadFrom++;
 
             try {
-                InetSocketAddress to = ensemble.get(bookieIndex);
-                sendReadTo(to, this);
-                sentReplicas.set(replica);
-                return to;
+                sendReadTo(ensemble.get(bookieIndex), this);
             } catch (InterruptedException ie) {
                 LOG.error("Interrupted reading entry " + this, ie);
                 Thread.currentThread().interrupt();
                 submitCallback(BKException.Code.ReadException);
-                return null;
             }
         }
 
-        synchronized void logErrorAndReattemptRead(InetSocketAddress host, String errMsg, int rc) {
+        void logErrorAndReattemptRead(String errMsg, int rc) {
             if (firstError == BKException.Code.OK) {
                 firstError = rc;
             }
 
+            int bookieIndex = lh.distributionSchedule.getWriteSet(entryId).get(nextReplicaIndexToReadFrom - 1);
             LOG.error(errMsg + " while reading entry: " + entryId + " ledgerId: " + lh.ledgerId + " from bookie: "
-                      + host);
+                      + ensemble.get(bookieIndex));
 
-            int replica = getReplicaIndex(host);
-            if (replica == -1) {
-                LOG.error("Received error from a host which is not in the ensemble {} {}.", host, ensemble);
-                return;
-            }
-            erroredReplicas.set(replica);
-
-            if (!readsOutstanding()) {
-                sendNextRead();
-            }
+            sendNextRead();
         }
 
         // return true if we managed to complete the entry
-        boolean complete(InetSocketAddress host, final ChannelBuffer buffer) {
+        boolean complete(final ChannelBuffer buffer) {
             ChannelBufferInputStream is;
             try {
                 is = lh.macManager.verifyDigestAndReturnData(entryId, buffer);
             } catch (BKDigestMatchException e) {
-                logErrorAndReattemptRead(host, "Mac mismatch", BKException.Code.DigestMatchException);
+                logErrorAndReattemptRead("Mac mismatch", BKException.Code.DigestMatchException);
                 return false;
             }
 
@@ -220,40 +133,23 @@ class PendingReadOp implements Enumerati
     }
 
     PendingReadOp(LedgerHandle lh, long startEntryId, long endEntryId, ReadCallback cb, Object ctx) {
-        seq = new ArrayBlockingQueue<LedgerEntryRequest>((int) ((endEntryId + 1) - startEntryId));
+
+        seq = new ArrayDeque<LedgerEntryRequest>((int) (endEntryId - startEntryId));
         this.cb = cb;
         this.ctx = ctx;
         this.lh = lh;
         this.startEntryId = startEntryId;
         this.endEntryId = endEntryId;
         numPendingEntries = endEntryId - startEntryId + 1;
-        speculativeReadTimeout = lh.bk.getConf().getSpeculativeReadTimeout();
-        if (speculativeReadTimeout > 0) {
-            speculativeReadTimer = new Timer("SpeculativeRead-L"+lh.getId()+"-S"+startEntryId+"-E"+endEntryId);
-        } else {
-            speculativeReadTimer = null;
-        }
-        heardFromHosts = new HashSet<InetSocketAddress>();
     }
 
     public void initiate() throws InterruptedException {
         long nextEnsembleChange = startEntryId, i = startEntryId;
 
         ArrayList<InetSocketAddress> ensemble = null;
-
-        if (speculativeReadTimer != null) {
-            speculativeReadTimer.schedule(new TimerTask() {
-                    public void run() {
-                        for (LedgerEntryRequest r : seq) {
-                            if (!r.isComplete()) {
-                                r.maybeSendSpeculativeRead(heardFromHosts);
-                            }
-                        }
-                    }
-                }, speculativeReadTimeout, speculativeReadTimeout);
-        }
-
         do {
+            LOG.debug("Acquiring lock: {}", i);
+
             if (i == nextEnsembleChange) {
                 ensemble = lh.metadata.getEnsemble(i);
                 nextEnsembleChange = lh.metadata.getNextEnsembleChange(i);
@@ -266,27 +162,16 @@ class PendingReadOp implements Enumerati
         } while (i <= endEntryId);
     }
 
-    private static class ReadContext {
-        final InetSocketAddress to;
-        final LedgerEntryRequest entry;
-
-        ReadContext(InetSocketAddress to, LedgerEntryRequest entry) {
-            this.to = to;
-            this.entry = entry;
-        }
-    }
-
     void sendReadTo(InetSocketAddress to, LedgerEntryRequest entry) throws InterruptedException {
         lh.opCounterSem.acquire();
 
         lh.bk.bookieClient.readEntry(to, lh.ledgerId, entry.entryId, 
-                                     this, new ReadContext(to, entry));
+                                     this, entry);
     }
 
     @Override
     public void readEntryComplete(int rc, long ledgerId, final long entryId, final ChannelBuffer buffer, Object ctx) {
-        final ReadContext rctx = (ReadContext)ctx;
-        final LedgerEntryRequest entry = rctx.entry;
+        final LedgerEntryRequest entry = (LedgerEntryRequest) ctx;
 
         lh.opCounterSem.release();
 
@@ -305,13 +190,11 @@ class PendingReadOp implements Enumerati
         }
 
         if (rc != BKException.Code.OK) {
-            entry.logErrorAndReattemptRead(rctx.to, "Error: " + BKException.getMessage(rc), rc);
+            entry.logErrorAndReattemptRead("Error: " + BKException.getMessage(rc), rc);
             return;
         }
 
-        heardFromHosts.add(rctx.to);
-
-        if (entry.complete(rctx.to, buffer)) {
+        if (entry.complete(buffer)) {
             numPendingEntries--;
         }
 
@@ -324,9 +207,6 @@ class PendingReadOp implements Enumerati
     }
 
     private void submitCallback(int code) {
-        if (speculativeReadTimer != null) {
-            speculativeReadTimer.cancel();
-        }
         cb.readComplete(code, lh, PendingReadOp.this, PendingReadOp.this.ctx);
     }
     public boolean hasMoreElements() {

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java?rev=1416408&r1=1416407&r2=1416408&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java Mon Dec  3 09:55:01 2012
@@ -43,7 +43,6 @@ public class ClientConfiguration extends
     // NIO Parameters
     protected final static String CLIENT_TCP_NODELAY = "clientTcpNoDelay";
     protected final static String READ_TIMEOUT = "readTimeout";
-    protected final static String SPECULATIVE_READ_TIMEOUT = "speculativeReadTimeout";
 
     // Number Woker Threads
     protected final static String NUM_WORKER_THREADS = "numWorkerThreads";
@@ -276,39 +275,4 @@ public class ClientConfiguration extends
         setProperty(NUM_WORKER_THREADS, numThreads);
         return this;
     }
-
-    /**
-     * Get the period of time after which a speculative entry read should be triggered.
-     * A speculative entry read is sent to the next replica bookie before
-     * an error or response has been received for the previous entry read request.
-     *
-     * A speculative entry read is only sent if we have not heard from the current
-     * replica bookie during the entire read operation which may comprise of many entries.
-     *
-     * Speculative reads allow the client to avoid having to wait for the connect timeout
-     * in the case that a bookie has failed. It induces higher load on the network and on
-     * bookies. This should be taken into account before changing this configuration value.
-     *
-     * @see org.apache.bookkeeper.client.LedgerHandle#asyncReadEntries
-     * @return the speculative read timeout in milliseconds. Default 2000.
-     */
-    public int getSpeculativeReadTimeout() {
-        return getInt(SPECULATIVE_READ_TIMEOUT, 2000);
-    }
-
-    /**
-     * Set the speculative read timeout. A lower timeout will reduce read latency in the
-     * case of a failed bookie, while increasing the load on bookies and the network.
-     *
-     * The default is 2000 milliseconds. A value of 0 will disable speculative reads
-     * completely.
-     *
-     * @see #getSpeculativeReadTimeout()
-     * @param timeout the timeout value, in milliseconds
-     * @return client configuration
-     */
-    public ClientConfiguration setSpeculativeReadTimeout(int timeout) {
-        setProperty(SPECULATIVE_READ_TIMEOUT, timeout);
-        return this;
-    }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java?rev=1416408&r1=1416407&r2=1416408&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java Mon Dec  3 09:55:01 2012
@@ -234,7 +234,7 @@ public abstract class BookKeeperClusterT
     public CountDownLatch sleepBookie(InetSocketAddress addr, final int seconds)
             throws InterruptedException, IOException {
         final CountDownLatch l = new CountDownLatch(1);
-        final String name = "NIOServerFactory-" + addr.getPort();
+        final String name = "BookieJournal-" + addr.getPort();
         Thread[] allthreads = new Thread[Thread.activeCount()];
         Thread.enumerate(allthreads);
         for (final Thread t : allthreads) {
@@ -271,7 +271,7 @@ public abstract class BookKeeperClusterT
      */
     public void sleepBookie(InetSocketAddress addr, final CountDownLatch l)
             throws InterruptedException, IOException {
-        final String name = "NIOServerFactory-" + addr.getPort();
+        final String name = "BookieJournal-" + addr.getPort();
         Thread[] allthreads = new Thread[Thread.activeCount()];
         Thread.enumerate(allthreads);
         for (final Thread t : allthreads) {