You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/01/25 05:56:38 UTC

[GitHub] sijie closed pull request #1048: ISSUE #1014: LedgerManager.asyncProcessLedgers bug fixes

sijie closed pull request #1048: ISSUE #1014: LedgerManager.asyncProcessLedgers bug fixes
URL: https://github.com/apache/bookkeeper/pull/1048
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java
index 510f04c31..1aa8e151d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java
@@ -63,7 +63,11 @@ void asyncProcessLevelNodes(
         zk.sync(path, new AsyncCallback.VoidCallback() {
             @Override
             public void processResult(int rc, String path, Object ctx) {
-                if (rc != Code.OK.intValue()) {
+                if (rc == Code.NONODE.intValue()) {
+                    // Raced with node removal
+                    finalCb.processResult(successRc, null, context);
+                    return;
+                } else if (rc != Code.OK.intValue()) {
                     LOG.error("Error syncing path " + path + " when getting its chidren: ",
                               KeeperException.create(KeeperException.Code.get(rc), path));
                     finalCb.processResult(failureRc, null, context);
@@ -74,7 +78,11 @@ public void processResult(int rc, String path, Object ctx) {
                     @Override
                     public void processResult(int rc, String path, Object ctx,
                                               List<String> levelNodes) {
-                        if (rc != Code.OK.intValue()) {
+                        if (rc == Code.NONODE.intValue()) {
+                            // Raced with node removal
+                            finalCb.processResult(successRc, null, context);
+                            return;
+                        } else if (rc != Code.OK.intValue()) {
                             LOG.error("Error polling hash nodes of " + path,
                                       KeeperException.create(KeeperException.Code.get(rc), path));
                             finalCb.processResult(failureRc, null, context);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
index 56fd03578..6f1872c05 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
@@ -466,7 +466,10 @@ protected void asyncProcessLedgersInSingleNode(
         ZkUtils.getChildrenInSingleNode(zk, path, new GenericCallback<List<String>>() {
             @Override
             public void operationComplete(int rc, List<String> ledgerNodes) {
-                if (Code.OK.intValue() != rc) {
+                if (Code.NONODE.intValue() == rc) {
+                    finalCb.processResult(successRc, null, ctx);
+                    return;
+                } else if (Code.OK.intValue() != rc) {
                     finalCb.processResult(failureRc, null, ctx);
                     return;
                 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerIteratorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerIteratorTest.java
index b3aa4258a..7d421aa97 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerIteratorTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerIteratorTest.java
@@ -37,9 +37,11 @@
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -162,6 +164,24 @@ void createLedger(LedgerManager lm, Long ledgerId, Optional<Integer> rcExpected)
         return ret;
     }
 
+    static Set<Long> getLedgerIdsByUsingAsyncProcessLedgers(LedgerManager lm) throws InterruptedException{
+        Set<Long> ledgersReadAsync = ConcurrentHashMap.newKeySet();
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicInteger finalRC = new AtomicInteger();
+
+        lm.asyncProcessLedgers((ledgerId, callback) -> {
+            ledgersReadAsync.add(ledgerId);
+            callback.processResult(BKException.Code.OK, null, null);
+        }, (rc, s, obj) -> {
+            finalRC.set(rc);
+            latch.countDown();
+        }, null, BKException.Code.OK, BKException.Code.ReadException);
+
+        latch.await();
+        assertEquals("Final RC of asyncProcessLedgers", BKException.Code.OK, finalRC.get());
+        return ledgersReadAsync;
+    }
+
     @Test
     public void testIterateNoLedgers() throws Exception {
         LedgerManager lm = getLedgerManager();
@@ -186,6 +206,9 @@ public void testSingleLedger() throws Throwable {
         Set<Long> lids = ledgerRangeToSet(lri);
         assertEquals(lids.size(), 1);
         assertEquals(lids.iterator().next().longValue(), id);
+
+        Set<Long> ledgersReadAsync = getLedgerIdsByUsingAsyncProcessLedgers(lm);
+        assertEquals("Comparing LedgersIds read asynchronously", lids, ledgersReadAsync);
     }
 
     @Test
@@ -201,6 +224,9 @@ public void testTwoLedgers() throws Throwable {
         assertNotNull(lri);
         Set<Long> returnedIds = ledgerRangeToSet(lri);
         assertEquals(ids, returnedIds);
+
+        Set<Long> ledgersReadAsync = getLedgerIdsByUsingAsyncProcessLedgers(lm);
+        assertEquals("Comparing LedgersIds read asynchronously", ids, ledgersReadAsync);
     }
 
     @Test
@@ -217,6 +243,9 @@ public void testSeveralContiguousLedgers() throws Throwable {
         assertNotNull(lri);
         Set<Long> returnedIds = ledgerRangeToSet(lri);
         assertEquals(ids, returnedIds);
+
+        Set<Long> ledgersReadAsync = getLedgerIdsByUsingAsyncProcessLedgers(lm);
+        assertEquals("Comparing LedgersIds read asynchronously", ids, ledgersReadAsync);
     }
 
     @Test
@@ -306,6 +335,9 @@ public void validateEmptyL4PathSkipped() throws Throwable {
         Set<Long> returnedIds = ledgerRangeToSet(lri);
         assertEquals(ids, returnedIds);
 
+        Set<Long> ledgersReadAsync = getLedgerIdsByUsingAsyncProcessLedgers(lm);
+        assertEquals("Comparing LedgersIds read asynchronously", ids, ledgersReadAsync);
+
         lri = lm.getLedgerRanges();
         int emptyRanges = 0;
         while (lri.hasNext()) {
@@ -352,6 +384,9 @@ public void testWithSeveralIncompletePaths() throws Throwable {
         assertNotNull(lri);
         Set<Long> returnedIds = ledgerRangeToSet(lri);
         assertEquals(ids, returnedIds);
+
+        Set<Long> ledgersReadAsync = getLedgerIdsByUsingAsyncProcessLedgers(lm);
+        assertEquals("Comparing LedgersIds read asynchronously", ids, ledgersReadAsync);
     }
 
     @Test
@@ -423,7 +458,12 @@ public void checkConcurrentModifications() throws Throwable {
                         for (long id: mustExist) {
                             assertTrue(returnedIds.contains(id));
                         }
-                    } catch (IOException e) {
+
+                        Set<Long> ledgersReadAsync = getLedgerIdsByUsingAsyncProcessLedgers(checkerLM);
+                        for (long id: mustExist) {
+                            assertTrue(ledgersReadAsync.contains(id));
+                        }
+                    } catch (IOException | InterruptedException e) {
                         e.printStackTrace();
                         fail("Got exception scanning ledgers: " + e.toString());
                     }
@@ -517,4 +557,20 @@ public void testLedgerManagerFormat() throws Throwable {
         Assert.assertTrue("ChildrenOfLedgersRootPathAfterFormat should contain all the invalid znodes created",
                 childrenOfLedgersRootPathAfterFormat.containsAll(invalidZnodes));
     }
+
+    @Test
+    public void hierarchicalLedgerManagerAsyncProcessLedgersTest() throws Throwable {
+        Assume.assumeTrue(baseConf.getLedgerManagerFactoryClass().equals(HierarchicalLedgerManagerFactory.class));
+        LedgerManager lm = getLedgerManager();
+        LedgerRangeIterator lri = lm.getLedgerRanges();
+
+        Set<Long> ledgerIds = new TreeSet<>(Arrays.asList(1234L, 123456789123456789L));
+        for (Long ledgerId : ledgerIds) {
+            createLedger(lm, ledgerId, Optional.of(BKException.Code.OK));
+        }
+        Set<Long> ledgersReadThroughIterator = ledgerRangeToSet(lri);
+        assertEquals("Comparing LedgersIds read through Iterator", ledgerIds, ledgersReadThroughIterator);
+        Set<Long> ledgersReadAsync = getLedgerIdsByUsingAsyncProcessLedgers(lm);
+        assertEquals("Comparing LedgersIds read asynchronously", ledgerIds, ledgersReadAsync);
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services