You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/03/12 22:17:20 UTC
svn commit: r1576909 [4/18] - in /hbase/branches/0.89-fb/src: ./
examples/thrift/ main/java/org/apache/hadoop/hbase/
main/java/org/apache/hadoop/hbase/avro/
main/java/org/apache/hadoop/hbase/avro/generated/
main/java/org/apache/hadoop/hbase/client/ mai...
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Wed Mar 12 21:17:13 2014
@@ -19,26 +19,46 @@
*/
package org.apache.hadoop.hbase.client;
+import com.google.common.base.Preconditions;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
-import org.apache.hadoop.hbase.ipc.*;
+import org.apache.hadoop.hbase.ipc.HBaseRPC;
+import org.apache.hadoop.hbase.ipc.HBaseRPCOptions;
+import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
+import org.apache.hadoop.hbase.ipc.HConnectionParams;
+import org.apache.hadoop.hbase.ipc.HMasterInterface;
+import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.ipc.ThriftClientInterface;
+import org.apache.hadoop.hbase.ipc.ThriftHRegionInterface;
+import org.apache.hadoop.hbase.ipc.thrift.HBaseThriftRPC;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionOverloadedException;
-import org.apache.hadoop.hbase.util.*;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.MetaUtils;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.ipc.RemoteException;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.thrift.transport.TTransportException;
import java.io.EOFException;
import java.io.IOException;
@@ -46,11 +66,33 @@ import java.io.InterruptedIOException;
import java.io.SyncFailedException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.ConnectException;
+import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.nio.channels.ClosedChannelException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
-import java.util.concurrent.*;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -81,31 +123,27 @@ public class HConnectionManager {
super();
}
- private static String ZK_INSTANCE_NAME =
- HConnectionManager.class.getSimpleName();
-
- private static final int MAX_CACHED_HBASE_INSTANCES=31;
+ private static final int MAX_CACHED_HBASE_INSTANCES = 31;
// A LRU Map of master HBaseConfiguration -> connection information for that
// instance. The objects it contains are mutable and hence require
- // synchronized access to them. We set instances to 31. The zk default max
+ // synchronized access to them. We set instances to 31. The zk default max
// connections is 30 so should run into zk issues before hit this value of 31.
- private static
- final Map<Integer, TableServers> HBASE_INSTANCES =
- new LinkedHashMap<Integer, TableServers>
- ((int) (MAX_CACHED_HBASE_INSTANCES/0.75F)+1, 0.75F, true) {
- @Override
- protected boolean removeEldestEntry(Map.Entry<Integer, TableServers> eldest) {
- return size() > MAX_CACHED_HBASE_INSTANCES;
- }
+ private static final Map<Integer, TableServers> HBASE_INSTANCES = new LinkedHashMap<Integer, TableServers>(
+ (int) (MAX_CACHED_HBASE_INSTANCES / 0.75F) + 1, 0.75F, true) {
+ @Override
+ protected boolean removeEldestEntry(Map.Entry<Integer, TableServers> eldest) {
+ return size() > MAX_CACHED_HBASE_INSTANCES;
+ }
};
- private static final Map<String, ClientZKConnection> ZK_WRAPPERS =
- new HashMap<String, ClientZKConnection>();
+ private static final Map<String, ClientZKConnection> ZK_WRAPPERS = new HashMap<String, ClientZKConnection>();
/**
* Get the connection object for the instance specified by the configuration
* If no current connection exists, create a new connection for that instance
- * @param conf configuration
+ *
+ * @param conf
+ * configuration
* @return HConnection object for the instance specified by the configuration
*/
public static HConnection getConnection(Configuration conf) {
@@ -123,11 +161,13 @@ public class HConnectionManager {
/**
* Delete connection information for the instance specified by configuration
- * @param conf configuration
- * @param stopProxy stop the proxy as well
+ *
+ * @param conf
+ * configuration
+ * @param stopProxy
+ * stop the proxy as well
*/
- public static void deleteConnectionInfo(Configuration conf,
- boolean stopProxy) {
+ public static void deleteConnectionInfo(Configuration conf, boolean stopProxy) {
synchronized (HBASE_INSTANCES) {
Integer key = HBaseConfiguration.hashCode(conf);
TableServers t = HBASE_INSTANCES.remove(key);
@@ -147,9 +187,14 @@ public class HConnectionManager {
t.close();
}
}
- HBaseRPC.stopClients ();
+ HBaseRPC.stopClients();
}
deleteAllZookeeperConnections();
+ try {
+ HBaseThriftRPC.clearAll();
+ } catch (Exception e) {
+ // Exiting
+ }
}
/**
@@ -183,160 +228,31 @@ public class HConnectionManager {
}
/**
- * This class is responsible to handle connection and reconnection to a
- * zookeeper quorum.
- *
- */
- public static class ClientZKConnection implements Abortable, Watcher {
-
- static final Log LOG = LogFactory.getLog(ClientZKConnection.class);
- private ZooKeeperWrapper zooKeeperWrapper;
- private Configuration conf;
- private boolean aborted = false;
- private int reconnectionTimes = 0;
- private int maxReconnectionTimes = 0;
-
- /**
- * Create a ClientZKConnection
- *
- * @param conf
- * configuration
- */
- public ClientZKConnection(Configuration conf) {
- this.conf = conf;
- maxReconnectionTimes =
- conf.getInt("hbase.client.max.zookeeper.reconnection", 3);
- }
-
- /**
- * Get the zookeeper wrapper for this connection, instantiate it if
- * necessary.
- *
- * @return zooKeeperWrapper
- * @throws java.io.IOException
- * if a remote or network exception occurs
- */
- public synchronized ZooKeeperWrapper getZooKeeperWrapper()
- throws IOException {
- if (zooKeeperWrapper == null) {
- if (this.reconnectionTimes < this.maxReconnectionTimes) {
- zooKeeperWrapper = ZooKeeperWrapper.createInstance(conf,
- ZK_INSTANCE_NAME, this);
- } else {
- String msg = "HBase client failed to connection to zk after "
- + maxReconnectionTimes + " attempts";
- LOG.fatal(msg);
- throw new IOException(msg);
- }
- }
- return zooKeeperWrapper;
- }
-
- /**
- * Close this connection to zookeeper.
- */
- private synchronized void closeZooKeeperConnection() {
- if (zooKeeperWrapper != null) {
- zooKeeperWrapper.close();
- zooKeeperWrapper = null;
- }
- }
-
- /**
- * Reset this connection to zookeeper.
- *
- * @throws IOException
- * If there is any exception when reconnect to zookeeper
- */
- private synchronized void resetZooKeeperConnection() throws IOException {
- // close the zookeeper connection first
- closeZooKeeperConnection();
- // reconnect to zookeeper
- zooKeeperWrapper = ZooKeeperWrapper.createInstance(conf, ZK_INSTANCE_NAME,
- this);
- }
-
- @Override
- public synchronized void process(WatchedEvent event) {
- LOG.debug("Received ZK WatchedEvent: " +
- "[path=" + event.getPath() + "] " +
- "[state=" + event.getState().toString() + "] " +
- "[type=" + event.getType().toString() + "]");
- if (event.getType() == EventType.None &&
- event.getState() == KeeperState.SyncConnected) {
- LOG.info("Reconnected to ZooKeeper");
- // reset the reconnection times
- reconnectionTimes = 0;
- }
- }
-
- @Override
- public void abort(final String msg, Throwable t) {
- if (t != null && t instanceof KeeperException.SessionExpiredException) {
- try {
- reconnectionTimes++;
- LOG.info("This client just lost it's session with ZooKeeper, "
- + "trying the " + reconnectionTimes + " times to reconnect.");
- // reconnect to zookeeper if possible
- resetZooKeeperConnection();
-
- LOG.info("Reconnected successfully. This disconnect could have been"
- + " caused by a network partition or a long-running GC pause,"
- + " either way it's recommended that you verify your "
- + "environment.");
- this.aborted = false;
- return;
- } catch (IOException e) {
- LOG.error("Could not reconnect to ZooKeeper after session"
- + " expiration, aborting");
- t = e;
- }
- }
- if (t != null)
- LOG.fatal(msg, t);
- else
- LOG.fatal(msg);
-
- this.aborted = true;
- closeZooKeeperConnection();
- }
-
- @Override
- public boolean isAborted() {
- return this.aborted;
- }
- }
-
- /**
* It is provided for unit test cases which verify the behavior of region
* location cache prefetch.
+ *
* @return Number of cached regions for the table.
*/
- static int getCachedRegionCount(Configuration conf,
- byte[] tableName) {
- TableServers connection = (TableServers)getConnection(conf);
+ static int getCachedRegionCount(Configuration conf, byte[] tableName) {
+ TableServers connection = (TableServers) getConnection(conf);
return connection.getNumberOfCachedRegionLocations(tableName);
}
/**
* It's provided for unit test cases which verify the behavior of region
* location cache prefetch.
+ *
* @return true if the region where the table and row reside is cached.
*/
- static boolean isRegionCached(Configuration conf,
- byte[] tableName, byte[] row) {
- TableServers connection = (TableServers)getConnection(conf);
+ static boolean isRegionCached(Configuration conf, byte[] tableName, byte[] row) {
+ TableServers connection = (TableServers) getConnection(conf);
return connection.isRegionCached(tableName, row);
}
/* Encapsulates finding the servers for an HBase instance */
- static class TableServers implements ServerConnection {
+ public static class TableServers implements ServerConnection {
static final Log LOG = LogFactory.getLog(TableServers.class);
private final Class<? extends HRegionInterface> serverInterfaceClass;
- private final long pause;
- private final int numRetries;
- private final int rpcTimeout;
- private final long rpcRetryTimeout;
private final int prefetchRegionLimit;
private final Object masterLock = new Object();
@@ -349,26 +265,27 @@ public class HConnectionManager {
private final Object userRegionLock = new Object();
private volatile Configuration conf;
+ private final HConnectionParams params;
// Used by master and region servers during safe mode only
private volatile HRegionLocation rootRegionLocation;
- private final Map<Integer, ConcurrentSkipListMap<byte [], HRegionLocation>>
- cachedRegionLocations = new ConcurrentHashMap<Integer,
- ConcurrentSkipListMap<byte [], HRegionLocation>>();
+ private final Map<Integer, ConcurrentSkipListMap<byte[], HRegionLocation>> cachedRegionLocations = new ConcurrentHashMap<Integer, ConcurrentSkipListMap<byte[], HRegionLocation>>();
- // amount of time to wait before we consider a server to be in fast fail mode
+ // amount of time to wait before we consider a server to be in fast fail
+ // mode
protected long fastFailThresholdMilliSec;
// Keeps track of failures when we cannot talk to a server. Helps in
// fast failing clients if the server is down for a long time.
- protected final ConcurrentMap<HServerAddress, FailureInfo> repeatedFailuresMap =
- new ConcurrentHashMap<HServerAddress, FailureInfo>();
- // We populate repeatedFailuresMap every time there is a failure. So, to keep it
+ protected final ConcurrentMap<HServerAddress, FailureInfo> repeatedFailuresMap = new ConcurrentHashMap<HServerAddress, FailureInfo>();
+ // We populate repeatedFailuresMap every time there is a failure. So, to
+ // keep it
// from growing unbounded, we garbage collect the failure information
// every cleanupInterval.
protected final long failureMapCleanupIntervalMilliSec;
protected volatile long lastFailureMapCleanupTimeMilliSec;
- // Amount of time that has to pass, before we clear region -> regionserver cache
+ // Amount of time that has to pass, before we clear region -> regionserver
+ // cache
// again, when in fast fail mode. This is used to clean unused entries.
protected long cacheClearingTimeoutMilliSec;
// clear failure Info. Used to clean out all entries.
@@ -377,8 +294,7 @@ public class HConnectionManager {
private long fastFailClearingTimeMilliSec;
private final boolean recordClientContext;
- private ThreadLocal<List<OperationContext>> operationContextPerThread =
- new ThreadLocal<List<OperationContext>>();
+ private ThreadLocal<List<OperationContext>> operationContextPerThread = new ThreadLocal<List<OperationContext>>();
public void resetOperationContext() {
if (!recordClientContext || this.operationContextPerThread == null) {
@@ -403,8 +319,8 @@ public class HConnectionManager {
return null;
}
- ArrayList<OperationContext> context =
- new ArrayList<OperationContext>(currContext);
+ ArrayList<OperationContext> context = new ArrayList<OperationContext>(
+ currContext);
// Made a copy, clear the context
currContext.clear();
@@ -414,6 +330,7 @@ public class HConnectionManager {
/**
* Keeps track of repeated failures to any region server.
+ *
* @author amitanand.s
*
*/
@@ -432,23 +349,26 @@ public class HConnectionManager {
// Used to keep track of concurrent attempts to contact the server.
// In Fast fail mode, we want just one client thread to try to connect
// the rest of the client threads will fail fast.
- private final AtomicBoolean
- exclusivelyRetringInspiteOfFastFail = new AtomicBoolean(false);
+ private final AtomicBoolean exclusivelyRetringInspiteOfFastFail = new AtomicBoolean(
+ false);
public String toString() {
- return "FailureInfo: numConsecutiveFailures = " + numConsecutiveFailures
- + " timeOfFirstFailureMilliSec = " + timeOfFirstFailureMilliSec
- + " timeOfLatestAttemptMilliSec = " + timeOfLatestAttemptMilliSec
- + " timeOfLatestCacheClearMilliSec = " + timeOfLatestCacheClearMilliSec
- + " exclusivelyRetringInspiteOfFastFail = " + exclusivelyRetringInspiteOfFastFail.get();
+ return "FailureInfo: numConsecutiveFailures = "
+ + numConsecutiveFailures + " timeOfFirstFailureMilliSec = "
+ + timeOfFirstFailureMilliSec + " timeOfLatestAttemptMilliSec = "
+ + timeOfLatestAttemptMilliSec
+ + " timeOfLatestCacheClearMilliSec = "
+ + timeOfLatestCacheClearMilliSec
+ + " exclusivelyRetringInspiteOfFastFail = "
+ + exclusivelyRetringInspiteOfFastFail.get();
}
FailureInfo(long firstFailureTime) {
this.timeOfFirstFailureMilliSec = firstFailureTime;
}
}
- private final ThreadLocal<MutableBoolean> threadRetryingInFastFailMode =
- new ThreadLocal<MutableBoolean>();
+
+ private final ThreadLocal<MutableBoolean> threadRetryingInFastFailMode = new ThreadLocal<MutableBoolean>();
// For TESTING purposes only;
public Map<HServerAddress, FailureInfo> getFailureMap() {
@@ -477,6 +397,7 @@ public class HConnectionManager {
Map<String, ConcurrentMap<HRegionInfo, HRegionLocation>> batchedUploadUpdatesMap;
private int batchedUploadSoftFlushRetries;
private long batchedUploadSoftFlushTimeoutMillis;
+ private final boolean useThrift;
private ConcurrentSkipListSet<byte[]> initializedTableSet =
new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
@@ -487,6 +408,9 @@ public class HConnectionManager {
@SuppressWarnings("unchecked")
public TableServers(Configuration conf) {
this.conf = conf;
+ params = HConnectionParams.getInstance(conf);
+ this.useThrift = conf.getBoolean(HConstants.CLIENT_TO_RS_USE_THRIFT,
+ HConstants.CLIENT_TO_RS_USE_THRIFT_DEFAULT);
String serverClassName =
conf.get(HConstants.REGION_SERVER_CLASS,
@@ -503,19 +427,12 @@ public class HConnectionManager {
"Unable to find region server interface " + serverClassName, e);
}
- this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
- HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
- this.numRetries = conf.getInt("hbase.client.retries.number", 10);
- this.maxServerRequestedRetries =
- conf.getInt("hbase.client.server.requested.retries.max", 0);
- this.rpcTimeout = conf.getInt(
- HConstants.HBASE_RPC_TIMEOUT_KEY,
- HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
- this.rpcRetryTimeout = conf.getLong("hbase.client.rpc.retry.timeout",
- Long.MAX_VALUE);
- this.cacheClearingTimeoutMilliSec = conf.getLong("hbase.client.fastfail.cache.clear.interval",
+ // TODO move parameters below into HConnectionParams
+ this.cacheClearingTimeoutMilliSec = conf.getLong(
+ "hbase.client.fastfail.cache.clear.interval",
10000); // 10 sec
- this.fastFailThresholdMilliSec = conf.getLong("hbase.client.fastfail.threshold",
+ this.fastFailThresholdMilliSec = conf.getLong(
+ "hbase.client.fastfail.threshold",
60000); // 1 min
this.failureMapCleanupIntervalMilliSec = conf.getLong(
"hbase.client.fastfail.cleanup.map.interval.millisec", 600000); // 10 min
@@ -538,13 +455,6 @@ public class HConnectionManager {
}
- private long getPauseTime(int tries) {
- int ntries = tries;
- if (ntries >= HConstants.RETRY_BACKOFF.length)
- ntries = HConstants.RETRY_BACKOFF.length - 1;
- return this.pause * HConstants.RETRY_BACKOFF[ntries];
- }
-
// Used by master and region servers during safe mode only
public void unsetRootRegionLocation() {
this.rootRegionLocation = null;
@@ -569,20 +479,17 @@ public class HConnectionManager {
HServerAddress masterLocation = null;
synchronized (this.masterLock) {
- for (int tries = 0;
- !this.closed &&
- !this.masterChecked && this.master == null &&
- tries < numRetries;
- tries++) {
+ for (int tries = 0; !this.closed && !this.masterChecked
+ && this.master == null && tries < params.getNumRetries(); tries++) {
try {
masterLocation = zk.readMasterAddress(zk);
if (masterLocation != null) {
- HMasterInterface tryMaster = (HMasterInterface)HBaseRPC.getProxy(
- HMasterInterface.class, HBaseRPCProtocolVersion.versionID,
- masterLocation.getInetSocketAddress(), this.conf,
- (int)this.rpcTimeout, HBaseRPCOptions.DEFAULT);
+ HMasterInterface tryMaster = (HMasterInterface) HBaseRPC.getProxy(
+ HMasterInterface.class, HBaseRPCProtocolVersion.versionID,
+ masterLocation.getInetSocketAddress(), this.conf,
+ params.getRpcTimeout(), HBaseRPCOptions.DEFAULT);
if (tryMaster.isMasterRunning()) {
this.master = tryMaster;
@@ -592,20 +499,23 @@ public class HConnectionManager {
}
} catch (IOException e) {
- if (tries == numRetries - 1) {
+ if (tries == params.getNumRetries() - 1) {
// This was our last chance - don't bother sleeping
- LOG.info("getMaster attempt " + tries + " of " + this.numRetries +
- " failed; no more retrying.", e);
+ LOG.info(
+ "getMaster attempt " + tries + " of "
+ + params.getNumRetries() + " failed; no more retrying.",
+ e);
break;
}
- LOG.info("getMaster attempt " + tries + " of " + this.numRetries +
- " failed; retrying after sleep of " +
- getPauseTime(tries), e);
+ LOG.info(
+ "getMaster attempt " + tries + " of " + params.getNumRetries()
+ + " failed; retrying after sleep of "
+ + params.getPauseTime(tries), e);
}
// Cannot connect to master or it is not running. Sleep & retry
try {
- this.masterLock.wait(getPauseTime(tries));
+ this.masterLock.wait(params.getPauseTime(tries));
} catch (InterruptedException e) {
// continue
}
@@ -633,8 +543,8 @@ public class HConnectionManager {
return true;
}
- public boolean tableExists(final byte [] tableName)
- throws MasterNotRunningException {
+ public boolean tableExists(final byte[] tableName)
+ throws MasterNotRunningException {
getMaster();
if (tableName == null) {
throw new IllegalArgumentException("Table name cannot be null");
@@ -658,23 +568,22 @@ public class HConnectionManager {
/*
* @param n
- * @return Truen if passed tablename <code>n</code> is equal to the name
- * of a catalog table.
+ *
+ * @return Truen if passed tablename <code>n</code> is equal to the name of
+ * a catalog table.
*/
- private static boolean isMetaTableName(final byte [] n) {
+ private static boolean isMetaTableName(final byte[] n) {
return MetaUtils.isMetaTableName(n);
}
- public HRegionLocation getRegionLocation(final byte [] name,
- final byte [] row, boolean reload)
- throws IOException {
- return reload? relocateRegion(name, row): locateRegion(name, row);
+ public HRegionLocation getRegionLocation(final byte[] name,
+ final byte[] row, boolean reload) throws IOException {
+ return reload ? relocateRegion(name, row) : locateRegion(name, row);
}
public HTableDescriptor[] listTables() throws IOException {
getMaster();
- final TreeSet<HTableDescriptor> uniqueTables =
- new TreeSet<HTableDescriptor>();
+ final TreeSet<HTableDescriptor> uniqueTables = new TreeSet<HTableDescriptor>();
MetaScannerVisitor visitor = new MetaScannerVisitor() {
public boolean processRow(Result result) throws IOException {
try {
@@ -734,15 +643,12 @@ public class HConnectionManager {
}
/*
- * If online == true
- * Returns true if all regions are online
- * Returns false in any other case
- * If online == false
- * Returns true if all regions are offline
- * Returns false in any other case
+ * If online == true Returns true if all regions are online Returns false in
+ * any other case If online == false Returns true if all regions are offline
+ * Returns false in any other case
*/
private boolean testTableOnlineState(byte[] tableName, boolean online)
- throws IOException {
+ throws IOException {
if (!tableExists(tableName)) {
throw new TableNotFoundException(Bytes.toString(tableName));
}
@@ -752,30 +658,29 @@ public class HConnectionManager {
}
int rowsScanned = 0;
int rowsOffline = 0;
- byte[] startKey =
- HRegionInfo.createRegionName(tableName, null, HConstants.ZEROES, false);
+ byte[] startKey = HRegionInfo.createRegionName(tableName, null,
+ HConstants.ZEROES, false);
byte[] endKey;
HRegionInfo currentRegion;
Scan scan = new Scan(startKey);
- scan.addColumn(HConstants.CATALOG_FAMILY,
- HConstants.REGIONINFO_QUALIFIER);
+ scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
int rows = this.conf.getInt("hbase.meta.scanner.caching", 100);
scan.setCaching(rows);
- ScannerCallable s = new ScannerCallable(this,
- (Bytes.equals(tableName, HConstants.META_TABLE_NAME) ?
- HConstants.ROOT_TABLE_NAME : HConstants.META_TABLE_NAME),
- scan, HBaseRPCOptions.DEFAULT);
+ ScannerCallable s = new ScannerCallable(this, (Bytes.equals(tableName,
+ HConstants.META_TABLE_NAME) ? HConstants.ROOT_TABLE_NAME
+ : HConstants.META_TABLE_NAME), scan, HBaseRPCOptions.DEFAULT);
try {
// Open scanner
getRegionServerWithRetries(s);
do {
currentRegion = s.getHRegionInfo();
Result r;
- Result [] rrs;
- while ((rrs = getRegionServerWithRetries(s)) != null && rrs.length > 0) {
+ Result[] rrs;
+ while ((rrs = getRegionServerWithRetries(s)) != null
+ && rrs.length > 0) {
r = rrs[0];
- byte [] value = r.getValue(HConstants.CATALOG_FAMILY,
- HConstants.REGIONINFO_QUALIFIER);
+ byte[] value = r.getValue(HConstants.CATALOG_FAMILY,
+ HConstants.REGIONINFO_QUALIFIER);
if (value != null) {
HRegionInfo info = Writables.getHRegionInfoOrNull(value);
if (info != null) {
@@ -787,8 +692,8 @@ public class HConnectionManager {
}
}
endKey = currentRegion.getEndKey();
- } while (!(endKey == null ||
- Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY)));
+ } while (!(endKey == null || Bytes.equals(endKey,
+ HConstants.EMPTY_BYTE_ARRAY)));
} finally {
s.setClose();
// Doing below will call 'next' again and this will close the scanner
@@ -796,35 +701,38 @@ public class HConnectionManager {
getRegionServerWithRetries(s);
}
LOG.debug("Rowscanned=" + rowsScanned + ", rowsOffline=" + rowsOffline);
- boolean onOffLine = online? rowsOffline == 0: rowsOffline == rowsScanned;
+ boolean onOffLine = online ? rowsOffline == 0
+ : rowsOffline == rowsScanned;
return rowsScanned > 0 && onOffLine;
}
- private static class HTableDescriptorFinder
- implements MetaScanner.MetaScannerVisitor {
- byte[] tableName;
- HTableDescriptor result;
- protected HTableDescriptorFinder(byte[] tableName) {
- this.tableName = tableName;
- }
- public boolean processRow(Result rowResult) throws IOException {
- HRegionInfo info = Writables.getHRegionInfo(
- rowResult.getValue(HConstants.CATALOG_FAMILY,
- HConstants.REGIONINFO_QUALIFIER));
- HTableDescriptor desc = info.getTableDesc();
- if (Bytes.compareTo(desc.getName(), tableName) == 0) {
- result = desc;
- return false;
- }
- return true;
- }
- HTableDescriptor getResult() {
- return result;
+ private static class HTableDescriptorFinder implements
+ MetaScanner.MetaScannerVisitor {
+ byte[] tableName;
+ HTableDescriptor result;
+
+ protected HTableDescriptorFinder(byte[] tableName) {
+ this.tableName = tableName;
+ }
+
+ public boolean processRow(Result rowResult) throws IOException {
+ HRegionInfo info = Writables.getHRegionInfo(rowResult.getValue(
+ HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER));
+ HTableDescriptor desc = info.getTableDesc();
+ if (Bytes.compareTo(desc.getName(), tableName) == 0) {
+ result = desc;
+ return false;
}
+ return true;
+ }
+
+ HTableDescriptor getResult() {
+ return result;
+ }
}
public HTableDescriptor getHTableDescriptor(final byte[] tableName)
- throws IOException {
+ throws IOException {
if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
return new UnmodifyableHTableDescriptor(HTableDescriptor.ROOT_TABLEDESC);
}
@@ -840,27 +748,25 @@ public class HConnectionManager {
return result;
}
- public HRegionLocation locateRegion(final byte [] tableName,
- final byte [] row)
- throws IOException{
+ public HRegionLocation locateRegion(final byte[] tableName, final byte[] row)
+ throws IOException {
return locateRegion(tableName, row, true);
}
- public HRegionLocation relocateRegion(final byte [] tableName,
- final byte [] row)
- throws IOException{
+ public HRegionLocation relocateRegion(final byte[] tableName,
+ final byte[] row) throws IOException {
return locateRegion(tableName, row, false);
}
- private HRegionLocation locateRegion(final byte [] tableName,
- final byte [] row, boolean useCache)
- throws IOException{
+ private HRegionLocation locateRegion(final byte[] tableName,
+ final byte[] row, boolean useCache) throws IOException {
if (tableName == null || tableName.length == 0) {
throw new IllegalArgumentException(
"table name cannot be null or zero length");
}
-
- if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
+ if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) {
+ return locateMetaInRoot(row, useCache, metaRegionLock);
+ } else if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
synchronized (rootRegionLock) {
// This block guards against two threads trying to find the root
// region at the same time. One will go do the find while the
@@ -873,9 +779,6 @@ public class HConnectionManager {
}
return this.rootRegionLocation;
}
- } else if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) {
- return locateRegionInMeta(HConstants.ROOT_TABLE_NAME, tableName, row,
- useCache, metaRegionLock);
} else {
// Region not in the cache - have to go to the meta RS
return locateRegionInMeta(HConstants.META_TABLE_NAME, tableName, row,
@@ -883,9 +786,9 @@ public class HConnectionManager {
}
}
- private void prefetchRegionCache(final byte[] tableName,
+ private HRegionLocation prefetchRegionCache(final byte[] tableName,
final byte[] row) {
- prefetchRegionCache(tableName, row, this.prefetchRegionLimit);
+ return prefetchRegionCache(tableName, row, this.prefetchRegionLimit);
}
/*
@@ -893,7 +796,7 @@ public class HConnectionManager {
* row we're seeking. It will prefetch certain number of regions info and
* save them to the global region cache.
*/
- private void prefetchRegionCache(final byte[] tableName,
+ private HRegionLocation prefetchRegionCache(final byte[] tableName,
final byte[] row, int prefetchRegionLimit) {
// Implement a new visitor for MetaScanner, and use it to walk through
// the .META.
@@ -909,8 +812,7 @@ public class HConnectionManager {
regionInfo = Writables.getHRegionInfo(value);
// possible we got a region of a different table...
- if (!Bytes.equals(regionInfo.getTableDesc().getName(),
- tableName)) {
+ if (!Bytes.equals(regionInfo.getTableDesc().getName(), tableName)) {
return false; // stop scanning
}
if (regionInfo.isOffline()) {
@@ -920,20 +822,20 @@ public class HConnectionManager {
value = result.getValue(HConstants.CATALOG_FAMILY,
HConstants.SERVER_QUALIFIER);
if (value == null) {
- return true; // don't cache it
+ return true; // don't cache it
}
final String serverAddress = Bytes.toString(value);
value = result.getValue(HConstants.CATALOG_FAMILY,
HConstants.STARTCODE_QUALIFIER);
long serverStartCode = -1;
- if(value != null) {
+ if (value != null) {
serverStartCode = Bytes.toLong(value);
}
// instantiate the location
HRegionLocation loc = new HRegionLocation(regionInfo,
- new HServerAddress(serverAddress), serverStartCode);
+ new HServerAddress(serverAddress), serverStartCode);
// cache this meta entry
cacheLocation(tableName, loc);
}
@@ -946,22 +848,22 @@ public class HConnectionManager {
try {
// pre-fetch certain number of regions info at region cache.
MetaScanner.metaScan(conf, visitor, tableName, row, prefetchRegionLimit);
+ return getCachedLocation(tableName, row);
} catch (IOException e) {
LOG.warn("Encounted problems when prefetch META table: ", e);
}
+ return null;
}
- /*
- * Search one of the meta tables (-ROOT- or .META.) for the HRegionLocation
- * info that contains the table and row we're seeking.
+ /**
+ * Search the meta table (.META.) for the HRegionLocation info that
+ * contains the table and row we're seeking.
*/
private HRegionLocation locateRegionInMeta(final byte [] parentTable,
final byte [] tableName, final byte [] row, boolean useCache,
Object regionLockObject)
throws IOException {
HRegionLocation location;
- // If we are supposed to be using the cache, look in the cache to see if
- // we already have the region.
if (useCache) {
location = getCachedLocation(tableName, row);
if (location != null) {
@@ -969,15 +871,19 @@ public class HConnectionManager {
}
}
+ // If we are supposed to be using the cache, look in the cache to see if
+ // we already have the region.
+
// build the key of the meta region we should be looking for.
// the extra 9's on the end are necessary to allow "exact" matches
// without knowing the precise region names.
byte [] metaKey = HRegionInfo.createRegionName(tableName, row,
HConstants.NINES, false);
for (int tries = 0; true; tries++) {
- if (tries >= numRetries) {
+ if (tries >= params.getNumRetries()) {
throw new NoServerForRegionException("Unable to find region for "
- + Bytes.toStringBinary(row) + " after " + numRetries + " tries.");
+ + Bytes.toStringBinary(row) + " after " + params.getNumRetries() +
+ " tries.");
}
FailureInfo fInfo = null;
@@ -1007,127 +913,316 @@ public class HConnectionManager {
}
didTry = true;
+ HBaseThriftRPC.isMeta.get().push(true);
Result regionInfoRow = null;
- // This block guards against two threads trying to load the meta
- // region at the same time. The first will load the meta region and
- // the second will use the value that the first one found.
- synchronized (regionLockObject) {
- // Check the cache again for a hit in case some other thread made the
- // same query while we were waiting on the lock. If not supposed to
- // be using the cache, delete any existing cached location so it won't
- // interfere.
- if (useCache) {
- location = getCachedLocation(tableName, row);
- if (location != null) {
- return location;
+ try {
+ // This block guards against two threads trying to load the meta
+ // region at the same time. The first will load the meta region and
+ // the second will use the value that the first one found.
+ synchronized (regionLockObject) {
+ // Check the cache again for a hit in case some other thread made the
+ // same query while we were waiting on the lock. If not supposed to
+ // be using the cache, delete any existing cached location so it won't
+ // interfere.
+ if (useCache) {
+ location = getCachedLocation(tableName, row);
+ if (location != null) {
+ return location;
+ }
+ } else {
+ LOG.debug("Deleting the client location cache.");
+ deleteCachedLocation(tableName, row, null);
}
- } else {
- deleteCachedLocation(tableName, row, null);
- }
- // If the parent table is META, we may want to pre-fetch some
- // region info into the global region cache for this table.
- if (Bytes.equals(parentTable, HConstants.META_TABLE_NAME) &&
+ // If the parent table is META, we may want to pre-fetch some
+ // region info into the global region cache for this table.
+ if (Bytes.equals(parentTable, HConstants.META_TABLE_NAME) &&
(getRegionCachePrefetch(tableName)) ) {
- prefetchRegionCache(tableName, row);
- }
+ LOG.debug("Prefetching the client location cache.");
+ location = prefetchRegionCache(tableName, row);
+ if (location != null) {
+ return location;
+ }
+ }
- HRegionInterface serverInterface =
- getHRegionConnection(metaLocation.getServerAddress());
+ HRegionInterface serverInterface =
+ getHRegionConnection(metaLocation.getServerAddress());
- // Query the root or meta region for the location of the meta region
- regionInfoRow = serverInterface.getClosestRowBefore(
- metaLocation.getRegionInfo().getRegionName(), metaKey,
- HConstants.CATALOG_FAMILY);
+ // Query the root or meta region for the location of the meta region
+ regionInfoRow = serverInterface.getClosestRowBefore(
+ metaLocation.getRegionInfo().getRegionName(), metaKey,
+ HConstants.CATALOG_FAMILY);
}
- if (regionInfoRow == null) {
- throw new TableNotFoundException(Bytes.toString(tableName));
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ HBaseThriftRPC.isMeta.get().pop();
+ }
+ location = getLocationFromRow(regionInfoRow, tableName,
+ parentTable, row);
+ cacheLocation(tableName, location);
+ return location;
+ } catch (TableNotFoundException e) {
+ // if we got this error, probably means the table just plain doesn't
+ // exist. rethrow the error immediately. this should always be coming
+ // from the HTable constructor.
+ throw e;
+ } catch (PreemptiveFastFailException e) {
+ // already processed this. Don't process this again.
+ throw e;
+ } catch (IOException e) {
+ if (e instanceof RemoteException) {
+ e = RemoteExceptionHandler
+ .decodeRemoteException((RemoteException) e);
+ } else if (isNetworkException(e)) {
+ couldNotCommunicateWithServer = true;
+ handleFailureToServer(server, e);
+ }
+ if (tries < params.getNumRetries() - 1) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("locateRegionInMeta attempt " + tries + " of "
+ + params.getNumRetries()
+ + " failed; retrying after sleep of "
+ + params.getPauseTime(tries) + " because: " + e.getMessage());
}
- byte[] value = regionInfoRow.getValue(HConstants.CATALOG_FAMILY,
- HConstants.REGIONINFO_QUALIFIER);
- if (value == null || value.length == 0) {
- throw new IOException("HRegionInfo was null or empty in " +
- Bytes.toString(parentTable) + ", row=" + regionInfoRow);
- }
- // convert the row result into the HRegionLocation we need!
- HRegionInfo regionInfo = (HRegionInfo) Writables.getWritable(
- value, new HRegionInfo());
- // possible we got a region of a different table...
- if (!Bytes.equals(regionInfo.getTableDesc().getName(), tableName)) {
- throw new TableNotFoundException(
- "Table '" + Bytes.toString(tableName) + "' was not found.");
- }
- if (regionInfo.isOffline()) {
- throw new RegionOfflineException("region offline: " +
- regionInfo.getRegionNameAsString());
- }
-
- value = regionInfoRow.getValue(HConstants.CATALOG_FAMILY,
- HConstants.SERVER_QUALIFIER);
- String serverAddress = "";
- if(value != null) {
- serverAddress = Bytes.toString(value);
- }
- if (serverAddress.equals("")) {
- throw new NoServerForRegionException("No server address listed " +
- "in " + Bytes.toString(parentTable) + " for region " +
- regionInfo.getRegionNameAsString() + " containing row " +
- Bytes.toStringBinary(row));
- }
-
- value = regionInfoRow.getValue(HConstants.CATALOG_FAMILY,
- HConstants.STARTCODE_QUALIFIER);
- long serverStartCode = -1;
- if(value != null) {
- serverStartCode = Bytes.toLong(value);
- }
- // instantiate the location
- location = new HRegionLocation(regionInfo,
- new HServerAddress(serverAddress),
- serverStartCode);
- cacheLocation(tableName, location);
- return location;
- } catch (TableNotFoundException e) {
- // if we got this error, probably means the table just plain doesn't
- // exist. rethrow the error immediately. this should always be coming
- // from the HTable constructor.
+ } else {
throw e;
- } catch (PreemptiveFastFailException e) {
- // already processed this. Don't process this again.
+ }
+ // Only relocate the parent region if necessary
+ if (!(e instanceof RegionOfflineException || e instanceof NoServerForRegionException)) {
+ relocateRegion(parentTable, row);
+ }
+ } catch (Exception e) {
+ couldNotCommunicateWithServer = true;
+ handleFailureToServer(server, e);
+ if (tries < params.getNumRetries() - 1) {
+ LOG.debug("locateRegionInMeta attempt " + tries + " of "
+ + params.getNumRetries() + " failed; retrying after sleep of "
+ + params.getPauseTime(tries) + " because: " + e.getMessage());
+ } else {
throw e;
- } catch (IOException e) {
- if (e instanceof RemoteException) {
- e = RemoteExceptionHandler.decodeRemoteException(
- (RemoteException) e);
- } else if (isNetworkException(e)) {
- couldNotCommunicateWithServer = true;
- handleFailureToServer(server, e);
+ }
+ } finally {
+ updateFailureInfoForServer(server, fInfo, didTry,
+ couldNotCommunicateWithServer, retryDespiteFastFailMode);
+ }
+ try {
+ Thread.sleep(params.getPauseTime(tries));
+ } catch (InterruptedException e) {
+ // continue
+ }
+ }
+ }
+
+ private HRegionLocation getLocationFromRow(Result regionInfoRow,
+ byte[] tableName, byte[] parentTable, byte[] row) throws IOException {
+ if (regionInfoRow == null) {
+ throw new TableNotFoundException(Bytes.toString(tableName));
+ }
+ byte[] value = regionInfoRow.getValue(HConstants.CATALOG_FAMILY,
+ HConstants.REGIONINFO_QUALIFIER);
+ if (value == null || value.length == 0) {
+ throw new IOException("HRegionInfo was null or empty in "
+ + Bytes.toString(parentTable) + ", row=" + regionInfoRow);
+ }
+ // convert the row result into the HRegionLocation we need!
+ HRegionInfo regionInfo = (HRegionInfo) Writables.getWritable(value,
+ new HRegionInfo());
+ // possible we got a region of a different table...
+ if (!Bytes.equals(regionInfo.getTableDesc().getName(), tableName)) {
+ throw new TableNotFoundException("Table '" + Bytes.toString(tableName)
+ + "' was not found.");
+ }
+ if (regionInfo.isOffline()) {
+ throw new RegionOfflineException("region offline: "
+ + regionInfo.getRegionNameAsString());
+ }
+
+ value = regionInfoRow.getValue(HConstants.CATALOG_FAMILY,
+ HConstants.SERVER_QUALIFIER);
+ String serverAddress = "";
+ if (value != null) {
+ serverAddress = Bytes.toString(value);
+ }
+ if (serverAddress.equals("")) {
+ throw new NoServerForRegionException("No server address listed "
+ + "in " + Bytes.toString(parentTable) + " for region "
+ + regionInfo.getRegionNameAsString() + " containing row "
+ + Bytes.toStringBinary(row));
+ }
+
+ value = regionInfoRow.getValue(HConstants.CATALOG_FAMILY,
+ HConstants.STARTCODE_QUALIFIER);
+ long serverStartCode = -1;
+ if (value != null) {
+ serverStartCode = Bytes.toLong(value);
+ }
+ // instantiate the location
+ HRegionLocation location = new HRegionLocation(regionInfo,
+ new HServerAddress(serverAddress), serverStartCode);
+ return location;
+ }
+
+ /**
+ * TODO:WARNING!!! This looks like a lot of duplicated code with
+ * {@link #locateRegionInMeta(byte[], byte[], byte[], boolean, Object)} pls
+ * fix this! Search the Root Table for the Meta Region. Retries a fixed
+ * number of times and throws if Region is not found.
+ */
+ private HRegionLocation locateMetaInRoot(final byte[] row,
+ boolean useCache, Object regionLockObject) throws IOException {
+ HRegionLocation location;
+ final byte[] parentTable = HConstants.ROOT_TABLE_NAME;
+ final byte[] tableName = HConstants.META_TABLE_NAME;
+ if (useCache) {
+ location = getCachedLocation(tableName, row);
+ if (location != null) {
+ return location;
+ }
+ }
+ // If we are supposed to be using the cache, look in the cache to see if
+ // we already have the region.
+
+ // build the key of the meta region we should be looking for.
+ // the extra 9's on the end are necessary to allow "exact" matches
+ // without knowing the precise region names.
+ byte[] metaKey = HRegionInfo.createRegionName(tableName, row,
+ HConstants.NINES, false);
+ for (int tries = 0; true; tries++) {
+ if (tries >= params.getNumRetries()) {
+ throw new NoServerForRegionException("Unable to find region for "
+ + Bytes.toStringBinary(row) + " after " + params.getNumRetries()
+ + " tries.");
+ }
+
+ TableServers.FailureInfo fInfo = null;
+ HServerAddress server = null;
+ boolean didTry = false;
+ boolean couldNotCommunicateWithServer = false;
+ boolean retryDespiteFastFailMode = false;
+ try {
+ // locate the root or meta region
+ HRegionLocation metaLocation = null;
+ if (useCache && rootRegionLocation != null
+ && !inFastFailMode(this.rootRegionLocation.getServerAddress())) {
+ metaLocation = rootRegionLocation;
+ } else {
+ synchronized (rootRegionLock) {
+ // This block guards against two threads trying to find the root
+ // region at the same time. One will go do the find while the
+ // second waits. The second thread will not do find.
+
+ if (!useCache || rootRegionLocation == null
+ || inFastFailMode(this.rootRegionLocation.getServerAddress())) {
+ HBaseThriftRPC.isMeta.get().push(true);
+ try {
+ this.rootRegionLocation = locateRootRegion();
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ HBaseThriftRPC.isMeta.get().pop();
+ }
+ LOG.info("Updated rootRegionLocation from ZK to : "
+ + this.rootRegionLocation);
+ }
+ metaLocation = this.rootRegionLocation;
}
- if (tries < numRetries - 1) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("locateRegionInMeta attempt " + tries + " of " +
- this.numRetries + " failed; retrying after sleep of " +
- getPauseTime(tries) + " because: " + e.getMessage());
+ }
+ Preconditions.checkNotNull(metaLocation);
+ server = metaLocation.getServerAddress();
+ fInfo = repeatedFailuresMap.get(server);
+
+ // Handle the case where .META. is on an unresponsive server.
+ if (inFastFailMode(server) && !this.currentThreadInFastFailMode()) {
+ // In Fast-fail mode, all but one thread will fast fail. Check
+ // if we are that one chosen thread.
+
+ retryDespiteFastFailMode = shouldRetryInspiteOfFastFail(fInfo);
+
+ if (retryDespiteFastFailMode == false) { // we don't have to retry
+ throw new PreemptiveFastFailException(
+ fInfo.numConsecutiveFailures.get(),
+ fInfo.timeOfFirstFailureMilliSec,
+ fInfo.timeOfLatestAttemptMilliSec, server.getHostname());
+ }
+ }
+ didTry = true;
+ HBaseThriftRPC.isMeta.get().push(true);
+ Result regionInfoRow = null;
+ // This block guards against two threads trying to load the meta
+ // region at the same time. The first will load the meta region and
+ // the second will use the value that the first one found.
+ synchronized (metaRegionLock) {
+ if (useCache) {
+ location = getCachedLocation(tableName, row);
+ if (location != null) {
+ return location;
}
} else {
- throw e;
+ LOG.debug("Deleting the client location cache.");
+ deleteCachedLocation(tableName, row, null);
}
- // Only relocate the parent region if necessary
- if(!(e instanceof RegionOfflineException ||
- e instanceof NoServerForRegionException)) {
- relocateRegion(parentTable, metaKey);
- }
- } finally {
- updateFailureInfoForServer(server, fInfo, didTry,
- couldNotCommunicateWithServer, retryDespiteFastFailMode);
+ HRegionInterface serverInterface = getHRegionConnection(metaLocation
+ .getServerAddress());
+
+ // Query the root for the location of the meta region
+ regionInfoRow = serverInterface.getClosestRowBefore(metaLocation
+ .getRegionInfo().getRegionName(), metaKey,
+ HConstants.CATALOG_FAMILY);
+ location = getLocationFromRow(regionInfoRow, tableName,
+ parentTable, row);
+ cacheLocation(tableName, location);
}
- try{
- Thread.sleep(getPauseTime(tries));
- } catch (InterruptedException e){
- // continue
+ return location;
+ } catch (TableNotFoundException e) {
+ // if we got this error, probably means the table just plain doesn't
+ // exist. rethrow the error immediately. this should always be coming
+ // from the HTable constructor.
+ throw e;
+ } catch (PreemptiveFastFailException e) {
+ // already processed this. Don't process this again.
+ throw e;
+ } catch (IOException e) {
+ if (e instanceof RemoteException) {
+ e = RemoteExceptionHandler
+ .decodeRemoteException((RemoteException) e);
+ } else if (isNetworkException(e)) {
+ couldNotCommunicateWithServer = true;
+ handleFailureToServer(server, e);
+ }
+ if (tries < params.getNumRetries() - 1) {
+ LOG.debug("IOException locateRegionInMeta attempt " + tries
+ + " of " + params.getNumRetries()
+ + " failed; retrying after sleep of "
+ + params.getPauseTime(tries) + " because: " + e.getMessage());
+ } else {
+ throw e;
+ }
+ } catch (Exception e) {
+ couldNotCommunicateWithServer = true;
+ handleFailureToServer(server, e);
+ if (tries < params.getNumRetries() - 1) {
+ LOG.debug("Exception locateRegionInMeta attempt " + tries + " of "
+ + params.getNumRetries() + " failed; retrying after sleep of "
+ + params.getPauseTime(tries) + " because: " + e.getMessage());
+ } else {
+ throw e;
}
+ } finally {
+ HBaseThriftRPC.isMeta.get().pop();
+ updateFailureInfoForServer(server, fInfo, didTry,
+ couldNotCommunicateWithServer, retryDespiteFastFailMode);
+ }
+ try {
+ Thread.sleep(params.getPauseTime(tries));
+ } catch (InterruptedException e) {
+ // continue
}
}
+ }
+
+
+
/**
* Check if the exception is something that indicates that we cannot
@@ -1144,7 +1239,8 @@ public class HConnectionManager {
e instanceof ConnectException ||
e instanceof ClosedChannelException ||
e instanceof SyncFailedException ||
- e instanceof EOFException);
+ e instanceof EOFException ||
+ e instanceof TTransportException);
}
public Collection<HRegionLocation> getCachedHRegionLocations(final byte [] tableName,
@@ -1184,19 +1280,19 @@ public class HConnectionManager {
HRegionLocation rl = tableLocations.get(row);
if (rl != null) {
if (LOG.isTraceEnabled()) {
- LOG.trace("Cache hit for row <" +
- Bytes.toStringBinary(row) +
- "> in tableName " + Bytes.toString(tableName) +
- ": location server " + rl.getServerAddress() +
- ", location region name " +
- rl.getRegionInfo().getRegionNameAsString());
+ LOG.trace("Cache hit for row <" + Bytes.toStringBinary(row)
+ + "> in tableName " + Bytes.toString(tableName)
+ + ": location server " + rl.getServerAddress()
+ + ", location region name "
+ + rl.getRegionInfo().getRegionNameAsString());
}
return rl;
}
// get the matching region for the row
- Entry<byte [], HRegionLocation> entry = tableLocations.floorEntry(row);
- HRegionLocation possibleRegion = (entry == null)?null:entry.getValue();
+ Entry<byte[], HRegionLocation> entry = tableLocations.floorEntry(row);
+ HRegionLocation possibleRegion = (entry == null) ? null : entry
+ .getValue();
// we need to examine the cached location to verify that it is
// a match by end key as well.
@@ -1208,9 +1304,9 @@ public class HConnectionManager {
// this one. the exception case is when the endkey is
// HConstants.EMPTY_START_ROW, signifying that the region we're
// checking is actually the last region in the table.
- if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) ||
- KeyValue.getRowComparator(tableName).compareRows(endKey, 0, endKey.length,
- row, 0, row.length) > 0) {
+ if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW)
+ || KeyValue.getRowComparator(tableName).compareRows(endKey, 0,
+ endKey.length, row, 0, row.length) > 0) {
return possibleRegion;
}
}
@@ -1220,31 +1316,30 @@ public class HConnectionManager {
}
/*
- * Delete a cached location for the specified table name and row
- * if it is located on the (optionally) specified old location.
+ * Delete a cached location for the specified table name and row if it is
+ * located on the (optionally) specified old location.
*/
- public void deleteCachedLocation(final byte [] tableName,
- final byte [] row, HServerAddress oldServer) {
+ @Override
+ public void deleteCachedLocation(final byte[] tableName, final byte[] row,
+ HServerAddress oldServer) {
synchronized (this.cachedRegionLocations) {
- Map<byte [], HRegionLocation> tableLocations =
- getTableLocations(tableName);
+ Map<byte[], HRegionLocation> tableLocations = getTableLocations(tableName);
// start to examine the cache. we can only do cache actions
// if there's something in the cache for this table.
if (!tableLocations.isEmpty()) {
HRegionLocation rl = getCachedLocation(tableName, row);
if (rl != null) {
- // If oldLocation is specified. deleteLocation only if it is the same.
- if (oldServer != null
- && !oldServer.equals(rl.getServerAddress()))
+ // If oldLocation is specified. deleteLocation only if it is the
+ // same.
+ if (oldServer != null && !oldServer.equals(rl.getServerAddress()))
return; // perhaps, some body else cleared and repopulated.
tableLocations.remove(rl.getRegionInfo().getStartKey());
if (LOG.isDebugEnabled()) {
- LOG.debug("Removed " +
- rl.getRegionInfo().getRegionNameAsString() +
- " for tableName=" + Bytes.toString(tableName) +
- " from cache " + "because of " + Bytes.toStringBinary(row));
+ LOG.debug("Removed " + rl.getRegionInfo().getRegionNameAsString()
+ + " for tableName=" + Bytes.toString(tableName)
+ + " from cache " + "because of " + Bytes.toStringBinary(row));
}
}
}
@@ -1255,18 +1350,17 @@ public class HConnectionManager {
* Delete all cached entries of a table that maps to a specific location.
*
* @param tablename
+ *
* @param server
*/
- protected void clearCachedLocationForServer(
- final String server) {
+ protected void clearCachedLocationForServer(final String server) {
boolean deletedSomething = false;
-
synchronized (this.cachedRegionLocations) {
if (!cachedServers.contains(server)) {
return;
}
- for (Map<byte[], HRegionLocation> tableLocations :
- cachedRegionLocations.values()) {
+ for (Map<byte[], HRegionLocation> tableLocations : cachedRegionLocations
+ .values()) {
for (Entry<byte[], HRegionLocation> e : tableLocations.entrySet()) {
if (e.getValue().getServerAddress().toString().equals(server)) {
tableLocations.remove(e.getKey());
@@ -1283,20 +1377,21 @@ public class HConnectionManager {
/*
* @param tableName
+ *
* @return Map of cached locations for passed <code>tableName</code>
*/
- private ConcurrentSkipListMap<byte [], HRegionLocation> getTableLocations(
- final byte [] tableName) {
+ private ConcurrentSkipListMap<byte[], HRegionLocation> getTableLocations(
+ final byte[] tableName) {
// find the map of cached locations for this table
Integer key = Bytes.mapKey(tableName);
- ConcurrentSkipListMap<byte [], HRegionLocation> result =
- this.cachedRegionLocations.get(key);
+ ConcurrentSkipListMap<byte[], HRegionLocation> result = this.cachedRegionLocations
+ .get(key);
if (result == null) {
synchronized (this.cachedRegionLocations) {
result = this.cachedRegionLocations.get(key);
if (result == null) {
// if tableLocations for this table isn't built yet, make one
- result = new ConcurrentSkipListMap<byte [], HRegionLocation>(
+ result = new ConcurrentSkipListMap<byte[], HRegionLocation>(
Bytes.BYTES_COMPARATOR);
this.cachedRegionLocations.put(key, result);
}
@@ -1333,25 +1428,84 @@ public class HConnectionManager {
}
}
+ @Override
public HRegionInterface getHRegionConnection(
HServerAddress regionServer, boolean getMaster, HBaseRPCOptions options)
throws IOException {
if (getMaster) {
+ LOG.debug("Getting master");
getMaster();
}
- HRegionInterface server = HRegionServer.getMainRS(regionServer);
- if (server != null) {
+ HRegionInterface server = HRegionServer.getMainRS(regionServer);
+ if (server != null && !this.useThrift) {
return server;
}
+ final boolean thriftPortWrittenToMeta = conf.getBoolean(
+ HConstants.REGION_SERVER_WRITE_THRIFT_INFO_TO_META,
+ HConstants.REGION_SERVER_WRITE_THRIFT_INFO_TO_META_DEFAULT);
+ final boolean hadoopPortWrittenToMeta = !thriftPortWrittenToMeta;
+
try {
// establish an RPC for this RS
// set hbase.ipc.client.connect.max.retries to retry connection
// attempts
- server = (HRegionInterface) HBaseRPC.getProxy(
- serverInterfaceClass, HBaseRPCProtocolVersion.versionID,
- regionServer.getInetSocketAddress(), this.conf,
- this.rpcTimeout, options);
+ if (this.useThrift) {
+ Class<? extends ThriftClientInterface> serverInterface =
+ ThriftHRegionInterface.class;
+ if (thriftPortWrittenToMeta) {
+ try {
+ server = (HRegionInterface) HBaseThriftRPC.getClient(
+ regionServer.getInetSocketAddress(), this.conf,
+ serverInterface, options);
+ } catch (Exception e) {
+ LOG.warn("Exception connecting to the region server on" +
+ "the thrift channel. Retrying on the HadoopRPC port", e);
+ InetSocketAddress addr = new InetSocketAddress(regionServer
+ .getInetSocketAddress().getHostName(), conf.getInt(
+ HConstants.REGIONSERVER_PORT,
+ HConstants.DEFAULT_REGIONSERVER_PORT));
+ server = (HRegionInterface) HBaseRPC.getProxy(serverInterfaceClass,
+ HBaseRPCProtocolVersion.versionID, addr, this.conf,
+ params.getRpcTimeout(), options);
+ }
+ } else {
+ try {
+ InetSocketAddress addr = new InetSocketAddress(regionServer
+ .getInetSocketAddress().getHostName(), conf.getInt(
+ HConstants.REGIONSERVER_SWIFT_PORT,
+ HConstants.DEFAULT_REGIONSERVER_SWIFT_PORT));
+ server = (HRegionInterface) HBaseThriftRPC.getClient(addr,
+ this.conf, serverInterface, options);
+ } catch (Exception e) {
+ LOG.warn("Exception connecting to the region server on" +
+ "the thrift channel. Retrying on the HadoopRPC port", e);
+ server = (HRegionInterface) HBaseRPC.getProxy(serverInterfaceClass,
+ HBaseRPCProtocolVersion.versionID,
+ regionServer.getInetSocketAddress(), this.conf,
+ params.getRpcTimeout(), options);
+ }
+ }
+ } else {
+ if (hadoopPortWrittenToMeta) {
+ server = (HRegionInterface) HBaseRPC.getProxy(serverInterfaceClass,
+ HBaseRPCProtocolVersion.versionID,
+ regionServer.getInetSocketAddress(), this.conf,
+ params.getRpcTimeout(), options);
+ } else {
+ // The hadoop port is no longer written to Meta (this will happen
+ // when we are reasonably confident about the thrift service, and
+ // will soon deprecate RPC). So, try to connect to the default port.
+ // TODO gauravm: Verify that this works (t2830553)
+ InetSocketAddress addr = new InetSocketAddress(regionServer
+ .getInetSocketAddress().getHostName(), conf.getInt(
+ HConstants.REGIONSERVER_PORT,
+ HConstants.DEFAULT_REGIONSERVER_PORT));
+ server = (HRegionInterface) HBaseRPC.getProxy(serverInterfaceClass,
+ HBaseRPCProtocolVersion.versionID, addr, this.conf,
+ params.getRpcTimeout(), options);
+ }
+ }
} catch (RemoteException e) {
throw RemoteExceptionHandler.decodeRemoteException(e);
}
@@ -1359,21 +1513,19 @@ public class HConnectionManager {
return server;
}
- public HRegionInterface getHRegionConnection(
- HServerAddress regionServer, HBaseRPCOptions options)
- throws IOException {
+ public HRegionInterface getHRegionConnection(HServerAddress regionServer,
+ HBaseRPCOptions options) throws IOException {
return getHRegionConnection(regionServer, false, options);
}
- public HRegionInterface getHRegionConnection(
- HServerAddress regionServer, boolean getMaster)
- throws IOException {
- return getHRegionConnection(regionServer, getMaster, HBaseRPCOptions.DEFAULT);
+ public HRegionInterface getHRegionConnection(HServerAddress regionServer,
+ boolean getMaster) throws IOException {
+ return getHRegionConnection(regionServer, getMaster,
+ HBaseRPCOptions.DEFAULT);
}
- public HRegionInterface getHRegionConnection(
- HServerAddress regionServer)
- throws IOException {
+ public HRegionInterface getHRegionConnection(HServerAddress regionServer)
+ throws IOException {
return getHRegionConnection(regionServer, false);
}
@@ -1385,33 +1537,36 @@ public class HConnectionManager {
/*
* Repeatedly try to find the root region in ZK
+ *
* @return HRegionLocation for root region if found
+ *
* @throws NoServerForRegionException - if the root region can not be
* located after retrying
+ *
* @throws IOException
*/
- private HRegionLocation locateRootRegion()
- throws IOException {
+ private HRegionLocation locateRootRegion() throws IOException {
// We lazily instantiate the ZooKeeper object because we don't want to
// make the constructor have to throw IOException or handle it itself.
ZooKeeperWrapper zk = getZooKeeperWrapper();
HServerAddress rootRegionAddress = null;
- for (int tries = 0; tries < numRetries; tries++) {
+ for (int tries = 0; tries < params.getNumRetries(); tries++) {
int localTimeouts = 0;
// ask the master which server has the root region
- while (rootRegionAddress == null && localTimeouts < numRetries) {
+ while (rootRegionAddress == null
+ && localTimeouts < params.getNumRetries()) {
// Don't read root region until we're out of safe mode so we know
// that the meta regions have been assigned.
rootRegionAddress = zk.readRootRegionLocation();
if (rootRegionAddress == null) {
try {
if (LOG.isDebugEnabled()) {
- LOG.debug("Sleeping " + getPauseTime(tries) +
- "ms, waiting for root region.");
+ LOG.debug("Sleeping " + params.getPauseTime(tries)
+ + "ms, waiting for root region.");
}
- Thread.sleep(getPauseTime(tries));
+ Thread.sleep(params.getPauseTime(tries));
} catch (InterruptedException iex) {
// continue
}
@@ -1424,6 +1579,7 @@ public class HConnectionManager {
"Timed out trying to locate root region");
}
+ LOG.debug("Trying to get root region at : " + rootRegionAddress);
try {
// Get a connection to the region server
HRegionInterface server = getHRegionConnection(rootRegionAddress);
@@ -1437,20 +1593,16 @@ public class HConnectionManager {
} catch (Throwable t) {
t = translateException(t);
- if (tries == numRetries - 1) {
+ if (tries == params.getNumRetries() - 1) {
throw new NoServerForRegionException("Timed out trying to locate "+
"root region because: " + t.getMessage());
}
// Sleep and retry finding root region.
try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Root region location changed. Sleeping.", t);
- }
- Thread.sleep(getPauseTime(tries));
- if (LOG.isDebugEnabled()) {
- LOG.debug("Wake. Retry finding root region.");
- }
+ LOG.debug("Root region location changed. Sleeping.", t);
+ Thread.sleep(params.getPauseTime(tries));
+ LOG.debug("Wake. Retry finding root region.");
} catch (InterruptedException iex) {
// continue
}
@@ -1463,12 +1615,11 @@ public class HConnectionManager {
// and we're sort of sunk
if (rootRegionAddress == null) {
throw new NoServerForRegionException(
- "unable to locate root region server");
+ "unable to locate root region server");
}
// return the region location
- return new HRegionLocation(
- HRegionInfo.ROOT_REGIONINFO, rootRegionAddress);
+ return new HRegionLocation(HRegionInfo.ROOT_REGIONINFO, rootRegionAddress);
}
@Override
@@ -1479,28 +1630,29 @@ public class HConnectionManager {
long callStartTime;
int serverRequestedRetries = 0;
-
+
callStartTime = System.currentTimeMillis();
long serverRequestedWaitTime = 0;
// do not retry if region cannot be located. There are enough retries
// within instantiateRegionLocation.
callable.instantiateRegionLocation(false /* reload cache? */);
- for(int tries = 0; ; tries++) {
+ for (int tries = 0;; tries++) {
// If server requested wait. We will wait for that time, and start
// again. Do not count this time/tries against the client retries.
if (serverRequestedWaitTime > 0) {
serverRequestedRetries++;
- if (serverRequestedRetries > this.maxServerRequestedRetries) {
+ if (serverRequestedRetries > params.getMaxServerRequestedRetries()) {
throw RegionOverloadedException.create(roe, exceptions,
- serverRequestedRetries);
+ serverRequestedWaitTime);
}
long pauseTime = serverRequestedWaitTime + callStartTime
- System.currentTimeMillis();
- LOG.debug("Got a BlockingWritesRetryLaterException: sleeping for " +
- pauseTime +"ms. serverRequestedRetries = " + serverRequestedRetries);
+ LOG.debug("Got a BlockingWritesRetryLaterException: sleeping for "
+ + pauseTime + "ms. serverRequestedRetries = "
+ + serverRequestedRetries);
try {
Thread.sleep(pauseTime);
} catch (InterruptedException e) {
@@ -1520,9 +1672,8 @@ public class HConnectionManager {
if (ioe.getCause() instanceof NotServingRegionException) {
HRegionLocation prevLoc = callable.location;
if (prevLoc.getRegionInfo() != null) {
- deleteCachedLocation(callable.tableName,
- prevLoc.getRegionInfo().getStartKey(),
- prevLoc.getServerAddress());
+ deleteCachedLocation(callable.tableName, prevLoc.getRegionInfo()
+ .getStartKey(), prevLoc.getServerAddress());
}
}
@@ -1536,21 +1687,21 @@ public class HConnectionManager {
// Bail out of the retry loop, immediately
throw exp;
} catch (PreemptiveFastFailException pfe) {
- // Bail out of the retry loop, if the host has been consistently unreachable.
+ // Bail out of the retry loop, if the host has been consistently
+ // unreachable.
throw pfe;
} catch (Throwable t) {
exceptions.add(t);
- if (tries == numRetries - 1) {
+ if (tries == params.getNumRetries() - 1) {
throw new RetriesExhaustedException(callable.getServerName(),
callable.getRegionName(), callable.getRow(), tries, exceptions);
}
HRegionLocation prevLoc = callable.location;
if (prevLoc.getRegionInfo() != null) {
- deleteCachedLocation(callable.tableName,
- prevLoc.getRegionInfo().getStartKey(),
- prevLoc.getServerAddress());
+ deleteCachedLocation(callable.tableName, prevLoc.getRegionInfo()
+ .getStartKey(), prevLoc.getServerAddress());
}
try {
@@ -1561,18 +1712,18 @@ public class HConnectionManager {
throw new RetriesExhaustedException(callable.getServerName(),
callable.getRegionName(), callable.getRow(), tries, exceptions);
}
- if (prevLoc.getServerAddress().
- equals(callable.location.getServerAddress())) {
+ if (prevLoc.getServerAddress().equals(
+ callable.location.getServerAddress())) {
// Bail out of the retry loop if we have to wait too long
- long pauseTime = getPauseTime(tries);
- if ((System.currentTimeMillis() - callStartTime + pauseTime) >
- rpcRetryTimeout) {
+ long pauseTime = params.getPauseTime(tries);
+ if ((System.currentTimeMillis() - callStartTime + pauseTime) > params
+ .getRpcRetryTimeout()) {
throw new RetriesExhaustedException(callable.getServerName(),
callable.getRegionName(), callable.getRow(), tries,
exceptions);
}
- LOG.debug("getRegionServerWithRetries failed, sleeping for " +
- pauseTime +"ms. tries = " + tries, t);
+ LOG.debug("getRegionServerWithRetries failed, sleeping for "
+ + pauseTime + "ms. tries = " + tries, t);
try {
Thread.sleep(pauseTime);
} catch (InterruptedException e) {
@@ -1583,39 +1734,38 @@ public class HConnectionManager {
// has been re-populated.
callable.instantiateRegionLocation(false);
} else {
- LOG.debug("getRegionServerWithRetries failed, " +
- "region moved from " + prevLoc + " to " + callable.location +
- "retrying immediately tries=" + tries, t);
+ LOG.debug("getRegionServerWithRetries failed, "
+ + "region moved from " + prevLoc + " to " + callable.location
+ + "retrying immediately tries=" + tries, t);
}
}
}
}
- @Override
- public <T> T getRegionServerWithoutRetries(ServerCallable<T> callable)
- throws IOException, RuntimeException {
- return getRegionServerWithoutRetries(callable, true);
- }
-
-
/**
* Pass in a ServerCallable with your particular bit of logic defined and
* this method will pass it to the defined region server.
- * @param <T> the type of the return value
- * @param callable callable to run
+ *
+ * @param <T>
+ * the type of the return value
+ * @param callable
+ * callable to run
* @return an object of type T
- * @throws IOException if a remote or network exception occurs
- * @throws RuntimeException other unspecified error
- * @throws PreemptiveFastFailException if the remote host has been known to be
- * unreachable for more than this.fastFailThresholdMilliSec.
+ * @throws IOException
+ * if a remote or network exception occurs
+ * @throws RuntimeException
+ * other unspecified error
+ * @throws PreemptiveFastFailException
+ * if the remote host has been known to be unreachable for more
+ * than this.fastFailThresholdMilliSec.
*/
- private <T> T getRegionServerWithoutRetries(ServerCallable<T> callable,
- boolean instantiateRegionLocation)
- throws IOException, RuntimeException, PreemptiveFastFailException {
+ public <T> T getRegionServerWithoutRetries(ServerCallable<T> callable,
+ boolean instantiateRegionLocation) throws IOException,
+ RuntimeException, PreemptiveFastFailException {
FailureInfo fInfo = null;
HServerAddress server = null;
boolean didTry = false;
- boolean couldNotCommunicateWithServer = false;
+ MutableBoolean couldNotCommunicateWithServer = new MutableBoolean(false);
boolean retryDespiteFastFailMode = false;
try {
if (instantiateRegionLocation) {
@@ -1625,18 +1775,18 @@ public class HConnectionManager {
server = callable.getServerAddress();
fInfo = repeatedFailuresMap.get(server);
- if (inFastFailMode(server) &&
- !currentThreadInFastFailMode()) {
+ if (inFastFailMode(server) && !currentThreadInFastFailMode()) {
// In Fast-fail mode, all but one thread will fast fail. Check
// if we are that one chosen thread.
retryDespiteFastFailMode = shouldRetryInspiteOfFastFail(fInfo);
if (retryDespiteFastFailMode == false) { // we don't have to retry
- throw new PreemptiveFastFailException(fInfo.numConsecutiveFailures.get(),
- fInfo.timeOfFirstFailureMilliSec, fInfo.timeOfLatestAttemptMilliSec, server.getHostname());
+ throw new PreemptiveFastFailException(
+ fInfo.numConsecutiveFailures.get(),
+ fInfo.timeOfFirstFailureMilliSec,
+ fInfo.timeOfLatestAttemptMilliSec, server.getHostname());
}
}
didTry = true;
-
callable.instantiateServer();
return callable.call();
} catch (PreemptiveFastFailException pfe) {
@@ -1644,28 +1794,23 @@ public class HConnectionManager {
} catch (ClientSideDoNotRetryException exp) {
throw exp;
} catch (Throwable t1) {
- Throwable t2 = translateException(t1);
- boolean isLocalException = !(t2 instanceof RemoteException);
- // translateException throws DoNotRetryException or any
- // non-IOException.
- if (isLocalException && isNetworkException(t2)) {
- couldNotCommunicateWithServer = true;
- handleFailureToServer(server, t2);
- }
-
- updateClientContext(callable, t2);
- if (t2 instanceof IOException) {
- throw (IOException)t2;
- } else {
- throw new RuntimeException(t2);
- }
+ handleThrowable(t1, callable, couldNotCommunicateWithServer);
+ return null;
} finally {
updateFailureInfoForServer(server, fInfo, didTry,
- couldNotCommunicateWithServer, retryDespiteFastFailMode);
+ couldNotCommunicateWithServer.booleanValue(),
+ retryDespiteFastFailMode);
}
}
- private <T> void updateClientContext(final ServerCallable<T> callable, final Throwable t) {
+ @Override
+ public <T> T getRegionServerWithoutRetries(ServerCallable<T> callable)
+ throws IOException, RuntimeException, PreemptiveFastFailException {
+ return this.getRegionServerWithoutRetries(callable, true);
+ }
+
+ private <T> void updateClientContext(final ServerCallable<T> callable,
+ final Throwable t) {
if (!recordClientContext) {
return;
}
@@ -1676,23 +1821,23 @@ public class HConnectionManager {
this.operationContextPerThread.set(currContext);
}
- currContext.add(
- new OperationContext(callable.location, t));
+ currContext.add(new OperationContext(callable.location, t));
}
/**
* Handles failures encountered when communicating with a server.
*
- * Updates the FailureInfo in repeatedFailuresMap to reflect the
- * failure. Throws RepeatedConnectException if the client is in
- * Fast fail mode.
+ * Updates the FailureInfo in repeatedFailuresMap to reflect the failure.
+ * Throws RepeatedConnectException if the client is in Fast fail mode.
*
* @param server
- * @param t - the throwable to be handled.
+ * @param t
+ * - the throwable to be handled.
* @throws PreemptiveFastFailException
*/
private void handleFailureToServer(HServerAddress server, Throwable t) {
- if (server == null || t == null) return;
+ if (server == null || t == null)
+ return;
long currentTime = System.currentTimeMillis();
FailureInfo fInfo = repeatedFailuresMap.get(server);
@@ -1708,13 +1853,14 @@ public class HConnectionManager {
fInfo.numConsecutiveFailures.incrementAndGet();
if (inFastFailMode(server)) {
- // In FastFail mode, do not clear out the cache if it was done recently.
- if (currentTime > fInfo.timeOfLatestCacheClearMilliSec + cacheClearingTimeoutMilliSec) {
- fInfo.timeOfLatestCacheClearMilliSec = currentTime;
- clearCachedLocationForServer(server.toString());
- }
- LOG.error("Exception in FastFail mode : " + t.toString());
- return;
+ // In FastFail mode, do not clear out the cache if it was done recently.
+ if (currentTime > fInfo.timeOfLatestCacheClearMilliSec
+ + cacheClearingTimeoutMilliSec) {
+ fInfo.timeOfLatestCacheClearMilliSec = currentTime;
+ clearCachedLocationForServer(server.toString());
+ }
+ LOG.error("Exception in FastFail mode : " + t.toString());
+ return;
}
// if thrown these exceptions, we clear all the cache entries that
@@ -1727,36 +1873,43 @@ public class HConnectionManager {
/**
* Occasionally cleans up unused information in repeatedFailuresMap.
*
- * repeatedFailuresMap stores the failure information for all
- * remote hosts that had failures. In order to avoid these from growing
- * indefinitely, occassionallyCleanupFailureInformation() will clear these up once
- * every cleanupInterval ms.
+ * repeatedFailuresMap stores the failure information for all remote hosts
+ * that had failures. In order to avoid these from growing indefinitely,
+ * occassionallyCleanupFailureInformation() will clear these up once every
+ * cleanupInterval ms.
*/
private void occasionallyCleanupFailureInformation() {
long now = System.currentTimeMillis();
- if (!(now > lastFailureMapCleanupTimeMilliSec + failureMapCleanupIntervalMilliSec))
+ if (!(now > lastFailureMapCleanupTimeMilliSec
+ + failureMapCleanupIntervalMilliSec))
return;
// remove entries that haven't been attempted in a while
// No synchronization needed. It is okay if multiple threads try to
// remove the entry again and again from a concurrent hash map.
StringBuilder sb = new StringBuilder();
- for(Entry<HServerAddress, FailureInfo> entry : repeatedFailuresMap.entrySet()) {
+ for (Entry<HServerAddress, FailureInfo> entry : repeatedFailuresMap
+ .entrySet()) {
if (now > entry.getValue().timeOfLatestAttemptMilliSec
- + failureMapCleanupIntervalMilliSec) { // no recent failures
+ + failureMapCleanupIntervalMilliSec) { // no recent failures
repeatedFailuresMap.remove(entry.getKey());
} else if (now > entry.getValue().timeOfFirstFailureMilliSec
- + this.fastFailClearingTimeMilliSec) { // been failing for a long time
- LOG.error(entry.getKey() + " been failing for a long time. clearing out."
- + entry.getValue().toString());
+ + this.fastFailClearingTimeMilliSec) { // been failing for a long
+ // time
+ LOG.error(entry.getKey()
+ + " been failing for a long time. clearing out."
+ + entry.getValue().toString());
repeatedFailuresMap.remove(entry.getKey());
} else {
- sb.append(entry.getKey().toString() + " failing " + entry.getValue().toString() + "\n");
+ sb.append(entry.getKey().toString() + " failing "
+ + entry.getValue().toString() + "\n");
}
}
if (sb.length() > 0
- // If there are multiple threads cleaning up, try to see that only one will log the msg.
- && now > this.lastFailureMapCleanupTimeMilliSec + this.failureMapCleanupIntervalMilliSec) {
+ // If there are multiple threads cleaning up, try to see that only one
+ // will log the msg.
+ && now > this.lastFailureMapCleanupTimeMilliSec
+ + this.failureMapCleanupIntervalMilliSec) {
LOG.warn("Preemptive failure enabled for : " + sb.toString());
}
lastFailureMapCleanupTimeMilliSec = now;
@@ -1765,8 +1918,8 @@ public class HConnectionManager {
/**
* Checks to see if we are in the Fast fail mode for requests to the server.
*
- * If a client is unable to contact a server for more than fastFailThresholdMilliSec
- * the client will get into fast fail mode.
+ * If a client is unable to contact a server for more than
+ * fastFailThresholdMilliSec the client will get into fast fail mode.
*
* @param server
* @return true if the client is in fast fail mode for the server.
@@ -1774,36 +1927,44 @@ public class HConnectionManager {
private boolean inFastFailMode(HServerAddress server) {
FailureInfo fInfo = repeatedFailuresMap.get(server);
// if fInfo is null --> The server is considered good.
- // If the server is bad, wait long enough to believe that the server is down.
- return (fInfo != null && System.currentTimeMillis() >
- fInfo.timeOfFirstFailureMilliSec + this.fastFailThresholdMilliSec);
+ // If the server is bad, wait long enough to believe that the server is
+ // down.
+ return (fInfo != null && System.currentTimeMillis() > fInfo.timeOfFirstFailureMilliSec
+ + this.fastFailThresholdMilliSec);
}
/**
- * Checks to see if the current thread is already in FastFail mode for *some* server.
+ * Checks to see if the current thread is already in FastFail mode for
+ * *some* server.
+ *
* @return true, if the thread is already in FF mode.
*/
private boolean currentThreadInFastFailMode() {
- return (this.threadRetryingInFastFailMode.get() != null &&
- this.threadRetryingInFastFailMode.get().booleanValue() == true);
+ return (this.threadRetryingInFastFailMode.get() != null && this.threadRetryingInFastFailMode
+ .get().booleanValue() == true);
}
/**
- * Check to see if the client should try to connnect to the server, inspite of
- * knowing that it is in the fast fail mode.
+ * Check to see if the client should try to connnect to the server, inspite
+ * of knowing that it is in the fast fail mode.
*
[... 1910 lines stripped ...]