You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2016/06/11 04:56:00 UTC

[14/50] hbase git commit: HBASE-15727 Canary Tool for Zookeeper (churro morales)

HBASE-15727 Canary Tool for Zookeeper (churro morales)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7e5d5308
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7e5d5308
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7e5d5308

Branch: refs/heads/hbase-12439
Commit: 7e5d530870f146dfdee52e5a228ad84f0aefafd7
Parents: cd25880
Author: tedyu <yu...@gmail.com>
Authored: Thu Jun 2 10:15:08 2016 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Thu Jun 2 10:15:08 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/tool/Canary.java    | 142 ++++++++++++++++++-
 .../hadoop/hbase/tool/TestCanaryTool.java       |  24 +++-
 2 files changed, 160 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/7e5d5308/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java
index ab9971d..360b0f5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java
@@ -19,8 +19,14 @@
 
 package org.apache.hadoop.hbase.tool;
 
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT;
+import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT;
+
+import com.google.common.collect.Lists;
+
 import java.io.Closeable;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -32,12 +38,12 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -78,20 +84,29 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.hadoop.hbase.util.RegionSplitter;
+import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ConnectStringParser;
+import org.apache.zookeeper.data.Stat;
 
 /**
  * HBase Canary Tool, that that can be used to do
  * "canary monitoring" of a running HBase cluster.
  *
- * Here are two modes
+ * Here are three modes
  * 1. region mode - Foreach region tries to get one row per column family
  * and outputs some information about failure or latency.
  *
  * 2. regionserver mode - Foreach regionserver tries to get one row from one table
  * selected randomly and outputs some information about failure or latency.
+ *
+ * 3. zookeeper mode - for each zookeeper instance, selects a zNode and
+ * outputs some information about failure or latency.
  */
 public final class Canary implements Tool {
   // Sink interface used by the canary to outputs information
@@ -188,6 +203,55 @@ public final class Canary implements Tool {
     }
   }
 
