You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2017/06/29 21:57:24 UTC

[bookkeeper] branch master updated: ISSUE #207: change the HashSet to BitSet to handle duplicated bookies on reading entries

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 75aca23  ISSUE #207: change the HashSet to BitSet to handle duplicated bookies on reading entries
75aca23 is described below

commit 75aca2397e43ec321a049aaec514c99cc9fad235
Author: Sijie Guo <si...@apache.org>
AuthorDate: Thu Jun 29 14:57:16 2017 -0700

    ISSUE #207: change the HashSet to BitSet to handle duplicated bookies on reading entries
    
    Descriptions of the changes in this PR:
    
    Problem:
    
    If there happens to have duplicated bookies in same ledger (it happened before when ensemble change still has problem), the read doesn't work as expected. It hang forever.
    
    Solution:
    
    Change the HashSet to BitSet to use bookieIndex rather than InetSocketAddress to track the heardfrom bookies set.
    
    ---
    Be sure to do all of the following to help us incorporate your contribution
    quickly and easily:
    
    - [x] Make sure the PR title is formatted like:
        `<Issue #>: Description of pull request`
        `e.g. Issue 123: Description ...`
    - [x] Make sure tests pass via `mvn clean apache-rat:check install findbugs:check`.
    - [x] Replace `<Issue #>` in the title with the actual Issue number, if there is one.
    
    ---
    
    Author: Sijie Guo <si...@apache.org>
    Author: Sijie Guo <si...@twitter.com>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Jia Zhai <None>, Matteo Merli <mm...@apache.org>
    
    This closes #199 from sijie/client_changes/bitset, closes #207
---
 .../apache/bookkeeper/client/PendingReadOp.java    | 83 ++++++++++---------
 .../apache/bookkeeper/client/TestSequenceRead.java | 94 ++++++++++++++++++++++
 .../bookkeeper/client/TestSpeculativeRead.java     | 20 ++---
 3 files changed, 146 insertions(+), 51 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index 306c5dd..0dd1f67 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -62,6 +62,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
     private ScheduledFuture<?> speculativeTask = null;
     Queue<LedgerEntryRequest> seq;
     Set<BookieSocketAddress> heardFromHosts;
+    BitSet heardFromHostsBitSet;
     ReadCallback cb;
     Object ctx;
     LedgerHandle lh;
@@ -101,6 +102,8 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
         /**
          * Complete the read request from <i>host</i>.
          *
+         * @param bookieIndex
+         *          bookie index
          * @param host
          *          host that respond the read
          * @param buffer
@@ -108,12 +111,12 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
          * @return return true if we managed to complete the entry;
          *         otherwise return false if the read entry is not complete or it is already completed before
          */
-        boolean complete(BookieSocketAddress host, final ByteBuf buffer) {
+        boolean complete(int bookieIndex, BookieSocketAddress host, final ByteBuf buffer) {
             ByteBuf content;
             try {
                 content = lh.macManager.verifyDigestAndReturnData(entryId, buffer);
             } catch (BKDigestMatchException e) {
-                logErrorAndReattemptRead(host, "Mac mismatch", BKException.Code.DigestMatchException);
+                logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch", BKException.Code.DigestMatchException);
                 buffer.release();
                 return false;
             }
@@ -153,6 +156,8 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
         /**
          * Log error <i>errMsg</i> and reattempt read from <i>host</i>.
          *
+         * @param bookieIndex
+         *          bookie index
          * @param host
          *          host that just respond
          * @param errMsg
@@ -160,7 +165,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
          * @param rc
          *          read result code
          */
-        synchronized void logErrorAndReattemptRead(BookieSocketAddress host, String errMsg, int rc) {
+        synchronized void logErrorAndReattemptRead(int bookieIndex, BookieSocketAddress host, String errMsg, int rc) {
             if (BKException.Code.OK == firstError ||
                 BKException.Code.NoSuchEntryException == firstError ||
                 BKException.Code.NoSuchLedgerExistsException == firstError) {
@@ -192,11 +197,11 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
          * Send to next replica speculatively, if required and possible.
          * This returns the host we may have sent to for unit testing.
          *
-         * @param heardFromHosts
+         * @param heardFromHostsBitSet
          *      the set of hosts that we already received responses.
          * @return host we sent to if we sent. null otherwise.
          */
-        abstract BookieSocketAddress maybeSendSpeculativeRead(Set<BookieSocketAddress> heardFromHosts);
+        abstract BookieSocketAddress maybeSendSpeculativeRead(BitSet heardFromHostsBitSet);
 
         /**
          * Whether the read request completed.
@@ -220,6 +225,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
         public String toString() {
             return String.format("L%d-E%d", ledgerId, entryId);
         }
+
     }
 
     class ParallelReadRequest extends LedgerEntryRequest {
@@ -236,7 +242,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
             for (int bookieIndex : writeSet) {
                 BookieSocketAddress to = ensemble.get(bookieIndex);
                 try {
-                    sendReadTo(to, this);
+                    sendReadTo(bookieIndex, to, this);
                 } catch (InterruptedException ie) {
                     LOG.error("Interrupted reading entry {} : ", this, ie);
                     Thread.currentThread().interrupt();
@@ -247,8 +253,8 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
         }
 
         @Override
-        synchronized void logErrorAndReattemptRead(BookieSocketAddress host, String errMsg, int rc) {
-            super.logErrorAndReattemptRead(host, errMsg, rc);
+        synchronized void logErrorAndReattemptRead(int bookieIndex, BookieSocketAddress host, String errMsg, int rc) {
+            super.logErrorAndReattemptRead(bookieIndex, host, errMsg, rc);
             --numPendings;
             // if received all responses or this entry doesn't meet quorum write, complete the request.
             if (numMissedEntryReads > maxMissedReadsAllowed || numPendings == 0) {
@@ -262,7 +268,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
         }
 
         @Override
-        BookieSocketAddress maybeSendSpeculativeRead(Set<BookieSocketAddress> heardFromHosts) {
+        BookieSocketAddress maybeSendSpeculativeRead(BitSet heardFromHostsBitSet) {
             // no speculative read
             return null;
         }
@@ -282,11 +288,11 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
             this.erroredReplicas = new BitSet(lh.getLedgerMetadata().getWriteQuorumSize());
         }
 
-        private int getReplicaIndex(BookieSocketAddress host) {
-            int bookieIndex = ensemble.indexOf(host);
-            if (bookieIndex == -1) {
-                return NOT_FOUND;
-            }
+        private synchronized int getNextReplicaIndexToReadFrom() {
+            return nextReplicaIndexToReadFrom;
+        }
+
+        private int getReplicaIndex(int bookieIndex) {
             return writeSet.indexOf(bookieIndex);
         }
 
@@ -301,17 +307,6 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
             return b;
         }
 
-        private BitSet getHeardFromBitSet(Set<BookieSocketAddress> heardFromHosts) {
-            BitSet b = new BitSet(ensemble.size());
-            for (BookieSocketAddress i : heardFromHosts) {
-                int index = ensemble.indexOf(i);
-                if (index != -1) {
-                    b.set(index);
-                }
-            }
-            return b;
-        }
-
         private boolean readsOutstanding() {
             return (sentReplicas.cardinality() - erroredReplicas.cardinality()) > 0;
         }
@@ -322,13 +317,12 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
          * @return host we sent to if we sent. null otherwise.
          */
         @Override
-        synchronized BookieSocketAddress maybeSendSpeculativeRead(Set<BookieSocketAddress> heardFromHosts) {
+        synchronized BookieSocketAddress maybeSendSpeculativeRead(BitSet heardFrom) {
             if (nextReplicaIndexToReadFrom >= getLedgerMetadata().getWriteQuorumSize()) {
                 return null;
             }
 
             BitSet sentTo = getSentToBitSet();
-            BitSet heardFrom = getHeardFromBitSet(heardFromHosts);
             sentTo.and(heardFrom);
 
             // only send another read, if we have had no response at all (even for other entries)
@@ -367,7 +361,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
 
             try {
                 BookieSocketAddress to = ensemble.get(bookieIndex);
-                sendReadTo(to, this);
+                sendReadTo(bookieIndex, to, this);
                 sentReplicas.set(replica);
                 return to;
             } catch (InterruptedException ie) {
@@ -379,10 +373,10 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
         }
 
         @Override
-        synchronized void logErrorAndReattemptRead(BookieSocketAddress host, String errMsg, int rc) {
-            super.logErrorAndReattemptRead(host, errMsg, rc);
+        synchronized void logErrorAndReattemptRead(int bookieIndex, BookieSocketAddress host, String errMsg, int rc) {
+            super.logErrorAndReattemptRead(bookieIndex, host, errMsg, rc);
 
-            int replica = getReplicaIndex(host);
+            int replica = getReplicaIndex(bookieIndex);
             if (replica == NOT_FOUND) {
                 LOG.error("Received error from a host which is not in the ensemble {} {}.", host, ensemble);
                 return;
@@ -393,6 +387,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
                 sendNextRead();
             }
         }
+
     }
 
     PendingReadOp(LedgerHandle lh, ScheduledExecutorService scheduler,
@@ -408,7 +403,8 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
         maxMissedReadsAllowed = getLedgerMetadata().getWriteQuorumSize()
                 - getLedgerMetadata().getAckQuorumSize();
         speculativeReadTimeout = lh.bk.getConf().getSpeculativeReadTimeout();
-        heardFromHosts = new HashSet<BookieSocketAddress>();
+        heardFromHosts = new HashSet<>();
+        heardFromHostsBitSet = new BitSet(getLedgerMetadata().getEnsembleSize());
 
         readOpLogger = lh.bk.getReadOpLogger();
     }
@@ -440,12 +436,12 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
                     int x = 0;
                     for (LedgerEntryRequest r : seq) {
                         if (!r.isComplete()) {
-                            if (null == r.maybeSendSpeculativeRead(heardFromHosts)) {
+                            if (null == r.maybeSendSpeculativeRead(heardFromHostsBitSet)) {
                                 // Subsequent speculative read will not materialize anyway
                                 cancelSpeculativeTask(false);
                             } else {
                                 if (LOG.isDebugEnabled()) {
-                                    LOG.debug("Send speculative read for {}. Hosts heard are {}.", r, heardFromHosts);
+                                    LOG.debug("Send speculative read for {}. Hosts heard are {}.", r, heardFromHostsBitSet);
                                 }
                                 ++x;
                             }
@@ -454,7 +450,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
                     if (x > 0) {
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("Send {} speculative reads for ledger {} ({}, {}). Hosts heard are {}.",
-                                    new Object[] { x, lh.getId(), startEntryId, endEntryId, heardFromHosts });
+                                    new Object[] { x, lh.getId(), startEntryId, endEntryId, heardFromHostsBitSet });
                         }
                     }
                 }
@@ -491,11 +487,13 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
     }
 
     private static class ReadContext implements ReadEntryCallbackCtx {
+        final int bookieIndex;
         final BookieSocketAddress to;
         final LedgerEntryRequest entry;
         long lac = LedgerHandle.INVALID_ENTRY_ID;
 
-        ReadContext(BookieSocketAddress to, LedgerEntryRequest entry) {
+        ReadContext(int bookieIndex, BookieSocketAddress to, LedgerEntryRequest entry) {
+            this.bookieIndex = bookieIndex;
             this.to = to;
             this.entry = entry;
         }
@@ -511,13 +509,13 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
         }
     }
 
-    void sendReadTo(BookieSocketAddress to, LedgerEntryRequest entry) throws InterruptedException {
+    void sendReadTo(int bookieIndex, BookieSocketAddress to, LedgerEntryRequest entry) throws InterruptedException {
         if (lh.throttler != null) {
             lh.throttler.acquire();
         }
 
         lh.bk.bookieClient.readEntry(to, lh.ledgerId, entry.entryId,
-                                     this, new ReadContext(to, entry));
+                                     this, new ReadContext(bookieIndex, to, entry));
     }
 
     @Override
@@ -526,13 +524,14 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
         final LedgerEntryRequest entry = rctx.entry;
 
         if (rc != BKException.Code.OK) {
-            entry.logErrorAndReattemptRead(rctx.to, "Error: " + BKException.getMessage(rc), rc);
+            entry.logErrorAndReattemptRead(rctx.bookieIndex, rctx.to, "Error: " + BKException.getMessage(rc), rc);
             return;
         }
 
         heardFromHosts.add(rctx.to);
+        heardFromHostsBitSet.set(rctx.bookieIndex, true);
 
-        if (entry.complete(rctx.to, buffer)) {
+        if (entry.complete(rctx.bookieIndex, rctx.to, buffer)) {
             lh.updateLastConfirmed(rctx.getLastAddConfirmed(), 0L);
             submitCallback(BKException.Code.OK);
         }
@@ -564,8 +563,8 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
                     break;
                 }
             }
-            LOG.error("Read of ledger entry failed: L{} E{}-E{}, Heard from {}. First unread entry is {}",
-                    new Object[] { lh.getId(), startEntryId, endEntryId, heardFromHosts, firstUnread });
+            LOG.error("Read of ledger entry failed: L{} E{}-E{}, Heard from {} : bitset = {}. First unread entry is {}",
+                    new Object[] { lh.getId(), startEntryId, endEntryId, heardFromHosts, heardFromHostsBitSet, firstUnread });
             readOpLogger.registerFailedEvent(latencyNanos, TimeUnit.NANOSECONDS);
         } else {
             readOpLogger.registerSuccessfulEvent(latencyNanos, TimeUnit.NANOSECONDS);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSequenceRead.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSequenceRead.java
new file mode 100644
index 0000000..5b035b7
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSequenceRead.java
@@ -0,0 +1,94 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Test reading an entry from replicas in sequence way.
+ */
+public class TestSequenceRead extends BookKeeperClusterTestCase {
+
+    static final Logger logger = LoggerFactory.getLogger(TestSequenceRead.class);
+
+    final DigestType digestType;
+    final byte[] passwd = "sequence-read".getBytes();
+
+    public TestSequenceRead() {
+        super(5);
+        this.digestType = DigestType.CRC32;
+    }
+
+    private LedgerHandle createLedgerWithDuplicatedBookies() throws Exception {
+        final LedgerHandle lh = bkc.createLedger(3, 3, 3, digestType, passwd);
+        // introduce duplicated bookies in an ensemble.
+        SortedMap<Long, ArrayList<BookieSocketAddress>> ensembles = lh.getLedgerMetadata().getEnsembles();
+        SortedMap<Long, ArrayList<BookieSocketAddress>> newEnsembles = new TreeMap<Long, ArrayList<BookieSocketAddress>>();
+        for (Map.Entry<Long, ArrayList<BookieSocketAddress>> entry : ensembles.entrySet()) {
+            ArrayList<BookieSocketAddress> newList = new ArrayList<BookieSocketAddress>(entry.getValue().size());
+            BookieSocketAddress firstBookie = entry.getValue().get(0);
+            for (BookieSocketAddress ignored : entry.getValue()) {
+                newList.add(firstBookie);
+            }
+            newEnsembles.put(entry.getKey(), newList);
+        }
+        lh.getLedgerMetadata().setEnsembles(newEnsembles);
+        // update the ledger metadata with duplicated bookies
+        final CountDownLatch latch = new CountDownLatch(1);
+        bkc.getLedgerManager().writeLedgerMetadata(lh.getId(), lh.getLedgerMetadata(), new BookkeeperInternalCallbacks.GenericCallback<Void>() {
+            @Override
+            public void operationComplete(int rc, Void result) {
+                if (BKException.Code.OK == rc) {
+                    latch.countDown();
+                } else {
+                    logger.error("Error on writing ledger metadata for ledger {} : ", lh.getId(), BKException.getMessage(rc));
+                }
+            }
+        });
+        latch.await();
+        logger.info("Update ledger metadata with duplicated bookies for ledger {}.", lh.getId());
+        return lh;
+    }
+
+    @Test(timeout = 60000)
+    public void testSequenceReadOnDuplicatedBookies() throws Exception {
+        final LedgerHandle lh = createLedgerWithDuplicatedBookies();
+
+        // should be able to open the ledger even it has duplicated bookies
+        final LedgerHandle readLh = bkc.openLedger(lh.getId(), digestType, passwd);
+        assertEquals(LedgerHandle.INVALID_ENTRY_ID, readLh.getLastAddConfirmed());
+    }
+
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
index ee77d37..c0b8e49 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeRead.java
@@ -1,5 +1,3 @@
-package org.apache.bookkeeper.client;
-
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -20,19 +18,20 @@ package org.apache.bookkeeper.client;
  * under the License.
  *
  */
+package org.apache.bookkeeper.client;
 
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.Enumeration;
-import java.util.HashSet;
-import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
-import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.test.BaseTestCase;
+
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -286,10 +285,13 @@ public class TestSpeculativeRead extends BaseTestCase {
         LedgerHandle l = bkspec.openLedger(id, digestType, passwd);
 
         ArrayList<BookieSocketAddress> ensemble = l.getLedgerMetadata().getEnsembles().get(0L);
-        Set<BookieSocketAddress> allHosts = new HashSet<BookieSocketAddress>(ensemble);
-        Set<BookieSocketAddress> noHost = new HashSet<BookieSocketAddress>();
-        Set<BookieSocketAddress> secondHostOnly = new HashSet<BookieSocketAddress>();
-        secondHostOnly.add(ensemble.get(1));
+        BitSet allHosts = new BitSet(ensemble.size());
+        for (int i = 0; i < ensemble.size(); i++) {
+            allHosts.set(i, true);
+        }
+        BitSet noHost = new BitSet(ensemble.size());
+        BitSet secondHostOnly = new BitSet(ensemble.size());
+        secondHostOnly.set(1, true);
         PendingReadOp.LedgerEntryRequest req0 = null, req2 = null, req4 = null;
         try {
             LatchCallback latch0 = new LatchCallback();

-- 
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <co...@bookkeeper.apache.org>'].