You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2017/05/15 19:35:34 UTC
bookkeeper git commit: BOOKKEEPER-1061: BookieWatcher should not do
ZK blocking operations from ZK async callback thread
Repository: bookkeeper
Updated Branches:
refs/heads/master d86351371 -> 5d43260e8
BOOKKEEPER-1061: BookieWatcher should not do ZK blocking operations from ZK async callback thread
In some cases, the BookieWatcher can get the ZK event thread stuck. This happens when a ZK blocking request is issued from a ZK callback thread.
We should decouple the blocking requests in a separate executor to avoid deadlocking ZK client.
Author: Matteo Merli <mm...@apache.org>
Reviewers: Jia Zhai <None>, Sijie Guo <si...@apache.org>
Closes #149 from merlimat/bookie-watcher-thread
Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/5d43260e
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/5d43260e
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/5d43260e
Branch: refs/heads/master
Commit: 5d43260e84c6121df72c0ee4c844651bfb726638
Parents: d863513
Author: Matteo Merli <mm...@apache.org>
Authored: Mon May 15 12:35:26 2017 -0700
Committer: Sijie Guo <si...@apache.org>
Committed: Mon May 15 12:35:26 2017 -0700
----------------------------------------------------------------------
.../apache/bookkeeper/client/BookieWatcher.java | 29 +++++++++++---------
1 file changed, 16 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5d43260e/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
index ae0ac50..e15f49a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
@@ -17,6 +17,8 @@
*/
package org.apache.bookkeeper.client;
+import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -183,14 +185,18 @@ class BookieWatcher implements Watcher, ChildrenCallback {
HashSet<BookieSocketAddress> newBookieAddrs = convertToBookieAddresses(children);
- synchronized (this) {
- Set<BookieSocketAddress> readonlyBookies = readOnlyBookieWatcher.getReadOnlyBookies();
- placementPolicy.onClusterChanged(newBookieAddrs, readonlyBookies);
- if (bk.conf.getDiskWeightBasedPlacementEnabled()) {
- // start collecting bookieInfo for the newly joined bookies, if any
- bk.bookieInfoReader.availableBookiesChanged(newBookieAddrs);
+ // Update watcher outside ZK callback thread, to avoid deadlock in case some other
+ // component is trying to do a blocking ZK operation
+ bk.mainWorkerPool.submitOrdered(path, safeRun(() -> {
+ synchronized (BookieWatcher.this) {
+ Set<BookieSocketAddress> readonlyBookies = readOnlyBookieWatcher.getReadOnlyBookies();
+ placementPolicy.onClusterChanged(newBookieAddrs, readonlyBookies);
+ if (bk.conf.getDiskWeightBasedPlacementEnabled()) {
+ // start collecting bookieInfo for the newly joined bookies, if any
+ bk.bookieInfoReader.availableBookiesChanged(newBookieAddrs);
+ }
}
- }
+ }));
// we don't need to close clients here, because:
// a. the dead bookies will be removed from topology, which will not be used in new ensemble.
@@ -236,13 +242,10 @@ class BookieWatcher implements Watcher, ChildrenCallback {
final LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
readBookies(new ChildrenCallback() {
public void processResult(int rc, String path, Object ctx, List<String> children) {
- try {
+ bk.mainWorkerPool.submitOrdered(path, safeRun(() -> {
BookieWatcher.this.processResult(rc, path, ctx, children);
- queue.put(rc);
- } catch (InterruptedException e) {
- logger.error("Interruped when trying to read bookies in a blocking fashion");
- throw new RuntimeException(e);
- }
+ queue.add(rc);
+ }));
}
});
int rc = queue.take();