+  public static class ZookeeperStdOutSink extends StdOutSink implements ExtendedSink {
+    @Override public void publishReadFailure(String zNode, String server) {
+      incReadFailureCount();
+      LOG.error(String.format("Read from zNode:%s on zookeeper instance:%s", zNode, server));
+    }
+
+    @Override public void publishReadTiming(String znode, String server, long msTime) {
+      LOG.info(String.format("Read from zNode:%s on zookeeper instance:%s in %dms",
+          znode, server, msTime));
+    }
+  }
+
+  static class ZookeeperTask implements Callable<Void> {
+    private final Connection connection;
+    private final String host;
+    private String znode;
+    private final int timeout;
+    private ZookeeperStdOutSink sink;
+
+    public ZookeeperTask(Connection connection, String host, String znode, int timeout,
+        ZookeeperStdOutSink sink) {
+      this.connection = connection;
+      this.host = host;
+      this.znode = znode;
+      this.timeout = timeout;
+      this.sink = sink;
+    }
+
+    @Override public Void call() throws Exception {
+      ZooKeeper zooKeeper = null;
+      try {
+        zooKeeper = new ZooKeeper(host, timeout, EmptyWatcher.instance);
+        Stat exists = zooKeeper.exists(znode, false);
+        StopWatch stopwatch = new StopWatch();
+        stopwatch.start();
+        zooKeeper.getData(znode, false, exists);
+        stopwatch.stop();
+        sink.publishReadTiming(znode, host, stopwatch.getTime());
+      } catch (KeeperException | InterruptedException e) {
+        sink.publishReadFailure(znode, host);
+      } finally {
+        if (zooKeeper != null) {
+          zooKeeper.close();
+        }
+      }
+      return null;
+    }
+  }
+
   /**
    * For each column family of the region tries to get one row and outputs the latency, or the
    * failure.
@@ -462,6 +526,7 @@ public final class Canary implements Tool {
   private long timeout = DEFAULT_TIMEOUT;
   private boolean failOnError = true;
   private boolean regionServerMode = false;
+  private boolean zookeeperMode = false;
   private boolean regionServerAllRegions = false;
   private boolean writeSniffing = false;
   private boolean treatFailureAsError = false;
@@ -522,6 +587,8 @@ public final class Canary implements Tool {
             System.err.println("-interval needs a numeric value argument.");
             printUsageAndExit();
           }
+        } else if (cmd.equals("-zookeeper")) {
+          this.zookeeperMode = true;
         } else if(cmd.equals("-regionserver")) {
           this.regionServerMode = true;
         } else if(cmd.equals("-allRegions")) {
@@ -578,6 +645,13 @@ public final class Canary implements Tool {
       System.err.println("-allRegions can only be specified in regionserver mode.");
       printUsageAndExit();
     }
+    if (this.zookeeperMode) {
+      if (this.regionServerMode || this.regionServerAllRegions || this.writeSniffing) {
+        System.err.println("-zookeeper is exclusive and cannot be combined with "
+            + "other modes.");
+        printUsageAndExit();
+      }
+    }
     return index;
   }
 
@@ -662,6 +736,8 @@ public final class Canary implements Tool {
     System.err.println("      which means to enable regionserver mode");
     System.err.println("   -allRegions    Tries all regions on a regionserver,");
     System.err.println("      only works in regionserver mode.");
+    System.err.println("   -zookeeper    Tries to grab zookeeper.znode.parent ");
+    System.err.println("      on each zookeeper instance");
     System.err.println("   -daemon        Continuous check at defined intervals.");
     System.err.println("   -interval <N>  Interval between checks (sec)");
     System.err.println("   -e             Use table/regionserver as regular expression");
@@ -700,6 +776,10 @@ public final class Canary implements Tool {
           new RegionServerMonitor(connection, monitorTargets, this.useRegExp,
               (ExtendedSink) this.sink, this.executor, this.regionServerAllRegions,
               this.treatFailureAsError);
+    } else if (this.zookeeperMode) {
+      monitor =
+          new ZookeeperMonitor(connection, monitorTargets, this.useRegExp,
+              (ZookeeperStdOutSink) this.sink, this.executor, this.treatFailureAsError);
     } else {
       monitor =
           new RegionMonitor(connection, monitorTargets, this.useRegExp, this.sink, this.executor,
@@ -1040,6 +1120,62 @@ public final class Canary implements Tool {
     }
     return executor.invokeAll(tasks);
   }
+
+  //  monitor for zookeeper mode
+  private static class ZookeeperMonitor extends Monitor {
+    private List<String> hosts;
+    private final String znode;
+    private final int timeout;
+
+    protected ZookeeperMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
+        ExtendedSink sink, ExecutorService executor, boolean treatFailureAsError)  {
+      super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError);
+      Configuration configuration = connection.getConfiguration();
+      znode =
+          configuration.get(ZOOKEEPER_ZNODE_PARENT,
+              DEFAULT_ZOOKEEPER_ZNODE_PARENT);
+      timeout = configuration
+          .getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
+      ConnectStringParser parser =
+          new ConnectStringParser(ZKConfig.getZKQuorumServersString(configuration));
+      hosts = Lists.newArrayList();
+      for (InetSocketAddress server : parser.getServerAddresses()) {
+        hosts.add(server.toString());
+      }
+    }
+
+    @Override public void run() {
+      List<ZookeeperTask> tasks = Lists.newArrayList();
+      for (final String host : hosts) {
+        tasks.add(new ZookeeperTask(connection, host, znode, timeout, getSink()));
+      }
+      try {
+        for (Future<Void> future : this.executor.invokeAll(tasks)) {
+          try {
+            future.get();
+          } catch (ExecutionException e) {
+            LOG.error("Sniff zookeeper failed!", e);
+            this.errorCode = ERROR_EXIT_CODE;
+          }
+        }
+      } catch (InterruptedException e) {
+        this.errorCode = ERROR_EXIT_CODE;
+        Thread.currentThread().interrupt();
+        LOG.error("Sniff zookeeper interrupted!", e);
+      }
+      this.done = true;
+    }
+
+
+    private ZookeeperStdOutSink getSink() {
+      if (!(sink instanceof ZookeeperStdOutSink)) {
+        throw new RuntimeException("Can only write to zookeeper sink");
+      }
+      return ((ZookeeperStdOutSink) sink);
+    }
+  }
+
+
   // a monitor for regionserver mode
   private static class RegionServerMonitor extends Monitor {
 
@@ -1255,7 +1391,7 @@ public final class Canary implements Tool {
     new GenericOptionsParser(conf, args);
 
     int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM);
-    LOG.info("Number of exection threads " + numThreads);
+    LOG.info("Number of execution threads " + numThreads);
 
     ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/7e5d5308/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestCanaryTool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestCanaryTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestCanaryTool.java
index 755e5ba..fd67186 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestCanaryTool.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestCanaryTool.java
@@ -31,6 +31,8 @@ import org.apache.hadoop.util.ToolRunner;
 import org.apache.log4j.Appender;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.spi.LoggingEvent;
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.hbase.HConstants;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -40,13 +42,11 @@ import org.mockito.ArgumentMatcher;
 import org.mockito.Mock;
 import org.mockito.runners.MockitoJUnitRunner;
 
-
-import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
-import static org.junit.Assert.*;
 import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.spy;
@@ -79,6 +79,24 @@ public class TestCanaryTool {
   Appender mockAppender;
 
   @Test
+  public void testBasicZookeeperCanaryWorks() throws Exception {
+    Integer port =
+        Iterables.getOnlyElement(testingUtility.getZkCluster().getClientPortList(), null);
+    testingUtility.getConfiguration().set(HConstants.ZOOKEEPER_QUORUM,
+        "localhost:" + port + "/hbase");
+    ExecutorService executor = new ScheduledThreadPoolExecutor(2);
+    Canary.ZookeeperStdOutSink sink = spy(new Canary.ZookeeperStdOutSink());
+    Canary canary = new Canary(executor, sink);
+    String[] args = { "-t", "10000", "-zookeeper" };
+    ToolRunner.run(testingUtility.getConfiguration(), canary, args);
+
+    String baseZnode = testingUtility.getConfiguration()
+        .get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
+    verify(sink, atLeastOnce())
+        .publishReadTiming(eq(baseZnode), eq("localhost:" + port), anyLong());
+  }
+
+  @Test
   public void testBasicCanaryWorks() throws Exception {
     TableName tableName = TableName.valueOf("testTable");
     Table table = testingUtility.createTable(tableName, new byte[][] { FAMILY });