You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/01/15 07:41:06 UTC
[07/27] hbase git commit: HBASE-19772 ReadOnlyZKClient improvements
HBASE-19772 ReadOnlyZKClient improvements
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/70515f53
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/70515f53
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/70515f53
Branch: refs/heads/HBASE-19064
Commit: 70515f53112599997348aee1d748838f7a78a7fd
Parents: a7f9668
Author: Duo Zhang <pa...@gmail.com>
Authored: Thu Jan 11 11:33:36 2018 -0800
Committer: Michael Stack <st...@apache.org>
Committed: Thu Jan 11 11:37:27 2018 -0800
----------------------------------------------------------------------
.../hbase/zookeeper/ReadOnlyZKClient.java | 58 ++++++++++++--------
1 file changed, 35 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/70515f53/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java
index 82c011b..0a9544d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
@@ -116,6 +117,8 @@ public final class ReadOnlyZKClient implements Closeable {
private ZooKeeper zookeeper;
+ private int pendingRequests = 0;
+
private String getId() {
return String.format("0x%08x", System.identityHashCode(this));
}
@@ -127,12 +130,12 @@ public final class ReadOnlyZKClient implements Closeable {
this.retryIntervalMs =
conf.getInt(RECOVERY_RETRY_INTERVAL_MILLIS, DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS);
this.keepAliveTimeMs = conf.getInt(KEEPALIVE_MILLIS, DEFAULT_KEEPALIVE_MILLIS);
- LOG.info("Start read only zookeeper connection " + getId() + " to " + connectString +
- ", session timeout " + sessionTimeoutMs + " ms, retries " + maxRetries +
- ", retry interval " + retryIntervalMs + " ms, keep alive " + keepAliveTimeMs + " ms");
- Thread t = new Thread(this::run, "ReadOnlyZKClient");
- t.setDaemon(true);
- t.start();
+ LOG.info(
+ "Start read only zookeeper connection {} to {}, " + "session timeout {} ms, retries {}, " +
+ "retry interval {} ms, keep alive {} ms",
+ getId(), connectString, sessionTimeoutMs, maxRetries, retryIntervalMs, keepAliveTimeMs);
+ Threads.setDaemonThreadRunning(new Thread(this::run),
+ "ReadOnlyZKClient-" + connectString + "@" + getId());
}
private abstract class ZKTask<T> extends Task {
@@ -156,6 +159,7 @@ public final class ReadOnlyZKClient implements Closeable {
@Override
public void exec(ZooKeeper alwaysNull) {
+ pendingRequests--;
Code code = Code.get(rc);
if (code == Code.OK) {
future.complete(ret);
@@ -169,19 +173,19 @@ public final class ReadOnlyZKClient implements Closeable {
future.completeExceptionally(KeeperException.create(code, path));
} else {
if (code == Code.SESSIONEXPIRED) {
- LOG.warn(getId() + " session expired, close and reconnect");
+ LOG.warn("{} to {} session expired, close and reconnect", getId(), connectString);
try {
zk.close();
} catch (InterruptedException e) {
}
}
if (ZKTask.this.delay(retryIntervalMs, maxRetries)) {
- LOG.warn(getId() + " failed for " + operationType + " of " + path + ", code = " +
- code + ", retries = " + ZKTask.this.retries);
+ LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}", getId(),
+ connectString, operationType, path, code, ZKTask.this.retries);
tasks.add(ZKTask.this);
} else {
- LOG.warn(getId() + " failed for " + operationType + " of " + path + ", code = " +
- code + ", retries = " + ZKTask.this.retries + ", give up");
+ LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}, give up", getId(),
+ connectString, operationType, path, code, ZKTask.this.retries);
future.completeExceptionally(KeeperException.create(code, path));
}
}
@@ -205,6 +209,14 @@ public final class ReadOnlyZKClient implements Closeable {
return true;
}
+ protected abstract void doExec(ZooKeeper zk);
+
+ @Override
+ public final void exec(ZooKeeper zk) {
+ pendingRequests++;
+ doExec(zk);
+ }
+
public boolean delay(long intervalMs, int maxRetries) {
if (retries >= maxRetries) {
return false;
@@ -217,14 +229,12 @@ public final class ReadOnlyZKClient implements Closeable {
@Override
public void connectFailed(IOException e) {
if (delay(retryIntervalMs, maxRetries)) {
- LOG.warn(getId() + " failed to connect to zk for " + operationType + " of " + path +
- ", retries = " + retries,
- e);
+ LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}", getId(),
+ connectString, operationType, path, retries, e);
tasks.add(this);
} else {
- LOG.warn(getId() + " failed to connect to zk for " + operationType + " of " + path +
- ", retries = " + retries + ", give up",
- e);
+ LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}, give up", getId(),
+ connectString, operationType, path, retries, e);
future.completeExceptionally(e);
}
}
@@ -249,7 +259,7 @@ public final class ReadOnlyZKClient implements Closeable {
tasks.add(new ZKTask<byte[]>(path, future, "get") {
@Override
- public void exec(ZooKeeper zk) {
+ protected void doExec(ZooKeeper zk) {
zk.getData(path, false,
(rc, path, ctx, data, stat) -> onComplete(zk, rc, data, true), null);
}
@@ -265,7 +275,7 @@ public final class ReadOnlyZKClient implements Closeable {
tasks.add(new ZKTask<Stat>(path, future, "exists") {
@Override
- public void exec(ZooKeeper zk) {
+ protected void doExec(ZooKeeper zk) {
zk.exists(path, false, (rc, path, ctx, stat) -> onComplete(zk, rc, stat, false), null);
}
});
@@ -311,9 +321,11 @@ public final class ReadOnlyZKClient implements Closeable {
if (task == CLOSE) {
break;
}
- if (task == null) {
- LOG.info(getId() + " no activities for " + keepAliveTimeMs +
- " ms, close active connection. Will reconnect next time when there are new requests.");
+ if (task == null && pendingRequests == 0) {
+ LOG.info(
+ "{} to {} no activities for {} ms, close active connection. " +
+ "Will reconnect next time when there are new requests",
+ getId(), connectString, keepAliveTimeMs);
closeZk();
continue;
}
@@ -339,7 +351,7 @@ public final class ReadOnlyZKClient implements Closeable {
@Override
public void close() {
if (closed.compareAndSet(false, true)) {
- LOG.info("Close zookeeper connection " + getId() + " to " + connectString);
+ LOG.info("Close zookeeper connection {} to {}", getId(), connectString);
tasks.add(CLOSE);
}
}