You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/01/15 10:33:34 UTC

[08/48] hbase git commit: HBASE-19772 Do not close connection to zk when there are still pending request in ReadOnlyZKClient

HBASE-19772 Do not close connection to zk when there are still pending request in ReadOnlyZKClient


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/842f794a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/842f794a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/842f794a

Branch: refs/heads/HBASE-19397-branch-2
Commit: 842f794a627dcbfc72a78f1742645271d4d4bca6
Parents: f91589d
Author: zhangduo <zh...@apache.org>
Authored: Fri Jan 12 09:43:56 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sun Jan 14 17:08:30 2018 +0800

----------------------------------------------------------------------
 .../hbase/zookeeper/ReadOnlyZKClient.java       | 79 ++++++++++----------
 .../hbase/zookeeper/TestReadOnlyZKClient.java   | 71 +++++++++++++-----
 2 files changed, 92 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/842f794a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java
index 82c011b..ad70740 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java
@@ -29,9 +29,8 @@ import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
@@ -39,6 +38,7 @@ import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 
 /**
@@ -114,7 +114,10 @@ public final class ReadOnlyZKClient implements Closeable {
 
   private final AtomicBoolean closed = new AtomicBoolean(false);
 
-  private ZooKeeper zookeeper;
+  @VisibleForTesting
+  ZooKeeper zookeeper;
+
+  private int pendingRequests = 0;
 
   private String getId() {
     return String.format("0x%08x", System.identityHashCode(this));
@@ -127,12 +130,12 @@ public final class ReadOnlyZKClient implements Closeable {
     this.retryIntervalMs =
         conf.getInt(RECOVERY_RETRY_INTERVAL_MILLIS, DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS);
     this.keepAliveTimeMs = conf.getInt(KEEPALIVE_MILLIS, DEFAULT_KEEPALIVE_MILLIS);
-    LOG.info("Start read only zookeeper connection " + getId() + " to " + connectString +
-        ", session timeout " + sessionTimeoutMs + " ms, retries " + maxRetries +
-        ", retry interval " + retryIntervalMs + " ms, keep alive " + keepAliveTimeMs + " ms");
-    Thread t = new Thread(this::run, "ReadOnlyZKClient");
-    t.setDaemon(true);
-    t.start();
+    LOG.info(
+      "Start read only zookeeper connection {} to {}, " + "session timeout {} ms, retries {}, " +
+        "retry interval {} ms, keep alive {} ms",
+      getId(), connectString, sessionTimeoutMs, maxRetries, retryIntervalMs, keepAliveTimeMs);
+    Threads.setDaemonThreadRunning(new Thread(this::run),
+      "ReadOnlyZKClient-" + connectString + "@" + getId());
   }
 
   private abstract class ZKTask<T> extends Task {
@@ -156,6 +159,7 @@ public final class ReadOnlyZKClient implements Closeable {
 
         @Override
         public void exec(ZooKeeper alwaysNull) {
+          pendingRequests--;
           Code code = Code.get(rc);
           if (code == Code.OK) {
             future.complete(ret);
@@ -169,19 +173,19 @@ public final class ReadOnlyZKClient implements Closeable {
             future.completeExceptionally(KeeperException.create(code, path));
           } else {
             if (code == Code.SESSIONEXPIRED) {
-              LOG.warn(getId() + " session expired, close and reconnect");
+              LOG.warn("{} to {} session expired, close and reconnect", getId(), connectString);
               try {
                 zk.close();
               } catch (InterruptedException e) {
               }
             }
             if (ZKTask.this.delay(retryIntervalMs, maxRetries)) {
-              LOG.warn(getId() + " failed for " + operationType + " of " + path + ", code = " +
-                  code + ", retries = " + ZKTask.this.retries);
+              LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}", getId(),
+                connectString, operationType, path, code, ZKTask.this.retries);
               tasks.add(ZKTask.this);
             } else {
-              LOG.warn(getId() + " failed for " + operationType + " of " + path + ", code = " +
-                  code + ", retries = " + ZKTask.this.retries + ", give up");
+              LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}, give up", getId(),
+                connectString, operationType, path, code, ZKTask.this.retries);
               future.completeExceptionally(KeeperException.create(code, path));
             }
           }
@@ -205,6 +209,14 @@ public final class ReadOnlyZKClient implements Closeable {
       return true;
     }
 
+    protected abstract void doExec(ZooKeeper zk);
+
+    @Override
+    public final void exec(ZooKeeper zk) {
+      pendingRequests++;
+      doExec(zk);
+    }
+
     public boolean delay(long intervalMs, int maxRetries) {
       if (retries >= maxRetries) {
         return false;
@@ -217,14 +229,12 @@ public final class ReadOnlyZKClient implements Closeable {
     @Override
     public void connectFailed(IOException e) {
       if (delay(retryIntervalMs, maxRetries)) {
-        LOG.warn(getId() + " failed to connect to zk for " + operationType + " of " + path +
-            ", retries = " + retries,
-          e);
+        LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}", getId(),
+          connectString, operationType, path, retries, e);
         tasks.add(this);
       } else {
-        LOG.warn(getId() + " failed to connect to zk for " + operationType + " of " + path +
-            ", retries = " + retries + ", give up",
-          e);
+        LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}, give up", getId(),
+          connectString, operationType, path, retries, e);
         future.completeExceptionally(e);
       }
     }
@@ -249,7 +259,7 @@ public final class ReadOnlyZKClient implements Closeable {
     tasks.add(new ZKTask<byte[]>(path, future, "get") {
 
       @Override
-      public void exec(ZooKeeper zk) {
+      protected void doExec(ZooKeeper zk) {
         zk.getData(path, false,
             (rc, path, ctx, data, stat) -> onComplete(zk, rc, data, true), null);
       }
@@ -265,7 +275,7 @@ public final class ReadOnlyZKClient implements Closeable {
     tasks.add(new ZKTask<Stat>(path, future, "exists") {
 
       @Override
-      public void exec(ZooKeeper zk) {
+      protected void doExec(ZooKeeper zk) {
         zk.exists(path, false, (rc, path, ctx, stat) -> onComplete(zk, rc, stat, false), null);
       }
     });
@@ -286,16 +296,6 @@ public final class ReadOnlyZKClient implements Closeable {
     // may be closed when session expired
     if (zookeeper == null || !zookeeper.getState().isAlive()) {
       zookeeper = new ZooKeeper(connectString, sessionTimeoutMs, e -> {});
-      int timeout = 10000;
-      try {
-        // Before returning, try and ensure we are connected. Don't wait long in case
-        // we are trying to connect to a cluster that is down. If we fail to connect,
-        // just catch the exception and carry-on. The first usage will fail and we'll
-        // cleanup.
-        zookeeper = ZooKeeperHelper.ensureConnectedZooKeeper(zookeeper, timeout);
-      } catch (ZooKeeperConnectionException e) {
-        LOG.warn("Failed connecting after waiting " + timeout + "ms; " + zookeeper);
-      }
     }
     return zookeeper;
   }
@@ -311,9 +311,11 @@ public final class ReadOnlyZKClient implements Closeable {
       if (task == CLOSE) {
         break;
       }
-      if (task == null) {
-        LOG.info(getId() + " no activities for " + keepAliveTimeMs +
-            " ms, close active connection. Will reconnect next time when there are new requests.");
+      if (task == null && pendingRequests == 0) {
+        LOG.debug(
+          "{} to {} no activities for {} ms, close active connection. " +
+            "Will reconnect next time when there are new requests",
+          getId(), connectString, keepAliveTimeMs);
         closeZk();
         continue;
       }
@@ -339,17 +341,12 @@ public final class ReadOnlyZKClient implements Closeable {
   @Override
   public void close() {
     if (closed.compareAndSet(false, true)) {
-      LOG.info("Close zookeeper connection " + getId() + " to " + connectString);
+      LOG.info("Close zookeeper connection {} to {}", getId(), connectString);
       tasks.add(CLOSE);
     }
   }
 
   @VisibleForTesting
-  ZooKeeper getZooKeeper() {
-    return zookeeper;
-  }
-
-  @VisibleForTesting
   public String getConnectString() {
     return connectString;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/842f794a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java
index c478121..211e776 100644
--- a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java
+++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -27,26 +27,36 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
 
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseZKTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ZKTests;
+import org.apache.zookeeper.AsyncCallback.StatCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 @Category({ ZKTests.class, MediumTests.class })
 public class TestReadOnlyZKClient {
@@ -67,8 +77,7 @@ public class TestReadOnlyZKClient {
   public static void setUp() throws Exception {
     PORT = UTIL.startMiniZKCluster().getClientPort();
 
-    ZooKeeper zk = ZooKeeperHelper.
-        getConnectedZooKeeper("localhost:" + PORT, 10000);
+    ZooKeeper zk = ZooKeeperHelper.getConnectedZooKeeper("localhost:" + PORT, 10000);
     DATA = new byte[10];
     ThreadLocalRandom.current().nextBytes(DATA);
     zk.create(PATH, DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
@@ -83,7 +92,7 @@ public class TestReadOnlyZKClient {
     conf.setInt(ReadOnlyZKClient.KEEPALIVE_MILLIS, 3000);
     RO_ZK = new ReadOnlyZKClient(conf);
     // only connect when necessary
-    assertNull(RO_ZK.getZooKeeper());
+    assertNull(RO_ZK.zookeeper);
   }
 
   @AfterClass
@@ -93,17 +102,13 @@ public class TestReadOnlyZKClient {
     UTIL.cleanupTestDir();
   }
 
-  @Test
-  public void testGetAndExists() throws Exception {
-    assertArrayEquals(DATA, RO_ZK.get(PATH).get());
-    assertEquals(CHILDREN, RO_ZK.exists(PATH).get().getNumChildren());
-    assertNotNull(RO_ZK.getZooKeeper());
+  private void waitForIdleConnectionClosed() throws Exception {
     // The zookeeper client should be closed finally after the keep alive time elapsed
     UTIL.waitFor(10000, new ExplainingPredicate<Exception>() {
 
       @Override
       public boolean evaluate() throws Exception {
-        return RO_ZK.getZooKeeper() == null;
+        return RO_ZK.zookeeper == null;
       }
 
       @Override
@@ -114,6 +119,14 @@ public class TestReadOnlyZKClient {
   }
 
   @Test
+  public void testGetAndExists() throws Exception {
+    assertArrayEquals(DATA, RO_ZK.get(PATH).get());
+    assertEquals(CHILDREN, RO_ZK.exists(PATH).get().getNumChildren());
+    assertNotNull(RO_ZK.zookeeper);
+    waitForIdleConnectionClosed();
+  }
+
+  @Test
   public void testNoNode() throws InterruptedException, ExecutionException {
     String pathNotExists = PATH + "_whatever";
     try {
@@ -132,15 +145,39 @@ public class TestReadOnlyZKClient {
   @Test
   public void testSessionExpire() throws Exception {
     assertArrayEquals(DATA, RO_ZK.get(PATH).get());
-    ZooKeeper zk = RO_ZK.getZooKeeper();
+    ZooKeeper zk = RO_ZK.zookeeper;
     long sessionId = zk.getSessionId();
     UTIL.getZkCluster().getZooKeeperServers().get(0).closeSession(sessionId);
     // should not reach keep alive so still the same instance
-    assertSame(zk, RO_ZK.getZooKeeper());
-    byte [] got = RO_ZK.get(PATH).get();
+    assertSame(zk, RO_ZK.zookeeper);
+    byte[] got = RO_ZK.get(PATH).get();
     assertArrayEquals(DATA, got);
-    assertNotNull(RO_ZK.getZooKeeper());
-    assertNotSame(zk, RO_ZK.getZooKeeper());
-    assertNotEquals(sessionId, RO_ZK.getZooKeeper().getSessionId());
+    assertNotNull(RO_ZK.zookeeper);
+    assertNotSame(zk, RO_ZK.zookeeper);
+    assertNotEquals(sessionId, RO_ZK.zookeeper.getSessionId());
+  }
+
+  @Test
+  public void testNotCloseZkWhenPending() throws Exception {
+    assertArrayEquals(DATA, RO_ZK.get(PATH).get());
+    ZooKeeper mockedZK = spy(RO_ZK.zookeeper);
+    CountDownLatch latch = new CountDownLatch(1);
+    doAnswer(new Answer<Object>() {
+
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        latch.await();
+        return invocation.callRealMethod();
+      }
+    }).when(mockedZK).exists(anyString(), anyBoolean(), any(StatCallback.class), any());
+    RO_ZK.zookeeper = mockedZK;
+    CompletableFuture<Stat> future = RO_ZK.exists(PATH);
+    // 2 * keep alive time to ensure that we will not close the zk when there are pending requests
+    Thread.sleep(6000);
+    assertNotNull(RO_ZK.zookeeper);
+    latch.countDown();
+    assertEquals(CHILDREN, future.get().getNumChildren());
+    // now we will close the idle connection.
+    waitForIdleConnectionClosed();
   }
 }