You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2016/09/20 16:38:25 UTC
hbase git commit: HBASE-16646 Enhance LoadIncrementalHFiles API to
accept store file paths as input - addendum adheres to original cleanup logic
Repository: hbase
Updated Branches:
refs/heads/master 348eb2834 -> 08d9a2b66
HBASE-16646 Enhance LoadIncrementalHFiles API to accept store file paths as input - addendum adheres to original cleanup logic
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/08d9a2b6
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/08d9a2b6
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/08d9a2b6
Branch: refs/heads/master
Commit: 08d9a2b6629162b0cd031e4473dbbf319182b51a
Parents: 348eb28
Author: tedyu <yu...@gmail.com>
Authored: Tue Sep 20 09:38:18 2016 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Tue Sep 20 09:38:18 2016 -0700
----------------------------------------------------------------------
.../hbase/mapreduce/LoadIncrementalHFiles.java | 154 ++++++++++---------
1 file changed, 84 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/08d9a2b6/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index f775b82..6dea477 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -333,6 +333,26 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
doBulkLoad(hfofDir, admin, table, regionLocator, false);
}
+ void cleanup(Admin admin, Deque<LoadQueueItem> queue, ExecutorService pool,
+ SecureBulkLoadClient secureClient) throws IOException {
+ fsDelegationToken.releaseDelegationToken();
+ if (bulkToken != null && secureClient != null) {
+ secureClient.cleanupBulkLoad(admin.getConnection(), bulkToken);
+ }
+ if (pool != null) {
+ pool.shutdown();
+ }
+ if (!queue.isEmpty()) {
+ StringBuilder err = new StringBuilder();
+ err.append("-------------------------------------------------\n");
+ err.append("Bulk load aborted with some files not yet loaded:\n");
+ err.append("-------------------------------------------------\n");
+ for (LoadQueueItem q : queue) {
+ err.append(" ").append(q.hfilePath).append('\n');
+ }
+ LOG.error(err);
+ }
+ }
/**
* Perform a bulk load of the given directory into the given
* pre-existing table. This method is not threadsafe.
@@ -352,12 +372,20 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
// LQI queue does not need to be threadsafe -- all operations on this queue
// happen in this thread
Deque<LoadQueueItem> queue = new LinkedList<>();
- prepareHFileQueue(map, table, queue, silence);
- if (queue.isEmpty()) {
- LOG.warn("Bulk load operation did not get any files to load");
- return;
+ ExecutorService pool = null;
+ SecureBulkLoadClient secureClient = null;
+ try {
+ prepareHFileQueue(map, table, queue, silence);
+ if (queue.isEmpty()) {
+ LOG.warn("Bulk load operation did not get any files to load");
+ return;
+ }
+ pool = createExecutorService();
+ secureClient = new SecureBulkLoadClient(table.getConfiguration(), table);
+ performBulkLoad(admin, table, regionLocator, queue, pool, secureClient);
+ } finally {
+ cleanup(admin, queue, pool, secureClient);
}
- performBulkLoad(admin, table, regionLocator, queue);
}
/**
@@ -392,87 +420,73 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
// LQI queue does not need to be threadsafe -- all operations on this queue
// happen in this thread
Deque<LoadQueueItem> queue = new LinkedList<>();
- prepareHFileQueue(hfofDir, table, queue, validateHFile, silence);
+ ExecutorService pool = null;
+ SecureBulkLoadClient secureClient = null;
+ try {
+ prepareHFileQueue(hfofDir, table, queue, validateHFile, silence);
- if (queue.isEmpty()) {
- LOG.warn("Bulk load operation did not find any files to load in " +
- "directory " + hfofDir != null ? hfofDir.toUri() : "" + ". Does it contain files in " +
- "subdirectories that correspond to column family names?");
- return;
+ if (queue.isEmpty()) {
+ LOG.warn("Bulk load operation did not find any files to load in " +
+ "directory " + hfofDir != null ? hfofDir.toUri() : "" + ". Does it contain files in " +
+ "subdirectories that correspond to column family names?");
+ return;
+ }
+ pool = createExecutorService();
+ secureClient = new SecureBulkLoadClient(table.getConfiguration(), table);
+ performBulkLoad(admin, table, regionLocator, queue, pool, secureClient);
+ } finally {
+ cleanup(admin, queue, pool, secureClient);
}
- performBulkLoad(admin, table, regionLocator, queue);
}
void performBulkLoad(final Admin admin, Table table, RegionLocator regionLocator,
- Deque<LoadQueueItem> queue) throws IOException {
- ExecutorService pool = createExecutorService();
+ Deque<LoadQueueItem> queue, ExecutorService pool,
+ SecureBulkLoadClient secureClient) throws IOException {
+ int count = 0;
- SecureBulkLoadClient secureClient = new SecureBulkLoadClient(table.getConfiguration(), table);
+ if(isSecureBulkLoadEndpointAvailable()) {
+ LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in future releases.");
+ LOG.warn("Secure bulk load has been integrated into HBase core.");
+ }
- try {
- int count = 0;
+ //If using secure bulk load, get source delegation token, and
+ //prepare staging directory and token
+ // fs is the source filesystem
+ fsDelegationToken.acquireDelegationToken(fs);
+ bulkToken = secureClient.prepareBulkLoad(admin.getConnection());
- if(isSecureBulkLoadEndpointAvailable()) {
- LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in future releases.");
- LOG.warn("Secure bulk load has been integrated into HBase core.");
+ // Assumes that region splits can happen while this occurs.
+ while (!queue.isEmpty()) {
+ // need to reload split keys each iteration.
+ final Pair<byte[][], byte[][]> startEndKeys = regionLocator.getStartEndKeys();
+ if (count != 0) {
+ LOG.info("Split occured while grouping HFiles, retry attempt " +
+ + count + " with " + queue.size() + " files remaining to group or split");
}
- //If using secure bulk load, get source delegation token, and
- //prepare staging directory and token
- // fs is the source filesystem
- fsDelegationToken.acquireDelegationToken(fs);
- bulkToken = secureClient.prepareBulkLoad(admin.getConnection());
-
- // Assumes that region splits can happen while this occurs.
- while (!queue.isEmpty()) {
- // need to reload split keys each iteration.
- final Pair<byte[][], byte[][]> startEndKeys = regionLocator.getStartEndKeys();
- if (count != 0) {
- LOG.info("Split occured while grouping HFiles, retry attempt " +
- + count + " with " + queue.size() + " files remaining to group or split");
- }
-
- int maxRetries = getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10);
- maxRetries = Math.max(maxRetries, startEndKeys.getFirst().length + 1);
- if (maxRetries != 0 && count >= maxRetries) {
- throw new IOException("Retry attempted " + count +
+ int maxRetries = getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10);
+ maxRetries = Math.max(maxRetries, startEndKeys.getFirst().length + 1);
+ if (maxRetries != 0 && count >= maxRetries) {
+ throw new IOException("Retry attempted " + count +
" times without completing, bailing out");
- }
- count++;
+ }
+ count++;
- // Using ByteBuffer for byte[] equality semantics
- Multimap<ByteBuffer, LoadQueueItem> regionGroups = groupOrSplitPhase(table,
- pool, queue, startEndKeys);
+ // Using ByteBuffer for byte[] equality semantics
+ Multimap<ByteBuffer, LoadQueueItem> regionGroups = groupOrSplitPhase(table,
+ pool, queue, startEndKeys);
- if (!checkHFilesCountPerRegionPerFamily(regionGroups)) {
- // Error is logged inside checkHFilesCountPerRegionPerFamily.
- throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily
+ if (!checkHFilesCountPerRegionPerFamily(regionGroups)) {
+ // Error is logged inside checkHFilesCountPerRegionPerFamily.
+ throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily
+ " hfiles to one family of one region");
- }
-
- bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups);
-
- // NOTE: The next iteration's split / group could happen in parallel to
- // atomic bulkloads assuming that there are splits and no merges, and
- // that we can atomically pull out the groups we want to retry.
}
- } finally {
- fsDelegationToken.releaseDelegationToken();
- if(bulkToken != null) {
- secureClient.cleanupBulkLoad(admin.getConnection(), bulkToken);
- }
- pool.shutdown();
- if (!queue.isEmpty()) {
- StringBuilder err = new StringBuilder();
- err.append("-------------------------------------------------\n");
- err.append("Bulk load aborted with some files not yet loaded:\n");
- err.append("-------------------------------------------------\n");
- for (LoadQueueItem q : queue) {
- err.append(" ").append(q.hfilePath).append('\n');
- }
- LOG.error(err);
- }
+ bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups);
+
+ // NOTE: The next iteration's split / group could happen in parallel to
+ // atomic bulkloads assuming that there are splits and no merges, and
+ // that we can atomically pull out the groups we want to retry.
}
if (!queue.isEmpty()) {