You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2014/02/05 19:00:09 UTC

svn commit: r1564851 - in /hbase/trunk: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ hbase-common/src/main/java/org/apach...

Author: apurtell
Date: Wed Feb  5 18:00:09 2014
New Revision: 1564851

URL: http://svn.apache.org/r1564851
Log:
HBASE-10337 HTable.get() uninteruptible (Nicolas Liochon)

Added:
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientOperationInterrupt.java
Modified:
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1564851&r1=1564850&r2=1564851&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Wed Feb  5 18:00:09 2014
@@ -83,6 +83,7 @@ import org.apache.hadoop.hbase.security.
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ExceptionUtil;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -532,6 +533,7 @@ public class HConnectionManager {
       try {
         connection.close();
       } catch (Exception e) {
+        ExceptionUtil.rethrowIfInterrupt(e);
         if (connectSucceeded) {
           throw new IOException("The connection to " + connection
               + " could not be deleted.", e);
@@ -1123,7 +1125,11 @@ public class HConnectionManager {
         MetaScanner.metaScan(conf, this, visitor, tableName, row,
             this.prefetchRegionLimit, TableName.META_TABLE_NAME);
       } catch (IOException e) {
-        LOG.warn("Encountered problems when prefetch hbase:meta table: ", e);
+        if (ExceptionUtil.isInterrupt(e)) {
+          Thread.currentThread().interrupt();
+        } else {
+          LOG.warn("Encountered problems when prefetch hbase:meta table: ", e);
+        }
       }
     }
 
@@ -1252,6 +1258,8 @@ public class HConnectionManager {
           // from the HTable constructor.
           throw e;
         } catch (IOException e) {
+          ExceptionUtil.rethrowIfInterrupt(e);
+
           if (e instanceof RemoteException) {
             e = ((RemoteException)e).unwrapRemoteException();
           }
@@ -1528,6 +1536,7 @@ public class HConnectionManager {
         try {
           zkw = getKeepAliveZooKeeperWatcher();
         } catch (IOException e) {
+          ExceptionUtil.rethrowIfInterrupt(e);
           throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
         }
         try {
@@ -1588,7 +1597,7 @@ public class HConnectionManager {
 
             if (exceptionCaught != null)
               // It failed. If it's not the last try, we're going to wait a little
-              if (tries < numTries) {
+              if (tries < numTries && !ExceptionUtil.isInterrupt(exceptionCaught)) {
                 // tries at this point is 1 or more; decrement to start from 0.
                 long pauseTime = ConnectionUtils.getPauseTime(pause, tries - 1);
                 LOG.info("getMaster attempt " + tries + " of " + numTries +
@@ -1598,7 +1607,7 @@ public class HConnectionManager {
                 try {
                   Thread.sleep(pauseTime);
                 } catch (InterruptedException e) {
-                  throw new RuntimeException(
+                  throw new MasterNotRunningException(
                       "Thread was interrupted while trying to connect to master.", e);
                 }
               } else {

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java?rev=1564851&r1=1564850&r2=1564851&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java Wed Feb  5 18:00:09 2014
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.DoNotRetr
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ExceptionUtil;
 import org.apache.hadoop.ipc.RemoteException;
 
 import com.google.protobuf.ServiceException;
@@ -130,6 +131,7 @@ public class RpcRetryingCaller<T> {
             new RetriesExhaustedException.ThrowableWithExtraContext(t,
                 EnvironmentEdgeManager.currentTimeMillis(), toString());
         exceptions.add(qt);
+        ExceptionUtil.rethrowIfInterrupt(t);
         if (tries >= retries - 1) {
           throw new RetriesExhaustedException(tries, exceptions);
         }
@@ -184,6 +186,7 @@ public class RpcRetryingCaller<T> {
       return callable.call();
     } catch (Throwable t) {
       Throwable t2 = translateException(t);
+      ExceptionUtil.rethrowIfInterrupt(t2);
       // It would be nice to clear the location cache here.
       if (t2 instanceof IOException) {
         throw (IOException)t2;

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java?rev=1564851&r1=1564850&r2=1564851&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java Wed Feb  5 18:00:09 2014
@@ -26,6 +26,7 @@ import java.io.DataOutputStream;
 import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InterruptedIOException;
 import java.io.OutputStream;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
@@ -76,6 +77,7 @@ import org.apache.hadoop.hbase.security.
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ExceptionUtil;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.PoolMap;
 import org.apache.hadoop.hbase.util.PoolMap.PoolType;
@@ -642,18 +644,19 @@ public class RpcClient {
      */
     private void handleConnectionFailure(int curRetries, int maxRetries, IOException ioe)
     throws IOException {
-
       closeConnection();
 
       // throw the exception if the maximum number of retries is reached
-      if (curRetries >= maxRetries) {
+      if (curRetries >= maxRetries || ExceptionUtil.isInterrupt(ioe)) {
         throw ioe;
       }
 
       // otherwise back off and retry
       try {
         Thread.sleep(failureSleep);
-      } catch (InterruptedException ignored) {}
+      } catch (InterruptedException ie) {
+        ExceptionUtil.rethrowIfInterrupt(ie);
+      }
 
       LOG.info("Retrying connect to server: " + remoteId.getAddress() +
         " after sleeping " + failureSleep + "ms. Already tried " + curRetries +
@@ -672,7 +675,9 @@ public class RpcClient {
         if (timeout>0) {
           try {
             wait(timeout);
-          } catch (InterruptedException ignored) {}
+          } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+          }
         }
       }
 
@@ -1112,6 +1117,8 @@ public class RpcClient {
           // since we expect certain responses to not make it by the specified
           // {@link ConnectionId#rpcTimeout}.
           closeException = e;
+        } if (ExceptionUtil.isInterrupt(e)){
+
         } else {
           // Treat this as a fatal condition and close this connection
           markClosed(e);
@@ -1425,24 +1432,14 @@ public class RpcClient {
     Connection connection =
       getConnection(ticket, call, addr, rpcTimeout, this.codec, this.compressor);
     connection.writeRequest(call, priority);                 // send the parameter
-    boolean interrupted = false;
+
     //noinspection SynchronizationOnLocalVariableOrMethodParameter
     synchronized (call) {
       while (!call.done) {
         if (connection.shouldCloseConnection.get()) {
           throw new IOException("Unexpected closed connection");
         }
-        try {
-          call.wait(1000);                       // wait for the result
-        } catch (InterruptedException ignored) {
-          // save the fact that we were interrupted
-          interrupted = true;
-        }
-      }
-
-      if (interrupted) {
-        // set the interrupt flag now that we are done waiting
-        Thread.currentThread().interrupt();
+        call.wait(1000);                       // wait for the result
       }
 
       if (call.error != null) {

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java?rev=1564851&r1=1564850&r2=1564851&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java Wed Feb  5 18:00:09 2014
@@ -125,6 +125,7 @@ import org.apache.hadoop.hbase.security.
 import org.apache.hadoop.hbase.security.visibility.CellVisibility;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.DynamicClassLoader;
+import org.apache.hadoop.hbase.util.ExceptionUtil;
 import org.apache.hadoop.hbase.util.Methods;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.Text;
@@ -276,8 +277,11 @@ public final class ProtobufUtil {
     if (e == null) {
       return new IOException(se);
     }
+    if (ExceptionUtil.isInterrupt(e)) {
+      return ExceptionUtil.asInterrupt(e);
+    }
     if (e instanceof RemoteException) {
-      e = ((RemoteException)e).unwrapRemoteException();
+      e = ((RemoteException) e).unwrapRemoteException();
     }
     return e instanceof IOException ? (IOException) e : new IOException(se);
   }

Added: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java?rev=1564851&view=auto
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java (added)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java Wed Feb  5 18:00:09 2014
@@ -0,0 +1,54 @@
+package org.apache.hadoop.hbase.util;
+
+import java.io.InterruptedIOException;
+import java.net.SocketTimeoutException;
+import java.nio.channels.ClosedByInterruptException;
+
+/**
+ * This class handles the different interruption classes.
+ * It can be:
+ * - InterruptedException
+ * - InterruptedIOException (inherits IOException); used in IO
+ * - ClosedByInterruptException (inherits IOException)
+ * , - SocketTimeoutException inherits InterruptedIOException but is not a real
+ * interruption, so we have to distinguish the case. This pattern is unfortunately common.
+ */
+public class ExceptionUtil {
+
+  /**
+   * @return true if the throwable comes an interruption, false otherwise.
+   */
+  public static boolean isInterrupt(Throwable t) {
+    if (t instanceof InterruptedException) return true;
+    if (t instanceof SocketTimeoutException) return false;
+    return (t instanceof InterruptedIOException);
+  }
+
+  /**
+   * @throws InterruptedIOException if t was an interruption. Does nothing otherwise.
+   */
+  public static void rethrowIfInterrupt(Throwable t) throws InterruptedIOException {
+    InterruptedIOException iie = asInterrupt(t);
+    if (iie != null) throw iie;
+  }
+
+  /**
+   * @return an InterruptedIOException if t was an interruption, null otherwise
+   */
+  public static InterruptedIOException asInterrupt(Throwable t) {
+    if (t instanceof SocketTimeoutException) return null;
+
+    if (t instanceof InterruptedIOException) return (InterruptedIOException) t;
+
+    if (t instanceof InterruptedException) {
+      InterruptedIOException iie = new InterruptedIOException();
+      iie.initCause(t);
+      return iie;
+    }
+
+    return null;
+  }
+
+  private ExceptionUtil() {
+  }
+}

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientOperationInterrupt.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientOperationInterrupt.java?rev=1564851&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientOperationInterrupt.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientOperationInterrupt.java Wed Feb  5 18:00:09 2014
@@ -0,0 +1,147 @@
+package org.apache.hadoop.hbase.client;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.SocketTimeoutException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Category(MediumTests.class)
+public class TestClientOperationInterrupt {
+  private static final Log LOG = LogFactory.getLog(TestClientOperationInterrupt.class);
+
+  private static HBaseTestingUtility util;
+  private static final byte[] tableName = Bytes.toBytes("test");
+  private static final byte[] dummy = Bytes.toBytes("dummy");
+  private static final byte[] row1 = Bytes.toBytes("r1");
+  private static final byte[] test = Bytes.toBytes("test");
+  private static Configuration conf;
+
+  public static class TestCoprocessor extends BaseRegionObserver {
+    @Override
+    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
+                         final Get get, final List<Cell> results) throws IOException {
+      Threads.sleep(2500);
+    }
+  }
+
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    conf = HBaseConfiguration.create();
+    conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
+        TestCoprocessor.class.getName());
+    util = new HBaseTestingUtility(conf);
+    util.startMiniCluster();
+
+    HBaseAdmin admin = util.getHBaseAdmin();
+    if (admin.tableExists(tableName)) {
+      if (admin.isTableEnabled(tableName)) {
+        admin.disableTable(tableName);
+      }
+      admin.deleteTable(tableName);
+    }
+    util.createTable(tableName, new byte[][]{dummy, test});
+
+    HTable ht = new HTable(conf, tableName);
+    Put p = new Put(row1);
+    p.add(dummy, dummy, dummy);
+    ht.put(p);
+  }
+
+
+  @Test
+  public void testInterrupt50Percent() throws IOException, InterruptedException {
+    final AtomicInteger noEx = new AtomicInteger(0);
+    final AtomicInteger badEx = new AtomicInteger(0);
+    final AtomicInteger noInt = new AtomicInteger(0);
+    final AtomicInteger done = new AtomicInteger(0);
+    List<Thread> threads = new ArrayList<Thread>();
+
+    final int nbThread = 100;
+
+    for (int i = 0; i < nbThread; i++) {
+      Thread t = new Thread() {
+        @Override
+        public void run() {
+          try {
+            HTable ht = new HTable(conf, tableName);
+            Result r = ht.get(new Get(row1));
+            noEx.incrementAndGet();
+          } catch (IOException e) {
+            LOG.info("exception", e);
+            if (!(e instanceof InterruptedIOException) || (e instanceof SocketTimeoutException)) {
+              badEx.incrementAndGet();
+            } else {
+              if (Thread.currentThread().isInterrupted()) {
+                noInt.incrementAndGet();
+                LOG.info("The thread should NOT be with the 'interrupt' status.");
+              }
+            }
+          } finally {
+            done.incrementAndGet();
+          }
+        }
+      };
+      t.setName("TestClientOperationInterrupt #" + i);
+      threads.add(t);
+      t.start();
+    }
+
+    for (int i = 0; i < nbThread / 2; i++) {
+      threads.get(i).interrupt();
+    }
+
+
+    boolean stillAlive = true;
+    while (stillAlive) {
+      stillAlive = false;
+      for (Thread t : threads) {
+        if (t.isAlive()) {
+          stillAlive = true;
+        }
+      }
+      Threads.sleep(10);
+    }
+
+    Assert.assertFalse(Thread.currentThread().isInterrupted());
+
+    Assert.assertTrue(" noEx: " + noEx.get() + ", badEx=" + badEx.get() + ", noInt=" + noInt.get(),
+        noEx.get() == nbThread / 2 && badEx.get() == 0);
+
+    // The problem here is that we need the server to free its handlers to handle all operations
+    while (done.get() != nbThread){
+      Thread.sleep(1);
+    }
+
+    HTable ht = new HTable(conf, tableName);
+    Result r = ht.get(new Get(row1));
+    Assert.assertFalse(r.isEmpty());
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    util.shutdownMiniCluster();
+  }
+}