You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2016/10/14 16:07:45 UTC
hbase git commit: HBASE-16821 Enhance LoadIncrementalHFiles API to
convey missing hfiles if any
Repository: hbase
Updated Branches:
refs/heads/master 07086036a -> 39d43ab77
HBASE-16821 Enhance LoadIncrementalHFiles API to convey missing hfiles if any
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/39d43ab7
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/39d43ab7
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/39d43ab7
Branch: refs/heads/master
Commit: 39d43ab779a90f28273426ee2887daacaa6b1f48
Parents: 0708603
Author: tedyu <yu...@gmail.com>
Authored: Fri Oct 14 09:07:38 2016 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Fri Oct 14 09:07:38 2016 -0700
----------------------------------------------------------------------
.../hbase/mapreduce/LoadIncrementalHFiles.java | 83 +++++++++++++-------
.../mapreduce/TestLoadIncrementalHFiles.java | 7 +-
.../TestLoadIncrementalHFilesSplitRecovery.java | 20 ++---
3 files changed, 70 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/39d43ab7/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index ccf44da..3647637 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -362,9 +362,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
* @param regionLocator region locator
* @param silence true to ignore unmatched column families
* @param copyFile always copy hfiles if true
+ * @return List of filenames which were not found
* @throws TableNotFoundException if table does not yet exist
*/
- public void doBulkLoad(Map<byte[], List<Path>> map, final Admin admin, Table table,
+ public List<String> doBulkLoad(Map<byte[], List<Path>> map, final Admin admin, Table table,
RegionLocator regionLocator, boolean silence, boolean copyFile)
throws TableNotFoundException, IOException {
if (!admin.isTableAvailable(regionLocator.getName())) {
@@ -379,7 +380,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
prepareHFileQueue(map, table, queue, silence);
if (queue.isEmpty()) {
LOG.warn("Bulk load operation did not get any files to load");
- return;
+ return null;
}
pool = createExecutorService();
secureClient = new SecureBulkLoadClient(table.getConfiguration(), table);
@@ -389,7 +390,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
break;
}
}
- performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile);
+ return performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile);
} finally {
cleanup(admin, queue, pool, secureClient);
}
@@ -448,7 +449,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
}
}
- void performBulkLoad(final Admin admin, Table table, RegionLocator regionLocator,
+ List<String> performBulkLoad(final Admin admin, Table table, RegionLocator regionLocator,
Deque<LoadQueueItem> queue, ExecutorService pool,
SecureBulkLoadClient secureClient, boolean copyFile) throws IOException {
int count = 0;
@@ -463,6 +464,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
// fs is the source filesystem
fsDelegationToken.acquireDelegationToken(fs);
bulkToken = secureClient.prepareBulkLoad(admin.getConnection());
+ Pair<Multimap<ByteBuffer, LoadQueueItem>, List<String>> pair = null;
// Assumes that region splits can happen while this occurs.
while (!queue.isEmpty()) {
@@ -482,8 +484,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
count++;
// Using ByteBuffer for byte[] equality semantics
- Multimap<ByteBuffer, LoadQueueItem> regionGroups = groupOrSplitPhase(table,
- pool, queue, startEndKeys);
+ pair = groupOrSplitPhase(table, pool, queue, startEndKeys);
+ Multimap<ByteBuffer, LoadQueueItem> regionGroups = pair.getFirst();
if (!checkHFilesCountPerRegionPerFamily(regionGroups)) {
// Error is logged inside checkHFilesCountPerRegionPerFamily.
@@ -502,6 +504,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
throw new RuntimeException("Bulk load aborted with some files not yet loaded."
+ "Please check log for more details.");
}
+ if (pair == null) return null;
+ return pair.getSecond();
}
/**
@@ -625,7 +629,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
try {
pool = createExecutorService();
Multimap<ByteBuffer, LoadQueueItem> regionGroups =
- groupOrSplitPhase(table, pool, queue, startEndKeys);
+ groupOrSplitPhase(table, pool, queue, startEndKeys).getFirst();
bulkLoadPhase(table, conn, pool, queue, regionGroups, copyFile);
} finally {
if (pool != null) {
@@ -709,25 +713,34 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
}
/**
- * @return A map that groups LQI by likely bulk load region targets.
+ * @param table the table to load into
+ * @param pool the ExecutorService
+ * @param queue the queue for LoadQueueItem
+ * @param startEndKeys start and end keys
+ * @return A map that groups LQI by likely bulk load region targets and List of missing hfiles.
*/
- private Multimap<ByteBuffer, LoadQueueItem> groupOrSplitPhase(final Table table,
- ExecutorService pool, Deque<LoadQueueItem> queue,
+ private Pair<Multimap<ByteBuffer, LoadQueueItem>, List<String>> groupOrSplitPhase(
+ final Table table, ExecutorService pool, Deque<LoadQueueItem> queue,
final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
// <region start key, LQI> need synchronized only within this scope of this
// phase because of the puts that happen in futures.
Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create();
final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs);
+ List<String> missingHFiles = new ArrayList<>();
+ Pair<Multimap<ByteBuffer, LoadQueueItem>, List<String>> pair = new Pair<>(regionGroups,
+ missingHFiles);
// drain LQIs and figure out bulk load groups
- Set<Future<List<LoadQueueItem>>> splittingFutures = new HashSet<>();
+ Set<Future<Pair<List<LoadQueueItem>, String>>> splittingFutures = new HashSet<>();
while (!queue.isEmpty()) {
final LoadQueueItem item = queue.remove();
- final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() {
+ final Callable<Pair<List<LoadQueueItem>, String>> call =
+ new Callable<Pair<List<LoadQueueItem>, String>>() {
@Override
- public List<LoadQueueItem> call() throws Exception {
- List<LoadQueueItem> splits = groupOrSplit(regionGroups, item, table, startEndKeys);
+ public Pair<List<LoadQueueItem>, String> call() throws Exception {
+ Pair<List<LoadQueueItem>, String> splits = groupOrSplit(regionGroups, item, table,
+ startEndKeys);
return splits;
}
};
@@ -735,11 +748,15 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
}
// get all the results. All grouping and splitting must finish before
// we can attempt the atomic loads.
- for (Future<List<LoadQueueItem>> lqis : splittingFutures) {
+ for (Future<Pair<List<LoadQueueItem>, String>> lqis : splittingFutures) {
try {
- List<LoadQueueItem> splits = lqis.get();
+ Pair<List<LoadQueueItem>, String> splits = lqis.get();
if (splits != null) {
- queue.addAll(splits);
+ if (splits.getFirst() != null) {
+ queue.addAll(splits.getFirst());
+ } else {
+ missingHFiles.add(splits.getSecond());
+ }
}
} catch (ExecutionException e1) {
Throwable t = e1.getCause();
@@ -754,7 +771,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
throw (InterruptedIOException)new InterruptedIOException().initCause(e1);
}
}
- return regionGroups;
+ return pair;
}
// unique file name for the table
@@ -817,17 +834,22 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
* protected for testing
* @throws IOException if an IO failure is encountered
*/
- protected List<LoadQueueItem> groupOrSplit(Multimap<ByteBuffer, LoadQueueItem> regionGroups,
- final LoadQueueItem item, final Table table,
- final Pair<byte[][], byte[][]> startEndKeys)
- throws IOException {
+ protected Pair<List<LoadQueueItem>, String> groupOrSplit(
+ Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, final Table table,
+ final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
final Path hfilePath = item.hfilePath;
// fs is the source filesystem
if (fs == null) {
fs = hfilePath.getFileSystem(getConf());
}
- HFile.Reader hfr = HFile.createReader(fs, hfilePath,
- new CacheConfig(getConf()), getConf());
+ HFile.Reader hfr = null;
+ try {
+ hfr = HFile.createReader(fs, hfilePath,
+ new CacheConfig(getConf()), getConf());
+ } catch (FileNotFoundException fnfe) {
+ LOG.debug("encountered", fnfe);
+ return new Pair<>(null, hfilePath.getName());
+ }
final byte[] first, last;
try {
hfr.loadFileInfo();
@@ -890,7 +912,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
List<LoadQueueItem> lqis = splitStoreFile(item, table,
startEndKeys.getFirst()[indexForCallable],
startEndKeys.getSecond()[indexForCallable]);
- return lqis;
+ return new Pair<>(lqis, null);
}
// group regions.
@@ -1171,7 +1193,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
LOG.info("Table "+ tableName +" is available!!");
}
- public int run(String dirPath, Map<byte[], List<Path>> map, TableName tableName) throws Exception{
+ public List<String> run(String dirPath, Map<byte[], List<Path>> map, TableName tableName) throws Exception{
initialize();
try (Connection connection = ConnectionFactory.createConnection(getConf());
Admin admin = connection.getAdmin()) {
@@ -1197,13 +1219,12 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
boolean copyFiles = "yes".equalsIgnoreCase(getConf().get(ALWAYS_COPY_FILES, ""));
if (dirPath != null) {
doBulkLoad(hfofDir, admin, table, locator, silence, copyFiles);
+ return null;
} else {
- doBulkLoad(map, admin, table, locator, silence, copyFiles);
+ return doBulkLoad(map, admin, table, locator, silence, copyFiles);
}
}
}
-
- return 0;
}
@Override
@@ -1215,7 +1236,9 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
String dirPath = args[0];
TableName tableName = TableName.valueOf(args[1]);
- return run(dirPath, null, tableName);
+ List<String> missingHFiles = run(dirPath, null, tableName);
+ if (missingHFiles == null) return 0;
+ return -1;
}
public static void main(String[] args) throws Exception {
http://git-wip-us.apache.org/repos/asf/hbase/blob/39d43ab7/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
index 88b9247..fe7abcd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
@@ -323,12 +323,14 @@ public class TestLoadIncrementalHFiles {
map = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
map.put(FAMILY, list);
}
+ Path last = null;
for (byte[][] range : hfileRanges) {
byte[] from = range[0];
byte[] to = range[1];
Path path = new Path(familyDir, "hfile_" + hfileIdx++);
HFileTestUtil.createHFile(util.getConfiguration(), fs, path, FAMILY, QUALIFIER, from, to, 1000);
if (useMap) {
+ last = path;
list.add(path);
}
}
@@ -346,7 +348,10 @@ public class TestLoadIncrementalHFiles {
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
String [] args= {dir.toString(), tableName.toString()};
if (useMap) {
- loader.run(null, map, tableName);
+ fs.delete(last);
+ List<String> missingHFiles = loader.run(null, map, tableName);
+ expectedRows -= 1000;
+ assertTrue(missingHFiles.contains(last.getName()));
} else {
loader.run(args);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/39d43ab7/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
index 2060726..a1ed832 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
@@ -404,13 +404,14 @@ public class TestLoadIncrementalHFilesSplitRecovery {
LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
util.getConfiguration()) {
@Override
- protected List<LoadQueueItem> groupOrSplit(
+ protected Pair<List<LoadQueueItem>, String> groupOrSplit(
Multimap<ByteBuffer, LoadQueueItem> regionGroups,
final LoadQueueItem item, final Table htable,
final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
- List<LoadQueueItem> lqis = super.groupOrSplit(regionGroups, item, htable, startEndKeys);
- if (lqis != null) {
- countedLqis.addAndGet(lqis.size());
+ Pair<List<LoadQueueItem>, String> lqis = super.groupOrSplit(regionGroups, item, htable,
+ startEndKeys);
+ if (lqis != null && lqis.getFirst() != null) {
+ countedLqis.addAndGet(lqis.getFirst().size());
}
return lqis;
}
@@ -479,7 +480,7 @@ public class TestLoadIncrementalHFilesSplitRecovery {
int i = 0;
@Override
- protected List<LoadQueueItem> groupOrSplit(
+ protected Pair<List<LoadQueueItem>, String> groupOrSplit(
Multimap<ByteBuffer, LoadQueueItem> regionGroups,
final LoadQueueItem item, final Table table,
final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
@@ -521,13 +522,14 @@ public class TestLoadIncrementalHFilesSplitRecovery {
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) {
@Override
- protected List<LoadQueueItem> groupOrSplit(
+ protected Pair<List<LoadQueueItem>, String> groupOrSplit(
Multimap<ByteBuffer, LoadQueueItem> regionGroups,
final LoadQueueItem item, final Table htable,
final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
- List<LoadQueueItem> lqis = super.groupOrSplit(regionGroups, item, htable, startEndKeys);
- if (lqis != null) {
- countedLqis.addAndGet(lqis.size());
+ Pair<List<LoadQueueItem>, String> lqis = super.groupOrSplit(regionGroups, item, htable,
+ startEndKeys);
+ if (lqis != null && lqis.getFirst() != null) {
+ countedLqis.addAndGet(lqis.getFirst().size());
}
return lqis;
}