You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2014/02/05 19:01:12 UTC
svn commit: r1564853 - in /hbase/branches/0.98:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/
hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/
hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/
hbase-common/src/main/java/o...
Author: apurtell
Date: Wed Feb 5 18:01:11 2014
New Revision: 1564853
URL: http://svn.apache.org/r1564853
Log:
HBASE-10337 HTable.get() uninteruptible (Nicolas Liochon)
Added:
hbase/branches/0.98/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java
hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientOperationInterrupt.java
Modified:
hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
Modified: hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1564853&r1=1564852&r2=1564853&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Wed Feb 5 18:01:11 2014
@@ -118,6 +118,7 @@ import org.apache.hadoop.hbase.security.
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -555,6 +556,7 @@ public class HConnectionManager {
try {
connection.close();
} catch (Exception e) {
+ ExceptionUtil.rethrowIfInterrupt(e);
if (connectSucceeded) {
throw new IOException("The connection to " + connection
+ " could not be deleted.", e);
@@ -1156,7 +1158,11 @@ public class HConnectionManager {
MetaScanner.metaScan(conf, this, visitor, tableName, row,
this.prefetchRegionLimit, TableName.META_TABLE_NAME);
} catch (IOException e) {
- LOG.warn("Encountered problems when prefetch hbase:meta table: ", e);
+ if (ExceptionUtil.isInterrupt(e)) {
+ Thread.currentThread().interrupt();
+ } else {
+ LOG.warn("Encountered problems when prefetch hbase:meta table: ", e);
+ }
}
}
@@ -1285,6 +1291,8 @@ public class HConnectionManager {
// from the HTable constructor.
throw e;
} catch (IOException e) {
+ ExceptionUtil.rethrowIfInterrupt(e);
+
if (e instanceof RemoteException) {
e = ((RemoteException)e).unwrapRemoteException();
}
@@ -1561,6 +1569,7 @@ public class HConnectionManager {
try {
zkw = getKeepAliveZooKeeperWatcher();
} catch (IOException e) {
+ ExceptionUtil.rethrowIfInterrupt(e);
throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
}
try {
@@ -1621,7 +1630,7 @@ public class HConnectionManager {
if (exceptionCaught != null)
// It failed. If it's not the last try, we're going to wait a little
- if (tries < numTries) {
+ if (tries < numTries && !ExceptionUtil.isInterrupt(exceptionCaught)) {
// tries at this point is 1 or more; decrement to start from 0.
long pauseTime = ConnectionUtils.getPauseTime(pause, tries - 1);
LOG.info("getMaster attempt " + tries + " of " + numTries +
@@ -1631,7 +1640,7 @@ public class HConnectionManager {
try {
Thread.sleep(pauseTime);
} catch (InterruptedException e) {
- throw new RuntimeException(
+ throw new MasterNotRunningException(
"Thread was interrupted while trying to connect to master.", e);
}
} else {
Modified: hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java?rev=1564853&r1=1564852&r2=1564853&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java (original)
+++ hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java Wed Feb 5 18:01:11 2014
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.DoNotRetr
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.ipc.RemoteException;
import com.google.protobuf.ServiceException;
@@ -130,6 +131,7 @@ public class RpcRetryingCaller<T> {
new RetriesExhaustedException.ThrowableWithExtraContext(t,
EnvironmentEdgeManager.currentTimeMillis(), toString());
exceptions.add(qt);
+ ExceptionUtil.rethrowIfInterrupt(t);
if (tries >= retries - 1) {
throw new RetriesExhaustedException(tries, exceptions);
}
@@ -184,6 +186,7 @@ public class RpcRetryingCaller<T> {
return callable.call();
} catch (Throwable t) {
Throwable t2 = translateException(t);
+ ExceptionUtil.rethrowIfInterrupt(t2);
// It would be nice to clear the location cache here.
if (t2 instanceof IOException) {
throw (IOException)t2;
Modified: hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java?rev=1564853&r1=1564852&r2=1564853&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java (original)
+++ hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java Wed Feb 5 18:01:11 2014
@@ -26,6 +26,7 @@ import java.io.DataOutputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.InetSocketAddress;
@@ -76,6 +77,7 @@ import org.apache.hadoop.hbase.security.
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.PoolMap;
import org.apache.hadoop.hbase.util.PoolMap.PoolType;
@@ -642,18 +644,19 @@ public class RpcClient {
*/
private void handleConnectionFailure(int curRetries, int maxRetries, IOException ioe)
throws IOException {
-
closeConnection();
// throw the exception if the maximum number of retries is reached
- if (curRetries >= maxRetries) {
+ if (curRetries >= maxRetries || ExceptionUtil.isInterrupt(ioe)) {
throw ioe;
}
// otherwise back off and retry
try {
Thread.sleep(failureSleep);
- } catch (InterruptedException ignored) {}
+ } catch (InterruptedException ie) {
+ ExceptionUtil.rethrowIfInterrupt(ie);
+ }
LOG.info("Retrying connect to server: " + remoteId.getAddress() +
" after sleeping " + failureSleep + "ms. Already tried " + curRetries +
@@ -672,7 +675,9 @@ public class RpcClient {
if (timeout>0) {
try {
wait(timeout);
- } catch (InterruptedException ignored) {}
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
}
}
@@ -1112,6 +1117,8 @@ public class RpcClient {
// since we expect certain responses to not make it by the specified
// {@link ConnectionId#rpcTimeout}.
closeException = e;
+ } if (ExceptionUtil.isInterrupt(e)){
+
} else {
// Treat this as a fatal condition and close this connection
markClosed(e);
@@ -1425,24 +1432,14 @@ public class RpcClient {
Connection connection =
getConnection(ticket, call, addr, rpcTimeout, this.codec, this.compressor);
connection.writeRequest(call, priority); // send the parameter
- boolean interrupted = false;
+
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (call) {
while (!call.done) {
if (connection.shouldCloseConnection.get()) {
throw new IOException("Unexpected closed connection");
}
- try {
- call.wait(1000); // wait for the result
- } catch (InterruptedException ignored) {
- // save the fact that we were interrupted
- interrupted = true;
- }
- }
-
- if (interrupted) {
- // set the interrupt flag now that we are done waiting
- Thread.currentThread().interrupt();
+ call.wait(1000); // wait for the result
}
if (call.error != null) {
Modified: hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java?rev=1564853&r1=1564852&r2=1564853&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (original)
+++ hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java Wed Feb 5 18:01:11 2014
@@ -125,6 +125,7 @@ import org.apache.hadoop.hbase.security.
import org.apache.hadoop.hbase.security.visibility.CellVisibility;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.DynamicClassLoader;
+import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.Methods;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.Text;
@@ -276,8 +277,11 @@ public final class ProtobufUtil {
if (e == null) {
return new IOException(se);
}
+ if (ExceptionUtil.isInterrupt(e)) {
+ return ExceptionUtil.asInterrupt(e);
+ }
if (e instanceof RemoteException) {
- e = ((RemoteException)e).unwrapRemoteException();
+ e = ((RemoteException) e).unwrapRemoteException();
}
return e instanceof IOException ? (IOException) e : new IOException(se);
}
Added: hbase/branches/0.98/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java?rev=1564853&view=auto
==============================================================================
--- hbase/branches/0.98/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java (added)
+++ hbase/branches/0.98/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java Wed Feb 5 18:01:11 2014
@@ -0,0 +1,54 @@
+package org.apache.hadoop.hbase.util;
+
+import java.io.InterruptedIOException;
+import java.net.SocketTimeoutException;
+import java.nio.channels.ClosedByInterruptException;
+
+/**
+ * This class handles the different interruption classes.
+ * It can be:
+ * - InterruptedException
+ * - InterruptedIOException (inherits IOException); used in IO
+ * - ClosedByInterruptException (inherits IOException)
+ * , - SocketTimeoutException inherits InterruptedIOException but is not a real
+ * interruption, so we have to distinguish the case. This pattern is unfortunately common.
+ */
+public class ExceptionUtil {
+
+ /**
+ * @return true if the throwable comes an interruption, false otherwise.
+ */
+ public static boolean isInterrupt(Throwable t) {
+ if (t instanceof InterruptedException) return true;
+ if (t instanceof SocketTimeoutException) return false;
+ return (t instanceof InterruptedIOException);
+ }
+
+ /**
+ * @throws InterruptedIOException if t was an interruption. Does nothing otherwise.
+ */
+ public static void rethrowIfInterrupt(Throwable t) throws InterruptedIOException {
+ InterruptedIOException iie = asInterrupt(t);
+ if (iie != null) throw iie;
+ }
+
+ /**
+ * @return an InterruptedIOException if t was an interruption, null otherwise
+ */
+ public static InterruptedIOException asInterrupt(Throwable t) {
+ if (t instanceof SocketTimeoutException) return null;
+
+ if (t instanceof InterruptedIOException) return (InterruptedIOException) t;
+
+ if (t instanceof InterruptedException) {
+ InterruptedIOException iie = new InterruptedIOException();
+ iie.initCause(t);
+ return iie;
+ }
+
+ return null;
+ }
+
+ private ExceptionUtil() {
+ }
+}
Added: hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientOperationInterrupt.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientOperationInterrupt.java?rev=1564853&view=auto
==============================================================================
--- hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientOperationInterrupt.java (added)
+++ hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientOperationInterrupt.java Wed Feb 5 18:01:11 2014
@@ -0,0 +1,147 @@
+package org.apache.hadoop.hbase.client;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.SocketTimeoutException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Category(MediumTests.class)
+public class TestClientOperationInterrupt {
+ private static final Log LOG = LogFactory.getLog(TestClientOperationInterrupt.class);
+
+ private static HBaseTestingUtility util;
+ private static final byte[] tableName = Bytes.toBytes("test");
+ private static final byte[] dummy = Bytes.toBytes("dummy");
+ private static final byte[] row1 = Bytes.toBytes("r1");
+ private static final byte[] test = Bytes.toBytes("test");
+ private static Configuration conf;
+
+ public static class TestCoprocessor extends BaseRegionObserver {
+ @Override
+ public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
+ final Get get, final List<Cell> results) throws IOException {
+ Threads.sleep(2500);
+ }
+ }
+
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ conf = HBaseConfiguration.create();
+ conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
+ TestCoprocessor.class.getName());
+ util = new HBaseTestingUtility(conf);
+ util.startMiniCluster();
+
+ HBaseAdmin admin = util.getHBaseAdmin();
+ if (admin.tableExists(tableName)) {
+ if (admin.isTableEnabled(tableName)) {
+ admin.disableTable(tableName);
+ }
+ admin.deleteTable(tableName);
+ }
+ util.createTable(tableName, new byte[][]{dummy, test});
+
+ HTable ht = new HTable(conf, tableName);
+ Put p = new Put(row1);
+ p.add(dummy, dummy, dummy);
+ ht.put(p);
+ }
+
+
+ @Test
+ public void testInterrupt50Percent() throws IOException, InterruptedException {
+ final AtomicInteger noEx = new AtomicInteger(0);
+ final AtomicInteger badEx = new AtomicInteger(0);
+ final AtomicInteger noInt = new AtomicInteger(0);
+ final AtomicInteger done = new AtomicInteger(0);
+ List<Thread> threads = new ArrayList<Thread>();
+
+ final int nbThread = 100;
+
+ for (int i = 0; i < nbThread; i++) {
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ try {
+ HTable ht = new HTable(conf, tableName);
+ Result r = ht.get(new Get(row1));
+ noEx.incrementAndGet();
+ } catch (IOException e) {
+ LOG.info("exception", e);
+ if (!(e instanceof InterruptedIOException) || (e instanceof SocketTimeoutException)) {
+ badEx.incrementAndGet();
+ } else {
+ if (Thread.currentThread().isInterrupted()) {
+ noInt.incrementAndGet();
+ LOG.info("The thread should NOT be with the 'interrupt' status.");
+ }
+ }
+ } finally {
+ done.incrementAndGet();
+ }
+ }
+ };
+ t.setName("TestClientOperationInterrupt #" + i);
+ threads.add(t);
+ t.start();
+ }
+
+ for (int i = 0; i < nbThread / 2; i++) {
+ threads.get(i).interrupt();
+ }
+
+
+ boolean stillAlive = true;
+ while (stillAlive) {
+ stillAlive = false;
+ for (Thread t : threads) {
+ if (t.isAlive()) {
+ stillAlive = true;
+ }
+ }
+ Threads.sleep(10);
+ }
+
+ Assert.assertFalse(Thread.currentThread().isInterrupted());
+
+ Assert.assertTrue(" noEx: " + noEx.get() + ", badEx=" + badEx.get() + ", noInt=" + noInt.get(),
+ noEx.get() == nbThread / 2 && badEx.get() == 0);
+
+ // The problem here is that we need the server to free its handlers to handle all operations
+ while (done.get() != nbThread){
+ Thread.sleep(1);
+ }
+
+ HTable ht = new HTable(conf, tableName);
+ Result r = ht.get(new Get(row1));
+ Assert.assertFalse(r.isEmpty());
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ util.shutdownMiniCluster();
+ }
+}