You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nk...@apache.org on 2013/07/03 12:02:29 UTC

svn commit: r1499302 - in /hbase/branches/0.95: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ hbase-server/src/test/java/org/apache/hadoop/hbase/client/

Author: nkeywal
Date: Wed Jul  3 10:02:28 2013
New Revision: 1499302

URL: http://svn.apache.org/r1499302
Log:
HBASE-8853 The client connection is not cut when receiving the failure notification

Modified:
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java

Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java?rev=1499302&r1=1499301&r2=1499302&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java Wed Jul  3 10:02:28 2013
@@ -62,7 +62,7 @@ import java.util.concurrent.Executors;
 class ClusterStatusListener implements Closeable {
   private static final Log LOG = LogFactory.getLog(ClusterStatusListener.class);
   private final List<ServerName> deadServers = new ArrayList<ServerName>();
-  private final DeadServerHandler deadServerHandler;
+  protected final DeadServerHandler deadServerHandler;
   private final Listener listener;
 
   /**

Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1499302&r1=1499301&r2=1499302&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Wed Jul  3 10:02:28 2013
@@ -520,12 +520,13 @@ public class HConnectionManager {
               public void newDead(ServerName sn) {
                 clearCaches(sn);
                 rpcClient.cancelConnections(sn.getHostname(), sn.getPort(),
-                  new SocketException(sn.getServerName() + " is dead: closing its connection."));
+                    new SocketException(sn.getServerName() + " is dead: closing its connection."));
               }
             }, conf, listenerClass);
       }
     }
 
+
     /**
      * For tests.
      */

Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java?rev=1499302&r1=1499301&r2=1499302&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java Wed Jul  3 10:02:28 2013
@@ -571,14 +571,38 @@ public class RpcClient {
     }
 
     protected void closeConnection() {
+      if (socket == null) {
+        return;
+      }
+
       // close the current connection
-      if (socket != null) {
-        try {
-          socket.close();
-        } catch (IOException e) {
-          LOG.warn("Not able to close a socket", e);
+      try {
+        if (socket.getOutputStream() != null) {
+          socket.getOutputStream().close();
+        }
+      } catch (IOException e) {
+        LOG.warn("Not able to close an output stream", e);
+      }
+      try {
+        if (socket.getInputStream() != null) {
+          socket.getInputStream().close();
         }
+      } catch (IOException e) {
+        LOG.warn("Not able to close an input stream", e);
+      }
+      try {
+        if (socket.getChannel() != null) {
+          socket.getChannel().close();
+        }
+      } catch (IOException e) {
+        LOG.warn("Not able to close a channel", e);
       }
+      try {
+        socket.close();
+      } catch (IOException e) {
+        LOG.warn("Not able to close a socket", e);
+      }
+
       // set socket to null so that the next call to setupIOstreams
       // can start the process of connect all over again.
       socket = null;

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java?rev=1499302&r1=1499301&r2=1499302&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java Wed Jul  3 10:02:28 2013
@@ -23,6 +23,7 @@ import static org.junit.Assert.*;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
+import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -31,6 +32,7 @@ import java.util.Random;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -44,8 +46,11 @@ import org.apache.hadoop.hbase.MediumTes
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionImplementation;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.exceptions.RegionServerStoppedException;
 import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
 import org.apache.hadoop.hbase.master.ClusterStatusPublisher;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -79,6 +84,7 @@ public class TestHCM {
   private static final byte[] FAM_NAM = Bytes.toBytes("f");
   private static final byte[] ROW = Bytes.toBytes("bbb");
   private static final byte[] ROW_X = Bytes.toBytes("xxx");
+  private static Random _randy = new Random();
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
@@ -94,34 +100,6 @@ public class TestHCM {
     TEST_UTIL.shutdownMiniCluster();
   }
 
-  private static Random _randy = new Random();
-
-  public static void createNewConfigurations() throws SecurityException,
-  IllegalArgumentException, NoSuchFieldException,
-  IllegalAccessException, InterruptedException, ZooKeeperConnectionException, IOException {
-    HConnection last = null;
-    for (int i = 0; i <= (HConnectionManager.MAX_CACHED_CONNECTION_INSTANCES * 2); i++) {
-      // set random key to differentiate the connection from previous ones
-      Configuration configuration = HBaseConfiguration.create();
-      configuration.set("somekey", String.valueOf(_randy.nextInt()));
-      System.out.println("Hash Code: " + configuration.hashCode());
-      HConnection connection = HConnectionManager.getConnection(configuration);
-      if (last != null) {
-        if (last == connection) {
-          System.out.println("!! Got same connection for once !!");
-        }
-      }
-      // change the configuration once, and the cached connection is lost forever:
-      //      the hashtable holding the cache won't be able to find its own keys
-      //      to remove them, so the LRU strategy does not work.
-      configuration.set("someotherkey", String.valueOf(_randy.nextInt()));
-      last = connection;
-      LOG.info("Cache Size: " + getHConnectionManagerCacheSize());
-      Thread.sleep(100);
-    }
-    Assert.assertEquals(1,
-      getHConnectionManagerCacheSize());
-  }
 
   private static int getHConnectionManagerCacheSize(){
     return HConnectionTestingUtility.getConnectionCount();
@@ -182,6 +160,93 @@ public class TestHCM {
     hci.getClient(sn);  // will throw an exception: RegionServerStoppedException
   }
 
+  /**
+   * Test that the connection to the dead server is cut immediately when we receive the
+   *  notification.
+   * @throws Exception
+   */
+  @Test
+  public void testConnectionCut() throws Exception {
+    String tableName = "HCM-testConnectionCut";
+
+    TEST_UTIL.createTable(tableName.getBytes(), FAM_NAM).close();
+    boolean previousBalance = TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, true);
+
+    Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
+    // We want to work on a separate connection.
+    c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
+    c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
+    c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 30 * 1000);
+
+    HTable table = new HTable(c2, tableName);
+
+    Put p = new Put(FAM_NAM);
+    p.add(FAM_NAM, FAM_NAM, FAM_NAM);
+    table.put(p);
+
+    final HConnectionImplementation hci =  (HConnectionImplementation)table.getConnection();
+    final HRegionLocation loc = table.getRegionLocation(FAM_NAM);
+
+    Get get = new Get(FAM_NAM);
+    Assert.assertNotNull(table.get(get));
+
+    get = new Get(FAM_NAM);
+    get.setFilter(new BlockingFilter());
+
+    // This thread will mark the server as dead while we're waiting during a get.
+    Thread t = new Thread() {
+      @Override
+      public void run() {
+        synchronized (syncBlockingFilter) {
+          try {
+            syncBlockingFilter.wait();
+          } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+          }
+        }
+        hci.clusterStatusListener.deadServerHandler.newDead(loc.getServerName());
+      }
+    };
+
+    t.start();
+    try {
+      table.get(get);
+      Assert.fail();
+    } catch (IOException expected) {
+      LOG.debug("Received: " + expected);
+      Assert.assertFalse(expected instanceof SocketTimeoutException);
+      Assert.assertFalse(syncBlockingFilter.get());
+    } finally {
+      syncBlockingFilter.set(true);
+      t.join();
+      HConnectionManager.getConnection(c2).close();
+      TEST_UTIL.getHBaseAdmin().setBalancerRunning(previousBalance, true);
+    }
+
+    table.close();
+  }
+
+  protected static final AtomicBoolean syncBlockingFilter = new AtomicBoolean(false);
+
+  public static class BlockingFilter extends FilterBase {
+    @Override
+    public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
+      int i = 0;
+      while (i++ < 1000 && !syncBlockingFilter.get()) {
+        synchronized (syncBlockingFilter) {
+          syncBlockingFilter.notifyAll();
+        }
+        Threads.sleep(100);
+      }
+      syncBlockingFilter.set(true);
+      return false;
+    }
+
+    public static Filter parseFrom(final byte [] pbBytes) throws DeserializationException{
+      return new BlockingFilter();
+    }
+  }
+
   @Test
   public void abortingHConnectionRemovesItselfFromHCM() throws Exception {
     // Save off current HConnections
@@ -491,7 +556,7 @@ public class TestHCM {
   public void testConnectionUniqueness() throws Exception {
     int zkmaxconnections = TEST_UTIL.getConfiguration().
       getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS,
-        HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS);
+          HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS);
     // Test up to a max that is < the maximum number of zk connections.  If we
     // go above zk connections, we just fall into cycle where we are failing
     // to set up a session and test runs for a long time.
@@ -608,6 +673,18 @@ public class TestHCM {
     conn.close();
   }
 
+  private int setNumTries(HConnectionImplementation hci, int newVal) throws Exception {
+    Field numTries = hci.getClass().getDeclaredField("numTries");
+    numTries.setAccessible(true);
+    Field modifiersField = Field.class.getDeclaredField("modifiers");
+    modifiersField.setAccessible(true);
+    modifiersField.setInt(numTries, numTries.getModifiers() & ~Modifier.FINAL);
+    final int prevNumRetriesVal = (Integer)numTries.get(hci);
+    numTries.set(hci, newVal);
+
+    return prevNumRetriesVal;
+  }
+
   @Test
   public void testMulti() throws Exception {
     HTable table = TEST_UTIL.createTable(TABLE_NAME3, FAM_NAM);
@@ -703,13 +780,7 @@ public class TestHCM {
         conn.getCachedLocation(TABLE_NAME3, ROW_X).getPort() == destServerName.getPort());
 
     // Hijack the number of retry to fail after 2 tries
-    Field numTries = conn.getClass().getDeclaredField("numTries");
-    numTries.setAccessible(true);
-    Field modifiersField = Field.class.getDeclaredField("modifiers");
-    modifiersField.setAccessible(true);
-    modifiersField.setInt(numTries, numTries.getModifiers() & ~Modifier.FINAL);
-    final int prevNumRetriesVal = (Integer)numTries.get(conn);
-    numTries.set(conn, 2);
+    final int prevNumRetriesVal = setNumTries(conn, 2);
 
     Put put3 = new Put(ROW_X);
     put3.add(FAM_NAM, ROW_X, ROW_X);
@@ -720,7 +791,7 @@ public class TestHCM {
     table.batch(Lists.newArrayList(put4, put3)); // first should be a valid row,
                                                  // second we get RegionMovedException.
 
-    numTries.set(conn, prevNumRetriesVal);
+    setNumTries(conn, prevNumRetriesVal);
     table.close();
     conn.close();
   }