You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/08/28 22:58:20 UTC
svn commit: r1378337 - in
/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase:
client/HConnectionManager.java client/HTableMultiplexer.java
client/PreemptiveFastFailException.java client/ServerCallable.java
ipc/HBaseClient.java
Author: mbautin
Date: Tue Aug 28 20:58:20 2012
New Revision: 1378337
URL: http://svn.apache.org/viewvc?rev=1378337&view=rev
Log:
[HBASE-6629] [0.89-fb] Fast fail, without retries, if the client has not been able to contact a server in a while
Author: aaiyer
Summary:
New way to improve client side waiting for a unresponsive regionserver.
1) keep count of repeated connect failures to a server.
2) throw RepeatedConnectException instead of ConnectException.
This contains more deatils such as
- # of failures
- time of first failure
show rpcRetryTimetout and current time
Test Plan:
Run unit tests on MR to test for existing stuff. -- All pass.
Deploy to dev cluster.
- run a load tester job,
- kill one of the RS during the load testing job
- see that the RepeatedConnectExceptions get thrown
- see the # of ops should be better with the diff.
Graph for #ops and latency (ms) uploaded to
https://www.facebook.com/pxlcld/8hnw
https://www.facebook.com/pxlcld/8hnh
Reviewers: kranganathan, kannan, avf
Reviewed By: kranganathan
CC: hbase-eng@, riyer, anshumansingh26, liujiakai
Differential Revision: https://phabricator.fb.com/D535439
Task ID: 1285514
Added:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailException.java
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1378337&r1=1378336&r2=1378337&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Tue Aug 28 20:58:20 2012
@@ -29,6 +29,7 @@ import java.net.SocketTimeoutException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
@@ -39,12 +40,15 @@ import java.util.Set;
import java.util.SortedMap;
import java.util.TreeSet;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -376,6 +380,44 @@ public class HConnectionManager {
cachedRegionLocations =
new HashMap<Integer, SortedMap<byte [], HRegionLocation>>();
+ // amount of time to wait before we consider a server to be in fast fail mode
+ private long fastFailThresholdMilliSec;
+ // Keeps track of failures when we cannot talk to a server. Helps in
+ // fast failing clients if the server is down for a long time.
+ private final ConcurrentMap<HServerAddress, FailureInfo> repeatedFailuresMap =
+ new ConcurrentHashMap<HServerAddress, FailureInfo>();
+ // We populate repeatedFailuresMap every time there is a failure. So, to keep it
+ // from growing unbounded, we garbage collect the failure information
+ // every cleanupInterval.
+ private final long failureMapCleanupIntervalMilliSec;
+ private volatile long lastFailureMapCleanupTimeMilliSec;
+ // Amount of time that has to pass, before we clear region -> regionserver cache
+ // again, when in fast fail mode.
+ private long cacheClearingTimeoutMilliSec;
+
+ /**
+ * Keeps track of repeated failures to any region server.
+ * @author amitanand.s
+ *
+ */
+ private class FailureInfo {
+ // The number of consecutive failures.
+ private final AtomicLong numConsecutiveFailures = new AtomicLong();
+ // The time when the server started to become unresponsive
+ // Once set, this would never be updated.
+ private long timeOfFirstFailureMilliSec;
+ // The time when the client last tried to contact the server.
+ // This is only updated by one client at a time
+ private volatile long timeOfLatestAttemptMilliSec;
+ // The time when the client last cleared cache for regions assigned
+ // to the server. Used to ensure we don't clearCache too often.
+ private volatile long timeOfLatestCacheClearMilliSec;
+ // Used to keep track of concurrent attempts to contact the server.
+ // In Fast fail mode, we want just one client thread to try to connect
+ // the rest of the client threads will fail fast.
+ private final AtomicBoolean
+ exclusivelyRetringInspiteOfFastFail = new AtomicBoolean(false);
+ }
// The presence of a server in the map implies it's likely that there is an
// entry in cachedRegionLocations that map to this server; but the absence
// of a server in this map guarentees that there is no entry in cache that
@@ -426,6 +468,12 @@ public class HConnectionManager {
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.rpcRetryTimeout = conf.getLong("hbase.client.rpc.retry.timeout",
Long.MAX_VALUE);
+ this.cacheClearingTimeoutMilliSec = conf.getLong("hbase.client.fastfail.cache.clear.interval",
+ 10000); // 10 sec
+ this.fastFailThresholdMilliSec = conf.getLong("hbase.client.fastfail.threshold",
+ 60000); // 1 min
+ this.failureMapCleanupIntervalMilliSec = conf.getLong(
+ "hbase.client.fastfail.cleanup.map.interval.millisec", 600000); // 10 min
this.prefetchRegionLimit = conf.getInt("hbase.client.prefetch.limit",
10);
@@ -863,10 +911,31 @@ public class HConnectionManager {
+ Bytes.toStringBinary(row) + " after " + numRetries + " tries.");
}
+ FailureInfo fInfo = null;
+ HServerAddress server = null;
+ boolean didTry = false;
+ boolean couldNotCommunicateWithServer = false;
+ boolean retryDespiteFastFailMode = false;
try {
// locate the root or meta region
HRegionLocation metaLocation = locateRegion(parentTable, metaKey);
- HRegionInterface server =
+
+ server = metaLocation.getServerAddress();
+ // Handle the case where .META. is on an unresponsive server.
+ if (inFastFailMode(server)) {
+ // In Fast-fail mode, all but one thread will fast fail. Check
+ // if we are that one chosen thread.
+ fInfo = repeatedFailuresMap.get(server);
+ retryDespiteFastFailMode = shouldRetryInspiteOfFastFail(fInfo);
+
+ if (retryDespiteFastFailMode == false) { // we don't have to retry
+ throw new PreemptiveFastFailException(fInfo.numConsecutiveFailures.get(),
+ fInfo.timeOfFirstFailureMilliSec, fInfo.timeOfLatestAttemptMilliSec);
+ }
+ }
+ didTry = true;
+
+ HRegionInterface serverInterface =
getHRegionConnection(metaLocation.getServerAddress());
Result regionInfoRow = null;
@@ -895,7 +964,7 @@ public class HConnectionManager {
}
// Query the root or meta region for the location of the meta region
- regionInfoRow = server.getClosestRowBefore(
+ regionInfoRow = serverInterface.getClosestRowBefore(
metaLocation.getRegionInfo().getRegionName(), metaKey,
HConstants.CATALOG_FAMILY);
}
@@ -948,6 +1017,9 @@ public class HConnectionManager {
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException(
(RemoteException) e);
+ } else if (isNetworkException(e)) {
+ couldNotCommunicateWithServer = true;
+ handleFailureToServer(server);
}
if (tries < numRetries - 1) {
if (LOG.isDebugEnabled()) {
@@ -963,6 +1035,9 @@ public class HConnectionManager {
e instanceof NoServerForRegionException)) {
relocateRegion(parentTable, metaKey);
}
+ } finally {
+ updateFailureInfoForServer(server, fInfo, didTry,
+ couldNotCommunicateWithServer, retryDespiteFastFailMode);
}
try{
Thread.sleep(getPauseTime(tries));
@@ -972,6 +1047,24 @@ public class HConnectionManager {
}
}
+ /**
+ * Check if the exception is something that indicates that we cannot
+ * contact/communicate with the server.
+ *
+ * @param e
+ * @return
+ */
+ private boolean isNetworkException(Throwable e) {
+ // This list covers most connectivity exceptions but not all.
+ // For example, in SocketOutputStream a plain IOException is thrown
+ // at times when the channel is closed.
+ return (e instanceof SocketTimeoutException ||
+ e instanceof ConnectException ||
+ e instanceof ClosedChannelException ||
+ e instanceof SyncFailedException ||
+ e instanceof EOFException);
+ }
+
/*
* Search the cache for a location that fits our table and row key.
* Return null if no suitable region is located. TODO: synchronization note
@@ -1082,6 +1175,7 @@ public class HConnectionManager {
private void clearCachedLocationForServer(
final String server) {
boolean deletedSomething = false;
+
synchronized (this.cachedRegionLocations) {
if (!cachedServers.contains(server)) {
return;
@@ -1345,14 +1439,16 @@ public class HConnectionManager {
// If we are not supposed to retry; Let it pass through.
throw ioe;
+ } catch (RegionOverloadedException roe) {
+ exceptions.add(roe);
+ serverRequestedWaitTime = roe.getBackoffTimeMillis();
+ continue;
+ } catch (PreemptiveFastFailException pfe) {
+ // Bail out of the retry loop, if the host has been consistently unreachable.
+ throw pfe;
} catch (Throwable t) {
exceptions.add(t);
- if (t instanceof RegionOverloadedException) {
- serverRequestedWaitTime = ((RegionOverloadedException)t).getBackoffTimeMillis();
- continue;
- }
-
if (tries == numRetries - 1) {
throw new RetriesExhaustedException(callable.getServerName(),
callable.getRegionName(), callable.getRow(), tries, exceptions);
@@ -1369,6 +1465,7 @@ public class HConnectionManager {
if (prevLoc.getServerAddress().
equals(callable.location.getServerAddress())) {
+ // Bail out of the retry loop if we have to wait too long
long pauseTime = getPauseTime(tries);
if ((System.currentTimeMillis() - callStartTime + pauseTime) >
rpcRetryTimeout) {
@@ -1402,6 +1499,7 @@ public class HConnectionManager {
return getRegionServerWithoutRetries(callable, true);
}
+
/**
* Pass in a ServerCallable with your particular bit of logic defined and
* this method will pass it to the defined region server.
@@ -1410,32 +1508,46 @@ public class HConnectionManager {
* @return an object of type T
* @throws IOException if a remote or network exception occurs
* @throws RuntimeException other unspecified error
+ * @throws PreemptiveFastFailException if the remote host has been known to be
+ * unreachable for more than this.fastFailThresholdMilliSec.
*/
private <T> T getRegionServerWithoutRetries(ServerCallable<T> callable,
boolean instantiateRegionLocation)
- throws IOException, RuntimeException {
+ throws IOException, RuntimeException, PreemptiveFastFailException {
+ FailureInfo fInfo = null;
+ HServerAddress server = null;
+ boolean didTry = false;
+ boolean couldNotCommunicateWithServer = false;
+ boolean retryDespiteFastFailMode = false;
try {
if (instantiateRegionLocation) callable.instantiateRegionLocation(false);
+
+ // Logic to fast fail requests to unreachable servers.
+ server = callable.getServerAddress();
+ if (inFastFailMode(server)) {
+ // In Fast-fail mode, all but one thread will fast fail. Check
+ // if we are that one chosen thread.
+ fInfo = repeatedFailuresMap.get(server);
+ retryDespiteFastFailMode = shouldRetryInspiteOfFastFail(fInfo);
+ if (retryDespiteFastFailMode == false) { // we don't have to retry
+ throw new PreemptiveFastFailException(fInfo.numConsecutiveFailures.get(),
+ fInfo.timeOfFirstFailureMilliSec, fInfo.timeOfLatestAttemptMilliSec);
+ }
+ }
+ didTry = true;
+
callable.instantiateServer();
return callable.call();
+ } catch (PreemptiveFastFailException pfe) {
+ throw pfe;
} catch (Throwable t1) {
Throwable t2 = translateException(t1);
boolean isLocalException = !(t2 instanceof RemoteException);
// translateException throws DoNotRetryException or any
// non-IOException.
- if (isLocalException && (t2 instanceof SocketTimeoutException ||
- t2 instanceof ConnectException ||
- t2 instanceof ClosedChannelException ||
- t2 instanceof SyncFailedException ||
- t2 instanceof EOFException)) {
- // XXX this list covers most connectivity exceptions but not all.
- // For example, in SocketOutputStream a plain IOException is thrown
- // at times when the channel is closed.
-
- // if thrown these exceptions, we clear all the cache entries that
- // map to that slow/dead server; otherwise, let cache miss and ask
- // .META. again to find the new location
- clearCachedLocationForServer(callable.location.getServerAddress().toString());
+ if (isLocalException && isNetworkException(t2)) {
+ couldNotCommunicateWithServer = true;
+ handleFailureToServer(server);
}
if (t2 instanceof IOException) {
@@ -1443,9 +1555,142 @@ public class HConnectionManager {
} else {
throw new RuntimeException(t2);
}
+ } finally {
+ updateFailureInfoForServer(server, fInfo, didTry,
+ couldNotCommunicateWithServer, retryDespiteFastFailMode);
}
}
+ /**
+ * Handles failures encountered when communicating with a server.
+ *
+ * Updates the FailureInfo in repeatedFailuresMap to reflect the
+ * failure. Throws RepeatedConnectException if the client is in
+ * Fast fail mode.
+ *
+ * @param server
+ * @throws PreemptiveFastFailException
+ */
+ private void handleFailureToServer(HServerAddress server)
+ throws PreemptiveFastFailException {
+ if (server == null) return;
+
+ long currentTime = System.currentTimeMillis();
+ FailureInfo fInfo = repeatedFailuresMap.get(server);
+ if (fInfo == null) {
+ fInfo = new FailureInfo();
+ fInfo.timeOfFirstFailureMilliSec = currentTime;
+ fInfo = repeatedFailuresMap.putIfAbsent(server, fInfo);
+ }
+ fInfo.timeOfLatestAttemptMilliSec = currentTime;
+ fInfo.numConsecutiveFailures.incrementAndGet();
+
+ if (inFastFailMode(server)) {
+ // In FastFail mode, do not clear out the cache if it was done recently.
+ if (currentTime > fInfo.timeOfLatestCacheClearMilliSec + cacheClearingTimeoutMilliSec) {
+ fInfo.timeOfLatestCacheClearMilliSec = currentTime;
+ clearCachedLocationForServer(server.toString());
+ }
+ throw new PreemptiveFastFailException(fInfo.numConsecutiveFailures.get(),
+ fInfo.timeOfFirstFailureMilliSec, fInfo.timeOfLatestAttemptMilliSec);
+ }
+
+ // if thrown these exceptions, we clear all the cache entries that
+ // map to that slow/dead server; otherwise, let cache miss and ask
+ // .META. again to find the new location
+ fInfo.timeOfLatestCacheClearMilliSec = currentTime;
+ clearCachedLocationForServer(server.toString());
+ }
+
+ /**
+ * Occasionally cleans up unused information in repeatedFailuresMap.
+ *
+ * repeatedFailuresMap stores the failure information for all
+ * remote hosts that had failures. In order to avoid these from growing
+ * indefinitely, occassionallyCleanupFailureInformation() will clear these up once
+ * every cleanupInterval ms.
+ */
+ private void occasionallyCleanupFailureInformation() {
+ long now = System.currentTimeMillis();
+ if (!(now > lastFailureMapCleanupTimeMilliSec + failureMapCleanupIntervalMilliSec))
+ return;
+
+ // remove entries that haven't been attempted in a while
+ // No synchronization needed. It is okay if multiple threads try to
+ // remove the entry again and again from a concurrent hash map.
+ lastFailureMapCleanupTimeMilliSec = now;
+ for(Entry<HServerAddress, FailureInfo> entry : repeatedFailuresMap.entrySet()) {
+ if (now > entry.getValue().timeOfLatestAttemptMilliSec
+ + failureMapCleanupIntervalMilliSec) {
+ repeatedFailuresMap.remove(entry.getKey());
+ }
+ }
+ }
+
+ /**
+ * Checks to see if we are in the Fast fail mode for requests to the server.
+ *
+ * If a client is unable to contact a server for more than fastFailThresholdMilliSec
+ * the client will get into fast fail mode.
+ *
+ * @param server
+ * @return true if the client is in fast fail mode for the server.
+ */
+ private boolean inFastFailMode(HServerAddress server) {
+ FailureInfo fInfo = repeatedFailuresMap.get(server);
+ // if fInfo is null --> The server is considered good.
+ // If the server is bad, wait long enough to believe that the server is down.
+ return (fInfo != null && System.currentTimeMillis() >
+ fInfo.timeOfFirstFailureMilliSec + this.fastFailThresholdMilliSec);
+ }
+
+ /**
+ * Check to see if the client should try to connnect to the server, inspite of
+ * knowing that it is in the fast fail mode.
+ *
+ * The idea here is that we want just one client thread to be actively trying to
+ * reconnect, while all the other threads trying to reach the server will short circuit.
+ *
+ * @param fInfo
+ * @return true if the client should try to connect to the server.
+ */
+ private boolean shouldRetryInspiteOfFastFail(FailureInfo fInfo) {
+ // We believe that the server is down, But, we want to have just one client
+ // actively trying to connect. If we are the chosen one, we will retry
+ // and not throw an exception.
+ return (fInfo != null &&
+ fInfo.exclusivelyRetringInspiteOfFastFail.compareAndSet(false, true));
+ }
+
+ /**
+ * updates the failure information for the server.
+ *
+ * @param server
+ * @param fInfo
+ * @param couldNotCommunicate
+ * @param retryDespiteFastFailMode
+ */
+ private void updateFailureInfoForServer(HServerAddress server, FailureInfo fInfo,
+ boolean didTry, boolean couldNotCommunicate, boolean retryDespiteFastFailMode) {
+ if (server == null || fInfo == null || didTry == false) return;
+
+ // If we were able to connect to the server, reset the failure information.
+ if (couldNotCommunicate == false) {
+ repeatedFailuresMap.remove(server);
+ } else {
+ // update time of last attempt
+ long currentTime = System.currentTimeMillis();
+ fInfo.timeOfLatestAttemptMilliSec = currentTime;
+
+ // Release the lock if we were retrying inspite of FastFail
+ if (retryDespiteFastFailMode) {
+ fInfo.exclusivelyRetringInspiteOfFastFail.set(false);
+ }
+ }
+
+ occasionallyCleanupFailureInformation();
+ }
+
private <R> Callable<MultiResponse> createMultiActionCallable(final HServerAddress address,
final MultiAction multi, final byte [] tableName,
final HBaseRPCOptions options) {
@@ -2123,6 +2368,11 @@ public class HConnectionManager {
toThrow = roe;
}
}
+
+ if (singleServer &&
+ ex.getCause() instanceof PreemptiveFastFailException) {
+ throw (PreemptiveFastFailException)ex.getCause();
+ }
}
// For each region
@@ -2222,6 +2472,8 @@ public class HConnectionManager {
serverRequestedWaitTime = ex.getBackoffTimeMillis();
// do not clear the list
continue;
+ } catch (PreemptiveFastFailException pfe) {
+ throw pfe;
}
list.clear();
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java?rev=1378337&r1=1378336&r2=1378337&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java Tue Aug 28 20:58:20 2012
@@ -403,7 +403,19 @@ public class HTableMultiplexer {
}
// Process this multiput request
- List<Put> failed = connection.processListOfMultiPut(Arrays.asList(mput), null, options);
+ List<Put> failed = null;
+ try {
+ failed = connection.processListOfMultiPut(Arrays.asList(mput), null, options);
+ } catch(PreemptiveFastFailException e) {
+ // Client is not blocking on us. So, let us treat this
+ // as a normal failure, and retry.
+ for (PutStatus putStatus: processingList) {
+ if (!resubmitFailedPut(putStatus, this.addr)) {
+ failedCount++;
+ }
+ }
+ }
+
if (failed != null) {
if (failed.size() == processingList.size()) {
// All the puts for this region server are failed. Going to retry it later
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailException.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailException.java?rev=1378337&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailException.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailException.java Tue Aug 28 20:58:20 2012
@@ -0,0 +1,43 @@
+/**
+ *
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.net.ConnectException;
+
+/**
+ * Thrown when the client believes that we are trying to communicate to has
+ * been repeatedly unresponsive for a while.
+ *
+ * On receiving such an exception. The HConnectionManager will skip all
+ * retries and fast fail the operation.
+ */
+public class PreemptiveFastFailException extends ConnectException {
+ private static final long serialVersionUID = 7129103682617007177L;
+ private long failureCount, timeOfFirstFailureMilliSec, timeOfLatestAttemptMilliSec;
+
+ /**
+ * @param count
+ * @param timeOfFirstFailureMilliSec
+ * @param timeOfLatestAttemptMilliSec
+ */
+ public PreemptiveFastFailException(long count, long timeOfFirstFailureMilliSec,
+ long timeOfLatestAttemptMilliSec) {
+ super("Exception happened " + count + " times.");
+ this.failureCount = count;
+ this.timeOfFirstFailureMilliSec = timeOfFirstFailureMilliSec;
+ this.timeOfLatestAttemptMilliSec = timeOfLatestAttemptMilliSec;
+ }
+
+ public long getFirstFailureAt() {
+ return timeOfFirstFailureMilliSec;
+ }
+
+ public long getLastAttemptAt() {
+ return timeOfLatestAttemptMilliSec;
+ }
+
+ public long getFailureCount() {
+ return failureCount;
+ }
+}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java?rev=1378337&r1=1378336&r2=1378337&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java Tue Aug 28 20:58:20 2012
@@ -21,6 +21,7 @@
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseRPCOptions;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
@@ -89,6 +90,14 @@ public abstract class ServerCallable<T>
return location.getServerAddress().toString();
}
+ /** @return the server address */
+ public HServerAddress getServerAddress() {
+ if (location == null) {
+ return null;
+ }
+ return location.getServerAddress();
+ }
+
/** @return the region name */
public byte[] getRegionName() {
if (location == null) {
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1378337&r1=1378336&r2=1378337&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Tue Aug 28 20:58:20 2012
@@ -41,6 +41,8 @@ import java.net.UnknownHostException;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -77,8 +79,9 @@ public class HBaseClient {
private static final Log LOG =
LogFactory.getLog("org.apache.hadoop.ipc.HBaseClient");
- protected final Hashtable<ConnectionId, Connection> connections =
- new Hashtable<ConnectionId, Connection>();
+ // Active connections are stored in connections.
+ protected final ConcurrentMap<ConnectionId, Connection> connections =
+ new ConcurrentHashMap<ConnectionId, Connection>();
protected final Class<? extends Writable> valueClass; // class of call values
protected int counter; // counter for call ids
@@ -215,9 +218,14 @@ public class HBaseClient {
* @param call to add
* @return true if the call was added.
*/
- protected synchronized boolean addCall(Call call) {
- if (shouldCloseConnection.get())
+ protected synchronized boolean addCall(Call call) throws IOException {
+ if (shouldCloseConnection.get()) {
+ // If there was something bad. Let not the next thread spend a while
+ // figuring it out.
+ if (closeException != null)
+ throw closeException;
return false;
+ }
calls.put(call.id, call);
notify();
return true;
@@ -285,9 +293,12 @@ public class HBaseClient {
* @throws java.io.IOException e
*/
protected synchronized void setupIOstreams(byte version) throws IOException {
- if (socket != null || shouldCloseConnection.get()) {
+ if (socket != null) { // somebody already set up the IOStreams
return;
}
+ if (shouldCloseConnection.get()) { // somebody already tried and failed.
+ throw closeException;
+ }
short ioFailures = 0;
short timeoutFailures = 0;
@@ -301,7 +312,7 @@ public class HBaseClient {
this.socket.setTcpNoDelay(tcpNoDelay);
this.socket.setKeepAlive(tcpKeepAlive);
NetUtils.connect(this.socket, remoteId.getAddress(),
- connectionTimeOutMillSec);
+ connectionTimeOutMillSec);
if (remoteId.rpcTimeout > 0) {
pingInterval = remoteId.rpcTimeout; // overwrite pingInterval
}
@@ -648,11 +659,7 @@ public class HBaseClient {
// release the resources
// first thing to do;take the connection out of the connection list
- synchronized (connections) {
- if (connections.get(remoteId) == this) {
- connections.remove(remoteId);
- }
- }
+ connections.remove(remoteId, this);
// close the streams and therefore the socket
IOUtils.closeStream(out);
@@ -759,7 +766,7 @@ public class HBaseClient {
this.conf = conf;
this.socketFactory = factory;
this.connectionTimeOutMillSec =
- conf.getInt("hbase.client.connection.timeout.millsec", 5000);
+ conf.getInt("hbase.client.connection.timeout.millsec", 5000);
}
/**
@@ -956,12 +963,13 @@ public class HBaseClient {
ConnectionId remoteId = new ConnectionId(addr, ticket, rpcTimeout,
call.getVersion());
do {
- synchronized (connections) {
+ connection = connections.get(remoteId);
+ if (connection == null) {
+ // Do not worry about creating a new Connection object if
+ // the hash map was already updated. The unused connection
+ // will be automatically closed after a 10 sec timeout (maxIdleTime).
+ connections.putIfAbsent(remoteId, new Connection(remoteId));
connection = connections.get(remoteId);
- if (connection == null) {
- connection = new Connection(remoteId);
- connections.put(remoteId, connection);
- }
}
} while (!connection.addCall(call));