You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/01/11 23:20:56 UTC
hbase git commit: HBASE-17165 Add retry to LoadIncrementalHFiles tool
Repository: hbase
Updated Branches:
refs/heads/master 471cf13b1 -> f7d0f15c9
HBASE-17165 Add retry to LoadIncrementalHFiles tool
Signed-off-by: Michael Stack <st...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f7d0f15c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f7d0f15c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f7d0f15c
Branch: refs/heads/master
Commit: f7d0f15c99e7eacb487ba9e06cfa42ecc4d41263
Parents: 471cf13
Author: Mike Grimes <gr...@amazon.com>
Authored: Mon Nov 21 14:34:07 2016 -0800
Committer: Michael Stack <st...@apache.org>
Committed: Wed Jan 11 15:20:48 2017 -0800
----------------------------------------------------------------------
.../hbase/mapreduce/LoadIncrementalHFiles.java | 136 +++++++++++--------
.../TestLoadIncrementalHFilesSplitRecovery.java | 69 ++++++++--
2 files changed, 136 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/f7d0f15c/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index 980dcb1..963c4a1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -45,6 +45,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.commons.logging.Log;
@@ -110,6 +111,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
private boolean initalized = false;
public static final String NAME = "completebulkload";
+ static final String RETRY_ON_IO_EXCEPTION = "hbase.bulkload.retries.retryOnIOException";
public static final String MAX_FILES_PER_REGION_PER_FAMILY
= "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily";
private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
@@ -133,6 +135,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
private UserProvider userProvider;
private int nrThreads;
private RpcControllerFactory rpcControllerFactory;
+ private AtomicInteger numRetries;
private Map<LoadQueueItem, ByteBuffer> retValue = null;
@@ -158,6 +161,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
nrThreads = conf.getInt("hbase.loadincremental.threads.max",
Runtime.getRuntime().availableProcessors());
initalized = true;
+ numRetries = new AtomicInteger(1);
}
private void usage() {
@@ -510,6 +514,69 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
return item2RegionMap;
}
+ protected ClientServiceCallable<byte[]> buildClientServiceCallable(final Connection conn,
+ TableName tableName, byte[] first, Collection<LoadQueueItem> lqis, boolean copyFile) {
+
+ final List<Pair<byte[], String>> famPaths = new ArrayList<>(lqis.size());
+ for (LoadQueueItem lqi : lqis) {
+ famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString()));
+ }
+
+ return new ClientServiceCallable<byte[]>(conn,
+ tableName, first, rpcControllerFactory.newController()) {
+ @Override
+ protected byte[] rpcCall() throws Exception {
+ SecureBulkLoadClient secureClient = null;
+ boolean success = false;
+ try {
+ LOG.debug("Going to connect to server " + getLocation() + " for row "
+ + Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths);
+ byte[] regionName = getLocation().getRegionInfo().getRegionName();
+ try (Table table = conn.getTable(getTableName())) {
+ secureClient = new SecureBulkLoadClient(getConf(), table);
+ success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
+ assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, copyFile);
+ }
+ return success ? regionName : null;
+ } finally {
+ //Best effort copying of files that might not have been imported
+ //from the staging directory back to original location
+ //in user directory
+ if (secureClient != null && !success) {
+ FileSystem targetFs = FileSystem.get(getConf());
+ // fs is the source filesystem
+ if (fs == null) {
+ fs = lqis.iterator().next().hfilePath.getFileSystem(getConf());
+ }
+ // Check to see if the source and target filesystems are the same
+ // If they are the same filesystem, we will try move the files back
+ // because previously we moved them to the staging directory.
+ if (FSHDFSUtils.isSameHdfs(getConf(), fs, targetFs)) {
+ for (Pair<byte[], String> el : famPaths) {
+ Path hfileStagingPath = null;
+ Path hfileOrigPath = new Path(el.getSecond());
+ try {
+ hfileStagingPath = new Path(new Path(bulkToken, Bytes.toString(el.getFirst())),
+ hfileOrigPath.getName());
+ if (targetFs.rename(hfileStagingPath, hfileOrigPath)) {
+ LOG.debug("Moved back file " + hfileOrigPath + " from " +
+ hfileStagingPath);
+ } else if (targetFs.exists(hfileStagingPath)) {
+ LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
+ hfileStagingPath);
+ }
+ } catch (Exception ex) {
+ LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
+ hfileStagingPath, ex);
+ }
+ }
+ }
+ }
+ }
+ }
+ };
+ }
+
/**
* Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the
* passed directory and validates whether the prepared queue has all the valid table column
@@ -655,11 +722,14 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
final byte[] first = e.getKey().array();
final Collection<LoadQueueItem> lqis = e.getValue();
+ final ClientServiceCallable<byte[]> serviceCallable =
+ buildClientServiceCallable(conn, table.getName(), first, lqis, copyFile);
+
final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() {
@Override
public List<LoadQueueItem> call() throws Exception {
List<LoadQueueItem> toRetry =
- tryAtomicRegionLoad(conn, table.getName(), first, lqis, copyFile);
+ tryAtomicRegionLoad(serviceCallable, table.getName(), first, lqis);
return toRetry;
}
};
@@ -946,75 +1016,21 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
* @return empty list if success, list of items to retry on recoverable
* failure
*/
- protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn,
- final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis,
- boolean copyFile) throws IOException {
+ protected List<LoadQueueItem> tryAtomicRegionLoad(ClientServiceCallable<byte[]> serviceCallable,
+ final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis)
+ throws IOException {
final List<Pair<byte[], String>> famPaths = new ArrayList<>(lqis.size());
for (LoadQueueItem lqi : lqis) {
if (!unmatchedFamilies.contains(Bytes.toString(lqi.family))) {
famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString()));
}
}
- final ClientServiceCallable<byte[]> svrCallable = new ClientServiceCallable<byte[]>(conn,
- tableName, first, rpcControllerFactory.newController()) {
- @Override
- protected byte[] rpcCall() throws Exception {
- SecureBulkLoadClient secureClient = null;
- boolean success = false;
- try {
- LOG.debug("Going to connect to server " + getLocation() + " for row "
- + Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths);
- byte[] regionName = getLocation().getRegionInfo().getRegionName();
- try (Table table = conn.getTable(getTableName())) {
- secureClient = new SecureBulkLoadClient(getConf(), table);
- success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
- assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, copyFile);
- }
- return success ? regionName : null;
- } finally {
- //Best effort copying of files that might not have been imported
- //from the staging directory back to original location
- //in user directory
- if (secureClient != null && !success) {
- FileSystem targetFs = FileSystem.get(getConf());
- // fs is the source filesystem
- if(fs == null) {
- fs = lqis.iterator().next().hfilePath.getFileSystem(getConf());
- }
- // Check to see if the source and target filesystems are the same
- // If they are the same filesystem, we will try move the files back
- // because previously we moved them to the staging directory.
- if (FSHDFSUtils.isSameHdfs(getConf(), fs, targetFs)) {
- for(Pair<byte[], String> el : famPaths) {
- Path hfileStagingPath = null;
- Path hfileOrigPath = new Path(el.getSecond());
- try {
- hfileStagingPath= new Path(new Path(bulkToken, Bytes.toString(el.getFirst())),
- hfileOrigPath.getName());
- if(targetFs.rename(hfileStagingPath, hfileOrigPath)) {
- LOG.debug("Moved back file " + hfileOrigPath + " from " +
- hfileStagingPath);
- } else if(targetFs.exists(hfileStagingPath)){
- LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
- hfileStagingPath);
- }
- } catch(Exception ex) {
- LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
- hfileStagingPath, ex);
- }
- }
- }
- }
- }
- }
- };
-
try {
List<LoadQueueItem> toRetry = new ArrayList<>();
Configuration conf = getConf();
byte[] region = RpcRetryingCallerFactory.instantiate(conf,
null).<byte[]> newCaller()
- .callWithRetries(svrCallable, Integer.MAX_VALUE);
+ .callWithRetries(serviceCallable, Integer.MAX_VALUE);
if (region == null) {
LOG.warn("Attempt to bulk load region containing "
+ Bytes.toStringBinary(first) + " into table "
@@ -1026,7 +1042,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
return toRetry;
} catch (IOException e) {
LOG.error("Encountered unrecoverable error from region server, additional details: "
- + svrCallable.getExceptionMessageAdditionalDetail(), e);
+ + serviceCallable.getExceptionMessageAdditionalDetail(), e);
throw e;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f7d0f15c/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
index 8337da8..a0bac77 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ClientServiceCallable;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -57,7 +58,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.LoadQueueItem;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
@@ -275,31 +276,34 @@ public class TestLoadIncrementalHFilesSplitRecovery {
*/
@Test(expected=IOException.class, timeout=120000)
public void testBulkLoadPhaseFailure() throws Exception {
- TableName table = TableName.valueOf("bulkLoadPhaseFailure");
+ final TableName table = TableName.valueOf("bulkLoadPhaseFailure");
final AtomicInteger attmptedCalls = new AtomicInteger();
final AtomicInteger failedCalls = new AtomicInteger();
util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
- try (Connection connection = ConnectionFactory.createConnection(this.util.getConfiguration())) {
+ try (Connection connection = ConnectionFactory.createConnection(util
+ .getConfiguration())) {
setupTable(connection, table, 10);
- LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
+ LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
+ util.getConfiguration()) {
@Override
- protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn,
- TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis,
- boolean copyFile) throws IOException {
+ protected List<LoadQueueItem> tryAtomicRegionLoad(
+ ClientServiceCallable<byte[]> serviceCallable, TableName tableName, final byte[] first,
+ Collection<LoadQueueItem> lqis) throws IOException {
int i = attmptedCalls.incrementAndGet();
if (i == 1) {
Connection errConn;
try {
errConn = getMockedConnection(util.getConfiguration());
+ serviceCallable = this.buildClientServiceCallable(errConn, table, first, lqis, true);
} catch (Exception e) {
LOG.fatal("mocking cruft, should never happen", e);
throw new RuntimeException("mocking cruft, should never happen");
}
failedCalls.incrementAndGet();
- return super.tryAtomicRegionLoad(errConn, tableName, first, lqis, copyFile);
+ return super.tryAtomicRegionLoad(serviceCallable, tableName, first, lqis);
}
- return super.tryAtomicRegionLoad(conn, tableName, first, lqis, copyFile);
+ return super.tryAtomicRegionLoad(serviceCallable, tableName, first, lqis);
}
};
try {
@@ -318,6 +322,53 @@ public class TestLoadIncrementalHFilesSplitRecovery {
}
}
+ /**
+ * Test that shows that exception thrown from the RS side will result in the
+ * expected number of retries set by ${@link HConstants#HBASE_CLIENT_RETRIES_NUMBER}
+ * when ${@link LoadIncrementalHFiles#RETRY_ON_IO_EXCEPTION} is set
+ */
+ @Test
+ public void testRetryOnIOException() throws Exception {
+ final TableName table = TableName.valueOf("retryOnIOException");
+ final AtomicInteger calls = new AtomicInteger(1);
+ final Connection conn = ConnectionFactory.createConnection(util
+ .getConfiguration());
+ util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
+ util.getConfiguration().setBoolean(
+ LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, true);
+ final LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
+ util.getConfiguration()) {
+ @Override
+ protected List<LoadQueueItem> tryAtomicRegionLoad(
+ ClientServiceCallable<byte[]> serverCallable, TableName tableName,
+ final byte[] first, Collection<LoadQueueItem> lqis)
+ throws IOException {
+ if (calls.getAndIncrement() < util.getConfiguration().getInt(
+ HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+ HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) - 1) {
+ ClientServiceCallable<byte[]> newServerCallable = new ClientServiceCallable<byte[]>(
+ conn, tableName, first, new RpcControllerFactory(
+ util.getConfiguration()).newController()) {
+ @Override
+ public byte[] rpcCall() throws Exception {
+ throw new IOException("Error calling something on RegionServer");
+ }
+ };
+ return super.tryAtomicRegionLoad(newServerCallable, tableName, first, lqis);
+ } else {
+ return super.tryAtomicRegionLoad(serverCallable, tableName, first, lqis);
+ }
+ }
+ };
+ setupTable(conn, table, 10);
+ Path dir = buildBulkFiles(table, 1);
+ lih.doBulkLoad(dir, conn.getAdmin(), conn.getTable(table),
+ conn.getRegionLocator(table));
+ util.getConfiguration().setBoolean(
+ LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, false);
+
+ }
+
@SuppressWarnings("deprecation")
private ClusterConnection getMockedConnection(final Configuration conf)
throws IOException, org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException {