You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by si...@apache.org on 2012/05/26 19:16:45 UTC
svn commit: r1342933 - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/
bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/
Author: sijie
Date: Sat May 26 17:16:45 2012
New Revision: 1342933
URL: http://svn.apache.org/viewvc?rev=1342933&view=rev
Log:
BOOKKEEPER-258: CompactionTest failed (ivank via sijie)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestProtoVersions.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1342933&r1=1342932&r2=1342933&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Sat May 26 17:16:45 2012
@@ -116,6 +116,8 @@ Trunk (unreleased changes)
BOOKKEEPER-266: Review versioning documentation (ivank)
+ BOOKKEEPER-258: CompactionTest failed (ivank via sijie)
+
hedwig-client/
BOOKKEEPER-217: NPE in hedwig client when enable DEBUG (sijie via ivank)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java?rev=1342933&r1=1342932&r2=1342933&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java Sat May 26 17:16:45 2012
@@ -59,7 +59,7 @@ class BookieWatcher implements Watcher,
BookKeeper bk;
ScheduledExecutorService scheduler;
- Set<InetSocketAddress> knownBookies = new HashSet<InetSocketAddress>();
+ HashSet<InetSocketAddress> knownBookies = new HashSet<InetSocketAddress>();
SafeRunnable reReadTask = new SafeRunnable() {
@Override
@@ -103,7 +103,7 @@ class BookieWatcher implements Watcher,
}
// Read the bookie addresses into a set for efficient lookup
- Set<InetSocketAddress> newBookieAddrs = new HashSet<InetSocketAddress>();
+ HashSet<InetSocketAddress> newBookieAddrs = new HashSet<InetSocketAddress>();
for (String bookieAddrString : children) {
InetSocketAddress bookieAddr;
try {
@@ -115,9 +115,16 @@ class BookieWatcher implements Watcher,
newBookieAddrs.add(bookieAddr);
}
+ HashSet<InetSocketAddress> deadBookies = (HashSet<InetSocketAddress>)knownBookies.clone();
+ deadBookies.removeAll(newBookieAddrs);
+
synchronized (this) {
knownBookies = newBookieAddrs;
}
+
+ if (bk.getBookieClient() != null) {
+ bk.getBookieClient().closeClients(deadBookies);
+ }
}
/**
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java?rev=1342933&r1=1342932&r2=1342933&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java Sat May 26 17:16:45 2012
@@ -21,6 +21,8 @@ package org.apache.bookkeeper.proto;
*
*/
+import java.util.Set;
+import java.util.HashSet;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
@@ -32,6 +34,8 @@ import org.apache.bookkeeper.proto.Bookk
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
+import org.apache.bookkeeper.util.SafeRunnable;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.jboss.netty.buffer.ChannelBuffer;
@@ -75,6 +79,28 @@ public class BookieClient {
return channel;
}
+ public void closeClients(Set<InetSocketAddress> addrs) {
+ final HashSet<PerChannelBookieClient> clients = new HashSet<PerChannelBookieClient>();
+ for (InetSocketAddress a : addrs) {
+ PerChannelBookieClient c = channels.get(a);
+ if (c != null) {
+ clients.add(c);
+ }
+ }
+
+ if (clients.size() == 0) {
+ return;
+ }
+ executor.submit(new SafeRunnable() {
+ @Override
+ public void safeRun() {
+ for (PerChannelBookieClient c : clients) {
+ c.close();
+ }
+ }
+ });
+ }
+
public void addEntry(final InetSocketAddress addr, final long ledgerId, final byte[] masterKey, final long entryId,
final ChannelBuffer toSend, final WriteCallback cb, final Object ctx, final int options) {
final PerChannelBookieClient client = lookupClient(addr);
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java?rev=1342933&r1=1342932&r2=1342933&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java Sat May 26 17:16:45 2012
@@ -468,10 +468,19 @@ public class PerChannelBookieClient exte
return;
}
if (t instanceof ReadTimeoutException) {
- ctx.getChannel().disconnect();
+ for (CompletionKey key : addCompletions.keySet()) {
+ if (key.shouldTimeout()) {
+ errorOutAddKey(key);
+ }
+ }
+ for (CompletionKey key : readCompletions.keySet()) {
+ if (key.shouldTimeout()) {
+ errorOutReadKey(key);
+ }
+ }
return;
}
-
+
if (t instanceof IOException) {
// these are thrown when a bookie fails, logging them just pollutes
// the logs (the failure is logged from the listeners on the write
@@ -641,15 +650,22 @@ public class PerChannelBookieClient exte
this.ctx = ctx;
}
}
-
+
// visable for testing
- static class CompletionKey {
+ CompletionKey newCompletionKey(long ledgerId, long entryId) {
+ return new CompletionKey(ledgerId, entryId);
+ }
+
+ // visable for testing
+ class CompletionKey {
long ledgerId;
long entryId;
+ final long timeoutAt;
CompletionKey(long ledgerId, long entryId) {
this.ledgerId = ledgerId;
this.entryId = entryId;
+ this.timeoutAt = System.currentTimeMillis() + (conf.getReadTimeout()*1000);
}
@Override
@@ -669,6 +685,10 @@ public class PerChannelBookieClient exte
public String toString() {
return String.format("LedgerEntry(%d, %d)", ledgerId, entryId);
}
+
+ public boolean shouldTimeout() {
+ return this.timeoutAt <= System.currentTimeMillis();
+ }
}
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestProtoVersions.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestProtoVersions.java?rev=1342933&r1=1342932&r2=1342933&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestProtoVersions.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestProtoVersions.java Sat May 26 17:16:45 2012
@@ -80,7 +80,7 @@ public class TestProtoVersions {
readLatch.countDown();
}
};
- bc.readCompletions.put(new PerChannelBookieClient.CompletionKey(1, 1),
+ bc.readCompletions.put(bc.newCompletionKey(1, 1),
new PerChannelBookieClient.ReadCompletion(cb, this));
int totalHeaderSize = 4 // for the length of the packet