You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/04/02 22:49:09 UTC
svn commit: r1584162 [4/5] - in /hbase/branches/0.89-fb/src:
main/java/org/apache/hadoop/hbase/client/
test/java/org/apache/hadoop/hbase/client/
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TableServers.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TableServers.java?rev=1584162&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TableServers.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/TableServers.java Wed Apr 2 20:49:09 2014
@@ -0,0 +1,3512 @@
+package org.apache.hadoop.hbase.client;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.SyncFailedException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.lang.mutable.MutableBoolean;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
+import org.apache.hadoop.hbase.ipc.HBaseRPC;
+import org.apache.hadoop.hbase.ipc.HBaseRPCOptions;
+import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
+import org.apache.hadoop.hbase.ipc.HConnectionParams;
+import org.apache.hadoop.hbase.ipc.HMasterInterface;
+import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.ipc.ThriftClientInterface;
+import org.apache.hadoop.hbase.ipc.ThriftHRegionInterface;
+import org.apache.hadoop.hbase.ipc.thrift.HBaseThriftRPC;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.RegionOverloadedException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.MetaUtils;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.thrift.transport.TTransportException;
+
+import com.google.common.base.Preconditions;
+
+/* Encapsulates finding the servers for an HBase instance */
+public class TableServers implements ServerConnection {
+ static final Log LOG = LogFactory.getLog(TableServers.class);
+ private final Class<? extends HRegionInterface> serverInterfaceClass;
+ private final int prefetchRegionLimit;
+
+ private final Object masterLock = new Object();
+ private volatile boolean closed;
+ private volatile HMasterInterface master;
+ private volatile boolean masterChecked;
+
+ private final Object rootRegionLock = new Object();
+ private final Object metaRegionLock = new Object();
+ private final Object userRegionLock = new Object();
+
+ private volatile Configuration conf;
+ private final HConnectionParams params;
+
+ // Used by master and region servers during safe mode only
+ private volatile HRegionLocation rootRegionLocation;
+
+ private final Map<Integer, ConcurrentSkipListMap<byte[], HRegionLocation>> cachedRegionLocations = new ConcurrentHashMap<Integer, ConcurrentSkipListMap<byte[], HRegionLocation>>();
+
+ // amount of time to wait before we consider a server to be in fast fail
+ // mode
+ protected long fastFailThresholdMilliSec;
+ // Keeps track of failures when we cannot talk to a server. Helps in
+ // fast failing clients if the server is down for a long time.
+ protected final ConcurrentMap<HServerAddress, FailureInfo> repeatedFailuresMap = new ConcurrentHashMap<HServerAddress, FailureInfo>();
+ // We populate repeatedFailuresMap every time there is a failure. So, to
+ // keep it
+ // from growing unbounded, we garbage collect the failure information
+ // every cleanupInterval.
+ protected final long failureMapCleanupIntervalMilliSec;
+ protected volatile long lastFailureMapCleanupTimeMilliSec;
+ // Amount of time that has to pass, before we clear region -> regionserver
+ // cache
+ // again, when in fast fail mode. This is used to clean unused entries.
+ protected long cacheClearingTimeoutMilliSec;
+ // clear failure Info. Used to clean out all entries.
+ // A safety valve, in case the client does not exit the
+ // fast fail mode for any reason.
+ private long fastFailClearingTimeMilliSec;
+ private final boolean recordClientContext;
+
+ private ThreadLocal<List<OperationContext>> operationContextPerThread = new ThreadLocal<List<OperationContext>>();
+
+ @Override
+ public void resetOperationContext() {
+ if (!recordClientContext || this.operationContextPerThread == null) {
+ return;
+ }
+
+ List<OperationContext> currContext = this.operationContextPerThread.get();
+
+ if (currContext != null) {
+ currContext.clear();
+ }
+ }
+
+ @Override
+ public List<OperationContext> getAndResetOperationContext() {
+ if (!recordClientContext || this.operationContextPerThread == null) {
+ return null;
+ }
+
+ List<OperationContext> currContext = this.operationContextPerThread.get();
+
+ if (currContext == null) {
+ return null;
+ }
+
+ ArrayList<OperationContext> context = new ArrayList<OperationContext>(
+ currContext);
+
+ // Made a copy, clear the context
+ currContext.clear();
+
+ return context;
+ }
+
+ /**
+ * Keeps track of repeated failures to any region server.
+ *
+ * @author amitanand.s
+ *
+ */
+ protected class FailureInfo {
+ // The number of consecutive failures.
+ private final AtomicLong numConsecutiveFailures = new AtomicLong();
+ // The time when the server started to become unresponsive
+ // Once set, this would never be updated.
+ private long timeOfFirstFailureMilliSec;
+ // The time when the client last tried to contact the server.
+ // This is only updated by one client at a time
+ private volatile long timeOfLatestAttemptMilliSec;
+ // The time when the client last cleared cache for regions assigned
+ // to the server. Used to ensure we don't clearCache too often.
+ private volatile long timeOfLatestCacheClearMilliSec;
+ // Used to keep track of concurrent attempts to contact the server.
+ // In Fast fail mode, we want just one client thread to try to connect
+ // the rest of the client threads will fail fast.
+ private final AtomicBoolean exclusivelyRetringInspiteOfFastFail = new AtomicBoolean(
+ false);
+
+ @Override
+ public String toString() {
+ return "FailureInfo: numConsecutiveFailures = "
+ + numConsecutiveFailures + " timeOfFirstFailureMilliSec = "
+ + timeOfFirstFailureMilliSec + " timeOfLatestAttemptMilliSec = "
+ + timeOfLatestAttemptMilliSec
+ + " timeOfLatestCacheClearMilliSec = "
+ + timeOfLatestCacheClearMilliSec
+ + " exclusivelyRetringInspiteOfFastFail = "
+ + exclusivelyRetringInspiteOfFastFail.get();
+ }
+
+ FailureInfo(long firstFailureTime) {
+ this.timeOfFirstFailureMilliSec = firstFailureTime;
+ }
+ }
+
+ private final ThreadLocal<MutableBoolean> threadRetryingInFastFailMode = new ThreadLocal<MutableBoolean>();
+
+ // For TESTING purposes only;
+ public Map<HServerAddress, FailureInfo> getFailureMap() {
+ return repeatedFailuresMap;
+ }
+
+ // The presence of a server in the map implies it's likely that there is an
+ // entry in cachedRegionLocations that map to this server; but the absence
+ // of a server in this map guarantees that there is no entry in cache that
+ // maps to the absent server.
+ private final Set<String> cachedServers =
+ new HashSet<String>();
+
+ // region cache prefetch is enabled by default. this set contains all
+ // tables whose region cache prefetch are disabled.
+ private final Set<Integer> regionCachePrefetchDisabledTables =
+ new CopyOnWriteArraySet<Integer>();
+ // keep track of servers that have been updated for batchedLoad
+ // tablename -> Map
+ Map<String, ConcurrentMap<HRegionInfo, HRegionLocation>> batchedUploadUpdatesMap;
+ private int batchedUploadSoftFlushRetries;
+ private long batchedUploadSoftFlushTimeoutMillis;
+ private final boolean useThrift;
+
+ private ConcurrentSkipListSet<byte[]> initializedTableSet =
+ new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
+ /**
+ * constructor
+ * @param conf Configuration object
+ */
+ @SuppressWarnings("unchecked")
+ public TableServers(Configuration conf) {
+ this.conf = conf;
+ params = HConnectionParams.getInstance(conf);
+ this.useThrift = conf.getBoolean(HConstants.CLIENT_TO_RS_USE_THRIFT,
+ HConstants.CLIENT_TO_RS_USE_THRIFT_DEFAULT);
+
+ String serverClassName =
+ conf.get(HConstants.REGION_SERVER_CLASS,
+ HConstants.DEFAULT_REGION_SERVER_CLASS);
+
+ this.closed = false;
+
+ try {
+ this.serverInterfaceClass =
+ (Class<? extends HRegionInterface>) Class.forName(serverClassName);
+
+ } catch (ClassNotFoundException e) {
+ throw new UnsupportedOperationException(
+ "Unable to find region server interface " + serverClassName, e);
+ }
+
+ // TODO move parameters below into HConnectionParams
+ this.cacheClearingTimeoutMilliSec = conf.getLong(
+ "hbase.client.fastfail.cache.clear.interval",
+ 10000); // 10 sec
+ this.fastFailThresholdMilliSec = conf.getLong(
+ "hbase.client.fastfail.threshold",
+ 60000); // 1 min
+ this.failureMapCleanupIntervalMilliSec = conf.getLong(
+ "hbase.client.fastfail.cleanup.map.interval.millisec", 600000); // 10 min
+ this.fastFailClearingTimeMilliSec = conf.getLong(
+ "hbase.client.fastfail.cleanup.all.millisec", 900000); // 15 mins
+
+ this.prefetchRegionLimit = conf.getInt("hbase.client.prefetch.limit",
+ 10);
+
+ this.master = null;
+ this.masterChecked = false;
+ this.batchedUploadSoftFlushRetries =
+ conf.getInt("hbase.client.batched-upload.softflush.retries", 10);
+ this.batchedUploadSoftFlushTimeoutMillis =
+ conf.getLong("hbase.client.batched-upload.softflush.timeout.ms", 60000L); // 1 min
+ batchedUploadUpdatesMap = new ConcurrentHashMap<String,
+ ConcurrentMap<HRegionInfo, HRegionLocation>>();
+
+ this.recordClientContext = conf.getBoolean("hbase.client.record.context", false);
+
+ }
+
+ // Used by master and region servers during safe mode only
+ @Override
+ public void unsetRootRegionLocation() {
+ this.rootRegionLocation = null;
+ }
+
+ // Used by master and region servers during safe mode only
+ @Override
+ public void setRootRegionLocation(HRegionLocation rootRegion) {
+ if (rootRegion == null) {
+ throw new IllegalArgumentException(
+ "Cannot set root region location to null.");
+ }
+ this.rootRegionLocation = rootRegion;
+ }
+
+ @Override
+ public HMasterInterface getMaster() throws MasterNotRunningException {
+ ZooKeeperWrapper zk;
+ try {
+ zk = getZooKeeperWrapper();
+ } catch (IOException e) {
+ throw new MasterNotRunningException(e);
+ }
+
+ HServerAddress masterLocation = null;
+ synchronized (this.masterLock) {
+ for (int tries = 0; !this.closed && !this.masterChecked
+ && this.master == null && tries < params.getNumRetries(); tries++) {
+
+ try {
+ masterLocation = zk.readMasterAddress(zk);
+
+ if (masterLocation != null) {
+ HMasterInterface tryMaster = (HMasterInterface) HBaseRPC.getProxy(
+ HMasterInterface.class, HBaseRPCProtocolVersion.versionID,
+ masterLocation.getInetSocketAddress(), this.conf,
+ params.getRpcTimeout(), HBaseRPCOptions.DEFAULT);
+
+ if (tryMaster.isMasterRunning()) {
+ this.master = tryMaster;
+ this.masterLock.notifyAll();
+ break;
+ }
+ }
+
+ } catch (IOException e) {
+ if (tries == params.getNumRetries() - 1) {
+ // This was our last chance - don't bother sleeping
+ LOG.info(
+ "getMaster attempt " + tries + " of "
+ + params.getNumRetries() + " failed; no more retrying.",
+ e);
+ break;
+ }
+ LOG.info(
+ "getMaster attempt " + tries + " of " + params.getNumRetries()
+ + " failed; retrying after sleep of "
+ + params.getPauseTime(tries), e);
+ }
+
+ // Cannot connect to master or it is not running. Sleep & retry
+ try {
+ this.masterLock.wait(params.getPauseTime(tries));
+ } catch (InterruptedException e) {
+ // continue
+ }
+ }
+ this.masterChecked = true;
+ }
+ if (this.master == null) {
+ if (masterLocation == null) {
+ throw new MasterNotRunningException();
+ }
+ throw new MasterNotRunningException(masterLocation.toString());
+ }
+ return this.master;
+ }
+
+ @Override
+ public boolean isMasterRunning() {
+ if (this.master == null) {
+ try {
+ getMaster();
+
+ } catch (MasterNotRunningException e) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public boolean tableExists(final byte[] tableName)
+ throws MasterNotRunningException {
+ getMaster();
+ if (tableName == null) {
+ throw new IllegalArgumentException("Table name cannot be null");
+ }
+ if (isMetaTableName(tableName)) {
+ return true;
+ }
+ boolean exists = false;
+ try {
+ HTableDescriptor[] tables = listTables();
+ for (HTableDescriptor table : tables) {
+ if (Bytes.equals(table.getName(), tableName)) {
+ exists = true;
+ }
+ }
+ } catch (IOException e) {
+ LOG.warn("Testing for table existence threw exception", e);
+ }
+ return exists;
+ }
+
+ /*
+ * @param n
+ *
+ * @return Truen if passed tablename <code>n</code> is equal to the name of
+ * a catalog table.
+ */
+ private static boolean isMetaTableName(final byte[] n) {
+ return MetaUtils.isMetaTableName(n);
+ }
+
+ @Override
+ public HRegionLocation getRegionLocation(final byte[] name,
+ final byte[] row, boolean reload) throws IOException {
+ return reload ? relocateRegion(name, row) : locateRegion(name, row);
+ }
+
+ @Override
+ public HTableDescriptor[] listTables() throws IOException {
+ getMaster();
+ final TreeSet<HTableDescriptor> uniqueTables = new TreeSet<HTableDescriptor>();
+ MetaScannerVisitor visitor = new MetaScannerVisitor() {
+ @Override
+ public boolean processRow(Result result) throws IOException {
+ try {
+ byte[] value = result.getValue(HConstants.CATALOG_FAMILY,
+ HConstants.REGIONINFO_QUALIFIER);
+ HRegionInfo info = null;
+ if (value != null) {
+ info = Writables.getHRegionInfo(value);
+ }
+ // Only examine the rows where the startKey is zero length
+ if (info != null && info.getStartKey().length == 0) {
+ uniqueTables.add(info.getTableDesc());
+ }
+ return true;
+ } catch (RuntimeException e) {
+ LOG.error("Result=" + result);
+ throw e;
+ }
+ }
+ };
+ MetaScanner.metaScan(conf, visitor);
+
+ return uniqueTables.toArray(new HTableDescriptor[uniqueTables.size()]);
+ }
+
+ @Override
+ public boolean isTableEnabled(byte[] tableName) throws IOException {
+ return testTableOnlineState(tableName, true);
+ }
+
+ @Override
+ public boolean isTableDisabled(byte[] tableName) throws IOException {
+ return testTableOnlineState(tableName, false);
+ }
+
+ @Override
+ public boolean isTableAvailable(final byte[] tableName) throws IOException {
+ final AtomicBoolean available = new AtomicBoolean(true);
+ MetaScannerVisitor visitor = new MetaScannerVisitor() {
+ @Override
+ public boolean processRow(Result row) throws IOException {
+ byte[] value = row.getValue(HConstants.CATALOG_FAMILY,
+ HConstants.REGIONINFO_QUALIFIER);
+ HRegionInfo info = Writables.getHRegionInfoOrNull(value);
+ if (info != null) {
+ if (Bytes.equals(tableName, info.getTableDesc().getName())) {
+ value = row.getValue(HConstants.CATALOG_FAMILY,
+ HConstants.SERVER_QUALIFIER);
+ if (value == null) {
+ available.set(false);
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+ };
+ MetaScanner.metaScan(conf, visitor);
+ return available.get();
+ }
+
+ /*
+ * If online == true Returns true if all regions are online Returns false in
+ * any other case If online == false Returns true if all regions are offline
+ * Returns false in any other case
+ */
+ private boolean testTableOnlineState(byte[] tableName, boolean online)
+ throws IOException {
+ if (!tableExists(tableName)) {
+ throw new TableNotFoundException(Bytes.toString(tableName));
+ }
+ if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
+ // The root region is always enabled
+ return true;
+ }
+ int rowsScanned = 0;
+ int rowsOffline = 0;
+ byte[] startKey = HRegionInfo.createRegionName(tableName, null,
+ HConstants.ZEROES, false);
+ byte[] endKey;
+ HRegionInfo currentRegion;
+ Scan scan = new Scan(startKey);
+ scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
+ int rows = this.conf.getInt("hbase.meta.scanner.caching", 100);
+ scan.setCaching(rows);
+ ScannerCallable s = new ScannerCallable(this, (Bytes.equals(tableName,
+ HConstants.META_TABLE_NAME) ? HConstants.ROOT_TABLE_NAME
+ : HConstants.META_TABLE_NAME), scan, HBaseRPCOptions.DEFAULT);
+ try {
+ // Open scanner
+ getRegionServerWithRetries(s);
+ do {
+ currentRegion = s.getHRegionInfo();
+ Result r;
+ Result[] rrs;
+ while ((rrs = getRegionServerWithRetries(s)) != null
+ && rrs.length > 0) {
+ r = rrs[0];
+ byte[] value = r.getValue(HConstants.CATALOG_FAMILY,
+ HConstants.REGIONINFO_QUALIFIER);
+ if (value != null) {
+ HRegionInfo info = Writables.getHRegionInfoOrNull(value);
+ if (info != null) {
+ if (Bytes.equals(info.getTableDesc().getName(), tableName)) {
+ rowsScanned += 1;
+ rowsOffline += info.isOffline() ? 1 : 0;
+ }
+ }
+ }
+ }
+ endKey = currentRegion.getEndKey();
+ } while (!(endKey == null || Bytes.equals(endKey,
+ HConstants.EMPTY_BYTE_ARRAY)));
+ } finally {
+ s.setClose();
+ // Doing below will call 'next' again and this will close the scanner
+ // Without it we leave scanners open.
+ getRegionServerWithRetries(s);
+ }
+ LOG.debug("Rowscanned=" + rowsScanned + ", rowsOffline=" + rowsOffline);
+ boolean onOffLine = online ? rowsOffline == 0
+ : rowsOffline == rowsScanned;
+ return rowsScanned > 0 && onOffLine;
+ }
+
+ private static class HTableDescriptorFinder implements
+ MetaScanner.MetaScannerVisitor {
+ byte[] tableName;
+ HTableDescriptor result;
+
+ protected HTableDescriptorFinder(byte[] tableName) {
+ this.tableName = tableName;
+ }
+
+ @Override
+ public boolean processRow(Result rowResult) throws IOException {
+ HRegionInfo info = Writables.getHRegionInfo(rowResult.getValue(
+ HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER));
+ HTableDescriptor desc = info.getTableDesc();
+ if (Bytes.compareTo(desc.getName(), tableName) == 0) {
+ result = desc;
+ return false;
+ }
+ return true;
+ }
+
+ HTableDescriptor getResult() {
+ return result;
+ }
+ }
+
+ @Override
+ public HTableDescriptor getHTableDescriptor(final byte[] tableName)
+ throws IOException {
+ if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
+ return new UnmodifyableHTableDescriptor(HTableDescriptor.ROOT_TABLEDESC);
+ }
+ if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) {
+ return HTableDescriptor.META_TABLEDESC;
+ }
+ TableServers.HTableDescriptorFinder finder = new HTableDescriptorFinder(tableName);
+ MetaScanner.metaScan(conf, finder, tableName);
+ HTableDescriptor result = finder.getResult();
+ if (result == null) {
+ throw new TableNotFoundException(Bytes.toString(tableName));
+ }
+ return result;
+ }
+
+ @Override
+ public HRegionLocation locateRegion(final byte[] tableName, final byte[] row)
+ throws IOException {
+ return locateRegion(tableName, row, true);
+ }
+
+ @Override
+ public HRegionLocation relocateRegion(final byte[] tableName,
+ final byte[] row) throws IOException {
+ return locateRegion(tableName, row, false);
+ }
+
+ private HRegionLocation locateRegion(final byte[] tableName,
+ final byte[] row, boolean useCache) throws IOException {
+ if (tableName == null || tableName.length == 0) {
+ throw new IllegalArgumentException(
+ "table name cannot be null or zero length");
+ }
+ if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) {
+ return locateMetaInRoot(row, useCache, metaRegionLock);
+ } else if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
+ synchronized (rootRegionLock) {
+ // This block guards against two threads trying to find the root
+ // region at the same time. One will go do the find while the
+ // second waits. The second thread will not do find.
+
+ if (!useCache || rootRegionLocation == null
+ || inFastFailMode(this.rootRegionLocation.getServerAddress())) {
+ this.rootRegionLocation = locateRootRegion();
+ LOG.info("Updated rootRegionLocation from ZK to : " + this.rootRegionLocation);
+ }
+ return this.rootRegionLocation;
+ }
+ } else {
+ // Region not in the cache - have to go to the meta RS
+ return locateRegionInMeta(HConstants.META_TABLE_NAME, tableName, row,
+ useCache, userRegionLock);
+ }
+ }
+
+ private HRegionLocation prefetchRegionCache(final byte[] tableName,
+ final byte[] row) {
+ return prefetchRegionCache(tableName, row, this.prefetchRegionLimit);
+ }
+
+ /*
+ * Search .META. for the HRegionLocation info that contains the table and
+ * row we're seeking. It will prefetch certain number of regions info and
+ * save them to the global region cache.
+ */
+ private HRegionLocation prefetchRegionCache(final byte[] tableName,
+ final byte[] row, int prefetchRegionLimit) {
+ // Implement a new visitor for MetaScanner, and use it to walk through
+ // the .META.
+ MetaScannerVisitor visitor = new MetaScannerVisitor() {
+ @Override
+ public boolean processRow(Result result) throws IOException {
+ try {
+ byte[] value = result.getValue(HConstants.CATALOG_FAMILY,
+ HConstants.REGIONINFO_QUALIFIER);
+ HRegionInfo regionInfo = null;
+
+ if (value != null) {
+ // convert the row result into the HRegionLocation we need!
+ regionInfo = Writables.getHRegionInfo(value);
+
+ // possible we got a region of a different table...
+ if (!Bytes.equals(regionInfo.getTableDesc().getName(), tableName)) {
+ return false; // stop scanning
+ }
+ if (regionInfo.isOffline()) {
+ // don't cache offline regions
+ return true;
+ }
+ value = result.getValue(HConstants.CATALOG_FAMILY,
+ HConstants.SERVER_QUALIFIER);
+ if (value == null) {
+ return true; // don't cache it
+ }
+ final String serverAddress = Bytes.toString(value);
+
+ value = result.getValue(HConstants.CATALOG_FAMILY,
+ HConstants.STARTCODE_QUALIFIER);
+ long serverStartCode = -1;
+ if (value != null) {
+ serverStartCode = Bytes.toLong(value);
+ }
+
+ // instantiate the location
+ HRegionLocation loc = new HRegionLocation(regionInfo,
+ new HServerAddress(serverAddress), serverStartCode);
+ // cache this meta entry
+ cacheLocation(tableName, loc);
+ }
+ return true;
+ } catch (RuntimeException e) {
+ throw new IOException(e);
+ }
+ }
+ };
+ try {
+ // pre-fetch certain number of regions info at region cache.
+ MetaScanner.metaScan(conf, visitor, tableName, row, prefetchRegionLimit);
+ return getCachedLocation(tableName, row);
+ } catch (IOException e) {
+ LOG.warn("Encounted problems when prefetch META table: ", e);
+ }
+ return null;
+ }
+
+ /**
+ * Search the meta table (.META.) for the HRegionLocation info that
+ * contains the table and row we're seeking.
+ */
+ private HRegionLocation locateRegionInMeta(final byte [] parentTable,
+ final byte [] tableName, final byte [] row, boolean useCache,
+ Object regionLockObject)
+ throws IOException {
+ HRegionLocation location;
+ if (useCache) {
+ location = getCachedLocation(tableName, row);
+ if (location != null) {
+ return location;
+ }
+ }
+
+ // If we are supposed to be using the cache, look in the cache to see if
+ // we already have the region.
+
+ // build the key of the meta region we should be looking for.
+ // the extra 9's on the end are necessary to allow "exact" matches
+ // without knowing the precise region names.
+ byte [] metaKey = HRegionInfo.createRegionName(tableName, row,
+ HConstants.NINES, false);
+ for (int tries = 0; true; tries++) {
+ if (tries >= params.getNumRetries()) {
+ throw new NoServerForRegionException("Unable to find region for "
+ + Bytes.toStringBinary(row) + " after " + params.getNumRetries() +
+ " tries.");
+ }
+
+ FailureInfo fInfo = null;
+ HServerAddress server = null;
+ boolean didTry = false;
+ boolean couldNotCommunicateWithServer = false;
+ boolean retryDespiteFastFailMode = false;
+ try {
+ // locate the root or meta region
+ HRegionLocation metaLocation = locateRegion(parentTable, metaKey);
+
+ server = metaLocation.getServerAddress();
+ fInfo = repeatedFailuresMap.get(server);
+
+ // Handle the case where .META. is on an unresponsive server.
+ if (inFastFailMode(server) &&
+ !this.currentThreadInFastFailMode()) {
+ // In Fast-fail mode, all but one thread will fast fail. Check
+ // if we are that one chosen thread.
+
+ retryDespiteFastFailMode = shouldRetryInspiteOfFastFail(fInfo);
+
+ if (retryDespiteFastFailMode == false) { // we don't have to retry
+ throw new PreemptiveFastFailException(fInfo.numConsecutiveFailures.get(),
+ fInfo.timeOfFirstFailureMilliSec, fInfo.timeOfLatestAttemptMilliSec, server.getHostname());
+ }
+ }
+ didTry = true;
+
+ HBaseThriftRPC.isMeta.get().push(true);
+ Result regionInfoRow = null;
+ try {
+ // This block guards against two threads trying to load the meta
+ // region at the same time. The first will load the meta region and
+ // the second will use the value that the first one found.
+ synchronized (regionLockObject) {
+ // Check the cache again for a hit in case some other thread made the
+ // same query while we were waiting on the lock. If not supposed to
+ // be using the cache, delete any existing cached location so it won't
+ // interfere.
+ if (useCache) {
+ location = getCachedLocation(tableName, row);
+ if (location != null) {
+ return location;
+ }
+ } else {
+ LOG.debug("Deleting the client location cache.");
+ deleteCachedLocation(tableName, row, null);
+ }
+
+ // If the parent table is META, we may want to pre-fetch some
+ // region info into the global region cache for this table.
+ if (Bytes.equals(parentTable, HConstants.META_TABLE_NAME) &&
+ (getRegionCachePrefetch(tableName)) ) {
+ LOG.debug("Prefetching the client location cache.");
+ location = prefetchRegionCache(tableName, row);
+ if (location != null) {
+ return location;
+ }
+ }
+
+ HRegionInterface serverInterface =
+ getHRegionConnection(metaLocation.getServerAddress());
+
+ // Query the root or meta region for the location of the meta region
+ regionInfoRow = serverInterface.getClosestRowBefore(
+ metaLocation.getRegionInfo().getRegionName(), metaKey,
+ HConstants.CATALOG_FAMILY);
+ }
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ HBaseThriftRPC.isMeta.get().pop();
+ }
+ location = getLocationFromRow(regionInfoRow, tableName,
+ parentTable, row);
+ cacheLocation(tableName, location);
+ return location;
+ } catch (TableNotFoundException e) {
+ // if we got this error, probably means the table just plain doesn't
+ // exist. rethrow the error immediately. this should always be coming
+ // from the HTable constructor.
+ throw e;
+ } catch (PreemptiveFastFailException e) {
+ // already processed this. Don't process this again.
+ throw e;
+ } catch (IOException e) {
+ if (e instanceof RemoteException) {
+ e = RemoteExceptionHandler
+ .decodeRemoteException((RemoteException) e);
+ } else if (isNetworkException(e)) {
+ couldNotCommunicateWithServer = true;
+ handleFailureToServer(server, e);
+ }
+ if (tries < params.getNumRetries() - 1) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("locateRegionInMeta attempt " + tries + " of "
+ + params.getNumRetries()
+ + " failed; retrying after sleep of "
+ + params.getPauseTime(tries) + " because: " + e.getMessage());
+ }
+ } else {
+ throw e;
+ }
+ // Only relocate the parent region if necessary
+ if (!(e instanceof RegionOfflineException || e instanceof NoServerForRegionException)) {
+ relocateRegion(parentTable, row);
+ }
+ } catch (Exception e) {
+ couldNotCommunicateWithServer = true;
+ handleFailureToServer(server, e);
+ if (tries < params.getNumRetries() - 1) {
+ LOG.debug("locateRegionInMeta attempt " + tries + " of "
+ + params.getNumRetries() + " failed; retrying after sleep of "
+ + params.getPauseTime(tries) + " because: " + e.getMessage());
+ } else {
+ throw e;
+ }
+ } finally {
+ updateFailureInfoForServer(server, fInfo, didTry,
+ couldNotCommunicateWithServer, retryDespiteFastFailMode);
+ }
+ try {
+ Thread.sleep(params.getPauseTime(tries));
+ } catch (InterruptedException e) {
+ // continue
+ }
+ }
+}
+
+private HRegionLocation getLocationFromRow(Result regionInfoRow,
+ byte[] tableName, byte[] parentTable, byte[] row) throws IOException {
+ if (regionInfoRow == null) {
+ throw new TableNotFoundException(Bytes.toString(tableName));
+ }
+ byte[] value = regionInfoRow.getValue(HConstants.CATALOG_FAMILY,
+ HConstants.REGIONINFO_QUALIFIER);
+ if (value == null || value.length == 0) {
+ throw new IOException("HRegionInfo was null or empty in "
+ + Bytes.toString(parentTable) + ", row=" + regionInfoRow);
+ }
+ // convert the row result into the HRegionLocation we need!
+ HRegionInfo regionInfo = (HRegionInfo) Writables.getWritable(value,
+ new HRegionInfo());
+ // possible we got a region of a different table...
+ if (!Bytes.equals(regionInfo.getTableDesc().getName(), tableName)) {
+ throw new TableNotFoundException("Table '" + Bytes.toString(tableName)
+ + "' was not found.");
+ }
+ if (regionInfo.isOffline()) {
+ throw new RegionOfflineException("region offline: "
+ + regionInfo.getRegionNameAsString());
+ }
+
+ value = regionInfoRow.getValue(HConstants.CATALOG_FAMILY,
+ HConstants.SERVER_QUALIFIER);
+ String serverAddress = "";
+ if (value != null) {
+ serverAddress = Bytes.toString(value);
+ }
+ if (serverAddress.equals("")) {
+ throw new NoServerForRegionException("No server address listed "
+ + "in " + Bytes.toString(parentTable) + " for region "
+ + regionInfo.getRegionNameAsString() + " containing row "
+ + Bytes.toStringBinary(row));
+ }
+
+ value = regionInfoRow.getValue(HConstants.CATALOG_FAMILY,
+ HConstants.STARTCODE_QUALIFIER);
+ long serverStartCode = -1;
+ if (value != null) {
+ serverStartCode = Bytes.toLong(value);
+ }
+ // instantiate the location
+ HRegionLocation location = new HRegionLocation(regionInfo,
+ new HServerAddress(serverAddress), serverStartCode);
+ return location;
+}
+
+/**
+ * TODO:WARNING!!! This looks like a lot of duplicated code with
+ * {@link #locateRegionInMeta(byte[], byte[], byte[], boolean, Object)} pls
+ * fix this! Search the Root Table for the Meta Region. Retries a fixed
+ * number of times and throws if Region is not found.
+ */
+private HRegionLocation locateMetaInRoot(final byte[] row,
+ boolean useCache, Object regionLockObject) throws IOException {
+ HRegionLocation location;
+ final byte[] parentTable = HConstants.ROOT_TABLE_NAME;
+ final byte[] tableName = HConstants.META_TABLE_NAME;
+ if (useCache) {
+ location = getCachedLocation(tableName, row);
+ if (location != null) {
+ return location;
+ }
+ }
+ // If we are supposed to be using the cache, look in the cache to see if
+ // we already have the region.
+
+ // build the key of the meta region we should be looking for.
+ // the extra 9's on the end are necessary to allow "exact" matches
+ // without knowing the precise region names.
+ byte[] metaKey = HRegionInfo.createRegionName(tableName, row,
+ HConstants.NINES, false);
+ for (int tries = 0; true; tries++) {
+ if (tries >= params.getNumRetries()) {
+ throw new NoServerForRegionException("Unable to find region for "
+ + Bytes.toStringBinary(row) + " after " + params.getNumRetries()
+ + " tries.");
+ }
+
+ TableServers.FailureInfo fInfo = null;
+ HServerAddress server = null;
+ boolean didTry = false;
+ boolean couldNotCommunicateWithServer = false;
+ boolean retryDespiteFastFailMode = false;
+ try {
+ // locate the root or meta region
+ HRegionLocation metaLocation = null;
+ if (useCache && rootRegionLocation != null
+ && !inFastFailMode(this.rootRegionLocation.getServerAddress())) {
+ metaLocation = rootRegionLocation;
+ } else {
+ synchronized (rootRegionLock) {
+ // This block guards against two threads trying to find the root
+ // region at the same time. One will go do the find while the
+ // second waits. The second thread will not do find.
+
+ if (!useCache || rootRegionLocation == null
+ || inFastFailMode(this.rootRegionLocation.getServerAddress())) {
+ HBaseThriftRPC.isMeta.get().push(true);
+ try {
+ this.rootRegionLocation = locateRootRegion();
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ HBaseThriftRPC.isMeta.get().pop();
+ }
+ LOG.info("Updated rootRegionLocation from ZK to : "
+ + this.rootRegionLocation);
+ }
+ metaLocation = this.rootRegionLocation;
+ }
+ }
+ Preconditions.checkNotNull(metaLocation);
+ server = metaLocation.getServerAddress();
+ fInfo = repeatedFailuresMap.get(server);
+
+ // Handle the case where .META. is on an unresponsive server.
+ if (inFastFailMode(server) && !this.currentThreadInFastFailMode()) {
+ // In Fast-fail mode, all but one thread will fast fail. Check
+ // if we are that one chosen thread.
+
+ retryDespiteFastFailMode = shouldRetryInspiteOfFastFail(fInfo);
+
+ if (retryDespiteFastFailMode == false) { // we don't have to retry
+ throw new PreemptiveFastFailException(
+ fInfo.numConsecutiveFailures.get(),
+ fInfo.timeOfFirstFailureMilliSec,
+ fInfo.timeOfLatestAttemptMilliSec, server.getHostname());
+ }
+ }
+ didTry = true;
+ HBaseThriftRPC.isMeta.get().push(true);
+ Result regionInfoRow = null;
+ // This block guards against two threads trying to load the meta
+ // region at the same time. The first will load the meta region and
+ // the second will use the value that the first one found.
+ synchronized (metaRegionLock) {
+ if (useCache) {
+ location = getCachedLocation(tableName, row);
+ if (location != null) {
+ return location;
+ }
+ } else {
+ LOG.debug("Deleting the client location cache.");
+ deleteCachedLocation(tableName, row, null);
+ }
+ HRegionInterface serverInterface = getHRegionConnection(metaLocation
+ .getServerAddress());
+
+ // Query the root for the location of the meta region
+ regionInfoRow = serverInterface.getClosestRowBefore(metaLocation
+ .getRegionInfo().getRegionName(), metaKey,
+ HConstants.CATALOG_FAMILY);
+ location = getLocationFromRow(regionInfoRow, tableName,
+ parentTable, row);
+ cacheLocation(tableName, location);
+ }
+ return location;
+ } catch (TableNotFoundException e) {
+ // if we got this error, probably means the table just plain doesn't
+ // exist. rethrow the error immediately. this should always be coming
+ // from the HTable constructor.
+ throw e;
+ } catch (PreemptiveFastFailException e) {
+ // already processed this. Don't process this again.
+ throw e;
+ } catch (IOException e) {
+ if (e instanceof RemoteException) {
+ e = RemoteExceptionHandler
+ .decodeRemoteException((RemoteException) e);
+ } else if (isNetworkException(e)) {
+ couldNotCommunicateWithServer = true;
+ handleFailureToServer(server, e);
+ }
+ if (tries < params.getNumRetries() - 1) {
+ LOG.debug("IOException locateRegionInMeta attempt " + tries
+ + " of " + params.getNumRetries()
+ + " failed; retrying after sleep of "
+ + params.getPauseTime(tries) + " because: " + e.getMessage());
+ } else {
+ throw e;
+ }
+ } catch (Exception e) {
+ couldNotCommunicateWithServer = true;
+ handleFailureToServer(server, e);
+ if (tries < params.getNumRetries() - 1) {
+ LOG.debug("Exception locateRegionInMeta attempt " + tries + " of "
+ + params.getNumRetries() + " failed; retrying after sleep of "
+ + params.getPauseTime(tries) + " because: " + e.getMessage());
+ } else {
+ throw e;
+ }
+ } finally {
+ HBaseThriftRPC.isMeta.get().pop();
+ updateFailureInfoForServer(server, fInfo, didTry,
+ couldNotCommunicateWithServer, retryDespiteFastFailMode);
+ }
+ try {
+ Thread.sleep(params.getPauseTime(tries));
+ } catch (InterruptedException e) {
+ // continue
+ }
+ }
+}
+
+
+
+
+ /**
+ * Check if the exception is something that indicates that we cannot
+ * contact/communicate with the server.
+ *
+ * @param e
+ * @return
+ */
+ private boolean isNetworkException(Throwable e) {
+ // This list covers most connectivity exceptions but not all.
+ // For example, in SocketOutputStream a plain IOException is thrown
+ // at times when the channel is closed.
+ return (e instanceof SocketTimeoutException ||
+ e instanceof ConnectException ||
+ e instanceof ClosedChannelException ||
+ e instanceof SyncFailedException ||
+ e instanceof EOFException ||
+ e instanceof TTransportException);
+ }
+
+ @Override
+ public Collection<HRegionLocation> getCachedHRegionLocations(final byte [] tableName,
+ boolean forceRefresh) {
+ if (forceRefresh || !initializedTableSet.contains(tableName)) {
+ prefetchRegionCache(tableName, null, Integer.MAX_VALUE);
+ initializedTableSet.add(tableName);
+ }
+
+ ConcurrentSkipListMap<byte [], HRegionLocation> tableLocations =
+ getTableLocations(tableName);
+ return tableLocations.values();
+ }
+
+ /*
+ * Search the cache for a location that fits our table and row key.
+ * Return null if no suitable region is located. TODO: synchronization note
+ *
+ * <p>TODO: This method during writing consumes 15% of CPU doing lookup
+ * into the Soft Reference SortedMap. Improve.
+ *
+ * @param tableName
+ * @param row
+ * @return Null or region location found in cache.
+ */
+ HRegionLocation getCachedLocation(final byte [] tableName,
+ final byte [] row) {
+ ConcurrentSkipListMap<byte [], HRegionLocation> tableLocations =
+ getTableLocations(tableName);
+
+ // start to examine the cache. we can only do cache actions
+ // if there's something in the cache for this table.
+ if (tableLocations.isEmpty()) {
+ return null;
+ }
+
+ HRegionLocation rl = tableLocations.get(row);
+ if (rl != null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Cache hit for row <" + Bytes.toStringBinary(row)
+ + "> in tableName " + Bytes.toString(tableName)
+ + ": location server " + rl.getServerAddress()
+ + ", location region name "
+ + rl.getRegionInfo().getRegionNameAsString());
+ }
+ return rl;
+ }
+
+ // get the matching region for the row
+ Entry<byte[], HRegionLocation> entry = tableLocations.floorEntry(row);
+ HRegionLocation possibleRegion = (entry == null) ? null : entry
+ .getValue();
+
+ // we need to examine the cached location to verify that it is
+ // a match by end key as well.
+ if (possibleRegion != null) {
+ byte[] endKey = possibleRegion.getRegionInfo().getEndKey();
+
+ // make sure that the end key is greater than the row we're looking
+ // for, otherwise the row actually belongs in the next region, not
+ // this one. the exception case is when the endkey is
+ // HConstants.EMPTY_START_ROW, signifying that the region we're
+ // checking is actually the last region in the table.
+ if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW)
+ || KeyValue.getRowComparator(tableName).compareRows(endKey, 0,
+ endKey.length, row, 0, row.length) > 0) {
+ return possibleRegion;
+ }
+ }
+
+ // Passed all the way through, so we got nothing - complete cache miss
+ return null;
+ }
+
+ /*
+ * Delete a cached location for the specified table name and row if it is
+ * located on the (optionally) specified old location.
+ */
+ @Override
+ public void deleteCachedLocation(final byte[] tableName, final byte[] row,
+ HServerAddress oldServer) {
+ synchronized (this.cachedRegionLocations) {
+ Map<byte[], HRegionLocation> tableLocations = getTableLocations(tableName);
+
+ // start to examine the cache. we can only do cache actions
+ // if there's something in the cache for this table.
+ if (!tableLocations.isEmpty()) {
+ HRegionLocation rl = getCachedLocation(tableName, row);
+ if (rl != null) {
+ // If oldLocation is specified. deleteLocation only if it is the
+ // same.
+ if (oldServer != null && !oldServer.equals(rl.getServerAddress()))
+ return; // perhaps, some body else cleared and repopulated.
+
+ tableLocations.remove(rl.getRegionInfo().getStartKey());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removed " + rl.getRegionInfo().getRegionNameAsString()
+ + " for tableName=" + Bytes.toString(tableName)
+ + " from cache " + "because of " + Bytes.toStringBinary(row));
+ }
+ }
+ }
+ }
+ }
+
+ /*
+ * Delete all cached entries of a table that maps to a specific location.
+ *
+ * @param tablename
+ *
+ * @param server
+ */
+ protected void clearCachedLocationForServer(final String server) {
+ boolean deletedSomething = false;
+ synchronized (this.cachedRegionLocations) {
+ if (!cachedServers.contains(server)) {
+ return;
+ }
+ for (Map<byte[], HRegionLocation> tableLocations : cachedRegionLocations
+ .values()) {
+ for (Entry<byte[], HRegionLocation> e : tableLocations.entrySet()) {
+ if (e.getValue().getServerAddress().toString().equals(server)) {
+ tableLocations.remove(e.getKey());
+ deletedSomething = true;
+ }
+ }
+ }
+ cachedServers.remove(server);
+ }
+ if (deletedSomething && LOG.isDebugEnabled()) {
+ LOG.debug("Removed all cached region locations that map to " + server);
+ }
+ }
+
+ /*
+ * @param tableName
+ *
+ * @return Map of cached locations for passed <code>tableName</code>
+ */
+ private ConcurrentSkipListMap<byte[], HRegionLocation> getTableLocations(
+ final byte[] tableName) {
+ // find the map of cached locations for this table
+ Integer key = Bytes.mapKey(tableName);
+ ConcurrentSkipListMap<byte[], HRegionLocation> result = this.cachedRegionLocations
+ .get(key);
+ if (result == null) {
+ synchronized (this.cachedRegionLocations) {
+ result = this.cachedRegionLocations.get(key);
+ if (result == null) {
+ // if tableLocations for this table isn't built yet, make one
+ result = new ConcurrentSkipListMap<byte[], HRegionLocation>(
+ Bytes.BYTES_COMPARATOR);
+ this.cachedRegionLocations.put(key, result);
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Allows flushing the region cache.
+ */
+ @Override
+ public void clearRegionCache() {
+ synchronized (this.cachedRegionLocations) {
+ cachedRegionLocations.clear();
+ cachedServers.clear();
+ }
+ }
+
+ /**
+ * Put a newly discovered HRegionLocation into the cache.
+ */
+ private void cacheLocation(final byte [] tableName,
+ final HRegionLocation location) {
+ byte [] startKey = location.getRegionInfo().getStartKey();
+ boolean hasNewCache;
+ synchronized (this.cachedRegionLocations) {
+ cachedServers.add(location.getServerAddress().toString());
+ hasNewCache = (getTableLocations(tableName).put(startKey, location) == null);
+ }
+ if (hasNewCache) {
+ LOG.debug("Cached location for " +
+ location.getRegionInfo().getRegionNameAsString() +
+ " is " + location.getServerAddress().toString());
+ }
+ }
+
+ @SuppressWarnings("resource")
+ @Override
+ public HRegionInterface getHRegionConnection(
+ HServerAddress regionServer, boolean getMaster, HBaseRPCOptions options)
+ throws IOException {
+ if (getMaster) {
+ LOG.debug("Getting master");
+ getMaster();
+ }
+ HRegionInterface server = HRegionServer.getMainRS(regionServer);
+ if (server != null && !this.useThrift) {
+ return server;
+ }
+
+ final boolean thriftPortWrittenToMeta = conf.getBoolean(
+ HConstants.REGION_SERVER_WRITE_THRIFT_INFO_TO_META,
+ HConstants.REGION_SERVER_WRITE_THRIFT_INFO_TO_META_DEFAULT);
+ final boolean hadoopPortWrittenToMeta = !thriftPortWrittenToMeta;
+
+ try {
+ // establish an RPC for this RS
+ // set hbase.ipc.client.connect.max.retries to retry connection
+ // attempts
+ if (this.useThrift) {
+ Class<? extends ThriftClientInterface> serverInterface =
+ ThriftHRegionInterface.class;
+ if (thriftPortWrittenToMeta) {
+ try {
+ server = (HRegionInterface) HBaseThriftRPC.getClient(
+ regionServer.getInetSocketAddress(), this.conf,
+ serverInterface, options);
+ } catch (Exception e) {
+ LOG.warn("Exception connecting to the region server on" +
+ "the thrift channel. Retrying on the HadoopRPC port", e);
+ InetSocketAddress addr = new InetSocketAddress(regionServer
+ .getInetSocketAddress().getHostName(), conf.getInt(
+ HConstants.REGIONSERVER_PORT,
+ HConstants.DEFAULT_REGIONSERVER_PORT));
+ server = (HRegionInterface) HBaseRPC.getProxy(serverInterfaceClass,
+ HBaseRPCProtocolVersion.versionID, addr, this.conf,
+ params.getRpcTimeout(), options);
+ }
+ } else {
+ try {
+ InetSocketAddress addr = new InetSocketAddress(regionServer
+ .getInetSocketAddress().getHostName(), conf.getInt(
+ HConstants.REGIONSERVER_SWIFT_PORT,
+ HConstants.DEFAULT_REGIONSERVER_SWIFT_PORT));
+ server = (HRegionInterface) HBaseThriftRPC.getClient(addr,
+ this.conf, serverInterface, options);
+ } catch (Exception e) {
+ LOG.warn("Exception connecting to the region server on" +
+ "the thrift channel. Retrying on the HadoopRPC port", e);
+ server = (HRegionInterface) HBaseRPC.getProxy(serverInterfaceClass,
+ HBaseRPCProtocolVersion.versionID,
+ regionServer.getInetSocketAddress(), this.conf,
+ params.getRpcTimeout(), options);
+ }
+ }
+ } else {
+ if (hadoopPortWrittenToMeta) {
+ server = (HRegionInterface) HBaseRPC.getProxy(serverInterfaceClass,
+ HBaseRPCProtocolVersion.versionID,
+ regionServer.getInetSocketAddress(), this.conf,
+ params.getRpcTimeout(), options);
+ } else {
+ // The hadoop port is no longer written to Meta (this will happen
+ // when we are reasonably confident about the thrift service, and
+ // will soon deprecate RPC). So, try to connect to the default port.
+ // TODO gauravm: Verify that this works (t2830553)
+ InetSocketAddress addr = new InetSocketAddress(regionServer
+ .getInetSocketAddress().getHostName(), conf.getInt(
+ HConstants.REGIONSERVER_PORT,
+ HConstants.DEFAULT_REGIONSERVER_PORT));
+ server = (HRegionInterface) HBaseRPC.getProxy(serverInterfaceClass,
+ HBaseRPCProtocolVersion.versionID, addr, this.conf,
+ params.getRpcTimeout(), options);
+ }
+ }
+ } catch (RemoteException e) {
+ throw RemoteExceptionHandler.decodeRemoteException(e);
+ }
+
+ return server;
+ }
+
+ @Override
+ public HRegionInterface getHRegionConnection(HServerAddress regionServer,
+ HBaseRPCOptions options) throws IOException {
+ return getHRegionConnection(regionServer, false, options);
+ }
+
+ @Override
+ public HRegionInterface getHRegionConnection(HServerAddress regionServer,
+ boolean getMaster) throws IOException {
+ return getHRegionConnection(regionServer, getMaster,
+ HBaseRPCOptions.DEFAULT);
+ }
+
+ @Override
+ public HRegionInterface getHRegionConnection(HServerAddress regionServer)
+ throws IOException {
+ return getHRegionConnection(regionServer, false);
+ }
+
+ @Override
+ public synchronized ZooKeeperWrapper getZooKeeperWrapper()
+ throws IOException {
+ return HConnectionManager.getClientZKConnection(conf)
+ .getZooKeeperWrapper();
+ }
+
+ /*
+ * Repeatedly try to find the root region in ZK
+ *
+ * @return HRegionLocation for root region if found
+ *
+ * @throws NoServerForRegionException - if the root region can not be
+ * located after retrying
+ *
+ * @throws IOException
+ */
+ private HRegionLocation locateRootRegion() throws IOException {
+
+ // We lazily instantiate the ZooKeeper object because we don't want to
+ // make the constructor have to throw IOException or handle it itself.
+ ZooKeeperWrapper zk = getZooKeeperWrapper();
+
+ HServerAddress rootRegionAddress = null;
+ for (int tries = 0; tries < params.getNumRetries(); tries++) {
+ int localTimeouts = 0;
+ // ask the master which server has the root region
+ while (rootRegionAddress == null
+ && localTimeouts < params.getNumRetries()) {
+ // Don't read root region until we're out of safe mode so we know
+ // that the meta regions have been assigned.
+ rootRegionAddress = zk.readRootRegionLocation();
+ if (rootRegionAddress == null) {
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sleeping " + params.getPauseTime(tries)
+ + "ms, waiting for root region.");
+ }
+ Thread.sleep(params.getPauseTime(tries));
+ } catch (InterruptedException iex) {
+ // continue
+ }
+ localTimeouts++;
+ }
+ }
+
+ if (rootRegionAddress == null) {
+ throw new NoServerForRegionException(
+ "Timed out trying to locate root region");
+ }
+
+ LOG.debug("Trying to get root region at : " + rootRegionAddress);
+ try {
+ // Get a connection to the region server
+ HRegionInterface server = getHRegionConnection(rootRegionAddress);
+ // if this works, then we're good, and we have an acceptable address,
+ // so we can stop doing retries and return the result.
+ server.getRegionInfo(HRegionInfo.ROOT_REGIONINFO.getRegionName());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Found ROOT at " + rootRegionAddress);
+ }
+ break;
+ } catch (Throwable t) {
+ t = translateException(t);
+
+ if (tries == params.getNumRetries() - 1) {
+ throw new NoServerForRegionException("Timed out trying to locate "+
+ "root region because: " + t.getMessage());
+ }
+
+ // Sleep and retry finding root region.
+ try {
+ LOG.debug("Root region location changed. Sleeping.", t);
+ Thread.sleep(params.getPauseTime(tries));
+ LOG.debug("Wake. Retry finding root region.");
+ } catch (InterruptedException iex) {
+ // continue
+ }
+ }
+
+ rootRegionAddress = null;
+ }
+
+ // if the address is null by this point, then the retries have failed,
+ // and we're sort of sunk
+ if (rootRegionAddress == null) {
+ throw new NoServerForRegionException(
+ "unable to locate root region server");
+ }
+
+ // return the region location
+ return new HRegionLocation(HRegionInfo.ROOT_REGIONINFO, rootRegionAddress);
+ }
+
+ @Override
+ public <T> T getRegionServerWithRetries(ServerCallable<T> callable)
+ throws IOException {
+ List<Throwable> exceptions = new ArrayList<Throwable>();
+ RegionOverloadedException roe = null;
+
+ long callStartTime;
+ int serverRequestedRetries = 0;
+
+ callStartTime = System.currentTimeMillis();
+ long serverRequestedWaitTime = 0;
+ // do not retry if region cannot be located. There are enough retries
+ // within instantiateRegionLocation.
+ callable.instantiateRegionLocation(false /* reload cache? */);
+
+ for (int tries = 0;; tries++) {
+ // If server requested wait. We will wait for that time, and start
+ // again. Do not count this time/tries against the client retries.
+ if (serverRequestedWaitTime > 0) {
+ serverRequestedRetries++;
+
+ if (serverRequestedRetries > params.getMaxServerRequestedRetries()) {
+ throw RegionOverloadedException.create(roe, exceptions,
+ serverRequestedWaitTime);
+ }
+
+ long pauseTime = serverRequestedWaitTime + callStartTime
+ - System.currentTimeMillis();
+ LOG.debug("Got a BlockingWritesRetryLaterException: sleeping for "
+ + pauseTime + "ms. serverRequestedRetries = "
+ + serverRequestedRetries);
+ try {
+ Thread.sleep(pauseTime);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException();
+ }
+
+ serverRequestedWaitTime = 0;
+ tries = 0;
+ callStartTime = System.currentTimeMillis();
+ }
+
+ try {
+ return getRegionServerWithoutRetries(callable, false);
+ } catch (DoNotRetryIOException ioe) {
+ // clear cache if needed
+ if (ioe.getCause() instanceof NotServingRegionException) {
+ HRegionLocation prevLoc = callable.location;
+ if (prevLoc.getRegionInfo() != null) {
+ deleteCachedLocation(callable.tableName, prevLoc.getRegionInfo()
+ .getStartKey(), prevLoc.getServerAddress());
+ }
+ }
+
+ // If we are not supposed to retry; Let it pass through.
+ throw ioe;
+ } catch (RegionOverloadedException ex) {
+ roe = ex;
+ serverRequestedWaitTime = roe.getBackoffTimeMillis();
+ continue;
+ } catch (ClientSideDoNotRetryException exp) {
+ // Bail out of the retry loop, immediately
+ throw exp;
+ } catch (PreemptiveFastFailException pfe) {
+ // Bail out of the retry loop, if the host has been consistently
+ // unreachable.
+ throw pfe;
+ } catch (Throwable t) {
+ exceptions.add(t);
+
+ if (tries == params.getNumRetries() - 1) {
+ throw new RetriesExhaustedException(callable.getServerName(),
+ callable.getRegionName(), callable.getRow(), tries, exceptions);
+ }
+
+ HRegionLocation prevLoc = callable.location;
+ if (prevLoc.getRegionInfo() != null) {
+ deleteCachedLocation(callable.tableName, prevLoc.getRegionInfo()
+ .getStartKey(), prevLoc.getServerAddress());
+ }
+
+ try {
+ // do not retry if getting the location throws exception
+ callable.instantiateRegionLocation(false /* reload cache ? */);
+ } catch (IOException e) {
+ exceptions.add(e);
+ throw new RetriesExhaustedException(callable.getServerName(),
+ callable.getRegionName(), callable.getRow(), tries, exceptions);
+ }
+ if (prevLoc.getServerAddress().equals(
+ callable.location.getServerAddress())) {
+ // Bail out of the retry loop if we have to wait too long
+ long pauseTime = params.getPauseTime(tries);
+ if ((System.currentTimeMillis() - callStartTime + pauseTime) > params
+ .getRpcRetryTimeout()) {
+ throw new RetriesExhaustedException(callable.getServerName(),
+ callable.getRegionName(), callable.getRow(), tries,
+ exceptions);
+ }
+ LOG.debug("getRegionServerWithRetries failed, sleeping for "
+ + pauseTime + "ms. tries = " + tries, t);
+ try {
+ Thread.sleep(pauseTime);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException();
+ }
+ // do not reload cache. While we were sleeping hopefully the cache
+ // has been re-populated.
+ callable.instantiateRegionLocation(false);
+ } else {
+ LOG.debug("getRegionServerWithRetries failed, "
+ + "region moved from " + prevLoc + " to " + callable.location
+ + "retrying immediately tries=" + tries, t);
+ }
+ }
+ }
+ }
+
+ /**
+ * Pass in a ServerCallable with your particular bit of logic defined and
+ * this method will pass it to the defined region server.
+ *
+ * @param <T>
+ * the type of the return value
+ * @param callable
+ * callable to run
+ * @return an object of type T
+ * @throws IOException
+ * if a remote or network exception occurs
+ * @throws RuntimeException
+ * other unspecified error
+ * @throws PreemptiveFastFailException
+ * if the remote host has been known to be unreachable for more
+ * than this.fastFailThresholdMilliSec.
+ */
+ @Override
+ public <T> T getRegionServerWithoutRetries(ServerCallable<T> callable,
+ boolean instantiateRegionLocation) throws IOException,
+ RuntimeException, PreemptiveFastFailException {
+ FailureInfo fInfo = null;
+ HServerAddress server = null;
+ boolean didTry = false;
+ MutableBoolean couldNotCommunicateWithServer = new MutableBoolean(false);
+ boolean retryDespiteFastFailMode = false;
+ try {
+ if (instantiateRegionLocation) {
+ callable.instantiateRegionLocation(false);
+ }
+ // Logic to fast fail requests to unreachable servers.
+ server = callable.getServerAddress();
+ fInfo = repeatedFailuresMap.get(server);
+
+ if (inFastFailMode(server) && !currentThreadInFastFailMode()) {
+ // In Fast-fail mode, all but one thread will fast fail. Check
+ // if we are that one chosen thread.
+ retryDespiteFastFailMode = shouldRetryInspiteOfFastFail(fInfo);
+ if (retryDespiteFastFailMode == false) { // we don't have to retry
+ throw new PreemptiveFastFailException(
+ fInfo.numConsecutiveFailures.get(),
+ fInfo.timeOfFirstFailureMilliSec,
+ fInfo.timeOfLatestAttemptMilliSec, server.getHostname());
+ }
+ }
+ didTry = true;
+ callable.instantiateServer();
+ return callable.call();
+ } catch (PreemptiveFastFailException pfe) {
+ throw pfe;
+ } catch (ClientSideDoNotRetryException exp) {
+ throw exp;
+ } catch (Throwable t1) {
+ handleThrowable(t1, callable, couldNotCommunicateWithServer);
+ return null;
+ } finally {
+ updateFailureInfoForServer(server, fInfo, didTry,
+ couldNotCommunicateWithServer.booleanValue(),
+ retryDespiteFastFailMode);
+ }
+ }
+
+ @Override
+ public <T> T getRegionServerWithoutRetries(ServerCallable<T> callable)
+ throws IOException, RuntimeException, PreemptiveFastFailException {
+ return this.getRegionServerWithoutRetries(callable, true);
+ }
+
+ private <T> void updateClientContext(final ServerCallable<T> callable,
+ final Throwable t) {
+ if (!recordClientContext) {
+ return;
+ }
+
+ List<OperationContext> currContext = this.operationContextPerThread.get();
+ if (currContext == null) {
+ currContext = new ArrayList<OperationContext>();
+ this.operationContextPerThread.set(currContext);
+ }
+
+ currContext.add(new OperationContext(callable.location, t));
+ }
+
+ /**
+ * Handles failures encountered when communicating with a server.
+ *
+ * Updates the FailureInfo in repeatedFailuresMap to reflect the failure.
+ * Throws RepeatedConnectException if the client is in Fast fail mode.
+ *
+ * @param server
+ * @param t
+ * - the throwable to be handled.
+ * @throws PreemptiveFastFailException
+ */
+ private void handleFailureToServer(HServerAddress server, Throwable t) {
+ if (server == null || t == null)
+ return;
+
+ long currentTime = System.currentTimeMillis();
+ FailureInfo fInfo = repeatedFailuresMap.get(server);
+ if (fInfo == null) {
+ fInfo = new FailureInfo(currentTime);
+ FailureInfo oldfInfo = repeatedFailuresMap.putIfAbsent(server, fInfo);
+
+ if (oldfInfo != null) {
+ fInfo = oldfInfo;
+ }
+ }
+ fInfo.timeOfLatestAttemptMilliSec = currentTime;
+ fInfo.numConsecutiveFailures.incrementAndGet();
+
+ if (inFastFailMode(server)) {
+ // In FastFail mode, do not clear out the cache if it was done recently.
+ if (currentTime > fInfo.timeOfLatestCacheClearMilliSec
+ + cacheClearingTimeoutMilliSec) {
+ fInfo.timeOfLatestCacheClearMilliSec = currentTime;
+ clearCachedLocationForServer(server.toString());
+ }
+ LOG.error("Exception in FastFail mode : " + t.toString());
+ return;
+ }
+
+ // if thrown these exceptions, we clear all the cache entries that
+ // map to that slow/dead server; otherwise, let cache miss and ask
+ // .META. again to find the new location
+ fInfo.timeOfLatestCacheClearMilliSec = currentTime;
+ clearCachedLocationForServer(server.toString());
+ }
+
+ /**
+ * Occasionally cleans up unused information in repeatedFailuresMap.
+ *
+ * repeatedFailuresMap stores the failure information for all remote hosts
+ * that had failures. In order to avoid these from growing indefinitely,
+ * occassionallyCleanupFailureInformation() will clear these up once every
+ * cleanupInterval ms.
+ */
+ private void occasionallyCleanupFailureInformation() {
+ long now = System.currentTimeMillis();
+ if (!(now > lastFailureMapCleanupTimeMilliSec
+ + failureMapCleanupIntervalMilliSec))
+ return;
+
+ // remove entries that haven't been attempted in a while
+ // No synchronization needed. It is okay if multiple threads try to
+ // remove the entry again and again from a concurrent hash map.
+ StringBuilder sb = new StringBuilder();
+ for (Entry<HServerAddress, FailureInfo> entry : repeatedFailuresMap
+ .entrySet()) {
+ if (now > entry.getValue().timeOfLatestAttemptMilliSec
+ + failureMapCleanupIntervalMilliSec) { // no recent failures
+ repeatedFailuresMap.remove(entry.getKey());
+ } else if (now > entry.getValue().timeOfFirstFailureMilliSec
+ + this.fastFailClearingTimeMilliSec) { // been failing for a long
+ // time
+ LOG.error(entry.getKey()
+ + " been failing for a long time. clearing out."
+ + entry.getValue().toString());
+ repeatedFailuresMap.remove(entry.getKey());
+ } else {
+ sb.append(entry.getKey().toString() + " failing "
+ + entry.getValue().toString() + "\n");
+ }
+ }
+ if (sb.length() > 0
+ // If there are multiple threads cleaning up, try to see that only one
+ // will log the msg.
+ && now > this.lastFailureMapCleanupTimeMilliSec
+ + this.failureMapCleanupIntervalMilliSec) {
+ LOG.warn("Preemptive failure enabled for : " + sb.toString());
+ }
+ lastFailureMapCleanupTimeMilliSec = now;
+ }
+
+ /**
+ * Checks to see if we are in the Fast fail mode for requests to the server.
+ *
+ * If a client is unable to contact a server for more than
+ * fastFailThresholdMilliSec the client will get into fast fail mode.
+ *
+ * @param server
+ * @return true if the client is in fast fail mode for the server.
+ */
+ private boolean inFastFailMode(HServerAddress server) {
+ FailureInfo fInfo = repeatedFailuresMap.get(server);
+ // if fInfo is null --> The server is considered good.
+ // If the server is bad, wait long enough to believe that the server is
+ // down.
+ return (fInfo != null && System.currentTimeMillis() > fInfo.timeOfFirstFailureMilliSec
+ + this.fastFailThresholdMilliSec);
+ }
+
+ /**
+ * Checks to see if the current thread is already in FastFail mode for
+ * *some* server.
+ *
+ * @return true, if the thread is already in FF mode.
+ */
+ private boolean currentThreadInFastFailMode() {
+ return (this.threadRetryingInFastFailMode.get() != null && this.threadRetryingInFastFailMode
+ .get().booleanValue() == true);
+ }
+
+ /**
+ * Check to see if the client should try to connnect to the server, inspite
+ * of knowing that it is in the fast fail mode.
+ *
+ * The idea here is that we want just one client thread to be actively
+ * trying to reconnect, while all the other threads trying to reach the
+ * server will short circuit.
+ *
+ * @param fInfo
+ * @return true if the client should try to connect to the server.
+ */
+ private boolean shouldRetryInspiteOfFastFail(FailureInfo fInfo) {
+ // We believe that the server is down, But, we want to have just one
+ // client
+ // actively trying to connect. If we are the chosen one, we will retry
+ // and not throw an exception.
+ if (fInfo != null
+ && fInfo.exclusivelyRetringInspiteOfFastFail.compareAndSet(false,
+ true)) {
+ MutableBoolean threadAlreadyInFF = this.threadRetryingInFastFailMode
+ .get();
+ if (threadAlreadyInFF == null) {
+ threadAlreadyInFF = new MutableBoolean();
+ this.threadRetryingInFastFailMode.set(threadAlreadyInFF);
+ }
+ threadAlreadyInFF.setValue(true);
+
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * updates the failure information for the server.
+ *
+ * @param server
+ * @param fInfo
+ * @param couldNotCommunicate
+ * @param retryDespiteFastFailMode
+ */
+ private void updateFailureInfoForServer(HServerAddress server,
+ FailureInfo fInfo, boolean didTry, boolean couldNotCommunicate,
+ boolean retryDespiteFastFailMode) {
+ if (server == null || fInfo == null || didTry == false)
+ return;
+
+ // If we were able to connect to the server, reset the failure
+ // information.
+ if (couldNotCommunicate == false) {
+ LOG.info("Clearing out PFFE for server " + server.getHostname());
+ repeatedFailuresMap.remove(server);
+ } else {
+ // update time of last attempt
+ long currentTime = System.currentTimeMillis();
+ fInfo.timeOfLatestAttemptMilliSec = currentTime;
+
+ // Release the lock if we were retrying inspite of FastFail
+ if (retryDespiteFastFailMode) {
+ fInfo.exclusivelyRetringInspiteOfFastFail.set(false);
+ threadRetryingInFastFailMode.get().setValue(false);
+ }
+ }
+
+ occasionallyCleanupFailureInformation();
+ }
+
+ public void updateFailureInfoForServer(HServerAddress server,
+ boolean didTry, boolean couldNotCommunicate) {
+ FailureInfo fInfo = repeatedFailuresMap.get(server);
+ boolean retryDespiteFastFailMode = false;
+ if (inFastFailMode(server) && !currentThreadInFastFailMode()) {
+ // In Fast-fail mode, all but one thread will fast fail. Check
+ // if we are that one chosen thread.
+ retryDespiteFastFailMode = shouldRetryInspiteOfFastFail(fInfo);
+ }
+
+ updateFailureInfoForServer(server, fInfo, didTry, couldNotCommunicate, retryDespiteFastFailMode);
+ }
+
+ public void handleThrowable(Throwable t1, ServerCallable<?> callable,
+ MutableBoolean couldNotCommunicateWithServer)
+ throws IOException {
+ Throwable t2 = translateException(t1);
+ boolean isLocalException = !(t2 instanceof RemoteException);
+ // translateException throws DoNotRetryException or any
+ // non-IOException.
+ if (isLocalException && isNetworkException(t2)) {
+ couldNotCommunicateWithServer.setValue(true);
+ handleFailureToServer(callable.getServerAddress(), t2);
+ }
+
+ updateClientContext(callable, t2);
+ if (t2 instanceof IOException) {
+ throw (IOException) t2;
+ } else {
+ throw new RuntimeException(t2);
+ }
+ }
+
+ private Callable<Long> createGetServerStartCodeCallable(
+ final HServerAddress address, final HBaseRPCOptions options) {
+ final HConnection connection = this;
+ return new Callable<Long>() {
+ @Override
+ public Long call() throws IOException {
+ return getRegionServerWithoutRetries(new ServerCallableForBatchOps<Long>(
+ connection, address, options) {
+ @Override
+ public Long call() throws IOException {
+ return server.getStartCode();
+ }
+ });
+ }
+ };
+ }
+
+ private Callable<Long> createCurrentTimeCallable(
+ final HServerAddress address, final HBaseRPCOptions options) {
+ final HConnection connection = this;
+ return new Callable<Long>() {
+ @Override
+ public Long call() throws IOException {
+ return getRegionServerWithoutRetries(new ServerCallableForBatchOps<Long>(
+ connection, address, options) {
+ @Override
+ public Long call() throws IOException {
+ return server.getCurrentTimeMillis();
+ }
+ });
+ }
+ };
+ }
+
+ private Callable<MapWritable> createGetLastFlushTimesCallable(
+ final HServerAddress address, final HBaseRPCOptions options) {
+ final HConnection connection = this;
+ return new Callable<MapWritable>() {
+ @Override
+ public MapWritable call() throws IOException {
+ return getRegionServerWithoutRetries(new ServerCallableForBatchOps<MapWritable>(
+ connection, address, options) {
+ @Override
+ public MapWritable call() throws IOException {
+ return server.getLastFlushTimes();
+ }
+ });
+ }
+ };
+ }
+
+ private Callable<Void> createFlushCallable(final HServerAddress address,
+ final HRegionInfo region, final long targetFlushTime,
+ final HBaseRPCOptions options) {
+ final HConnection connection = this;
+ return new Callable<Void>() {
+ @Override
+ public Void call() throws IOException {
+ return getRegionServerWithoutRetries(new ServerCallableForBatchOps<Void>(
+ connection, address, options) {
+ @Override
+ public Void call() throws IOException {
+ server.flushRegion(region.getRegionName(), targetFlushTime);
+ return null;
+ }
+ });
+ }
+ };
+ }
+
+ private Callable<MultiResponse> createMultiActionCallable(
+ final HServerAddress address, final MultiAction multi,
+ final byte[] tableName, final HBaseRPCOptions options) {
+ final HConnection connection = this;
+ // no need to track mutations here. Done at the caller.
+ return new Callable<MultiResponse>() {
+ @Override
+ public MultiResponse call() throws IOException {
+ return getRegionServerWithoutRetries(
+ new ServerCallableForBatchOps<MultiResponse>(connection, address,
+ options) {
+ @Override
+ public MultiResponse call() throws IOException,
+ InterruptedException, ExecutionException {
+ return server.multiAction(multi);
+ }
+ }, true);
+ }
+ };
+ }
+
+ private HRegionLocation getRegionLocationForRowWithRetries(
+ byte[] tableName, byte[] rowKey, boolean reload) throws IOException {
+ boolean reloadFlag = reload;
+ List<Throwable> exceptions = new ArrayList<Throwable>();
+ HRegionLocation location = null;
+ int tries = 0;
+ for (; tries < params.getNumRetries();) {
+ try {
+ location = getRegionLocation(tableName, rowKey, reloadFlag);
+ } catch (Throwable t) {
+ exceptions.add(t);
+ }
+ if (location != null) {
+ break;
+ }
+ reloadFlag = true;
+ tries++;
+ try {
+ Thread.sleep(params.getPauseTime(tries));
+ } catch (InterruptedException e) {
+ // continue
+ }
+ }
+ if (location == null) {
+ throw new RetriesExhaustedException(
+ " -- nothing found, no 'location' returned," + " tableName="
+ + Bytes.toString(tableName) + ", reload=" + reload + " --",
+ HConstants.EMPTY_BYTE_ARRAY, rowKey, tries, exceptions);
+ }
+ return location;
+ }
+
+ private <R extends Row> Map<HServerAddress, MultiAction> splitRequestsByRegionServer(
+ List<R> workingList, final byte[] tableName, boolean isGets)
+ throws IOException {
+ Map<HServerAddress, MultiAction> actionsByServer = new HashMap<HServerAddress, MultiAction>();
+ for (int i = 0; i < workingList.size(); i++) {
+ Row row = workingList.get(i);
+ if (row != null) {
+ HRegionLocation loc = locateRegion(tableName, row.getRow(), true);
+
+ byte[] regionName = loc.getRegionInfo().getRegionName();
+
+ MultiAction actions = actionsByServer.get(loc.getServerAddress());
+ if (actions == null) {
+ actions = new MultiAction();
+ actionsByServer.put(loc.getServerAddress(), actions);
+ }
+
+ if (isGets) {
+ actions.addGet(regionName, (Get) row, i);
+ } else {
+ trackMutationsToTable(tableName, loc);
+ actions.mutate(regionName, (Mutation) row);
+ }
+ }
+ }
+ return actionsByServer;
+ }
+
+ private Map<HServerAddress, Future<MultiResponse>> makeServerRequests(
+ Map<HServerAddress, MultiAction> actionsByServer,
+ final byte[] tableName, ExecutorService pool, HBaseRPCOptions options) {
+
+ Map<HServerAddress, Future<MultiResponse>> futures = new HashMap<HServerAddress, Future<MultiResponse>>(
+ actionsByServer.size());
+
+ boolean singleServer = (actionsByServer.size() == 1);
+ for (Entry<HServerAddress, MultiAction> e : actionsByServer.entrySet()) {
+ Callable<MultiResponse> callable = createMultiActionCallable(
+ e.getKey(), e.getValue(), tableName, options);
+ Future<MultiResponse> task;
+ if (singleServer) {
+ task = new FutureTask<MultiResponse>(callable);
+ ((FutureTask<MultiResponse>) task).run();
+ } else {
+ task = pool.submit(callable);
+ }
+ futures.put(e.getKey(), task);
+ }
+ return futures;
+ }
+
+ /*
+ * Collects responses from each of the RegionServers. If there are failures,
+ * a list of failed operations is returned.
+ */
+ private List<Mutation> collectResponsesForMutateFromAllRS(byte[] tableName,
+ Map<HServerAddress, MultiAction> actionsByServer,
+ Map<HServerAddress, Future<MultiResponse>> futures,
+ Map<String, HRegionFailureInfo> failureInfo)
+ throws InterruptedException, IOException {
+
+ List<Mutation> newWorkingList = null;
+ for (Entry<HServerAddress, Future<MultiResponse>> responsePerServer : futures
+ .entrySet()) {
+ HServerAddress address = responsePerServer.getKey();
+ MultiAction request = actionsByServer.get(address);
+
+ Future<MultiResponse> future = responsePerServer.getValue();
+ MultiResponse resp = null;
+
+ try {
+ resp = future.get();
+ } catch (InterruptedException ie) {
+ throw ie;
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof DoNotRetryIOException)
+ throw (DoNotRetryIOException) e.getCause();
+ }
+
+ // If we got a response. Let us go through the responses from each
+ // region and
+ // process the Puts and Deletes.
+ // If the response is null, we will add it to newWorkingList here.
+ if (request.getDeletes() != null) {
+ newWorkingList = processMutationResponseFromOneRegionServer(
+ tableName, address, resp, request.getDeletes(), newWorkingList,
+ true, failureInfo);
+ }
+ if (request.getPuts() != null) {
+ newWorkingList = processMutationResponseFromOneRegionServer(
+ tableName, address, resp, request.getPuts(), newWorkingList,
+ false, failureInfo);
+ }
+ }
+ return newWorkingList;
+ }
+
+ /*
+ * Collects responses from each of the RegionServers and populates the
+ * result array. If there are failures, a list of failed operations is
+ * returned.
+ */
+ private List<Get> collectResponsesForGetFromAllRS(byte[] tableName,
+ Map<HServerAddress, MultiAction> actionsByServer,
+ Map<HServerAddress, Future<MultiResponse>> futures,
+ List<Get> orig_list, Result[] results,
+ Map<String, HRegionFailureInfo> failureInfo) throws IOException,
+ InterruptedException {
+
+ List<Get> newWorkingList = null;
+ for (Entry<HServerAddress, Future<MultiResponse>> responsePerServer : futures
+ .entrySet()) {
+ HServerAddress address = responsePerServer.getKey();
+ MultiAction request = actionsByServer.get(address);
+
+ Future<MultiResponse> future = responsePerServer.getValue();
+ MultiResponse resp = null;
+
+ try {
+ resp = future.get();
+ } catch (InterruptedException ie) {
+ throw ie;
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof DoNotRetryIOException) {
+ throw (DoNotRetryIOException) e.getCause();
+ }
+ e.printStackTrace();
+ }
+
+ if (resp == null) {
+ // Entire server failed
+ LOG.debug("Failed all for server: " + address
+ + ", removing from cache");
+ }
+
+ newWorkingList = processGetResponseFromOneRegionServer(tableName,
+ address, request, resp, orig_list, newWorkingList, results,
+ failureInfo);
+ }
+ return newWorkingList;
+ }
+
+ private <R extends Mutation> List<Mutation> processMutationResponseFromOneRegionServer(
+ byte[] tableName, HServerAddress address, MultiResponse resp,
+ Map<byte[], List<R>> map, List<Mutation> newWorkingList,
+ boolean isDelete, Map<String, HRegionFailureInfo> failureInfo)
+ throws IOException {
+ // If we got a response. Let us go through the responses from each region
+ // and
+ // process the Puts and Deletes.
+ for (Map.Entry<byte[], List<R>> e : map.entrySet()) {
+ byte[] regionName = e.getKey();
+ List<R> regionOps = map.get(regionName);
+
+ long result = 0;
+ try {
+ if (isDelete)
+ result = resp.getDeleteResult(regionName);
+ else
+ result = resp.getPutResult(regionName);
+
+ if (result != -1) {
+ if (newWorkingList == null)
+ newWorkingList = new ArrayList<Mutation>();
+
+ newWorkingList.addAll(regionOps.subList((int) result,
+ regionOps.size()));
+ }
+ } catch (Exception ex) {
+ String serverName = address.getHostname();
+ String regName = Bytes.toStringBinary(regionName);
+ // If response is null, we will catch a NPE here.
+ translateException(ex);
+
+ if (!failureInfo.containsKey(regName)) {
+ failureInfo.put(regName, new HRegionFailureInfo(regName));
+ }
+ failureInfo.get(regName).addException(ex);
+ failureInfo.get(regName).setServerName(serverName);
+
+ if (newWorkingList == null)
+ newWorkingList = new ArrayList<Mutation>();
+
+ newWorkingList.addAll(regionOps);
+ // enough to remove from cache one of the rows from the region
+ deleteCachedLocation(tableName, regionOps.get(0).getRow(), address);
+ }
+ }
+
+ return newWorkingList;
+ }
+
+ private List<Get> processGetResponseFromOneRegionServer(byte[] tableName,
+ HServerAddress address, MultiAction request, MultiResponse resp,
+ List<Get> orig_list, List<Get> newWorkingList, Result[] results,
+ Map<String, HRegionFailureInfo> failureInfo) throws IOException {
+
+ for (Map.Entry<byte[], List<Get>> e : request.getGets().entrySet()) {
+ byte[] regionName = e.getKey();
+ List<Get> regionGets = request.getGets().get(regionName);
+ List<Integer> origIndices = request.getOriginalIndex().get(regionName);
+
+ Result[] getResult;
+ try {
+ getResult = resp.getGetResult(regionName);
+
+ // fill up the result[] array accordingly
+ assert (origIndices.size() == getResult.length);
+ for (int i = 0; i < getResult.length; i++) {
+ results[origIndices.get(i)] = getResult[i];
+ }
+ } catch (Exception ex) {
+ // If response is null, we will catch a NPE here.
+ translateException(ex);
+
+ if (newWorkingList == null)
+ newWorkingList = new ArrayList<Get>(orig_list.size());
+
+ String serverName = address.getHostname();
+ String regName = Bytes.toStringBinary(regionName);
+
+ if (!failureInfo.containsKey(regName)) {
+ failureInfo.put(regName, new HRegionFailureInfo(regName));
+ }
+ failureInfo.get(regName).addException(ex);
+ failureInfo.get(regName).setServerName(serverName);
+
+ // Add the element to the correct position
+ for (int i = 0; i < regionGets.size(); i++) {
+ newWorkingList.add(origIndices.get(i), regionGets.get(i));
+ }
+
+ // enough to clear this once for a region
+ deleteCachedLocation(tableName, regionGets.get(0).getRow(), address);
+ }
+ }
+
+ return newWorkingList;
+ }
+
+ @Override
+ public void processBatchedMutations(List<Mutation> orig_list,
+ final byte[] tableName, ExecutorService pool, List<Mutation> failures,
+ HBaseRPCOptions options) throws IOException, InterruptedException {
+
+ // Keep track of the most recent servers for any given item for better
+ // exception reporting. We keep HRegionLocation to save on parsing.
+ // Later below when we use lastServers, we'll pull what we need from
+ // lastServers.
+ // Sort the puts based on the row key in order to optimize the row lock
+ // acquiring
+ // in the server side.
+
+ Map<String, HRegionFailureInfo> failureInfo = new HashMap<String, HRegionFailureInfo>();
+ List<Mutation> workingList = orig_list;
+ Collections.sort(workingList);
+
+ for (int tries = 0; workingList != null && !workingList.isEmpty()
+ && tries < params.getNumRetries(); ++tries) {
+
+ if (tries >= 1) {
+ long sleepTime = params.getPauseTime(tries);
+ LOG.debug("Retry " + tries + ", sleep for " + sleepTime + "ms!");
+ Thread.sleep(sleepTime);
+ }
+
+ // step 1: break up into regionserver-sized chunks and build the data
+ // structs
+ Map<HServerAddress, MultiAction> actionsByServer = splitRequestsByRegionServer(
+ workingList, tableName, false);
+
+ // step 2: make the requests
+ Map<HServerAddress, Future<MultiResponse>> futures = makeServerRequests(
+ actionsByServer, tableName, pool, options);
+
+ // step 3: collect the failures and successes and prepare for retry
+ workingList = collectResponsesForMutateFromAllRS(tableName,
+ actionsByServer, futures, failureInfo);
+ }
+
+ if (workingList != null && !workingList.isEmpty()) {
+ if (failures != null)
+ failures.addAll(workingList);
+ throw new RetriesExhaustedException(failureInfo, workingList.size()
+ + "mutate operations remaining after " + params.getNumRetries()
+ + " retries");
+ }
+ }
+
+ @Override
+ public void processBatchedGets(List<Get> orig_list, final byte[] tableName,
+ ExecutorService pool, Result[] results, HBaseRPCOptions options)
+ throws IOException, InterruptedException {
+
+ Map<String, HRegionFailureInfo> failureInfo = new HashMap<String, HRegionFailureInfo>();
+ // if results is not NULL
+ // results must be the same size as list
+ if (results != null && (results.length != orig_list.size())) {
+ throw new IllegalArgumentException(
+ "argument results must be the same size as argument list");
+ }
+
+ // Keep track of the most recent servers for any given item for better
+ // exception reporting. We keep HRegionLocation to save on parsing.
+ // Later below when we use lastServers, we'll pull what we need from
+ // lastServers.
+ List<Get> workingList = orig_list;
+
+ for (int tries = 0; workingList != null && !workingList.isEmpty()
+ && tries < params.getNumRetries(); ++tries) {
+
+ if (tries >= 1) {
[... 1184 lines stripped ...]