You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nk...@apache.org on 2013/07/03 12:02:29 UTC
svn commit: r1499302 - in /hbase/branches/0.95:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/
hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/
hbase-server/src/test/java/org/apache/hadoop/hbase/client/
Author: nkeywal
Date: Wed Jul 3 10:02:28 2013
New Revision: 1499302
URL: http://svn.apache.org/r1499302
Log:
HBASE-8853 The client connection is not cut when receiving the failure notification
Modified:
hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java
hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java?rev=1499302&r1=1499301&r2=1499302&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java Wed Jul 3 10:02:28 2013
@@ -62,7 +62,7 @@ import java.util.concurrent.Executors;
class ClusterStatusListener implements Closeable {
private static final Log LOG = LogFactory.getLog(ClusterStatusListener.class);
private final List<ServerName> deadServers = new ArrayList<ServerName>();
- private final DeadServerHandler deadServerHandler;
+ protected final DeadServerHandler deadServerHandler;
private final Listener listener;
/**
Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1499302&r1=1499301&r2=1499302&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Wed Jul 3 10:02:28 2013
@@ -520,12 +520,13 @@ public class HConnectionManager {
public void newDead(ServerName sn) {
clearCaches(sn);
rpcClient.cancelConnections(sn.getHostname(), sn.getPort(),
- new SocketException(sn.getServerName() + " is dead: closing its connection."));
+ new SocketException(sn.getServerName() + " is dead: closing its connection."));
}
}, conf, listenerClass);
}
}
+
/**
* For tests.
*/
Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java?rev=1499302&r1=1499301&r2=1499302&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java Wed Jul 3 10:02:28 2013
@@ -571,14 +571,38 @@ public class RpcClient {
}
protected void closeConnection() {
+ if (socket == null) {
+ return;
+ }
+
// close the current connection
- if (socket != null) {
- try {
- socket.close();
- } catch (IOException e) {
- LOG.warn("Not able to close a socket", e);
+ try {
+ if (socket.getOutputStream() != null) {
+ socket.getOutputStream().close();
+ }
+ } catch (IOException e) {
+ LOG.warn("Not able to close an output stream", e);
+ }
+ try {
+ if (socket.getInputStream() != null) {
+ socket.getInputStream().close();
}
+ } catch (IOException e) {
+ LOG.warn("Not able to close an input stream", e);
+ }
+ try {
+ if (socket.getChannel() != null) {
+ socket.getChannel().close();
+ }
+ } catch (IOException e) {
+ LOG.warn("Not able to close a channel", e);
}
+ try {
+ socket.close();
+ } catch (IOException e) {
+ LOG.warn("Not able to close a socket", e);
+ }
+
// set socket to null so that the next call to setupIOstreams
// can start the process of connect all over again.
socket = null;
Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java?rev=1499302&r1=1499301&r2=1499302&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java Wed Jul 3 10:02:28 2013
@@ -23,6 +23,7 @@ import static org.junit.Assert.*;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
+import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -31,6 +32,7 @@ import java.util.Random;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -44,8 +46,11 @@ import org.apache.hadoop.hbase.MediumTes
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionImplementation;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.exceptions.RegionServerStoppedException;
import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.master.ClusterStatusPublisher;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -79,6 +84,7 @@ public class TestHCM {
private static final byte[] FAM_NAM = Bytes.toBytes("f");
private static final byte[] ROW = Bytes.toBytes("bbb");
private static final byte[] ROW_X = Bytes.toBytes("xxx");
+ private static Random _randy = new Random();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
@@ -94,34 +100,6 @@ public class TestHCM {
TEST_UTIL.shutdownMiniCluster();
}
- private static Random _randy = new Random();
-
- public static void createNewConfigurations() throws SecurityException,
- IllegalArgumentException, NoSuchFieldException,
- IllegalAccessException, InterruptedException, ZooKeeperConnectionException, IOException {
- HConnection last = null;
- for (int i = 0; i <= (HConnectionManager.MAX_CACHED_CONNECTION_INSTANCES * 2); i++) {
- // set random key to differentiate the connection from previous ones
- Configuration configuration = HBaseConfiguration.create();
- configuration.set("somekey", String.valueOf(_randy.nextInt()));
- System.out.println("Hash Code: " + configuration.hashCode());
- HConnection connection = HConnectionManager.getConnection(configuration);
- if (last != null) {
- if (last == connection) {
- System.out.println("!! Got same connection for once !!");
- }
- }
- // change the configuration once, and the cached connection is lost forever:
- // the hashtable holding the cache won't be able to find its own keys
- // to remove them, so the LRU strategy does not work.
- configuration.set("someotherkey", String.valueOf(_randy.nextInt()));
- last = connection;
- LOG.info("Cache Size: " + getHConnectionManagerCacheSize());
- Thread.sleep(100);
- }
- Assert.assertEquals(1,
- getHConnectionManagerCacheSize());
- }
private static int getHConnectionManagerCacheSize(){
return HConnectionTestingUtility.getConnectionCount();
@@ -182,6 +160,93 @@ public class TestHCM {
hci.getClient(sn); // will throw an exception: RegionServerStoppedException
}
+ /**
+ * Test that the connection to the dead server is cut immediately when we receive the
+ * notification.
+ * @throws Exception
+ */
+ @Test
+ public void testConnectionCut() throws Exception {
+ String tableName = "HCM-testConnectionCut";
+
+ TEST_UTIL.createTable(tableName.getBytes(), FAM_NAM).close();
+ boolean previousBalance = TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, true);
+
+ Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
+ // We want to work on a separate connection.
+ c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
+ c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
+ c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 30 * 1000);
+
+ HTable table = new HTable(c2, tableName);
+
+ Put p = new Put(FAM_NAM);
+ p.add(FAM_NAM, FAM_NAM, FAM_NAM);
+ table.put(p);
+
+ final HConnectionImplementation hci = (HConnectionImplementation)table.getConnection();
+ final HRegionLocation loc = table.getRegionLocation(FAM_NAM);
+
+ Get get = new Get(FAM_NAM);
+ Assert.assertNotNull(table.get(get));
+
+ get = new Get(FAM_NAM);
+ get.setFilter(new BlockingFilter());
+
+ // This thread will mark the server as dead while we're waiting during a get.
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ synchronized (syncBlockingFilter) {
+ try {
+ syncBlockingFilter.wait();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ hci.clusterStatusListener.deadServerHandler.newDead(loc.getServerName());
+ }
+ };
+
+ t.start();
+ try {
+ table.get(get);
+ Assert.fail();
+ } catch (IOException expected) {
+ LOG.debug("Received: " + expected);
+ Assert.assertFalse(expected instanceof SocketTimeoutException);
+ Assert.assertFalse(syncBlockingFilter.get());
+ } finally {
+ syncBlockingFilter.set(true);
+ t.join();
+ HConnectionManager.getConnection(c2).close();
+ TEST_UTIL.getHBaseAdmin().setBalancerRunning(previousBalance, true);
+ }
+
+ table.close();
+ }
+
+ protected static final AtomicBoolean syncBlockingFilter = new AtomicBoolean(false);
+
+ public static class BlockingFilter extends FilterBase {
+ @Override
+ public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
+ int i = 0;
+ while (i++ < 1000 && !syncBlockingFilter.get()) {
+ synchronized (syncBlockingFilter) {
+ syncBlockingFilter.notifyAll();
+ }
+ Threads.sleep(100);
+ }
+ syncBlockingFilter.set(true);
+ return false;
+ }
+
+ public static Filter parseFrom(final byte [] pbBytes) throws DeserializationException{
+ return new BlockingFilter();
+ }
+ }
+
@Test
public void abortingHConnectionRemovesItselfFromHCM() throws Exception {
// Save off current HConnections
@@ -491,7 +556,7 @@ public class TestHCM {
public void testConnectionUniqueness() throws Exception {
int zkmaxconnections = TEST_UTIL.getConfiguration().
getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS,
- HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS);
+ HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS);
// Test up to a max that is < the maximum number of zk connections. If we
// go above zk connections, we just fall into cycle where we are failing
// to set up a session and test runs for a long time.
@@ -608,6 +673,18 @@ public class TestHCM {
conn.close();
}
+ private int setNumTries(HConnectionImplementation hci, int newVal) throws Exception {
+ Field numTries = hci.getClass().getDeclaredField("numTries");
+ numTries.setAccessible(true);
+ Field modifiersField = Field.class.getDeclaredField("modifiers");
+ modifiersField.setAccessible(true);
+ modifiersField.setInt(numTries, numTries.getModifiers() & ~Modifier.FINAL);
+ final int prevNumRetriesVal = (Integer)numTries.get(hci);
+ numTries.set(hci, newVal);
+
+ return prevNumRetriesVal;
+ }
+
@Test
public void testMulti() throws Exception {
HTable table = TEST_UTIL.createTable(TABLE_NAME3, FAM_NAM);
@@ -703,13 +780,7 @@ public class TestHCM {
conn.getCachedLocation(TABLE_NAME3, ROW_X).getPort() == destServerName.getPort());
// Hijack the number of retry to fail after 2 tries
- Field numTries = conn.getClass().getDeclaredField("numTries");
- numTries.setAccessible(true);
- Field modifiersField = Field.class.getDeclaredField("modifiers");
- modifiersField.setAccessible(true);
- modifiersField.setInt(numTries, numTries.getModifiers() & ~Modifier.FINAL);
- final int prevNumRetriesVal = (Integer)numTries.get(conn);
- numTries.set(conn, 2);
+ final int prevNumRetriesVal = setNumTries(conn, 2);
Put put3 = new Put(ROW_X);
put3.add(FAM_NAM, ROW_X, ROW_X);
@@ -720,7 +791,7 @@ public class TestHCM {
table.batch(Lists.newArrayList(put4, put3)); // first should be a valid row,
// second we get RegionMovedException.
- numTries.set(conn, prevNumRetriesVal);
+ setNumTries(conn, prevNumRetriesVal);
table.close();
conn.close();
}