You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/10/16 17:01:09 UTC

svn commit: r1398834 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/ bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/

Author: ivank
Date: Tue Oct 16 15:01:08 2012
New Revision: 1398834

URL: http://svn.apache.org/viewvc?rev=1398834&view=rev
Log:
BOOKKEEPER-417: Hierarchical zk underreplication manager should clean up its hierarchy when done to allow for fast acquisition of underreplicated entries (ivank)

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1398834&r1=1398833&r2=1398834&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Tue Oct 16 15:01:08 2012
@@ -92,6 +92,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-418: Store hostname of locker in replication lock (ivank)
 
+        BOOKKEEPER-417: Hierarchical zk underreplication manager should clean up its hierarchy when done to allow for fast acquisition of underreplicated entries (ivank)
+
       hedwig-protocol:
 
         BOOKKEEPER-394: CompositeException message is not useful (Stu Hood via sijie)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml?rev=1398834&r1=1398833&r2=1398834&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml Tue Oct 16 15:01:08 2012
@@ -38,6 +38,11 @@
       <scope>compile</scope>
     </dependency>
     <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>13.0.1</version>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <version>4.8.1</version>

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java?rev=1398834&r1=1398833&r2=1398834&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java Tue Oct 16 15:01:08 2012
@@ -36,6 +36,7 @@ import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.ZooDefs.Ids;
 
 import com.google.protobuf.TextFormat;
+import com.google.common.base.Joiner;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
@@ -46,6 +47,7 @@ import java.util.concurrent.ConcurrentHa
 import java.util.Map;
 import java.util.List;
 import java.util.Collections;
+import java.util.Arrays;
 
 import java.util.regex.Pattern;
 import java.util.regex.Matcher;
