You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2015/05/22 21:27:24 UTC
hbase git commit: HBASE-13712 Backport HBASE-13199 to branch-1
Repository: hbase
Updated Branches:
refs/heads/0.98 036c684e1 -> e8d8ca74e
HBASE-13712 Backport HBASE-13199 to branch-1
0.98 backport. Includes:
HBASE-13199 Some small improvements on canary tool (Shaohui Liu)
HBASE-13199 ADDENDUM Some small improvements on canary tool (Shaohui Liu)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e8d8ca74
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e8d8ca74
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e8d8ca74
Branch: refs/heads/0.98
Commit: e8d8ca74e553479efdd75f81f705c88a7ec66947
Parents: 036c684
Author: Andrew Purtell <ap...@apache.org>
Authored: Fri May 22 12:13:27 2015 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri May 22 12:13:27 2015 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/tool/Canary.java | 496 +++++++++++++------
1 file changed, 339 insertions(+), 157 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/e8d8ca74/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java
index 8107027..f55bc3c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java
@@ -19,14 +19,23 @@
package org.apache.hadoop.hbase.tool;
+import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -46,10 +55,14 @@ import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -113,6 +126,171 @@ public final class Canary implements Tool {
}
}
+ /**
+ * For each column family of the region tries to get one row and outputs the latency, or the
+ * failure.
+ */
+ static class RegionTask implements Callable<Void> {
+ private HConnection connection;
+ private HRegionInfo region;
+ private Sink sink;
+
+ RegionTask(HConnection connection, HRegionInfo region, Sink sink) {
+ this.connection = connection;
+ this.region = region;
+ this.sink = sink;
+ }
+
+ @Override
+ public Void call() {
+ HTableInterface table = null;
+ HTableDescriptor tableDesc = null;
+ try {
+ table = connection.getTable(region.getTable());
+ tableDesc = table.getTableDescriptor();
+ } catch (IOException e) {
+ LOG.debug("sniffRegion failed", e);
+ sink.publishReadFailure(region, e);
+ if (table != null) {
+ try {
+ table.close();
+ } catch (IOException ioe) {
+ }
+ }
+ return null;
+ }
+
+ byte[] startKey = null;
+ Get get = null;
+ Scan scan = null;
+ ResultScanner rs = null;
+ StopWatch stopWatch = new StopWatch();
+ for (HColumnDescriptor column : tableDesc.getColumnFamilies()) {
+ stopWatch.reset();
+ startKey = region.getStartKey();
+ // Can't do a get on empty start row so do a Scan of first element if any instead.
+ if (startKey.length > 0) {
+ get = new Get(startKey);
+ get.setCacheBlocks(false);
+ get.setFilter(new FirstKeyOnlyFilter());
+ get.addFamily(column.getName());
+ } else {
+ scan = new Scan();
+ scan.setCaching(1);
+ scan.setCacheBlocks(false);
+ scan.setFilter(new FirstKeyOnlyFilter());
+ scan.addFamily(column.getName());
+ scan.setMaxResultSize(1L);
+ }
+
+ try {
+ if (startKey.length > 0) {
+ stopWatch.start();
+ table.get(get);
+ stopWatch.stop();
+ sink.publishReadTiming(region, column, stopWatch.getTime());
+ } else {
+ stopWatch.start();
+ rs = table.getScanner(scan);
+ stopWatch.stop();
+ sink.publishReadTiming(region, column, stopWatch.getTime());
+ }
+ } catch (Exception e) {
+ sink.publishReadFailure(region, column, e);
+ } finally {
+ if (rs != null) {
+ rs.close();
+ }
+ scan = null;
+ get = null;
+ startKey = null;
+ }
+ }
+ try {
+ table.close();
+ } catch (IOException e) {
+ }
+ return null;
+ }
+ }
+
+ /**
+ * Get one row from a region on the regionserver and outputs the latency, or the failure.
+ */
+ static class RegionServerTask implements Callable<Void> {
+ private HConnection connection;
+ private String serverName;
+ private HRegionInfo region;
+ private ExtendedSink sink;
+
+ RegionServerTask(HConnection connection, String serverName, HRegionInfo region,
+ ExtendedSink sink) {
+ this.connection = connection;
+ this.serverName = serverName;
+ this.region = region;
+ this.sink = sink;
+ }
+
+ @Override
+ public Void call() {
+ TableName tableName = null;
+ HTableInterface table = null;
+ Get get = null;
+ byte[] startKey = null;
+ Scan scan = null;
+ StopWatch stopWatch = new StopWatch();
+ // monitor one region on every region server
+ stopWatch.reset();
+ try {
+ tableName = region.getTable();
+ table = connection.getTable(tableName);
+ startKey = region.getStartKey();
+ // Can't do a get on empty start row so do a Scan of first element if any instead.
+ if (startKey.length > 0) {
+ get = new Get(startKey);
+ get.setCacheBlocks(false);
+ get.setFilter(new FirstKeyOnlyFilter());
+ stopWatch.start();
+ table.get(get);
+ stopWatch.stop();
+ } else {
+ scan = new Scan();
+ scan.setCacheBlocks(false);
+ scan.setFilter(new FirstKeyOnlyFilter());
+ scan.setCaching(1);
+ scan.setMaxResultSize(1L);
+ stopWatch.start();
+ ResultScanner s = table.getScanner(scan);
+ s.close();
+ stopWatch.stop();
+ }
+ sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime());
+ } catch (TableNotFoundException tnfe) {
+ // This is ignored because it doesn't imply that the regionserver is dead
+ } catch (TableNotEnabledException tnee) {
+ // This is considered a success since we got a response.
+ LOG.debug("The targeted table was disabled. Assuming success.");
+ } catch (DoNotRetryIOException dnrioe) {
+ sink.publishReadFailure(tableName.getNameAsString(), serverName);
+ LOG.error(dnrioe);
+ } catch (IOException e) {
+ sink.publishReadFailure(tableName.getNameAsString(), serverName);
+ LOG.error(e);
+ } finally {
+ if (table != null) {
+ try {
+ table.close();
+ } catch (IOException e) {/* DO NOTHING */
+ }
+ }
+ scan = null;
+ get = null;
+ startKey = null;
+ }
+ return null;
+ }
+ }
+
private static final int USAGE_EXIT_CODE = 1;
private static final int INIT_ERROR_EXIT_CODE = 2;
private static final int TIMEOUT_ERROR_EXIT_CODE = 3;
@@ -122,6 +300,8 @@ public final class Canary implements Tool {
private static final long DEFAULT_TIMEOUT = 600000; // 10 mins
+ private static final int MAX_THREADS_NUM = 16; // #threads to contact regions
+
private static final Log LOG = LogFactory.getLog(Canary.class);
private Configuration conf = null;
@@ -132,12 +312,14 @@ public final class Canary implements Tool {
private long timeout = DEFAULT_TIMEOUT;
private boolean failOnError = true;
private boolean regionServerMode = false;
+ private ExecutorService executor; // threads to retrieve data from regionservers
public Canary() {
- this(new RegionServerStdOutSink());
+ this(new ScheduledThreadPoolExecutor(1), new RegionServerStdOutSink());
}
- public Canary(Sink sink) {
+ public Canary(ExecutorService executor, Sink sink) {
+ this.executor = executor;
this.sink = sink;
}
@@ -227,54 +409,65 @@ public final class Canary implements Tool {
}
}
- // launches chore for refreshing kerberos ticket if security is enabled.
+ // Launches chore for refreshing kerberos credentials if security is enabled.
+ // Please see http://hbase.apache.org/book.html#_running_canary_in_a_kerberos_enabled_cluster
+ // for more details.
AuthUtil.launchAuthChore(conf);
- // start to prepare the stuffs
+ // Start to prepare the stuffs
Monitor monitor = null;
Thread monitorThread = null;
long startTime = 0;
long currentTimeLength = 0;
+ // Get a connection to use in below.
+ HConnection connection = HConnectionManager.createConnection(this.conf);
+ try {
+ do {
+ // Do monitor !!
+ try {
+ monitor = this.newMonitor(connection, index, args);
+ monitorThread = new Thread(monitor);
+ startTime = System.currentTimeMillis();
+ monitorThread.start();
+ while (!monitor.isDone()) {
+ // wait for 1 sec
+ Thread.sleep(1000);
+ // exit if any error occurs
+ if (this.failOnError && monitor.hasError()) {
+ monitorThread.interrupt();
+ if (monitor.initialized) {
+ System.exit(monitor.errorCode);
+ } else {
+ System.exit(INIT_ERROR_EXIT_CODE);
+ }
+ }
+ currentTimeLength = System.currentTimeMillis() - startTime;
+ if (currentTimeLength > this.timeout) {
+ LOG.error("The monitor is running too long (" + currentTimeLength
+ + ") after timeout limit:" + this.timeout
+ + " will be killed itself !!");
+ if (monitor.initialized) {
+ System.exit(TIMEOUT_ERROR_EXIT_CODE);
+ } else {
+ System.exit(INIT_ERROR_EXIT_CODE);
+ }
+ break;
+ }
+ }
- do {
- // do monitor !!
- monitor = this.newMonitor(index, args);
- monitorThread = new Thread(monitor);
- startTime = System.currentTimeMillis();
- monitorThread.start();
- while (!monitor.isDone()) {
- // wait for 1 sec
- Thread.sleep(1000);
- // exit if any error occurs
- if (this.failOnError && monitor.hasError()) {
- monitorThread.interrupt();
- if (monitor.initialized) {
+ if (this.failOnError && monitor.hasError()) {
+ monitorThread.interrupt();
System.exit(monitor.errorCode);
- } else {
- System.exit(INIT_ERROR_EXIT_CODE);
- }
- }
- currentTimeLength = System.currentTimeMillis() - startTime;
- if (currentTimeLength > this.timeout) {
- LOG.error("The monitor is running too long (" + currentTimeLength
- + ") after timeout limit:" + this.timeout
- + " will be killed itself !!");
- if (monitor.initialized) {
- System.exit(TIMEOUT_ERROR_EXIT_CODE);
- } else {
- System.exit(INIT_ERROR_EXIT_CODE);
}
- break;
+ } finally {
+ if (monitor != null) monitor.close();
}
- }
-
- if (this.failOnError && monitor.hasError()) {
- monitorThread.interrupt();
- System.exit(monitor.errorCode);
- }
- Thread.sleep(interval);
- } while (interval > 0);
+ Thread.sleep(interval);
+ } while (interval > 0);
+ } finally {
+ connection.close();
+ }
return(monitor.errorCode);
}
@@ -298,13 +491,13 @@ public final class Canary implements Tool {
}
/**
- * a Factory method for {@link Monitor}.
- * Can be overrided by user.
+ * A Factory method for {@link Monitor}.
+ * Can be overridden by user.
* @param index a start index for monitor target
* @param args args passed from user
* @return a Monitor instance
*/
- public Monitor newMonitor(int index, String[] args) {
+ public Monitor newMonitor(final HConnection connection, int index, String[] args) {
Monitor monitor = null;
String[] monitorTargets = null;
@@ -314,22 +507,21 @@ public final class Canary implements Tool {
System.arraycopy(args, index, monitorTargets, 0, length);
}
- if(this.regionServerMode) {
- monitor = new RegionServerMonitor(
- this.conf,
- monitorTargets,
- this.useRegExp,
- (ExtendedSink)this.sink);
+ if (this.regionServerMode) {
+ monitor =
+ new RegionServerMonitor(connection, monitorTargets, this.useRegExp,
+ (ExtendedSink) this.sink, this.executor);
} else {
- monitor = new RegionMonitor(this.conf, monitorTargets, this.useRegExp, this.sink);
+ monitor =
+ new RegionMonitor(connection, monitorTargets, this.useRegExp, this.sink, this.executor);
}
return monitor;
}
// a Monitor super-class can be extended by users
- public static abstract class Monitor implements Runnable {
+ public static abstract class Monitor implements Runnable, Closeable {
- protected Configuration config;
+ protected HConnection connection;
protected HBaseAdmin admin;
protected String[] targets;
protected boolean useRegExp;
@@ -338,6 +530,7 @@ public final class Canary implements Tool {
protected boolean done = false;
protected int errorCode = 0;
protected Sink sink;
+ protected ExecutorService executor;
public boolean isDone() {
return done;
@@ -347,15 +540,20 @@ public final class Canary implements Tool {
return errorCode != 0;
}
- protected Monitor(Configuration config, String[] monitorTargets,
- boolean useRegExp, Sink sink) {
- if (null == config)
- throw new IllegalArgumentException("config shall not be null");
+ @Override
+ public void close() throws IOException {
+ if (this.admin != null) this.admin.close();
+ }
+
+ protected Monitor(HConnection connection, String[] monitorTargets, boolean useRegExp, Sink sink,
+ ExecutorService executor) {
+ if (null == connection) throw new IllegalArgumentException("connection shall not be null");
- this.config = config;
+ this.connection = connection;
this.targets = monitorTargets;
this.useRegExp = useRegExp;
this.sink = sink;
+ this.executor = executor;
}
public abstract void run();
@@ -363,7 +561,7 @@ public final class Canary implements Tool {
protected boolean initAdmin() {
if (null == this.admin) {
try {
- this.admin = new HBaseAdmin(config);
+ this.admin = new HBaseAdmin(connection);
} catch (Exception e) {
LOG.error("Initial HBaseAdmin failed...", e);
this.errorCode = INIT_ERROR_EXIT_CODE;
@@ -379,23 +577,31 @@ public final class Canary implements Tool {
// a monitor for region mode
private static class RegionMonitor extends Monitor {
- public RegionMonitor(Configuration config, String[] monitorTargets,
- boolean useRegExp, Sink sink) {
- super(config, monitorTargets, useRegExp, sink);
+ public RegionMonitor(HConnection connection, String[] monitorTargets, boolean useRegExp,
+ Sink sink, ExecutorService executor) {
+ super(connection, monitorTargets, useRegExp, sink, executor);
}
@Override
public void run() {
- if(this.initAdmin()) {
+ if (this.initAdmin()) {
try {
+ List<Future<Void>> taskFutures = new LinkedList<Future<Void>>();
if (this.targets != null && this.targets.length > 0) {
String[] tables = generateMonitorTables(this.targets);
this.initialized = true;
for (String table : tables) {
- Canary.sniff(admin, sink, table);
+ taskFutures.addAll(Canary.sniff(admin, sink, table, executor));
}
} else {
- sniff();
+ taskFutures.addAll(sniff());
+ }
+ for (Future<Void> future : taskFutures) {
+ try {
+ future.get();
+ } catch (ExecutionException e) {
+ LOG.error("Sniff region failed!", e);
+ }
}
} catch (Exception e) {
LOG.error("Run regionMonitor failed", e);
@@ -408,7 +614,7 @@ public final class Canary implements Tool {
private String[] generateMonitorTables(String[] monitorTargets) throws IOException {
String[] returnTables = null;
- if(this.useRegExp) {
+ if (this.useRegExp) {
Pattern pattern = null;
HTableDescriptor[] tds = null;
Set<String> tmpTables = new TreeSet<String>();
@@ -422,16 +628,15 @@ public final class Canary implements Tool {
}
}
}
- } catch(IOException e) {
+ } catch (IOException e) {
LOG.error("Communicate with admin failed", e);
throw e;
}
- if(tmpTables.size() > 0) {
+ if (tmpTables.size() > 0) {
returnTables = tmpTables.toArray(new String[tmpTables.size()]);
} else {
- String msg = "No HTable found, tablePattern:"
- + Arrays.toString(monitorTargets);
+ String msg = "No HTable found, tablePattern:" + Arrays.toString(monitorTargets);
LOG.error(msg);
this.errorCode = INIT_ERROR_EXIT_CODE;
throw new TableNotFoundException(msg);
@@ -446,12 +651,15 @@ public final class Canary implements Tool {
/*
* canary entry point to monitor all the tables.
*/
- private void sniff() throws Exception {
+ private List<Future<Void>> sniff() throws Exception {
+ List<Future<Void>> taskFutures = new LinkedList<Future<Void>>();
for (HTableDescriptor table : admin.listTables()) {
- Canary.sniff(admin, sink, table);
+ if (admin.isTableEnabled(table.getTableName())) {
+ taskFutures.addAll(Canary.sniff(admin, sink, table, executor));
+ }
}
+ return taskFutures;
}
-
}
/**
@@ -459,47 +667,49 @@ public final class Canary implements Tool {
* @throws Exception
*/
public static void sniff(final HBaseAdmin admin, TableName tableName) throws Exception {
- sniff(admin, new StdOutSink(), tableName.getNameAsString());
+ List<Future<Void>> taskFutures =
+ Canary.sniff(admin, new StdOutSink(), tableName.getNameAsString(),
+ new ScheduledThreadPoolExecutor(1));
+ for (Future<Void> future : taskFutures) {
+ future.get();
+ }
}
/**
* Canary entry point for specified table.
* @throws Exception
*/
- private static void sniff(final HBaseAdmin admin, final Sink sink, String tableName)
- throws Exception {
- if (admin.isTableAvailable(tableName)) {
- sniff(admin, sink, admin.getTableDescriptor(tableName.getBytes()));
+ private static List<Future<Void>> sniff(final HBaseAdmin admin, final Sink sink, String tableName,
+ ExecutorService executor) throws Exception {
+ if (admin.isTableEnabled(TableName.valueOf(tableName))) {
+ return Canary.sniff(admin, sink, admin.getTableDescriptor(TableName.valueOf(tableName)),
+ executor);
} else {
- LOG.warn(String.format("Table %s is not available", tableName));
+ LOG.warn(String.format("Table %s is not enabled", tableName));
}
+ return new LinkedList<Future<Void>>();
}
/*
* Loops over regions that owns this table, and output some information abouts the state.
*/
- private static void sniff(final HBaseAdmin admin, final Sink sink, HTableDescriptor tableDesc)
- throws Exception {
- HTable table = null;
-
+ private static List<Future<Void>> sniff(final HBaseAdmin admin, final Sink sink,
+ HTableDescriptor tableDesc, ExecutorService executor) throws Exception {
+ HTableInterface table = null;
try {
- table = new HTable(admin.getConfiguration(), tableDesc.getName());
+ table = admin.getConnection().getTable(tableDesc.getTableName());
} catch (TableNotFoundException e) {
- return;
+ return new ArrayList<Future<Void>>();
}
-
+ List<RegionTask> tasks = new ArrayList<RegionTask>();
try {
- for (HRegionInfo region : admin.getTableRegions(tableDesc.getName())) {
- try {
- sniffRegion(admin, sink, region, table);
- } catch (Exception e) {
- sink.publishReadFailure(region, e);
- LOG.debug("sniffRegion failed", e);
- }
+ for (HRegionInfo region : admin.getTableRegions(tableDesc.getTableName())) {
+ tasks.add(new RegionTask(admin.getConnection(), region, sink));
}
} finally {
table.close();
}
+ return executor.invokeAll(tasks);
}
/*
@@ -510,7 +720,7 @@ public final class Canary implements Tool {
final HBaseAdmin admin,
final Sink sink,
HRegionInfo region,
- HTable table) throws Exception {
+ HTableInterface table) throws Exception {
HTableDescriptor tableDesc = table.getTableDescriptor();
byte[] startKey = null;
Get get = null;
@@ -560,12 +770,12 @@ public final class Canary implements Tool {
}
}
}
- //a monitor for regionserver mode
+ // a monitor for regionserver mode
private static class RegionServerMonitor extends Monitor {
- public RegionServerMonitor(Configuration config, String[] monitorTargets,
- boolean useRegExp, ExtendedSink sink) {
- super(config, monitorTargets, useRegExp, sink);
+ public RegionServerMonitor(HConnection connection, String[] monitorTargets, boolean useRegExp,
+ ExtendedSink sink, ExecutorService executor) {
+ super(connection, monitorTargets, useRegExp, sink, executor);
}
private ExtendedSink getSink() {
@@ -613,62 +823,27 @@ public final class Canary implements Tool {
}
private void monitorRegionServers(Map<String, List<HRegionInfo>> rsAndRMap) {
- String serverName = null;
- String tableName = null;
- HRegionInfo region = null;
- HTable table = null;
- Get get = null;
- byte[] startKey = null;
- Scan scan = null;
- StopWatch stopWatch = new StopWatch();
+ List<RegionServerTask> tasks = new ArrayList<RegionServerTask>();
+ Random rand =new Random();
// monitor one region on every region server
for (Map.Entry<String, List<HRegionInfo>> entry : rsAndRMap.entrySet()) {
- stopWatch.reset();
- serverName = entry.getKey();
- // always get the first region
- region = entry.getValue().get(0);
- try {
- tableName = region.getTable().getNameAsString();
- table = new HTable(this.admin.getConfiguration(), tableName);
- startKey = region.getStartKey();
- // Can't do a get on empty start row so do a Scan of first element if any instead.
- if(startKey.length > 0) {
- get = new Get(startKey);
- stopWatch.start();
- table.get(get);
- stopWatch.stop();
- } else {
- scan = new Scan();
- scan.setCaching(1);
- scan.setMaxResultSize(1L);
- stopWatch.start();
- table.getScanner(scan);
- stopWatch.stop();
- }
- this.getSink().publishReadTiming(tableName, serverName, stopWatch.getTime());
- } catch (TableNotFoundException tnfe) {
- // This is ignored because it doesn't imply that the regionserver is dead
- } catch (TableNotEnabledException tnee) {
- // This is considered a success since we got a response.
- LOG.debug("The targeted table was disabled. Assuming success.");
- } catch (DoNotRetryIOException dnrioe) {
- this.getSink().publishReadFailure(tableName, serverName);
- LOG.error(dnrioe);
- } catch (IOException e) {
- this.getSink().publishReadFailure(tableName, serverName);
- LOG.error(e);
- this.errorCode = ERROR_EXIT_CODE;
- } finally {
- if (table != null) {
- try {
- table.close();
- } catch (IOException e) {/* DO NOTHING */
- }
+ String serverName = entry.getKey();
+ // random select a region
+ HRegionInfo region = entry.getValue().get(rand.nextInt(entry.getValue().size()));
+ tasks.add(new RegionServerTask(this.connection, serverName, region, getSink()));
+ }
+ try {
+ for (Future<Void> future : this.executor.invokeAll(tasks)) {
+ try {
+ future.get();
+ } catch (ExecutionException e) {
+ LOG.error("Sniff regionserver failed!", e);
+ this.errorCode = ERROR_EXIT_CODE;
}
- scan = null;
- get = null;
- startKey = null;
}
+ } catch (InterruptedException e) {
+ this.errorCode = ERROR_EXIT_CODE;
+ LOG.error("Sniff regionserver failed!", e);
}
}
@@ -680,18 +855,16 @@ public final class Canary implements Tool {
private Map<String, List<HRegionInfo>> getAllRegionServerByName() {
Map<String, List<HRegionInfo>> rsAndRMap = new HashMap<String, List<HRegionInfo>>();
- HTable table = null;
+ HTableInterface table = null;
try {
HTableDescriptor[] tableDescs = this.admin.listTables();
List<HRegionInfo> regions = null;
for (HTableDescriptor tableDesc : tableDescs) {
- table = new HTable(this.admin.getConfiguration(), tableDesc.getName());
-
- for (Map.Entry<HRegionInfo, ServerName> entry : table
- .getRegionLocations().entrySet()) {
- ServerName rs = entry.getValue();
+ table = this.admin.getConnection().getTable(tableDesc.getTableName());
+ for (Entry<HRegionInfo, ServerName> e: ((HTable)table).getRegionLocations().entrySet()) {
+ HRegionInfo r = e.getKey();
+ ServerName rs = e.getValue();
String rsName = rs.getHostname();
- HRegionInfo r = entry.getKey();
if (rsAndRMap.containsKey(rsName)) {
regions = rsAndRMap.get(rsName);
@@ -735,7 +908,7 @@ public final class Canary implements Tool {
if (this.useRegExp) {
regExpFound = false;
pattern = Pattern.compile(rsName);
- for (Map.Entry<String,List<HRegionInfo>> entry : fullRsAndRMap.entrySet()) {
+ for (Map.Entry<String, List<HRegionInfo>> entry : fullRsAndRMap.entrySet()) {
matcher = pattern.matcher(entry.getKey());
if (matcher.matches()) {
filteredRsAndRMap.put(entry.getKey(), entry.getValue());
@@ -762,7 +935,16 @@ public final class Canary implements Tool {
public static void main(String[] args) throws Exception {
final Configuration conf = HBaseConfiguration.create();
- int exitCode = ToolRunner.run(conf, new Canary(), args);
+ AuthUtil.launchAuthChore(conf);
+ int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM);
+ ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads);
+
+ Class<? extends Sink> sinkClass =
+ conf.getClass("hbase.canary.sink.class", StdOutSink.class, Sink.class);
+ Sink sink = ReflectionUtils.newInstance(sinkClass);
+
+ int exitCode = ToolRunner.run(conf, new Canary(executor, sink), args);
+ executor.shutdown();
System.exit(exitCode);
}
}