You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/04/02 22:49:09 UTC
svn commit: r1584162 [2/5] - in /hbase/branches/0.89-fb/src:
main/java/org/apache/hadoop/hbase/client/
test/java/org/apache/hadoop/hbase/client/
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1584162&r1=1584161&r2=1584162&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Wed Apr 2 20:49:09 2014
@@ -19,82 +19,16 @@
*/
package org.apache.hadoop.hbase.client;
-import com.google.common.base.Preconditions;
-import org.apache.commons.lang.mutable.MutableBoolean;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.HServerAddress;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.MasterNotRunningException;
-import org.apache.hadoop.hbase.NotServingRegionException;
-import org.apache.hadoop.hbase.RemoteExceptionHandler;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
-import org.apache.hadoop.hbase.ipc.HBaseRPCOptions;
-import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
-import org.apache.hadoop.hbase.ipc.HConnectionParams;
-import org.apache.hadoop.hbase.ipc.HMasterInterface;
-import org.apache.hadoop.hbase.ipc.HRegionInterface;
-import org.apache.hadoop.hbase.ipc.ThriftClientInterface;
-import org.apache.hadoop.hbase.ipc.ThriftHRegionInterface;
import org.apache.hadoop.hbase.ipc.thrift.HBaseThriftRPC;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.RegionOverloadedException;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.MetaUtils;
-import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.thrift.transport.TTransportException;
-import java.io.EOFException;
import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.io.SyncFailedException;
-import java.lang.reflect.UndeclaredThrowableException;
-import java.net.ConnectException;
-import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
-import java.nio.channels.ClosedChannelException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
import java.util.LinkedHashMap;
-import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
/**
* A non-instantiable class that manages connections to multiple tables in
@@ -248,3390 +182,4 @@ public class HConnectionManager {
TableServers connection = (TableServers) getConnection(conf);
return connection.isRegionCached(tableName, row);
}
-
- /* Encapsulates finding the servers for an HBase instance */
- public static class TableServers implements ServerConnection {
- static final Log LOG = LogFactory.getLog(TableServers.class);
- private final Class<? extends HRegionInterface> serverInterfaceClass;
- private final int prefetchRegionLimit;
-
- private final Object masterLock = new Object();
- private volatile boolean closed;
- private volatile HMasterInterface master;
- private volatile boolean masterChecked;
-
- private final Object rootRegionLock = new Object();
- private final Object metaRegionLock = new Object();
- private final Object userRegionLock = new Object();
-
- private volatile Configuration conf;
- private final HConnectionParams params;
-
- // Used by master and region servers during safe mode only
- private volatile HRegionLocation rootRegionLocation;
-
- private final Map<Integer, ConcurrentSkipListMap<byte[], HRegionLocation>> cachedRegionLocations = new ConcurrentHashMap<Integer, ConcurrentSkipListMap<byte[], HRegionLocation>>();
-
- // amount of time to wait before we consider a server to be in fast fail
- // mode
- protected long fastFailThresholdMilliSec;
- // Keeps track of failures when we cannot talk to a server. Helps in
- // fast failing clients if the server is down for a long time.
- protected final ConcurrentMap<HServerAddress, FailureInfo> repeatedFailuresMap = new ConcurrentHashMap<HServerAddress, FailureInfo>();
- // We populate repeatedFailuresMap every time there is a failure. So, to
- // keep it
- // from growing unbounded, we garbage collect the failure information
- // every cleanupInterval.
- protected final long failureMapCleanupIntervalMilliSec;
- protected volatile long lastFailureMapCleanupTimeMilliSec;
- // Amount of time that has to pass, before we clear region -> regionserver
- // cache
- // again, when in fast fail mode. This is used to clean unused entries.
- protected long cacheClearingTimeoutMilliSec;
- // clear failure Info. Used to clean out all entries.
- // A safety valve, in case the client does not exit the
- // fast fail mode for any reason.
- private long fastFailClearingTimeMilliSec;
- private final boolean recordClientContext;
-
- private ThreadLocal<List<OperationContext>> operationContextPerThread = new ThreadLocal<List<OperationContext>>();
-
- public void resetOperationContext() {
- if (!recordClientContext || this.operationContextPerThread == null) {
- return;
- }
-
- List<OperationContext> currContext = this.operationContextPerThread.get();
-
- if (currContext != null) {
- currContext.clear();
- }
- }
-
- public List<OperationContext> getAndResetOperationContext() {
- if (!recordClientContext || this.operationContextPerThread == null) {
- return null;
- }
-
- List<OperationContext> currContext = this.operationContextPerThread.get();
-
- if (currContext == null) {
- return null;
- }
-
- ArrayList<OperationContext> context = new ArrayList<OperationContext>(
- currContext);
-
- // Made a copy, clear the context
- currContext.clear();
-
- return context;
- }
-
- /**
- * Keeps track of repeated failures to any region server.
- *
- * @author amitanand.s
- *
- */
- protected class FailureInfo {
- // The number of consecutive failures.
- private final AtomicLong numConsecutiveFailures = new AtomicLong();
- // The time when the server started to become unresponsive
- // Once set, this would never be updated.
- private long timeOfFirstFailureMilliSec;
- // The time when the client last tried to contact the server.
- // This is only updated by one client at a time
- private volatile long timeOfLatestAttemptMilliSec;
- // The time when the client last cleared cache for regions assigned
- // to the server. Used to ensure we don't clearCache too often.
- private volatile long timeOfLatestCacheClearMilliSec;
- // Used to keep track of concurrent attempts to contact the server.
- // In Fast fail mode, we want just one client thread to try to connect
- // the rest of the client threads will fail fast.
- private final AtomicBoolean exclusivelyRetringInspiteOfFastFail = new AtomicBoolean(
- false);
-
- public String toString() {
- return "FailureInfo: numConsecutiveFailures = "
- + numConsecutiveFailures + " timeOfFirstFailureMilliSec = "
- + timeOfFirstFailureMilliSec + " timeOfLatestAttemptMilliSec = "
- + timeOfLatestAttemptMilliSec
- + " timeOfLatestCacheClearMilliSec = "
- + timeOfLatestCacheClearMilliSec
- + " exclusivelyRetringInspiteOfFastFail = "
- + exclusivelyRetringInspiteOfFastFail.get();
- }
-
- FailureInfo(long firstFailureTime) {
- this.timeOfFirstFailureMilliSec = firstFailureTime;
- }
- }
-
- private final ThreadLocal<MutableBoolean> threadRetryingInFastFailMode = new ThreadLocal<MutableBoolean>();
-
- // For TESTING purposes only;
- public Map<HServerAddress, FailureInfo> getFailureMap() {
- return repeatedFailuresMap;
- }
-
- // The presence of a server in the map implies it's likely that there is an
- // entry in cachedRegionLocations that map to this server; but the absence
- // of a server in this map guarentees that there is no entry in cache that
- // maps to the absent server.
- private final Set<String> cachedServers =
- new HashSet<String>();
-
- // region cache prefetch is enabled by default. this set contains all
- // tables whose region cache prefetch are disabled.
- private final Set<Integer> regionCachePrefetchDisabledTables =
- new CopyOnWriteArraySet<Integer>();
- // The number of times we will retry after receiving a RegionOverloadedException from the
- // region server. Defaults to 0 (i.e. we will throw the exception and let the client handle retries)
- // may not always be what you want. But, for the purposes of the HBaseThrift client, that this is
- // created for, we do not want the thrift layer to hold up IPC threads handling retries.
- private int maxServerRequestedRetries;
-
- // keep track of servers that have been updated for batchedLoad
- // tablename -> Map
- Map<String, ConcurrentMap<HRegionInfo, HRegionLocation>> batchedUploadUpdatesMap;
- private int batchedUploadSoftFlushRetries;
- private long batchedUploadSoftFlushTimeoutMillis;
- private final boolean useThrift;
-
- private ConcurrentSkipListSet<byte[]> initializedTableSet =
- new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
- /**
- * constructor
- * @param conf Configuration object
- */
- @SuppressWarnings("unchecked")
- public TableServers(Configuration conf) {
- this.conf = conf;
- params = HConnectionParams.getInstance(conf);
- this.useThrift = conf.getBoolean(HConstants.CLIENT_TO_RS_USE_THRIFT,
- HConstants.CLIENT_TO_RS_USE_THRIFT_DEFAULT);
-
- String serverClassName =
- conf.get(HConstants.REGION_SERVER_CLASS,
- HConstants.DEFAULT_REGION_SERVER_CLASS);
-
- this.closed = false;
-
- try {
- this.serverInterfaceClass =
- (Class<? extends HRegionInterface>) Class.forName(serverClassName);
-
- } catch (ClassNotFoundException e) {
- throw new UnsupportedOperationException(
- "Unable to find region server interface " + serverClassName, e);
- }
-
- // TODO move parameters below into HConnectionParams
- this.cacheClearingTimeoutMilliSec = conf.getLong(
- "hbase.client.fastfail.cache.clear.interval",
- 10000); // 10 sec
- this.fastFailThresholdMilliSec = conf.getLong(
- "hbase.client.fastfail.threshold",
- 60000); // 1 min
- this.failureMapCleanupIntervalMilliSec = conf.getLong(
- "hbase.client.fastfail.cleanup.map.interval.millisec", 600000); // 10 min
- this.fastFailClearingTimeMilliSec = conf.getLong(
- "hbase.client.fastfail.cleanup.all.millisec", 900000); // 15 mins
-
- this.prefetchRegionLimit = conf.getInt("hbase.client.prefetch.limit",
- 10);
-
- this.master = null;
- this.masterChecked = false;
- this.batchedUploadSoftFlushRetries =
- conf.getInt("hbase.client.batched-upload.softflush.retries", 10);
- this.batchedUploadSoftFlushTimeoutMillis =
- conf.getLong("hbase.client.batched-upload.softflush.timeout.ms", 60000L); // 1 min
- batchedUploadUpdatesMap = new ConcurrentHashMap<String,
- ConcurrentMap<HRegionInfo, HRegionLocation>>();
-
- this.recordClientContext = conf.getBoolean("hbase.client.record.context", false);
-
- }
-
- // Used by master and region servers during safe mode only
- public void unsetRootRegionLocation() {
- this.rootRegionLocation = null;
- }
-
- // Used by master and region servers during safe mode only
- public void setRootRegionLocation(HRegionLocation rootRegion) {
- if (rootRegion == null) {
- throw new IllegalArgumentException(
- "Cannot set root region location to null.");
- }
- this.rootRegionLocation = rootRegion;
- }
-
- public HMasterInterface getMaster() throws MasterNotRunningException {
- ZooKeeperWrapper zk;
- try {
- zk = getZooKeeperWrapper();
- } catch (IOException e) {
- throw new MasterNotRunningException(e);
- }
-
- HServerAddress masterLocation = null;
- synchronized (this.masterLock) {
- for (int tries = 0; !this.closed && !this.masterChecked
- && this.master == null && tries < params.getNumRetries(); tries++) {
-
- try {
- masterLocation = zk.readMasterAddress(zk);
-
- if (masterLocation != null) {
- HMasterInterface tryMaster = (HMasterInterface) HBaseRPC.getProxy(
- HMasterInterface.class, HBaseRPCProtocolVersion.versionID,
- masterLocation.getInetSocketAddress(), this.conf,
- params.getRpcTimeout(), HBaseRPCOptions.DEFAULT);
-
- if (tryMaster.isMasterRunning()) {
- this.master = tryMaster;
- this.masterLock.notifyAll();
- break;
- }
- }
-
- } catch (IOException e) {
- if (tries == params.getNumRetries() - 1) {
- // This was our last chance - don't bother sleeping
- LOG.info(
- "getMaster attempt " + tries + " of "
- + params.getNumRetries() + " failed; no more retrying.",
- e);
- break;
- }
- LOG.info(
- "getMaster attempt " + tries + " of " + params.getNumRetries()
- + " failed; retrying after sleep of "
- + params.getPauseTime(tries), e);
- }
-
- // Cannot connect to master or it is not running. Sleep & retry
- try {
- this.masterLock.wait(params.getPauseTime(tries));
- } catch (InterruptedException e) {
- // continue
- }
- }
- this.masterChecked = true;
- }
- if (this.master == null) {
- if (masterLocation == null) {
- throw new MasterNotRunningException();
- }
- throw new MasterNotRunningException(masterLocation.toString());
- }
- return this.master;
- }
-
- public boolean isMasterRunning() {
- if (this.master == null) {
- try {
- getMaster();
-
- } catch (MasterNotRunningException e) {
- return false;
- }
- }
- return true;
- }
-
- public boolean tableExists(final byte[] tableName)
- throws MasterNotRunningException {
- getMaster();
- if (tableName == null) {
- throw new IllegalArgumentException("Table name cannot be null");
- }
- if (isMetaTableName(tableName)) {
- return true;
- }
- boolean exists = false;
- try {
- HTableDescriptor[] tables = listTables();
- for (HTableDescriptor table : tables) {
- if (Bytes.equals(table.getName(), tableName)) {
- exists = true;
- }
- }
- } catch (IOException e) {
- LOG.warn("Testing for table existence threw exception", e);
- }
- return exists;
- }
-
- /*
- * @param n
- *
- * @return Truen if passed tablename <code>n</code> is equal to the name of
- * a catalog table.
- */
- private static boolean isMetaTableName(final byte[] n) {
- return MetaUtils.isMetaTableName(n);
- }
-
- public HRegionLocation getRegionLocation(final byte[] name,
- final byte[] row, boolean reload) throws IOException {
- return reload ? relocateRegion(name, row) : locateRegion(name, row);
- }
-
- public HTableDescriptor[] listTables() throws IOException {
- getMaster();
- final TreeSet<HTableDescriptor> uniqueTables = new TreeSet<HTableDescriptor>();
- MetaScannerVisitor visitor = new MetaScannerVisitor() {
- public boolean processRow(Result result) throws IOException {
- try {
- byte[] value = result.getValue(HConstants.CATALOG_FAMILY,
- HConstants.REGIONINFO_QUALIFIER);
- HRegionInfo info = null;
- if (value != null) {
- info = Writables.getHRegionInfo(value);
- }
- // Only examine the rows where the startKey is zero length
- if (info != null && info.getStartKey().length == 0) {
- uniqueTables.add(info.getTableDesc());
- }
- return true;
- } catch (RuntimeException e) {
- LOG.error("Result=" + result);
- throw e;
- }
- }
- };
- MetaScanner.metaScan(conf, visitor);
-
- return uniqueTables.toArray(new HTableDescriptor[uniqueTables.size()]);
- }
-
- public boolean isTableEnabled(byte[] tableName) throws IOException {
- return testTableOnlineState(tableName, true);
- }
-
- public boolean isTableDisabled(byte[] tableName) throws IOException {
- return testTableOnlineState(tableName, false);
- }
-
- public boolean isTableAvailable(final byte[] tableName) throws IOException {
- final AtomicBoolean available = new AtomicBoolean(true);
- MetaScannerVisitor visitor = new MetaScannerVisitor() {
- @Override
- public boolean processRow(Result row) throws IOException {
- byte[] value = row.getValue(HConstants.CATALOG_FAMILY,
- HConstants.REGIONINFO_QUALIFIER);
- HRegionInfo info = Writables.getHRegionInfoOrNull(value);
- if (info != null) {
- if (Bytes.equals(tableName, info.getTableDesc().getName())) {
- value = row.getValue(HConstants.CATALOG_FAMILY,
- HConstants.SERVER_QUALIFIER);
- if (value == null) {
- available.set(false);
- return false;
- }
- }
- }
- return true;
- }
- };
- MetaScanner.metaScan(conf, visitor);
- return available.get();
- }
-
- /*
- * If online == true Returns true if all regions are online Returns false in
- * any other case If online == false Returns true if all regions are offline
- * Returns false in any other case
- */
- private boolean testTableOnlineState(byte[] tableName, boolean online)
- throws IOException {
- if (!tableExists(tableName)) {
- throw new TableNotFoundException(Bytes.toString(tableName));
- }
- if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
- // The root region is always enabled
- return true;
- }
- int rowsScanned = 0;
- int rowsOffline = 0;
- byte[] startKey = HRegionInfo.createRegionName(tableName, null,
- HConstants.ZEROES, false);
- byte[] endKey;
- HRegionInfo currentRegion;
- Scan scan = new Scan(startKey);
- scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
- int rows = this.conf.getInt("hbase.meta.scanner.caching", 100);
- scan.setCaching(rows);
- ScannerCallable s = new ScannerCallable(this, (Bytes.equals(tableName,
- HConstants.META_TABLE_NAME) ? HConstants.ROOT_TABLE_NAME
- : HConstants.META_TABLE_NAME), scan, HBaseRPCOptions.DEFAULT);
- try {
- // Open scanner
- getRegionServerWithRetries(s);
- do {
- currentRegion = s.getHRegionInfo();
- Result r;
- Result[] rrs;
- while ((rrs = getRegionServerWithRetries(s)) != null
- && rrs.length > 0) {
- r = rrs[0];
- byte[] value = r.getValue(HConstants.CATALOG_FAMILY,
- HConstants.REGIONINFO_QUALIFIER);
- if (value != null) {
- HRegionInfo info = Writables.getHRegionInfoOrNull(value);
- if (info != null) {
- if (Bytes.equals(info.getTableDesc().getName(), tableName)) {
- rowsScanned += 1;
- rowsOffline += info.isOffline() ? 1 : 0;
- }
- }
- }
- }
- endKey = currentRegion.getEndKey();
- } while (!(endKey == null || Bytes.equals(endKey,
- HConstants.EMPTY_BYTE_ARRAY)));
- } finally {
- s.setClose();
- // Doing below will call 'next' again and this will close the scanner
- // Without it we leave scanners open.
- getRegionServerWithRetries(s);
- }
- LOG.debug("Rowscanned=" + rowsScanned + ", rowsOffline=" + rowsOffline);
- boolean onOffLine = online ? rowsOffline == 0
- : rowsOffline == rowsScanned;
- return rowsScanned > 0 && onOffLine;
- }
-
- private static class HTableDescriptorFinder implements
- MetaScanner.MetaScannerVisitor {
- byte[] tableName;
- HTableDescriptor result;
-
- protected HTableDescriptorFinder(byte[] tableName) {
- this.tableName = tableName;
- }
-
- public boolean processRow(Result rowResult) throws IOException {
- HRegionInfo info = Writables.getHRegionInfo(rowResult.getValue(
- HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER));
- HTableDescriptor desc = info.getTableDesc();
- if (Bytes.compareTo(desc.getName(), tableName) == 0) {
- result = desc;
- return false;
- }
- return true;
- }
-
- HTableDescriptor getResult() {
- return result;
- }
- }
-
- public HTableDescriptor getHTableDescriptor(final byte[] tableName)
- throws IOException {
- if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
- return new UnmodifyableHTableDescriptor(HTableDescriptor.ROOT_TABLEDESC);
- }
- if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) {
- return HTableDescriptor.META_TABLEDESC;
- }
- HTableDescriptorFinder finder = new HTableDescriptorFinder(tableName);
- MetaScanner.metaScan(conf, finder, tableName);
- HTableDescriptor result = finder.getResult();
- if (result == null) {
- throw new TableNotFoundException(Bytes.toString(tableName));
- }
- return result;
- }
-
- public HRegionLocation locateRegion(final byte[] tableName, final byte[] row)
- throws IOException {
- return locateRegion(tableName, row, true);
- }
-
- public HRegionLocation relocateRegion(final byte[] tableName,
- final byte[] row) throws IOException {
- return locateRegion(tableName, row, false);
- }
-
- private HRegionLocation locateRegion(final byte[] tableName,
- final byte[] row, boolean useCache) throws IOException {
- if (tableName == null || tableName.length == 0) {
- throw new IllegalArgumentException(
- "table name cannot be null or zero length");
- }
- if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) {
- return locateMetaInRoot(row, useCache, metaRegionLock);
- } else if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
- synchronized (rootRegionLock) {
- // This block guards against two threads trying to find the root
- // region at the same time. One will go do the find while the
- // second waits. The second thread will not do find.
-
- if (!useCache || rootRegionLocation == null
- || inFastFailMode(this.rootRegionLocation.getServerAddress())) {
- this.rootRegionLocation = locateRootRegion();
- LOG.info("Updated rootRegionLocation from ZK to : " + this.rootRegionLocation);
- }
- return this.rootRegionLocation;
- }
- } else {
- // Region not in the cache - have to go to the meta RS
- return locateRegionInMeta(HConstants.META_TABLE_NAME, tableName, row,
- useCache, userRegionLock);
- }
- }
-
- private HRegionLocation prefetchRegionCache(final byte[] tableName,
- final byte[] row) {
- return prefetchRegionCache(tableName, row, this.prefetchRegionLimit);
- }
-
- /*
- * Search .META. for the HRegionLocation info that contains the table and
- * row we're seeking. It will prefetch certain number of regions info and
- * save them to the global region cache.
- */
- private HRegionLocation prefetchRegionCache(final byte[] tableName,
- final byte[] row, int prefetchRegionLimit) {
- // Implement a new visitor for MetaScanner, and use it to walk through
- // the .META.
- MetaScannerVisitor visitor = new MetaScannerVisitor() {
- public boolean processRow(Result result) throws IOException {
- try {
- byte[] value = result.getValue(HConstants.CATALOG_FAMILY,
- HConstants.REGIONINFO_QUALIFIER);
- HRegionInfo regionInfo = null;
-
- if (value != null) {
- // convert the row result into the HRegionLocation we need!
- regionInfo = Writables.getHRegionInfo(value);
-
- // possible we got a region of a different table...
- if (!Bytes.equals(regionInfo.getTableDesc().getName(), tableName)) {
- return false; // stop scanning
- }
- if (regionInfo.isOffline()) {
- // don't cache offline regions
- return true;
- }
- value = result.getValue(HConstants.CATALOG_FAMILY,
- HConstants.SERVER_QUALIFIER);
- if (value == null) {
- return true; // don't cache it
- }
- final String serverAddress = Bytes.toString(value);
-
- value = result.getValue(HConstants.CATALOG_FAMILY,
- HConstants.STARTCODE_QUALIFIER);
- long serverStartCode = -1;
- if (value != null) {
- serverStartCode = Bytes.toLong(value);
- }
-
- // instantiate the location
- HRegionLocation loc = new HRegionLocation(regionInfo,
- new HServerAddress(serverAddress), serverStartCode);
- // cache this meta entry
- cacheLocation(tableName, loc);
- }
- return true;
- } catch (RuntimeException e) {
- throw new IOException(e);
- }
- }
- };
- try {
- // pre-fetch certain number of regions info at region cache.
- MetaScanner.metaScan(conf, visitor, tableName, row, prefetchRegionLimit);
- return getCachedLocation(tableName, row);
- } catch (IOException e) {
- LOG.warn("Encounted problems when prefetch META table: ", e);
- }
- return null;
- }
-
- /**
- * Search the meta table (.META.) for the HRegionLocation info that
- * contains the table and row we're seeking.
- */
- private HRegionLocation locateRegionInMeta(final byte [] parentTable,
- final byte [] tableName, final byte [] row, boolean useCache,
- Object regionLockObject)
- throws IOException {
- HRegionLocation location;
- if (useCache) {
- location = getCachedLocation(tableName, row);
- if (location != null) {
- return location;
- }
- }
-
- // If we are supposed to be using the cache, look in the cache to see if
- // we already have the region.
-
- // build the key of the meta region we should be looking for.
- // the extra 9's on the end are necessary to allow "exact" matches
- // without knowing the precise region names.
- byte [] metaKey = HRegionInfo.createRegionName(tableName, row,
- HConstants.NINES, false);
- for (int tries = 0; true; tries++) {
- if (tries >= params.getNumRetries()) {
- throw new NoServerForRegionException("Unable to find region for "
- + Bytes.toStringBinary(row) + " after " + params.getNumRetries() +
- " tries.");
- }
-
- FailureInfo fInfo = null;
- HServerAddress server = null;
- boolean didTry = false;
- boolean couldNotCommunicateWithServer = false;
- boolean retryDespiteFastFailMode = false;
- try {
- // locate the root or meta region
- HRegionLocation metaLocation = locateRegion(parentTable, metaKey);
-
- server = metaLocation.getServerAddress();
- fInfo = repeatedFailuresMap.get(server);
-
- // Handle the case where .META. is on an unresponsive server.
- if (inFastFailMode(server) &&
- !this.currentThreadInFastFailMode()) {
- // In Fast-fail mode, all but one thread will fast fail. Check
- // if we are that one chosen thread.
-
- retryDespiteFastFailMode = shouldRetryInspiteOfFastFail(fInfo);
-
- if (retryDespiteFastFailMode == false) { // we don't have to retry
- throw new PreemptiveFastFailException(fInfo.numConsecutiveFailures.get(),
- fInfo.timeOfFirstFailureMilliSec, fInfo.timeOfLatestAttemptMilliSec, server.getHostname());
- }
- }
- didTry = true;
-
- HBaseThriftRPC.isMeta.get().push(true);
- Result regionInfoRow = null;
- try {
- // This block guards against two threads trying to load the meta
- // region at the same time. The first will load the meta region and
- // the second will use the value that the first one found.
- synchronized (regionLockObject) {
- // Check the cache again for a hit in case some other thread made the
- // same query while we were waiting on the lock. If not supposed to
- // be using the cache, delete any existing cached location so it won't
- // interfere.
- if (useCache) {
- location = getCachedLocation(tableName, row);
- if (location != null) {
- return location;
- }
- } else {
- LOG.debug("Deleting the client location cache.");
- deleteCachedLocation(tableName, row, null);
- }
-
- // If the parent table is META, we may want to pre-fetch some
- // region info into the global region cache for this table.
- if (Bytes.equals(parentTable, HConstants.META_TABLE_NAME) &&
- (getRegionCachePrefetch(tableName)) ) {
- LOG.debug("Prefetching the client location cache.");
- location = prefetchRegionCache(tableName, row);
- if (location != null) {
- return location;
- }
- }
-
- HRegionInterface serverInterface =
- getHRegionConnection(metaLocation.getServerAddress());
-
- // Query the root or meta region for the location of the meta region
- regionInfoRow = serverInterface.getClosestRowBefore(
- metaLocation.getRegionInfo().getRegionName(), metaKey,
- HConstants.CATALOG_FAMILY);
- }
- } catch (Exception e) {
- throw e;
- } finally {
- HBaseThriftRPC.isMeta.get().pop();
- }
- location = getLocationFromRow(regionInfoRow, tableName,
- parentTable, row);
- cacheLocation(tableName, location);
- return location;
- } catch (TableNotFoundException e) {
- // if we got this error, probably means the table just plain doesn't
- // exist. rethrow the error immediately. this should always be coming
- // from the HTable constructor.
- throw e;
- } catch (PreemptiveFastFailException e) {
- // already processed this. Don't process this again.
- throw e;
- } catch (IOException e) {
- if (e instanceof RemoteException) {
- e = RemoteExceptionHandler
- .decodeRemoteException((RemoteException) e);
- } else if (isNetworkException(e)) {
- couldNotCommunicateWithServer = true;
- handleFailureToServer(server, e);
- }
- if (tries < params.getNumRetries() - 1) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("locateRegionInMeta attempt " + tries + " of "
- + params.getNumRetries()
- + " failed; retrying after sleep of "
- + params.getPauseTime(tries) + " because: " + e.getMessage());
- }
- } else {
- throw e;
- }
- // Only relocate the parent region if necessary
- if (!(e instanceof RegionOfflineException || e instanceof NoServerForRegionException)) {
- relocateRegion(parentTable, row);
- }
- } catch (Exception e) {
- couldNotCommunicateWithServer = true;
- handleFailureToServer(server, e);
- if (tries < params.getNumRetries() - 1) {
- LOG.debug("locateRegionInMeta attempt " + tries + " of "
- + params.getNumRetries() + " failed; retrying after sleep of "
- + params.getPauseTime(tries) + " because: " + e.getMessage());
- } else {
- throw e;
- }
- } finally {
- updateFailureInfoForServer(server, fInfo, didTry,
- couldNotCommunicateWithServer, retryDespiteFastFailMode);
- }
- try {
- Thread.sleep(params.getPauseTime(tries));
- } catch (InterruptedException e) {
- // continue
- }
- }
- }
-
- private HRegionLocation getLocationFromRow(Result regionInfoRow,
- byte[] tableName, byte[] parentTable, byte[] row) throws IOException {
- if (regionInfoRow == null) {
- throw new TableNotFoundException(Bytes.toString(tableName));
- }
- byte[] value = regionInfoRow.getValue(HConstants.CATALOG_FAMILY,
- HConstants.REGIONINFO_QUALIFIER);
- if (value == null || value.length == 0) {
- throw new IOException("HRegionInfo was null or empty in "
- + Bytes.toString(parentTable) + ", row=" + regionInfoRow);
- }
- // convert the row result into the HRegionLocation we need!
- HRegionInfo regionInfo = (HRegionInfo) Writables.getWritable(value,
- new HRegionInfo());
- // possible we got a region of a different table...
- if (!Bytes.equals(regionInfo.getTableDesc().getName(), tableName)) {
- throw new TableNotFoundException("Table '" + Bytes.toString(tableName)
- + "' was not found.");
- }
- if (regionInfo.isOffline()) {
- throw new RegionOfflineException("region offline: "
- + regionInfo.getRegionNameAsString());
- }
-
- value = regionInfoRow.getValue(HConstants.CATALOG_FAMILY,
- HConstants.SERVER_QUALIFIER);
- String serverAddress = "";
- if (value != null) {
- serverAddress = Bytes.toString(value);
- }
- if (serverAddress.equals("")) {
- throw new NoServerForRegionException("No server address listed "
- + "in " + Bytes.toString(parentTable) + " for region "
- + regionInfo.getRegionNameAsString() + " containing row "
- + Bytes.toStringBinary(row));
- }
-
- value = regionInfoRow.getValue(HConstants.CATALOG_FAMILY,
- HConstants.STARTCODE_QUALIFIER);
- long serverStartCode = -1;
- if (value != null) {
- serverStartCode = Bytes.toLong(value);
- }
- // instantiate the location
- HRegionLocation location = new HRegionLocation(regionInfo,
- new HServerAddress(serverAddress), serverStartCode);
- return location;
- }
-
- /**
- * TODO:WARNING!!! This looks like a lot of duplicated code with
- * {@link #locateRegionInMeta(byte[], byte[], byte[], boolean, Object)} pls
- * fix this! Search the Root Table for the Meta Region. Retries a fixed
- * number of times and throws if Region is not found.
- */
- private HRegionLocation locateMetaInRoot(final byte[] row,
- boolean useCache, Object regionLockObject) throws IOException {
- HRegionLocation location;
- final byte[] parentTable = HConstants.ROOT_TABLE_NAME;
- final byte[] tableName = HConstants.META_TABLE_NAME;
- if (useCache) {
- location = getCachedLocation(tableName, row);
- if (location != null) {
- return location;
- }
- }
- // If we are supposed to be using the cache, look in the cache to see if
- // we already have the region.
-
- // build the key of the meta region we should be looking for.
- // the extra 9's on the end are necessary to allow "exact" matches
- // without knowing the precise region names.
- byte[] metaKey = HRegionInfo.createRegionName(tableName, row,
- HConstants.NINES, false);
- for (int tries = 0; true; tries++) {
- if (tries >= params.getNumRetries()) {
- throw new NoServerForRegionException("Unable to find region for "
- + Bytes.toStringBinary(row) + " after " + params.getNumRetries()
- + " tries.");
- }
-
- TableServers.FailureInfo fInfo = null;
- HServerAddress server = null;
- boolean didTry = false;
- boolean couldNotCommunicateWithServer = false;
- boolean retryDespiteFastFailMode = false;
- try {
- // locate the root or meta region
- HRegionLocation metaLocation = null;
- if (useCache && rootRegionLocation != null
- && !inFastFailMode(this.rootRegionLocation.getServerAddress())) {
- metaLocation = rootRegionLocation;
- } else {
- synchronized (rootRegionLock) {
- // This block guards against two threads trying to find the root
- // region at the same time. One will go do the find while the
- // second waits. The second thread will not do find.
-
- if (!useCache || rootRegionLocation == null
- || inFastFailMode(this.rootRegionLocation.getServerAddress())) {
- HBaseThriftRPC.isMeta.get().push(true);
- try {
- this.rootRegionLocation = locateRootRegion();
- } catch (Exception e) {
- throw e;
- } finally {
- HBaseThriftRPC.isMeta.get().pop();
- }
- LOG.info("Updated rootRegionLocation from ZK to : "
- + this.rootRegionLocation);
- }
- metaLocation = this.rootRegionLocation;
- }
- }
- Preconditions.checkNotNull(metaLocation);
- server = metaLocation.getServerAddress();
- fInfo = repeatedFailuresMap.get(server);
-
- // Handle the case where .META. is on an unresponsive server.
- if (inFastFailMode(server) && !this.currentThreadInFastFailMode()) {
- // In Fast-fail mode, all but one thread will fast fail. Check
- // if we are that one chosen thread.
-
- retryDespiteFastFailMode = shouldRetryInspiteOfFastFail(fInfo);
-
- if (retryDespiteFastFailMode == false) { // we don't have to retry
- throw new PreemptiveFastFailException(
- fInfo.numConsecutiveFailures.get(),
- fInfo.timeOfFirstFailureMilliSec,
- fInfo.timeOfLatestAttemptMilliSec, server.getHostname());
- }
- }
- didTry = true;
- HBaseThriftRPC.isMeta.get().push(true);
- Result regionInfoRow = null;
- // This block guards against two threads trying to load the meta
- // region at the same time. The first will load the meta region and
- // the second will use the value that the first one found.
- synchronized (metaRegionLock) {
- if (useCache) {
- location = getCachedLocation(tableName, row);
- if (location != null) {
- return location;
- }
- } else {
- LOG.debug("Deleting the client location cache.");
- deleteCachedLocation(tableName, row, null);
- }
- HRegionInterface serverInterface = getHRegionConnection(metaLocation
- .getServerAddress());
-
- // Query the root for the location of the meta region
- regionInfoRow = serverInterface.getClosestRowBefore(metaLocation
- .getRegionInfo().getRegionName(), metaKey,
- HConstants.CATALOG_FAMILY);
- location = getLocationFromRow(regionInfoRow, tableName,
- parentTable, row);
- cacheLocation(tableName, location);
- }
- return location;
- } catch (TableNotFoundException e) {
- // if we got this error, probably means the table just plain doesn't
- // exist. rethrow the error immediately. this should always be coming
- // from the HTable constructor.
- throw e;
- } catch (PreemptiveFastFailException e) {
- // already processed this. Don't process this again.
- throw e;
- } catch (IOException e) {
- if (e instanceof RemoteException) {
- e = RemoteExceptionHandler
- .decodeRemoteException((RemoteException) e);
- } else if (isNetworkException(e)) {
- couldNotCommunicateWithServer = true;
- handleFailureToServer(server, e);
- }
- if (tries < params.getNumRetries() - 1) {
- LOG.debug("IOException locateRegionInMeta attempt " + tries
- + " of " + params.getNumRetries()
- + " failed; retrying after sleep of "
- + params.getPauseTime(tries) + " because: " + e.getMessage());
- } else {
- throw e;
- }
- } catch (Exception e) {
- couldNotCommunicateWithServer = true;
- handleFailureToServer(server, e);
- if (tries < params.getNumRetries() - 1) {
- LOG.debug("Exception locateRegionInMeta attempt " + tries + " of "
- + params.getNumRetries() + " failed; retrying after sleep of "
- + params.getPauseTime(tries) + " because: " + e.getMessage());
- } else {
- throw e;
- }
- } finally {
- HBaseThriftRPC.isMeta.get().pop();
- updateFailureInfoForServer(server, fInfo, didTry,
- couldNotCommunicateWithServer, retryDespiteFastFailMode);
- }
- try {
- Thread.sleep(params.getPauseTime(tries));
- } catch (InterruptedException e) {
- // continue
- }
- }
- }
-
-
-
-
- /**
- * Check if the exception is something that indicates that we cannot
- * contact/communicate with the server.
- *
- * @param e
- * @return
- */
- private boolean isNetworkException(Throwable e) {
- // This list covers most connectivity exceptions but not all.
- // For example, in SocketOutputStream a plain IOException is thrown
- // at times when the channel is closed.
- return (e instanceof SocketTimeoutException ||
- e instanceof ConnectException ||
- e instanceof ClosedChannelException ||
- e instanceof SyncFailedException ||
- e instanceof EOFException ||
- e instanceof TTransportException);
- }
-
- public Collection<HRegionLocation> getCachedHRegionLocations(final byte [] tableName,
- boolean forceRefresh) {
- if (forceRefresh || !initializedTableSet.contains(tableName)) {
- prefetchRegionCache(tableName, null, Integer.MAX_VALUE);
- initializedTableSet.add(tableName);
- }
-
- ConcurrentSkipListMap<byte [], HRegionLocation> tableLocations =
- getTableLocations(tableName);
- return tableLocations.values();
- }
-
- /*
- * Search the cache for a location that fits our table and row key.
- * Return null if no suitable region is located. TODO: synchronization note
- *
- * <p>TODO: This method during writing consumes 15% of CPU doing lookup
- * into the Soft Reference SortedMap. Improve.
- *
- * @param tableName
- * @param row
- * @return Null or region location found in cache.
- */
- HRegionLocation getCachedLocation(final byte [] tableName,
- final byte [] row) {
- ConcurrentSkipListMap<byte [], HRegionLocation> tableLocations =
- getTableLocations(tableName);
-
- // start to examine the cache. we can only do cache actions
- // if there's something in the cache for this table.
- if (tableLocations.isEmpty()) {
- return null;
- }
-
- HRegionLocation rl = tableLocations.get(row);
- if (rl != null) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Cache hit for row <" + Bytes.toStringBinary(row)
- + "> in tableName " + Bytes.toString(tableName)
- + ": location server " + rl.getServerAddress()
- + ", location region name "
- + rl.getRegionInfo().getRegionNameAsString());
- }
- return rl;
- }
-
- // get the matching region for the row
- Entry<byte[], HRegionLocation> entry = tableLocations.floorEntry(row);
- HRegionLocation possibleRegion = (entry == null) ? null : entry
- .getValue();
-
- // we need to examine the cached location to verify that it is
- // a match by end key as well.
- if (possibleRegion != null) {
- byte[] endKey = possibleRegion.getRegionInfo().getEndKey();
-
- // make sure that the end key is greater than the row we're looking
- // for, otherwise the row actually belongs in the next region, not
- // this one. the exception case is when the endkey is
- // HConstants.EMPTY_START_ROW, signifying that the region we're
- // checking is actually the last region in the table.
- if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW)
- || KeyValue.getRowComparator(tableName).compareRows(endKey, 0,
- endKey.length, row, 0, row.length) > 0) {
- return possibleRegion;
- }
- }
-
- // Passed all the way through, so we got nothing - complete cache miss
- return null;
- }
-
- /*
- * Delete a cached location for the specified table name and row if it is
- * located on the (optionally) specified old location.
- */
- @Override
- public void deleteCachedLocation(final byte[] tableName, final byte[] row,
- HServerAddress oldServer) {
- synchronized (this.cachedRegionLocations) {
- Map<byte[], HRegionLocation> tableLocations = getTableLocations(tableName);
-
- // start to examine the cache. we can only do cache actions
- // if there's something in the cache for this table.
- if (!tableLocations.isEmpty()) {
- HRegionLocation rl = getCachedLocation(tableName, row);
- if (rl != null) {
- // If oldLocation is specified. deleteLocation only if it is the
- // same.
- if (oldServer != null && !oldServer.equals(rl.getServerAddress()))
- return; // perhaps, some body else cleared and repopulated.
-
- tableLocations.remove(rl.getRegionInfo().getStartKey());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Removed " + rl.getRegionInfo().getRegionNameAsString()
- + " for tableName=" + Bytes.toString(tableName)
- + " from cache " + "because of " + Bytes.toStringBinary(row));
- }
- }
- }
- }
- }
-
- /*
- * Delete all cached entries of a table that maps to a specific location.
- *
- * @param tablename
- *
- * @param server
- */
- protected void clearCachedLocationForServer(final String server) {
- boolean deletedSomething = false;
- synchronized (this.cachedRegionLocations) {
- if (!cachedServers.contains(server)) {
- return;
- }
- for (Map<byte[], HRegionLocation> tableLocations : cachedRegionLocations
- .values()) {
- for (Entry<byte[], HRegionLocation> e : tableLocations.entrySet()) {
- if (e.getValue().getServerAddress().toString().equals(server)) {
- tableLocations.remove(e.getKey());
- deletedSomething = true;
- }
- }
- }
- cachedServers.remove(server);
- }
- if (deletedSomething && LOG.isDebugEnabled()) {
- LOG.debug("Removed all cached region locations that map to " + server);
- }
- }
-
- /*
- * @param tableName
- *
- * @return Map of cached locations for passed <code>tableName</code>
- */
- private ConcurrentSkipListMap<byte[], HRegionLocation> getTableLocations(
- final byte[] tableName) {
- // find the map of cached locations for this table
- Integer key = Bytes.mapKey(tableName);
- ConcurrentSkipListMap<byte[], HRegionLocation> result = this.cachedRegionLocations
- .get(key);
- if (result == null) {
- synchronized (this.cachedRegionLocations) {
- result = this.cachedRegionLocations.get(key);
- if (result == null) {
- // if tableLocations for this table isn't built yet, make one
- result = new ConcurrentSkipListMap<byte[], HRegionLocation>(
- Bytes.BYTES_COMPARATOR);
- this.cachedRegionLocations.put(key, result);
- }
- }
- }
- return result;
- }
-
- /**
- * Allows flushing the region cache.
- */
- public void clearRegionCache() {
- synchronized (this.cachedRegionLocations) {
- cachedRegionLocations.clear();
- cachedServers.clear();
- }
- }
-
- /**
- * Put a newly discovered HRegionLocation into the cache.
- */
- private void cacheLocation(final byte [] tableName,
- final HRegionLocation location) {
- byte [] startKey = location.getRegionInfo().getStartKey();
- boolean hasNewCache;
- synchronized (this.cachedRegionLocations) {
- cachedServers.add(location.getServerAddress().toString());
- hasNewCache = (getTableLocations(tableName).put(startKey, location) == null);
- }
- if (hasNewCache) {
- LOG.debug("Cached location for " +
- location.getRegionInfo().getRegionNameAsString() +
- " is " + location.getServerAddress().toString());
- }
- }
-
- @Override
- public HRegionInterface getHRegionConnection(
- HServerAddress regionServer, boolean getMaster, HBaseRPCOptions options)
- throws IOException {
- if (getMaster) {
- LOG.debug("Getting master");
- getMaster();
- }
- HRegionInterface server = HRegionServer.getMainRS(regionServer);
- if (server != null && !this.useThrift) {
- return server;
- }
-
- final boolean thriftPortWrittenToMeta = conf.getBoolean(
- HConstants.REGION_SERVER_WRITE_THRIFT_INFO_TO_META,
- HConstants.REGION_SERVER_WRITE_THRIFT_INFO_TO_META_DEFAULT);
- final boolean hadoopPortWrittenToMeta = !thriftPortWrittenToMeta;
-
- try {
- // establish an RPC for this RS
- // set hbase.ipc.client.connect.max.retries to retry connection
- // attempts
- if (this.useThrift) {
- Class<? extends ThriftClientInterface> serverInterface =
- ThriftHRegionInterface.class;
- if (thriftPortWrittenToMeta) {
- try {
- server = (HRegionInterface) HBaseThriftRPC.getClient(
- regionServer.getInetSocketAddress(), this.conf,
- serverInterface, options);
- } catch (Exception e) {
- LOG.warn("Exception connecting to the region server on" +
- "the thrift channel. Retrying on the HadoopRPC port", e);
- InetSocketAddress addr = new InetSocketAddress(regionServer
- .getInetSocketAddress().getHostName(), conf.getInt(
- HConstants.REGIONSERVER_PORT,
- HConstants.DEFAULT_REGIONSERVER_PORT));
- server = (HRegionInterface) HBaseRPC.getProxy(serverInterfaceClass,
- HBaseRPCProtocolVersion.versionID, addr, this.conf,
- params.getRpcTimeout(), options);
- }
- } else {
- try {
- InetSocketAddress addr = new InetSocketAddress(regionServer
- .getInetSocketAddress().getHostName(), conf.getInt(
- HConstants.REGIONSERVER_SWIFT_PORT,
- HConstants.DEFAULT_REGIONSERVER_SWIFT_PORT));
- server = (HRegionInterface) HBaseThriftRPC.getClient(addr,
- this.conf, serverInterface, options);
- } catch (Exception e) {
- LOG.warn("Exception connecting to the region server on" +
- "the thrift channel. Retrying on the HadoopRPC port", e);
- server = (HRegionInterface) HBaseRPC.getProxy(serverInterfaceClass,
- HBaseRPCProtocolVersion.versionID,
- regionServer.getInetSocketAddress(), this.conf,
- params.getRpcTimeout(), options);
- }
- }
- } else {
- if (hadoopPortWrittenToMeta) {
- server = (HRegionInterface) HBaseRPC.getProxy(serverInterfaceClass,
- HBaseRPCProtocolVersion.versionID,
- regionServer.getInetSocketAddress(), this.conf,
- params.getRpcTimeout(), options);
- } else {
- // The hadoop port is no longer written to Meta (this will happen
- // when we are reasonably confident about the thrift service, and
- // will soon deprecate RPC). So, try to connect to the default port.
- // TODO gauravm: Verify that this works (t2830553)
- InetSocketAddress addr = new InetSocketAddress(regionServer
- .getInetSocketAddress().getHostName(), conf.getInt(
- HConstants.REGIONSERVER_PORT,
- HConstants.DEFAULT_REGIONSERVER_PORT));
- server = (HRegionInterface) HBaseRPC.getProxy(serverInterfaceClass,
- HBaseRPCProtocolVersion.versionID, addr, this.conf,
- params.getRpcTimeout(), options);
- }
- }
- } catch (RemoteException e) {
- throw RemoteExceptionHandler.decodeRemoteException(e);
- }
-
- return server;
- }
-
- public HRegionInterface getHRegionConnection(HServerAddress regionServer,
- HBaseRPCOptions options) throws IOException {
- return getHRegionConnection(regionServer, false, options);
- }
-
- public HRegionInterface getHRegionConnection(HServerAddress regionServer,
- boolean getMaster) throws IOException {
- return getHRegionConnection(regionServer, getMaster,
- HBaseRPCOptions.DEFAULT);
- }
-
- public HRegionInterface getHRegionConnection(HServerAddress regionServer)
- throws IOException {
- return getHRegionConnection(regionServer, false);
- }
-
- public synchronized ZooKeeperWrapper getZooKeeperWrapper()
- throws IOException {
- return HConnectionManager.getClientZKConnection(conf)
- .getZooKeeperWrapper();
- }
-
- /*
- * Repeatedly try to find the root region in ZK
- *
- * @return HRegionLocation for root region if found
- *
- * @throws NoServerForRegionException - if the root region can not be
- * located after retrying
- *
- * @throws IOException
- */
- private HRegionLocation locateRootRegion() throws IOException {
-
- // We lazily instantiate the ZooKeeper object because we don't want to
- // make the constructor have to throw IOException or handle it itself.
- ZooKeeperWrapper zk = getZooKeeperWrapper();
-
- HServerAddress rootRegionAddress = null;
- for (int tries = 0; tries < params.getNumRetries(); tries++) {
- int localTimeouts = 0;
- // ask the master which server has the root region
- while (rootRegionAddress == null
- && localTimeouts < params.getNumRetries()) {
- // Don't read root region until we're out of safe mode so we know
- // that the meta regions have been assigned.
- rootRegionAddress = zk.readRootRegionLocation();
- if (rootRegionAddress == null) {
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Sleeping " + params.getPauseTime(tries)
- + "ms, waiting for root region.");
- }
- Thread.sleep(params.getPauseTime(tries));
- } catch (InterruptedException iex) {
- // continue
- }
- localTimeouts++;
- }
- }
-
- if (rootRegionAddress == null) {
- throw new NoServerForRegionException(
- "Timed out trying to locate root region");
- }
-
- LOG.debug("Trying to get root region at : " + rootRegionAddress);
- try {
- // Get a connection to the region server
- HRegionInterface server = getHRegionConnection(rootRegionAddress);
- // if this works, then we're good, and we have an acceptable address,
- // so we can stop doing retries and return the result.
- server.getRegionInfo(HRegionInfo.ROOT_REGIONINFO.getRegionName());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Found ROOT at " + rootRegionAddress);
- }
- break;
- } catch (Throwable t) {
- t = translateException(t);
-
- if (tries == params.getNumRetries() - 1) {
- throw new NoServerForRegionException("Timed out trying to locate "+
- "root region because: " + t.getMessage());
- }
-
- // Sleep and retry finding root region.
- try {
- LOG.debug("Root region location changed. Sleeping.", t);
- Thread.sleep(params.getPauseTime(tries));
- LOG.debug("Wake. Retry finding root region.");
- } catch (InterruptedException iex) {
- // continue
- }
- }
-
- rootRegionAddress = null;
- }
-
- // if the address is null by this point, then the retries have failed,
- // and we're sort of sunk
- if (rootRegionAddress == null) {
- throw new NoServerForRegionException(
- "unable to locate root region server");
- }
-
- // return the region location
- return new HRegionLocation(HRegionInfo.ROOT_REGIONINFO, rootRegionAddress);
- }
-
- @Override
- public <T> T getRegionServerWithRetries(ServerCallable<T> callable)
- throws IOException {
- List<Throwable> exceptions = new ArrayList<Throwable>();
- RegionOverloadedException roe = null;
-
- long callStartTime;
- int serverRequestedRetries = 0;
-
- callStartTime = System.currentTimeMillis();
- long serverRequestedWaitTime = 0;
- // do not retry if region cannot be located. There are enough retries
- // within instantiateRegionLocation.
- callable.instantiateRegionLocation(false /* reload cache? */);
-
- for (int tries = 0;; tries++) {
- // If server requested wait. We will wait for that time, and start
- // again. Do not count this time/tries against the client retries.
- if (serverRequestedWaitTime > 0) {
- serverRequestedRetries++;
-
- if (serverRequestedRetries > params.getMaxServerRequestedRetries()) {
- throw RegionOverloadedException.create(roe, exceptions,
- serverRequestedWaitTime);
- }
-
- long pauseTime = serverRequestedWaitTime + callStartTime
- - System.currentTimeMillis();
- LOG.debug("Got a BlockingWritesRetryLaterException: sleeping for "
- + pauseTime + "ms. serverRequestedRetries = "
- + serverRequestedRetries);
- try {
- Thread.sleep(pauseTime);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new InterruptedIOException();
- }
-
- serverRequestedWaitTime = 0;
- tries = 0;
- callStartTime = System.currentTimeMillis();
- }
-
- try {
- return getRegionServerWithoutRetries(callable, false);
- } catch (DoNotRetryIOException ioe) {
- // clear cache if needed
- if (ioe.getCause() instanceof NotServingRegionException) {
- HRegionLocation prevLoc = callable.location;
- if (prevLoc.getRegionInfo() != null) {
- deleteCachedLocation(callable.tableName, prevLoc.getRegionInfo()
- .getStartKey(), prevLoc.getServerAddress());
- }
- }
-
- // If we are not supposed to retry; Let it pass through.
- throw ioe;
- } catch (RegionOverloadedException ex) {
- roe = ex;
- serverRequestedWaitTime = roe.getBackoffTimeMillis();
- continue;
- } catch (ClientSideDoNotRetryException exp) {
- // Bail out of the retry loop, immediately
- throw exp;
- } catch (PreemptiveFastFailException pfe) {
- // Bail out of the retry loop, if the host has been consistently
- // unreachable.
- throw pfe;
- } catch (Throwable t) {
- exceptions.add(t);
-
- if (tries == params.getNumRetries() - 1) {
- throw new RetriesExhaustedException(callable.getServerName(),
- callable.getRegionName(), callable.getRow(), tries, exceptions);
- }
-
- HRegionLocation prevLoc = callable.location;
- if (prevLoc.getRegionInfo() != null) {
- deleteCachedLocation(callable.tableName, prevLoc.getRegionInfo()
- .getStartKey(), prevLoc.getServerAddress());
- }
-
- try {
- // do not retry if getting the location throws exception
- callable.instantiateRegionLocation(false /* reload cache ? */);
- } catch (IOException e) {
- exceptions.add(e);
- throw new RetriesExhaustedException(callable.getServerName(),
- callable.getRegionName(), callable.getRow(), tries, exceptions);
- }
- if (prevLoc.getServerAddress().equals(
- callable.location.getServerAddress())) {
- // Bail out of the retry loop if we have to wait too long
- long pauseTime = params.getPauseTime(tries);
- if ((System.currentTimeMillis() - callStartTime + pauseTime) > params
- .getRpcRetryTimeout()) {
- throw new RetriesExhaustedException(callable.getServerName(),
- callable.getRegionName(), callable.getRow(), tries,
- exceptions);
- }
- LOG.debug("getRegionServerWithRetries failed, sleeping for "
- + pauseTime + "ms. tries = " + tries, t);
- try {
- Thread.sleep(pauseTime);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new InterruptedIOException();
- }
- // do not reload cache. While we were sleeping hopefully the cache
- // has been re-populated.
- callable.instantiateRegionLocation(false);
- } else {
- LOG.debug("getRegionServerWithRetries failed, "
- + "region moved from " + prevLoc + " to " + callable.location
- + "retrying immediately tries=" + tries, t);
- }
- }
- }
- }
-
- /**
- * Pass in a ServerCallable with your particular bit of logic defined and
- * this method will pass it to the defined region server.
- *
- * @param <T>
- * the type of the return value
- * @param callable
- * callable to run
- * @return an object of type T
- * @throws IOException
- * if a remote or network exception occurs
- * @throws RuntimeException
- * other unspecified error
- * @throws PreemptiveFastFailException
- * if the remote host has been known to be unreachable for more
- * than this.fastFailThresholdMilliSec.
- */
- public <T> T getRegionServerWithoutRetries(ServerCallable<T> callable,
- boolean instantiateRegionLocation) throws IOException,
- RuntimeException, PreemptiveFastFailException {
- FailureInfo fInfo = null;
- HServerAddress server = null;
- boolean didTry = false;
- MutableBoolean couldNotCommunicateWithServer = new MutableBoolean(false);
- boolean retryDespiteFastFailMode = false;
- try {
- if (instantiateRegionLocation) {
- callable.instantiateRegionLocation(false);
- }
- // Logic to fast fail requests to unreachable servers.
- server = callable.getServerAddress();
- fInfo = repeatedFailuresMap.get(server);
-
- if (inFastFailMode(server) && !currentThreadInFastFailMode()) {
- // In Fast-fail mode, all but one thread will fast fail. Check
- // if we are that one chosen thread.
- retryDespiteFastFailMode = shouldRetryInspiteOfFastFail(fInfo);
- if (retryDespiteFastFailMode == false) { // we don't have to retry
- throw new PreemptiveFastFailException(
- fInfo.numConsecutiveFailures.get(),
- fInfo.timeOfFirstFailureMilliSec,
- fInfo.timeOfLatestAttemptMilliSec, server.getHostname());
- }
- }
- didTry = true;
- callable.instantiateServer();
- return callable.call();
- } catch (PreemptiveFastFailException pfe) {
- throw pfe;
- } catch (ClientSideDoNotRetryException exp) {
- throw exp;
- } catch (Throwable t1) {
- handleThrowable(t1, callable, couldNotCommunicateWithServer);
- return null;
- } finally {
- updateFailureInfoForServer(server, fInfo, didTry,
- couldNotCommunicateWithServer.booleanValue(),
- retryDespiteFastFailMode);
- }
- }
-
- @Override
- public <T> T getRegionServerWithoutRetries(ServerCallable<T> callable)
- throws IOException, RuntimeException, PreemptiveFastFailException {
- return this.getRegionServerWithoutRetries(callable, true);
- }
-
- private <T> void updateClientContext(final ServerCallable<T> callable,
- final Throwable t) {
- if (!recordClientContext) {
- return;
- }
-
- List<OperationContext> currContext = this.operationContextPerThread.get();
- if (currContext == null) {
- currContext = new ArrayList<OperationContext>();
- this.operationContextPerThread.set(currContext);
- }
-
- currContext.add(new OperationContext(callable.location, t));
- }
-
- /**
- * Handles failures encountered when communicating with a server.
- *
- * Updates the FailureInfo in repeatedFailuresMap to reflect the failure.
- * Throws RepeatedConnectException if the client is in Fast fail mode.
- *
- * @param server
- * @param t
- * - the throwable to be handled.
- * @throws PreemptiveFastFailException
- */
- private void handleFailureToServer(HServerAddress server, Throwable t) {
- if (server == null || t == null)
- return;
-
- long currentTime = System.currentTimeMillis();
- FailureInfo fInfo = repeatedFailuresMap.get(server);
- if (fInfo == null) {
- fInfo = new FailureInfo(currentTime);
- FailureInfo oldfInfo = repeatedFailuresMap.putIfAbsent(server, fInfo);
-
- if (oldfInfo != null) {
- fInfo = oldfInfo;
- }
- }
- fInfo.timeOfLatestAttemptMilliSec = currentTime;
- fInfo.numConsecutiveFailures.incrementAndGet();
-
- if (inFastFailMode(server)) {
- // In FastFail mode, do not clear out the cache if it was done recently.
- if (currentTime > fInfo.timeOfLatestCacheClearMilliSec
- + cacheClearingTimeoutMilliSec) {
- fInfo.timeOfLatestCacheClearMilliSec = currentTime;
- clearCachedLocationForServer(server.toString());
- }
- LOG.error("Exception in FastFail mode : " + t.toString());
- return;
- }
-
- // if thrown these exceptions, we clear all the cache entries that
- // map to that slow/dead server; otherwise, let cache miss and ask
- // .META. again to find the new location
- fInfo.timeOfLatestCacheClearMilliSec = currentTime;
- clearCachedLocationForServer(server.toString());
- }
-
- /**
- * Occasionally cleans up unused information in repeatedFailuresMap.
- *
- * repeatedFailuresMap stores the failure information for all remote hosts
- * that had failures. In order to avoid these from growing indefinitely,
- * occassionallyCleanupFailureInformation() will clear these up once every
- * cleanupInterval ms.
- */
- private void occasionallyCleanupFailureInformation() {
- long now = System.currentTimeMillis();
- if (!(now > lastFailureMapCleanupTimeMilliSec
- + failureMapCleanupIntervalMilliSec))
- return;
-
- // remove entries that haven't been attempted in a while
- // No synchronization needed. It is okay if multiple threads try to
- // remove the entry again and again from a concurrent hash map.
- StringBuilder sb = new StringBuilder();
- for (Entry<HServerAddress, FailureInfo> entry : repeatedFailuresMap
- .entrySet()) {
- if (now > entry.getValue().timeOfLatestAttemptMilliSec
- + failureMapCleanupIntervalMilliSec) { // no recent failures
- repeatedFailuresMap.remove(entry.getKey());
- } else if (now > entry.getValue().timeOfFirstFailureMilliSec
- + this.fastFailClearingTimeMilliSec) { // been failing for a long
- // time
- LOG.error(entry.getKey()
- + " been failing for a long time. clearing out."
- + entry.getValue().toString());
- repeatedFailuresMap.remove(entry.getKey());
- } else {
- sb.append(entry.getKey().toString() + " failing "
- + entry.getValue().toString() + "\n");
- }
- }
- if (sb.length() > 0
- // If there are multiple threads cleaning up, try to see that only one
- // will log the msg.
- && now > this.lastFailureMapCleanupTimeMilliSec
- + this.failureMapCleanupIntervalMilliSec) {
- LOG.warn("Preemptive failure enabled for : " + sb.toString());
- }
- lastFailureMapCleanupTimeMilliSec = now;
- }
-
- /**
- * Checks to see if we are in the Fast fail mode for requests to the server.
- *
- * If a client is unable to contact a server for more than
- * fastFailThresholdMilliSec the client will get into fast fail mode.
- *
- * @param server
- * @return true if the client is in fast fail mode for the server.
- */
- private boolean inFastFailMode(HServerAddress server) {
- FailureInfo fInfo = repeatedFailuresMap.get(server);
- // if fInfo is null --> The server is considered good.
- // If the server is bad, wait long enough to believe that the server is
- // down.
- return (fInfo != null && System.currentTimeMillis() > fInfo.timeOfFirstFailureMilliSec
- + this.fastFailThresholdMilliSec);
- }
-
- /**
- * Checks to see if the current thread is already in FastFail mode for
- * *some* server.
- *
- * @return true, if the thread is already in FF mode.
- */
- private boolean currentThreadInFastFailMode() {
- return (this.threadRetryingInFastFailMode.get() != null && this.threadRetryingInFastFailMode
- .get().booleanValue() == true);
- }
-
- /**
- * Check to see if the client should try to connnect to the server, inspite
- * of knowing that it is in the fast fail mode.
- *
- * The idea here is that we want just one client thread to be actively
- * trying to reconnect, while all the other threads trying to reach the
- * server will short circuit.
- *
- * @param fInfo
- * @return true if the client should try to connect to the server.
- */
- private boolean shouldRetryInspiteOfFastFail(FailureInfo fInfo) {
- // We believe that the server is down, But, we want to have just one
- // client
- // actively trying to connect. If we are the chosen one, we will retry
- // and not throw an exception.
- if (fInfo != null
- && fInfo.exclusivelyRetringInspiteOfFastFail.compareAndSet(false,
- true)) {
- MutableBoolean threadAlreadyInFF = this.threadRetryingInFastFailMode
- .get();
- if (threadAlreadyInFF == null) {
- threadAlreadyInFF = new MutableBoolean();
- this.threadRetryingInFastFailMode.set(threadAlreadyInFF);
- }
- threadAlreadyInFF.setValue(true);
-
- return true;
- } else {
- return false;
- }
- }
-
- /**
- * updates the failure information for the server.
- *
- * @param server
- * @param fInfo
- * @param couldNotCommunicate
- * @param retryDespiteFastFailMode
- */
- private void updateFailureInfoForServer(HServerAddress server,
- FailureInfo fInfo, boolean didTry, boolean couldNotCommunicate,
- boolean retryDespiteFastFailMode) {
- if (server == null || fInfo == null || didTry == false)
- return;
-
- // If we were able to connect to the server, reset the failure
- // information.
- if (couldNotCommunicate == false) {
- LOG.info("Clearing out PFFE for server " + server.getHostname());
- repeatedFailuresMap.remove(server);
- } else {
- // update time of last attempt
- long currentTime = System.currentTimeMillis();
- fInfo.timeOfLatestAttemptMilliSec = currentTime;
-
- // Release the lock if we were retrying inspite of FastFail
- if (retryDespiteFastFailMode) {
- fInfo.exclusivelyRetringInspiteOfFastFail.set(false);
- threadRetryingInFastFailMode.get().setValue(false);
- }
- }
-
- occasionallyCleanupFailureInformation();
- }
-
- public void updateFailureInfoForServer(HServerAddress server,
- boolean didTry, boolean couldNotCommunicate) {
- FailureInfo fInfo = repeatedFailuresMap.get(server);
- boolean retryDespiteFastFailMode = false;
- if (inFastFailMode(server) && !currentThreadInFastFailMode()) {
- // In Fast-fail mode, all but one thread will fast fail. Check
- // if we are that one chosen thread.
- retryDespiteFastFailMode = shouldRetryInspiteOfFastFail(fInfo);
- }
-
- updateFailureInfoForServer(server, fInfo, didTry, couldNotCommunicate, retryDespiteFastFailMode);
- }
-
- public void handleThrowable(Throwable t1, ServerCallable<?> callable,
- MutableBoolean couldNotCommunicateWithServer)
- throws IOException {
- Throwable t2 = translateException(t1);
- boolean isLocalException = !(t2 instanceof RemoteException);
- // translateException throws DoNotRetryException or any
- // non-IOException.
- if (isLocalException && isNetworkException(t2)) {
- couldNotCommunicateWithServer.setValue(true);
- handleFailureToServer(callable.getServerAddress(), t2);
- }
-
- updateClientContext(callable, t2);
- if (t2 instanceof IOException) {
- throw (IOException) t2;
- } else {
- throw new RuntimeException(t2);
- }
- }
-
- private Callable<Long> createGetServerStartCodeCallable(
- final HServerAddress address, final HBaseRPCOptions options) {
- final HConnection connection = this;
- return new Callable<Long>() {
- public Long call() throws IOException {
- return getRegionServerWithoutRetries(new ServerCallableForBatchOps<Long>(
- connection, address, options) {
- public Long call() throws IOException {
- return server.getStartCode();
- }
- });
- }
- };
- }
-
- private Callable<Long> createCurrentTimeCallable(
- final HServerAddress address, final HBaseRPCOptions options) {
- final HConnection connection = this;
- return new Callable<Long>() {
- public Long call() throws IOException {
- return getRegionServerWithoutRetries(new ServerCallableForBatchOps<Long>(
- connection, address, options) {
- public Long call() throws IOException {
- return server.getCurrentTimeMillis();
- }
- });
- }
- };
- }
-
- private Callable<MapWritable> createGetLastFlushTimesCallable(
- final HServerAddress address, final HBaseRPCOptions options) {
- final HConnection connection = this;
- return new Callable<MapWritable>() {
- public MapWritable call() throws IOException {
- return getRegionServerWithoutRetries(new ServerCallableForBatchOps<MapWritable>(
- connection, address, options) {
- public MapWritable call() throws IOException {
- return server.getLastFlushTimes();
- }
- });
- }
- };
- }
-
- private Callable<Void> createFlushCallable(final HServerAddress address,
- final HRegionInfo region, final long targetFlushTime,
- final HBaseRPCOptions options) {
- final HConnection connection = this;
- return new Callable<Void>() {
- public Void call() throws IOException {
- return getRegionServerWithoutRetries(new ServerCallableForBatchOps<Void>(
- connection, address, options) {
- public Void call() throws IOException {
- server.flushRegion(region.getRegionName(), targetFlushTime);
- return null;
- }
- });
- }
- };
- }
-
- private Callable<MultiResponse> createMultiActionCallable(
- final HServerAddress address, final MultiAction multi,
- final byte[] tableName, final HBaseRPCOptions options) {
- final HConnection connection = this;
- // no need to track mutations here. Done at the caller.
- return new Callable<MultiResponse>() {
- public MultiResponse call() throws IOException {
- return getRegionServerWithoutRetries(
- new ServerCallableForBatchOps<MultiResponse>(connection, address,
- options) {
- public MultiResponse call() throws IOException,
- InterruptedException, ExecutionException {
- return server.multiAction(multi);
- }
- }, true);
- }
- };
- }
-
- private HRegionLocation getRegionLocationForRowWithRetries(
- byte[] tableName, byte[] rowKey, boolean reload) throws IOException {
- boolean reloadFlag = reload;
- List<Throwable> exceptions = new ArrayList<Throwable>();
- HRegionLocation location = null;
- int tries = 0;
- for (; tries < params.getNumRetries();) {
- try {
- location = getRegionLocation(tableName, rowKey, reloadFlag);
- } catch (Throwable t) {
- exceptions.add(t);
- }
- if (location != null) {
- break;
- }
- reloadFlag = true;
- tries++;
- try {
- Thread.sleep(params.getPauseTime(tries));
- } catch (InterruptedException e) {
- // continue
- }
- }
- if (location == null) {
- throw new RetriesExhaustedException(
- " -- nothing found, no 'location' returned," + " tableName="
- + Bytes.toString(tableName) + ", reload=" + reload + " --",
- HConstants.EMPTY_BYTE_ARRAY, rowKey, tries, exceptions);
- }
- return location;
- }
-
- private <R extends Row> Map<HServerAddress, MultiAction> splitRequestsByRegionServer(
- List<R> workingList, final byte[] tableName, boolean isGets)
- throws IOException {
- Map<HServerAddress, MultiAction> actionsByServer = new HashMap<HServerAddress, MultiAction>();
- for (int i = 0; i < workingList.size(); i++) {
- Row row = workingList.get(i);
- if (row != null) {
- HRegionLocation loc = locateRegion(tableName, row.getRow(), true);
-
- byte[] regionName = loc.getRegionInfo().getRegionName();
-
- MultiAction actions = actionsByServer.get(loc.getServerAddress());
- if (actions == null) {
- actions = new MultiAction();
- actionsByServer.put(loc.getServerAddress(), actions);
- }
-
- if (isGets) {
- actions.addGet(regionName, (Get) row, i);
- } else {
- trackMutationsToTable(tableName, loc);
- actions.mutate(regionName, (Mutation) row);
- }
- }
- }
- return actionsByServer;
- }
-
- private Map<HServerAddress, Future<MultiResponse>> makeServerRequests(
- Map<HServerAddress, MultiAction> actionsByServer,
- final byte[] tableName, ExecutorService pool, HBaseRPCOptions options) {
-
- Map<HServerAddress, Future<MultiResponse>> futures = new HashMap<HServerAddress, Future<MultiResponse>>(
- actionsByServer.size());
-
- boolean singleServer = (actionsByServer.size() == 1);
- for (Entry<HServerAddress, MultiAction> e : actionsByServer.entrySet()) {
- Callable<MultiResponse> callable = createMultiActionCallable(
- e.getKey(), e.getValue(), tableName, options);
- Future<MultiResponse> task;
- if (singleServer) {
- task = new FutureTask<MultiResponse>(callable);
- ((FutureTask<MultiResponse>) task).run();
- } else {
- task = pool.submit(callable);
- }
- futures.put(e.getKey(), task);
- }
- return futures;
- }
-
- /*
- * Collects responses from each of the RegionServers. If there are failures,
- * a list of failed operations is returned.
- */
- private List<Mutation> collectResponsesForMutateFromAllRS(byte[] tableName,
- Map<HServerAddress, MultiAction> actionsByServer,
- Map<HServerAddress, Future<MultiResponse>> futures,
- Map<String, HRegionFailureInfo> failureInfo)
- throws InterruptedException, IOException {
-
- List<Mutation> newWorkingList = null;
- for (Entry<HServerAddress, Future<MultiResponse>> responsePerServer : futures
- .entrySet()) {
- HServerAddress address = responsePerServer.getKey();
- MultiAction request = actionsByServer.get(address);
-
- Future<MultiResponse> future = responsePerServer.getValue();
- MultiResponse resp = null;
-
- try {
- resp = future.get();
- } catch (InterruptedException ie) {
- throw ie;
- } catch (ExecutionException e) {
- if (e.getCause() instanceof DoNotRetryIOException)
- throw (DoNotRetryIOException) e.getCause();
- }
-
- // If we got a response. Let us go through the responses from each
- // region and
- // process the Puts and Deletes.
- // If the response is null, we will add it to newWorkingList here.
- if (request.getDeletes() != null) {
- newWorkingList = processMutationResponseFromOneRegionServer(
- tableName, address, resp, request.getDeletes(), newWorkingList,
- true, failureInfo);
- }
- if (request.getPuts() != null) {
- newWorkingList = processMutationResponseFromOneRegionServer(
- tableName, address, resp, request.getPuts(), newWorkingList,
- false, failureInfo);
- }
- }
- return newWorkingList;
- }
-
- /*
- * Collects responses from each of the RegionServers and populates the
- * result array. If there are failures, a list of failed operations is
- * returned.
- */
- private List<Get> collectResponsesForGetFromAllRS(byte[] tableName,
- Map<HServerAddress, MultiAction> actionsByServer,
- Map<HServerAddress, Future<MultiResponse>> futures,
- List<Get> orig_list, Result[] results,
- Map<String, HRegionFailureInfo> failureInfo) throws IOException,
- InterruptedException {
-
- List<Get> newWorkingList = null;
- for (Entry<HServerAddress, Future<MultiResponse>> responsePerServer : futures
- .entrySet()) {
- HServerAddress address = responsePerServer.getKey();
- MultiAction request = actionsByServer.get(address);
-
- Future<MultiResponse> future = responsePerServer.getValue();
- MultiResponse resp = null;
-
- try {
- resp = future.get();
- } catch (InterruptedException ie) {
- throw ie;
- } catch (ExecutionException e) {
- if (e.getCause() instanceof DoNotRetryIOException) {
- throw (DoNotRetryIOException) e.getCause();
- }
- e.printStackTrace();
- }
-
- if (resp == null) {
- // Entire server failed
- LOG.debug("Failed all for server: " + address
- + ", removing from cache");
- }
-
- newWorkingList = processGetResponseFromOneRegionServer(tableName,
- address, request, resp, orig_list, newWorkingList, results,
- failureInfo);
- }
- return newWorkingList;
- }
-
- private <R extends Mutation> List<Mutation> processMutationResponseFromOneRegionServer(
- byte[] tableName, HServerAddress address, MultiResponse resp,
- Map<byte[], List<R>> map, List<Mutation> newWorkingList,
- boolean isDelete, Map<String, HRegionFailureInfo> failureInfo)
- throws IOException {
- // If we got a response. Let us go through the responses from each region
- // and
- // process the Puts and Deletes.
- for (Map.Entry<byte[], List<R>> e : map.entrySet()) {
- byte[] regionName = e.getKey();
- List<R> regionOps = map.get(regionName);
-
- long result = 0;
- try {
- if (isDelete)
- result = resp.getDeleteResult(regionName);
- else
- result = resp.getPutResult(regionName);
-
- if (result != -1) {
- if (newWorkingList == null)
- newWorkingList = new ArrayList<Mutation>();
-
- newWorkingList.addAll(regionOps.subList((int) result,
- regionOps.size()));
- }
- } catch (Exception ex) {
- String serverName = address.getHostname();
- String regName = Bytes.toStringBinary(regionName);
- // If response is null, we will catch a NPE here.
- translateException(ex);
-
- if (!failureInfo.containsKey(regName)) {
- failureInfo.put(regName, new HRegionFailureInfo(regName));
- }
- failureInfo.get(regName).addException(ex);
- failureInfo.get(regName).setServerName(serverName);
-
- if (newWorkingList == null)
- newWorkingList = new ArrayList<Mutation>();
-
- newWorkingList.addAll(regionOps);
- // enough to remove from cache one of the rows from the region
- deleteCachedLocation(tableName, regionOps.get(0).getRow(), address);
- }
- }
-
- return newWorkingList;
- }
-
- private List<Get> processGetResponseFromOneRegionServer(byte[] tableName,
- HServerAddress address, MultiAction request, MultiResponse resp,
- List<Get> orig_list, List<Get> newWorkingList, Result[] results,
- Map<String, HRegionFailureInfo> failureInfo) throws IOException {
-
- for (Map.Entry<byte[], List<Get>> e : request.getGets().entrySet()) {
- byte[] regionName = e.getKey();
- List<Get> regionGets = request.getGets().get(regionName);
- List<Integer> origIndices = request.getOriginalIndex().get(regionName);
-
- Result[] getResult;
- try {
- getResult = resp.getGetResult(regionName);
-
- // fill up the result[] array accordingly
- assert (origIndices.size() == getResult.length);
[... 1271 lines stripped ...]