You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by si...@apache.org on 2012/05/26 19:16:45 UTC

svn commit: r1342933 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/

Author: sijie
Date: Sat May 26 17:16:45 2012
New Revision: 1342933

URL: http://svn.apache.org/viewvc?rev=1342933&view=rev
Log:
BOOKKEEPER-258: CompactionTest failed (ivank via sijie)

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestProtoVersions.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1342933&r1=1342932&r2=1342933&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Sat May 26 17:16:45 2012
@@ -116,6 +116,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-266: Review versioning documentation (ivank)
 
+        BOOKKEEPER-258: CompactionTest failed (ivank via sijie)
+
       hedwig-client/
 
         BOOKKEEPER-217: NPE in hedwig client when enable DEBUG (sijie via ivank)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java?rev=1342933&r1=1342932&r2=1342933&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java Sat May 26 17:16:45 2012
@@ -59,7 +59,7 @@ class BookieWatcher implements Watcher, 
     BookKeeper bk;
     ScheduledExecutorService scheduler;
 
-    Set<InetSocketAddress> knownBookies = new HashSet<InetSocketAddress>();
+    HashSet<InetSocketAddress> knownBookies = new HashSet<InetSocketAddress>();
 
     SafeRunnable reReadTask = new SafeRunnable() {
         @Override
@@ -103,7 +103,7 @@ class BookieWatcher implements Watcher, 
         }
 
         // Read the bookie addresses into a set for efficient lookup
-        Set<InetSocketAddress> newBookieAddrs = new HashSet<InetSocketAddress>();
+        HashSet<InetSocketAddress> newBookieAddrs = new HashSet<InetSocketAddress>();
         for (String bookieAddrString : children) {
             InetSocketAddress bookieAddr;
             try {
@@ -115,9 +115,16 @@ class BookieWatcher implements Watcher, 
             newBookieAddrs.add(bookieAddr);
         }
 
+        HashSet<InetSocketAddress> deadBookies = (HashSet<InetSocketAddress>)knownBookies.clone();
+        deadBookies.removeAll(newBookieAddrs);
+
         synchronized (this) {
             knownBookies = newBookieAddrs;
         }
+
+        if (bk.getBookieClient() != null) {
+            bk.getBookieClient().closeClients(deadBookies);
+        }
     }
 
     /**

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java?rev=1342933&r1=1342932&r2=1342933&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java Sat May 26 17:16:45 2012
@@ -21,6 +21,8 @@ package org.apache.bookkeeper.proto;
  *
  */
 
+import java.util.Set;
+import java.util.HashSet;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.concurrent.ConcurrentHashMap;
@@ -32,6 +34,8 @@ import org.apache.bookkeeper.proto.Bookk
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
+import org.apache.bookkeeper.util.SafeRunnable;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.jboss.netty.buffer.ChannelBuffer;
@@ -75,6 +79,28 @@ public class BookieClient {
         return channel;
     }
 
+    public void closeClients(Set<InetSocketAddress> addrs) {
+        final HashSet<PerChannelBookieClient> clients = new HashSet<PerChannelBookieClient>();
+        for (InetSocketAddress a : addrs) {
+            PerChannelBookieClient c = channels.get(a);
+            if (c != null) {
+                clients.add(c);
+            }
+        }
+
+        if (clients.size() == 0) {
+            return;
+        }
+        executor.submit(new SafeRunnable() {
+                @Override
+                public void safeRun() {
+                    for (PerChannelBookieClient c : clients) {
+                        c.close();
+                    }
+                }
+            });
+    }
+
     public void addEntry(final InetSocketAddress addr, final long ledgerId, final byte[] masterKey, final long entryId,
             final ChannelBuffer toSend, final WriteCallback cb, final Object ctx, final int options) {
         final PerChannelBookieClient client = lookupClient(addr);

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java?rev=1342933&r1=1342932&r2=1342933&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java Sat May 26 17:16:45 2012
@@ -468,10 +468,19 @@ public class PerChannelBookieClient exte
             return;
         }
         if (t instanceof ReadTimeoutException) {
-            ctx.getChannel().disconnect();
+            for (CompletionKey key : addCompletions.keySet()) {
+                if (key.shouldTimeout()) {
+                    errorOutAddKey(key);
+                }
+            }
+            for (CompletionKey key : readCompletions.keySet()) {
+                if (key.shouldTimeout()) {
+                    errorOutReadKey(key);
+                }
+            }
             return;
         }
- 
+
         if (t instanceof IOException) {
             // these are thrown when a bookie fails, logging them just pollutes
             // the logs (the failure is logged from the listeners on the write
@@ -641,15 +650,22 @@ public class PerChannelBookieClient exte
             this.ctx = ctx;
         }
     }
-    
+
     // visable for testing
-    static class CompletionKey {
+    CompletionKey newCompletionKey(long ledgerId, long entryId) {
+        return new CompletionKey(ledgerId, entryId);
+    }
+
+    // visable for testing
+    class CompletionKey {
         long ledgerId;
         long entryId;
+        final long timeoutAt;
 
         CompletionKey(long ledgerId, long entryId) {
             this.ledgerId = ledgerId;
             this.entryId = entryId;
+            this.timeoutAt = System.currentTimeMillis() + (conf.getReadTimeout()*1000);
         }
 
         @Override
@@ -669,6 +685,10 @@ public class PerChannelBookieClient exte
         public String toString() {
             return String.format("LedgerEntry(%d, %d)", ledgerId, entryId);
         }
+
+        public boolean shouldTimeout() {
+            return this.timeoutAt <= System.currentTimeMillis();
+        }
     }
 
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestProtoVersions.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestProtoVersions.java?rev=1342933&r1=1342932&r2=1342933&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestProtoVersions.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestProtoVersions.java Sat May 26 17:16:45 2012
@@ -80,7 +80,7 @@ public class TestProtoVersions {
                     readLatch.countDown();
                 }
             };
-        bc.readCompletions.put(new PerChannelBookieClient.CompletionKey(1, 1), 
+        bc.readCompletions.put(bc.newCompletionKey(1, 1),
                                new PerChannelBookieClient.ReadCompletion(cb, this));
         
         int totalHeaderSize = 4 // for the length of the packet