You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nk...@apache.org on 2012/08/21 13:12:11 UTC
svn commit: r1375473 - in /hbase/trunk:
hbase-common/src/main/java/org/apache/hadoop/hbase/
hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/
hbase-server/src/main/java/org/apache/hadoop/hbase/util/
hbase-server/src/test/java/org/apache/hadoop/hb...
Author: nkeywal
Date: Tue Aug 21 11:12:10 2012
New Revision: 1375473
URL: http://svn.apache.org/viewvc?rev=1375473&view=rev
Log:
HBASE-6364 Powering down the server host holding the .META. table causes HBase Client to take excessively long to recover and connect to reassigned .META. table
Added:
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestHBaseClient.java
Modified:
hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/EnvironmentEdgeManager.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ManualEnvironmentEdge.java
Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1375473&r1=1375472&r2=1375473&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java Tue Aug 21 11:12:10 2012
@@ -187,6 +187,9 @@ public final class HConstants {
/** Parameter name for what master implementation to use. */
public static final String MASTER_IMPL= "hbase.master.impl";
+ /** Parameter name for what hbase client implementation to use. */
+ public static final String HBASECLIENT_IMPL= "hbase.hbaseclient.impl";
+
/** Parameter name for how often threads should wake up */
public static final String THREAD_WAKE_FREQUENCY = "hbase.server.thread.wakefrequency";
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java?rev=1375473&r1=1375472&r2=1375473&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientCache.java Tue Aug 21 11:12:10 2012
@@ -18,12 +18,15 @@
package org.apache.hadoop.hbase.ipc;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.Map;
import javax.net.SocketFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.io.Writable;
@@ -32,7 +35,7 @@ import org.apache.hadoop.io.Writable;
* Enables reuse/sharing of clients on a per SocketFactory basis. A client
* establishes certain configuration dependent characteristics like timeouts,
* tcp-keepalive (true or false), etc. For more details on the characteristics,
- * look at {@link HBaseClient#HBaseClient(Class, Configuration, SocketFactory)}
+ * look at {@link HBaseClient#HBaseClient(Configuration, SocketFactory)}
* Creation of dynamic proxies to protocols creates the clients (and increments
* reference count once created), and stopping of the proxies leads to clearing
* out references and when the reference drops to zero, the cache mapping is
@@ -52,12 +55,29 @@ class ClientCache {
* @param factory socket factory
* @return an IPC client
*/
- protected synchronized HBaseClient getClient(Configuration conf,
- SocketFactory factory) {
+ @SuppressWarnings("unchecked")
+ protected synchronized HBaseClient getClient(Configuration conf, SocketFactory factory) {
+
HBaseClient client = clients.get(factory);
if (client == null) {
+ Class<? extends HBaseClient> hbaseClientClass = (Class<? extends HBaseClient>) conf
+ .getClass(HConstants.HBASECLIENT_IMPL, HBaseClient.class);
+
// Make an hbase client instead of hadoop Client.
- client = new HBaseClient(conf, factory);
+ try {
+ Constructor<? extends HBaseClient> cst = hbaseClientClass.getConstructor(
+ Configuration.class, SocketFactory.class);
+ client = cst.newInstance(conf, factory);
+ } catch (InvocationTargetException e) {
+ throw new RuntimeException(e);
+ } catch (InstantiationException e) {
+ throw new RuntimeException(e);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ } catch (NoSuchMethodException e) {
+ throw new RuntimeException("No matching constructor in "+hbaseClientClass.getName(), e);
+ }
+
clients.put(factory, client);
} else {
client.incCount();
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1375473&r1=1375472&r2=1375473&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Tue Aug 21 11:12:10 2012
@@ -38,6 +38,7 @@ import java.net.UnknownHostException;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
@@ -69,6 +70,8 @@ import org.apache.hadoop.hbase.security.
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.PoolMap;
import org.apache.hadoop.hbase.util.PoolMap.PoolType;
import org.apache.hadoop.io.IOUtils;
@@ -113,6 +116,7 @@ public class HBaseClient {
protected final boolean tcpKeepAlive; // if T then use keepalives
protected int pingInterval; // how often sends ping to the server in msecs
protected int socketTimeout; // socket timeout
+ protected FailedServers failedServers;
protected final SocketFactory socketFactory; // how to create sockets
private int refCount = 1;
@@ -124,6 +128,68 @@ public class HBaseClient {
final static int DEFAULT_SOCKET_TIMEOUT = 20000; // 20 seconds
final static int PING_CALL_ID = -1;
+ public final static String FAILED_SERVER_EXPIRY_KEY = "hbase.ipc.client.failed.servers.expiry";
+ public final static int FAILED_SERVER_EXPIRY_DEFAULT = 2000;
+
+ /**
+ * A class to manage a list of servers that failed recently.
+ */
+ static class FailedServers {
+ private final LinkedList<Pair<Long, String>> failedServers = new
+ LinkedList<Pair<Long, java.lang.String>>();
+ private final int recheckServersTimeout;
+
+ FailedServers(Configuration conf) {
+ this.recheckServersTimeout = conf.getInt(
+ FAILED_SERVER_EXPIRY_KEY, FAILED_SERVER_EXPIRY_DEFAULT);
+ }
+
+ /**
+ * Add an address to the list of the failed servers list.
+ */
+ public synchronized void addToFailedServers(InetSocketAddress address) {
+ final long expiry = EnvironmentEdgeManager.currentTimeMillis() + recheckServersTimeout;
+ failedServers.addFirst(new Pair<Long, String>(expiry, address.toString()));
+ }
+
+ /**
+ * Check if the server should be considered as bad. Clean the old entries of the list.
+ *
+ * @return true if the server is in the failed servers list
+ */
+ public synchronized boolean isFailedServer(final InetSocketAddress address) {
+ if (failedServers.isEmpty()) {
+ return false;
+ }
+
+ final String lookup = address.toString();
+ final long now = EnvironmentEdgeManager.currentTimeMillis();
+
+ // iterate, looking for the search entry and cleaning expired entries
+ Iterator<Pair<Long, String>> it = failedServers.iterator();
+ while (it.hasNext()) {
+ Pair<Long, String> cur = it.next();
+ if (cur.getFirst() < now) {
+ it.remove();
+ } else {
+ if (lookup.equals(cur.getSecond())) {
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
+ }
+
+ public static class FailedServerException extends IOException {
+ public FailedServerException(String s) {
+ super(s);
+ }
+ }
+
+
/**
* set the ping interval value in configuration
*
@@ -240,6 +306,15 @@ public class HBaseClient {
tokenHandlers.put(AuthenticationTokenIdentifier.AUTH_TOKEN_TYPE.toString(),
new AuthenticationTokenSelector());
}
+
+ /**
+ * Creates a connection. Can be overridden by a subclass for testing.
+ * @param remoteId - the ConnectionId to use for the connection creation.
+ */
+ protected Connection createConnection(ConnectionId remoteId) throws IOException {
+ return new Connection(remoteId);
+ }
+
/** Thread that reads responses and notifies callers. Each connection owns a
* socket connected to a remote address. Calls are multiplexed through this
* socket: responses may be delivered out of order. */
@@ -263,7 +338,7 @@ public class HBaseClient {
protected final AtomicBoolean shouldCloseConnection = new AtomicBoolean(); // indicate if the connection is closed
protected IOException closeException; // close reason
- public Connection(ConnectionId remoteId) throws IOException {
+ Connection(ConnectionId remoteId) throws IOException {
if (remoteId.getAddress().isUnresolved()) {
throw new UnknownHostException("unknown host: " +
remoteId.getAddress().getHostName());
@@ -359,17 +434,30 @@ public class HBaseClient {
/**
* Add a call to this connection's call queue and notify
- * a listener; synchronized.
- * Returns false if called during shutdown.
+ * a listener; synchronized. If the connection is dead, the call is not added, and the
+ * caller is notified.
+ * This function can return a connection that is already marked as 'shouldCloseConnection'
+ * It is up to the user code to check this status.
* @param call to add
- * @return true if the call was added.
*/
- protected synchronized boolean addCall(Call call) {
- if (shouldCloseConnection.get())
- return false;
- calls.put(call.id, call);
- notify();
- return true;
+ protected synchronized void addCall(Call call) {
+ // If the connection is about to close, we manage this as if the call was already added
+ // to the connection calls list. If not, the connection creations are serialized, as
+ // mentioned in HBASE-6364
+ if (this.shouldCloseConnection.get()) {
+ if (this.closeException == null) {
+ call.setException(new IOException(
+ "Call " + call.id + " not added as the connection " + remoteId + " is closing"));
+ } else {
+ call.setException(this.closeException);
+ }
+ synchronized (call) {
+ call.notifyAll();
+ }
+ } else {
+ calls.put(call.id, call);
+ notify();
+ }
}
/** This class sends a ping to the remote side when timeout on
@@ -682,6 +770,18 @@ public class HBaseClient {
return;
}
+ if (failedServers.isFailedServer(remoteId.getAddress())) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Not trying to connect to " + server +
+ " this server is in the failed servers list");
+ }
+ IOException e = new FailedServerException(
+ "This server is in the failed servers list: " + server);
+ markClosed(e);
+ close();
+ throw e;
+ }
+
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to "+server);
@@ -698,7 +798,7 @@ public class HBaseClient {
final InputStream in2 = inStream;
final OutputStream out2 = outStream;
UserGroupInformation ticket = remoteId.getTicket().getUGI();
- if (authMethod == AuthMethod.KERBEROS) {;
+ if (authMethod == AuthMethod.KERBEROS) {
if (ticket != null && ticket.getRealUser() != null) {
ticket = ticket.getRealUser();
}
@@ -744,6 +844,7 @@ public class HBaseClient {
return;
}
} catch (IOException e) {
+ failedServers.addToFailedServers(remoteId.address);
markClosed(e);
close();
@@ -1037,7 +1138,6 @@ public class HBaseClient {
/**
* Construct an IPC client whose values are of the {@link Message}
* class.
- * @param valueClass value class
* @param conf configuration
* @param factory socket factory
*/
@@ -1057,6 +1157,7 @@ public class HBaseClient {
this.clusterId = conf.get(HConstants.CLUSTER_ID, "default");
this.connections = new PoolMap<ConnectionId, Connection>(
getPoolType(conf), getPoolSize(conf));
+ this.failedServers = new FailedServers(conf);
}
/**
@@ -1297,20 +1398,22 @@ public class HBaseClient {
* refs for keys in HashMap properly. For now its ok.
*/
ConnectionId remoteId = new ConnectionId(addr, protocol, ticket, rpcTimeout);
- do {
- synchronized (connections) {
- connection = connections.get(remoteId);
- if (connection == null) {
- connection = new Connection(remoteId);
- connections.put(remoteId, connection);
- }
+ synchronized (connections) {
+ connection = connections.get(remoteId);
+ if (connection == null) {
+ connection = createConnection(remoteId);
+ connections.put(remoteId, connection);
}
- } while (!connection.addCall(call));
+ }
+ connection.addCall(call);
//we don't invoke the method below inside "synchronized (connections)"
//block above. The reason for that is if the server happens to be slow,
//it will take longer to establish a connection and that will slow the
//entire system down.
+ //Moreover, if the connection is currently created, there will be many threads
+ // waiting here; as setupIOstreams is synchronized. If the connection fails with a
+ // timeout, they will all fail simultaneously. This is checked in setupIOstreams.
connection.setupIOstreams();
return connection;
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/EnvironmentEdgeManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/EnvironmentEdgeManager.java?rev=1375473&r1=1375472&r2=1375473&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/EnvironmentEdgeManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/EnvironmentEdgeManager.java Tue Aug 21 11:12:10 2012
@@ -50,7 +50,7 @@ public class EnvironmentEdgeManager {
* Resets the managed instance to the default instance: {@link
* DefaultEnvironmentEdge}.
*/
- static void reset() {
+ public static void reset() {
injectEdge(new DefaultEnvironmentEdge());
}
@@ -60,7 +60,7 @@ public class EnvironmentEdgeManager {
*
* @param edge the new edge.
*/
- static void injectEdge(EnvironmentEdge edge) {
+ public static void injectEdge(EnvironmentEdge edge) {
if (edge == null) {
reset();
} else {
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ManualEnvironmentEdge.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ManualEnvironmentEdge.java?rev=1375473&r1=1375472&r2=1375473&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ManualEnvironmentEdge.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ManualEnvironmentEdge.java Tue Aug 21 11:12:10 2012
@@ -35,6 +35,10 @@ public class ManualEnvironmentEdge imple
value = newValue;
}
+ public void incValue(long addedValue) {
+ value += addedValue;
+ }
+
@Override
public long currentTimeMillis() {
return this.value;
Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestHBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestHBaseClient.java?rev=1375473&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestHBaseClient.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestHBaseClient.java Tue Aug 21 11:12:10 2012
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdge;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.net.InetSocketAddress;
+
+@Category(MediumTests.class) // Can't be small, we're playing with the EnvironmentEdge
+public class TestHBaseClient {
+
+ @Test
+ public void testFailedServer(){
+ ManualEnvironmentEdge ee = new ManualEnvironmentEdge();
+ EnvironmentEdgeManager.injectEdge( ee );
+ HBaseClient.FailedServers fs = new HBaseClient.FailedServers(new Configuration());
+
+ InetSocketAddress ia = InetSocketAddress.createUnresolved("bad", 12);
+ InetSocketAddress ia2 = InetSocketAddress.createUnresolved("bad", 12); // same server as ia
+ InetSocketAddress ia3 = InetSocketAddress.createUnresolved("badtoo", 12);
+ InetSocketAddress ia4 = InetSocketAddress.createUnresolved("badtoo", 13);
+
+
+ Assert.assertFalse( fs.isFailedServer(ia) );
+
+ fs.addToFailedServers(ia);
+ Assert.assertTrue( fs.isFailedServer(ia) );
+ Assert.assertTrue( fs.isFailedServer(ia2) );
+
+ ee.incValue( 1 );
+ Assert.assertTrue( fs.isFailedServer(ia) );
+ Assert.assertTrue( fs.isFailedServer(ia2) );
+
+ ee.incValue( HBaseClient.FAILED_SERVER_EXPIRY_DEFAULT + 1 );
+ Assert.assertFalse( fs.isFailedServer(ia) );
+ Assert.assertFalse( fs.isFailedServer(ia2) );
+
+ fs.addToFailedServers(ia);
+ fs.addToFailedServers(ia3);
+ fs.addToFailedServers(ia4);
+
+ Assert.assertTrue( fs.isFailedServer(ia) );
+ Assert.assertTrue( fs.isFailedServer(ia2) );
+ Assert.assertTrue( fs.isFailedServer(ia3) );
+ Assert.assertTrue( fs.isFailedServer(ia4) );
+
+ ee.incValue( HBaseClient.FAILED_SERVER_EXPIRY_DEFAULT + 1 );
+ Assert.assertFalse( fs.isFailedServer(ia) );
+ Assert.assertFalse( fs.isFailedServer(ia2) );
+ Assert.assertFalse( fs.isFailedServer(ia3) );
+ Assert.assertFalse( fs.isFailedServer(ia4) );
+
+
+ fs.addToFailedServers(ia3);
+ Assert.assertFalse( fs.isFailedServer(ia4) );
+ }
+}