@@ -260,6 +262,24 @@ public class ZkLedgerUnderreplicationMan
             Lock l = heldLocks.get(ledgerId);
             if (l != null) {
                 zkc.delete(getUrLedgerZnode(ledgerId), l.getLedgerZNodeVersion());
+
+                try {
+                    // clean up the hierarchy
+                    String parts[] = getUrLedgerZnode(ledgerId).split("/");
+                    for (int i = 1; i <= 4; i++) {
+                        String p[] = Arrays.copyOf(parts, parts.length - i);
+                        String path = Joiner.on("/").join(p);
+                        Stat s = zkc.exists(path, null);
+                        if (s != null) {
+                            zkc.delete(path, s.getVersion());
+                        }
+                    }
+                } catch (KeeperException.NotEmptyException nee) {
+                    // This can happen when cleaning up the hierarchy.
+                    // It's safe to ignore, it simply means another
+                    // ledger in the same hierarchy has been marked as
+                    // underreplicated.
+                }
             }
         } catch (KeeperException.NoNodeException nne) {
             // this is ok
@@ -281,7 +301,15 @@ public class ZkLedgerUnderreplicationMan
     private long getLedgerToRereplicateFromHierarchy(String parent, long depth, Watcher w)
             throws KeeperException, InterruptedException {
         if (depth == 4) {
-            List<String> children = zkc.getChildren(parent, w);
+            List<String> children;
+            try {
+                children = zkc.getChildren(parent, w);
+            } catch (KeeperException.NoNodeException nne) {
+                // can occur if another underreplicated ledger's
+                // hierarchy is being cleaned up
+                return -1;
+            }
+
             Collections.shuffle(children);
 
             while (children.size() > 0) {
@@ -314,7 +342,15 @@ public class ZkLedgerUnderreplicationMan
             return -1;
         }
 
-        List<String> children = zkc.getChildren(parent, w);
+        List<String> children;
+        try {
+            children = zkc.getChildren(parent, w);
+        } catch (KeeperException.NoNodeException nne) {
+            // can occur if another underreplicated ledger's
+            // hierarchy is being cleaned up
+            return -1;
+        }
+
         Collections.shuffle(children);
 
         while (children.size() > 0) {

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java?rev=1398834&r1=1398833&r2=1398834&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java Tue Oct 16 15:01:08 2012
@@ -23,6 +23,7 @@ package org.apache.bookkeeper.replicatio
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
 
 import java.nio.charset.Charset;
@@ -39,6 +40,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
@@ -584,6 +587,117 @@ public class TestLedgerUnderreplicationM
         }
     }
 
+    /**
+     * Test that the hierarchy gets cleaned up as ledgers
+     * are marked as fully replicated
+     */
+    @Test
+    public void testHierarchyCleanup() throws Exception {
+        final LedgerUnderreplicationManager replicaMgr = lmf1
+            .newLedgerUnderreplicationManager();
+        // 4 ledgers, 2 in the same hierarchy
+        long[] ledgers = { 0x00000000deadbeefL, 0x00000000deadbeeeL,
+                           0x00000000beefcafeL, 0x00000000cafed00dL };
+
+        for (long l : ledgers) {
+            replicaMgr.markLedgerUnderreplicated(l, "localhost:3181");
+        }
+        // can't simply test top level as we are limited to ledger
+        // ids no larger than an int
+        String testPath = urLedgerPath + "/0000/0000";
+        List<String> children = zkc1.getChildren(testPath, false);
+        assertEquals("Wrong number of hierarchies", 3, children.size());
+
+        int marked = 0;
+        while (marked < 3) {
+            long l = replicaMgr.getLedgerToRereplicate();
+            if (l != ledgers[0]) {
+                replicaMgr.markLedgerReplicated(l);
+                marked++;
+            } else {
+                replicaMgr.releaseUnderreplicatedLedger(l);
+            }
+        }
+        children = zkc1.getChildren(testPath, false);
+        assertEquals("Wrong number of hierarchies", 1, children.size());
+
+        long l = replicaMgr.getLedgerToRereplicate();
+        assertEquals("Got wrong ledger", ledgers[0], l);
+        replicaMgr.markLedgerReplicated(l);
+
+        children = zkc1.getChildren(urLedgerPath, false);
+        assertEquals("All hierarchies should be cleaned up", 0, children.size());
+    }
+
+    /**
+     * Test that as the hierarchy gets cleaned up, it doesn't interfere
+     * with the marking of other ledgers as underreplicated
+     */
+    @Test(timeout = 90000)
+    public void testHierarchyCleanupInterference() throws Exception {
+        final LedgerUnderreplicationManager replicaMgr1 = lmf1
+            .newLedgerUnderreplicationManager();
+        final LedgerUnderreplicationManager replicaMgr2 = lmf2
+            .newLedgerUnderreplicationManager();
+
+        final int iterations = 1000;
+        final AtomicBoolean threadFailed = new AtomicBoolean(false);
+        Thread markUnder = new Thread() {
+                public void run() {
+                    long l = 1;
+                    try {
+                        for (int i = 0; i < iterations; i++) {
+                            replicaMgr1.markLedgerUnderreplicated(l, "localhost:3181");
+                            l += 10000;
+                        }
+                    } catch (Exception e) {
+                        LOG.error("markUnder Thread failed with exception", e);
+                        threadFailed.set(true);
+                        return;
+                    }
+                }
+            };
+        final AtomicInteger processed = new AtomicInteger(0);
+        Thread markRepl = new Thread() {
+                public void run() {
+                    try {
+                        for (int i = 0; i < iterations; i++) {
+                            long l = replicaMgr2.getLedgerToRereplicate();
+                            replicaMgr2.markLedgerReplicated(l);
+                            processed.incrementAndGet();
+                        }
+                    } catch (Exception e) {
+                        LOG.error("markRepl Thread failed with exception", e);
+                        threadFailed.set(true);
+                        return;
+                    }
+                }
+            };
+        markRepl.setDaemon(true);
+        markUnder.setDaemon(true);
+
+        markRepl.start();
+        markUnder.start();
+        markUnder.join();
+        assertFalse("Thread failed to complete", threadFailed.get());
+
+        int lastProcessed = 0;
+        while (true) {
+            markRepl.join(10000);
+            if (!markRepl.isAlive()) {
+                break;
+            }
+            assertFalse("markRepl thread not progressing", lastProcessed == processed.get());
+        }
+        assertFalse("Thread failed to complete", threadFailed.get());
+
+        List<String> children = zkc1.getChildren(urLedgerPath, false);
+        for (String s : children) {
+            LOG.info("s: {}", s);
+        }
+        assertEquals("All hierarchies should be cleaned up", 0, children.size());
+    }
+
     private void verifyMarkLedgerUnderreplicated(Collection<String> missingReplica)
             throws KeeperException, InterruptedException,
             CompatibilityException, UnavailableException {

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java?rev=1398834&r1=1398833&r2=1398834&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java Tue Oct 16 15:01:08 2012
@@ -502,7 +502,13 @@ public class TestReplicationWorker exten
 
     private boolean isLedgerInUnderReplication(long id, String basePath)
             throws KeeperException, InterruptedException {
-        List<String> children = zkc.getChildren(basePath, true);
+        List<String> children;
+        try {
+            children = zkc.getChildren(basePath, true);
+        } catch (KeeperException.NoNodeException nne) {
+            return false;
+        }
+
         boolean isMatched = false;
         for (String child : children) {
             if (child.startsWith("urL") && child.contains(String.valueOf(id))) {
@@ -510,8 +516,12 @@ public class TestReplicationWorker exten
                 break;
             } else {
                 String path = basePath + '/' + child;
-                if (zkc.getChildren(path, false).size() > 0) {
-                    isMatched = isLedgerInUnderReplication(id, path);
+                try {
+                    if (zkc.getChildren(path, false).size() > 0) {
+                        isMatched = isLedgerInUnderReplication(id, path);
+                    }
+                } catch (KeeperException.NoNodeException nne) {
+                    return false;
                 }
             }