You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2016/10/14 16:07:45 UTC

hbase git commit: HBASE-16821 Enhance LoadIncrementalHFiles API to convey missing hfiles if any

Repository: hbase
Updated Branches:
  refs/heads/master 07086036a -> 39d43ab77


HBASE-16821 Enhance LoadIncrementalHFiles API to convey missing hfiles if any


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/39d43ab7
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/39d43ab7
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/39d43ab7

Branch: refs/heads/master
Commit: 39d43ab779a90f28273426ee2887daacaa6b1f48
Parents: 0708603
Author: tedyu <yu...@gmail.com>
Authored: Fri Oct 14 09:07:38 2016 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Fri Oct 14 09:07:38 2016 -0700

----------------------------------------------------------------------
 .../hbase/mapreduce/LoadIncrementalHFiles.java  | 83 +++++++++++++-------
 .../mapreduce/TestLoadIncrementalHFiles.java    |  7 +-
 .../TestLoadIncrementalHFilesSplitRecovery.java | 20 ++---
 3 files changed, 70 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/39d43ab7/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index ccf44da..3647637 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -362,9 +362,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
    * @param regionLocator region locator
    * @param silence true to ignore unmatched column families
    * @param copyFile always copy hfiles if true
+   * @return List of filenames which were not found
    * @throws TableNotFoundException if table does not yet exist
    */
-  public void doBulkLoad(Map<byte[], List<Path>> map, final Admin admin, Table table,
+  public List<String> doBulkLoad(Map<byte[], List<Path>> map, final Admin admin, Table table,
           RegionLocator regionLocator, boolean silence, boolean copyFile)
               throws TableNotFoundException, IOException {
     if (!admin.isTableAvailable(regionLocator.getName())) {
@@ -379,7 +380,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
       prepareHFileQueue(map, table, queue, silence);
       if (queue.isEmpty()) {
         LOG.warn("Bulk load operation did not get any files to load");
-        return;
+        return null;
       }
       pool = createExecutorService();
       secureClient = new SecureBulkLoadClient(table.getConfiguration(), table);
@@ -389,7 +390,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
           break;
         }
       }
-      performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile);
+      return performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile);
     } finally {
       cleanup(admin, queue, pool, secureClient);
     }
@@ -448,7 +449,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
     }
   }
 
-  void performBulkLoad(final Admin admin, Table table, RegionLocator regionLocator,
+  List<String> performBulkLoad(final Admin admin, Table table, RegionLocator regionLocator,
       Deque<LoadQueueItem> queue, ExecutorService pool,
       SecureBulkLoadClient secureClient, boolean copyFile) throws IOException {
     int count = 0;
@@ -463,6 +464,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
     // fs is the source filesystem
     fsDelegationToken.acquireDelegationToken(fs);
     bulkToken = secureClient.prepareBulkLoad(admin.getConnection());
+    Pair<Multimap<ByteBuffer, LoadQueueItem>, List<String>> pair = null;
 
     // Assumes that region splits can happen while this occurs.
     while (!queue.isEmpty()) {
@@ -482,8 +484,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
       count++;
 
       // Using ByteBuffer for byte[] equality semantics
-      Multimap<ByteBuffer, LoadQueueItem> regionGroups = groupOrSplitPhase(table,
-          pool, queue, startEndKeys);
+      pair = groupOrSplitPhase(table, pool, queue, startEndKeys);
+      Multimap<ByteBuffer, LoadQueueItem> regionGroups = pair.getFirst();
 
       if (!checkHFilesCountPerRegionPerFamily(regionGroups)) {
         // Error is logged inside checkHFilesCountPerRegionPerFamily.
@@ -502,6 +504,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
       throw new RuntimeException("Bulk load aborted with some files not yet loaded."
         + "Please check log for more details.");
     }
+    if (pair == null) return null;
+    return pair.getSecond();
   }
 
   /**
@@ -625,7 +629,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
     try {
       pool = createExecutorService();
       Multimap<ByteBuffer, LoadQueueItem> regionGroups =
-          groupOrSplitPhase(table, pool, queue, startEndKeys);
+          groupOrSplitPhase(table, pool, queue, startEndKeys).getFirst();
       bulkLoadPhase(table, conn, pool, queue, regionGroups, copyFile);
     } finally {
       if (pool != null) {
@@ -709,25 +713,34 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
   }
 
   /**
-   * @return A map that groups LQI by likely bulk load region targets.
+   * @param table the table to load into
+   * @param pool the ExecutorService
+   * @param queue the queue for LoadQueueItem
+   * @param startEndKeys start and end keys
+   * @return A map that groups LQI by likely bulk load region targets and List of missing hfiles.
    */
-  private Multimap<ByteBuffer, LoadQueueItem> groupOrSplitPhase(final Table table,
-      ExecutorService pool, Deque<LoadQueueItem> queue,
+  private Pair<Multimap<ByteBuffer, LoadQueueItem>, List<String>> groupOrSplitPhase(
+      final Table table, ExecutorService pool, Deque<LoadQueueItem> queue,
       final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
     // <region start key, LQI> need synchronized only within this scope of this
     // phase because of the puts that happen in futures.
     Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create();
     final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs);
+    List<String> missingHFiles = new ArrayList<>();
+    Pair<Multimap<ByteBuffer, LoadQueueItem>, List<String>> pair = new Pair<>(regionGroups,
+        missingHFiles);
 
     // drain LQIs and figure out bulk load groups
-    Set<Future<List<LoadQueueItem>>> splittingFutures = new HashSet<>();
+    Set<Future<Pair<List<LoadQueueItem>, String>>> splittingFutures = new HashSet<>();
     while (!queue.isEmpty()) {
       final LoadQueueItem item = queue.remove();
 
-      final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() {
+      final Callable<Pair<List<LoadQueueItem>, String>> call =
+          new Callable<Pair<List<LoadQueueItem>, String>>() {
         @Override
-        public List<LoadQueueItem> call() throws Exception {
-          List<LoadQueueItem> splits = groupOrSplit(regionGroups, item, table, startEndKeys);
+        public Pair<List<LoadQueueItem>, String> call() throws Exception {
+          Pair<List<LoadQueueItem>, String> splits = groupOrSplit(regionGroups, item, table,
+              startEndKeys);
           return splits;
         }
       };
@@ -735,11 +748,15 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
     }
     // get all the results.  All grouping and splitting must finish before
     // we can attempt the atomic loads.
-    for (Future<List<LoadQueueItem>> lqis : splittingFutures) {
+    for (Future<Pair<List<LoadQueueItem>, String>> lqis : splittingFutures) {
       try {
-        List<LoadQueueItem> splits = lqis.get();
+        Pair<List<LoadQueueItem>, String> splits = lqis.get();
         if (splits != null) {
-          queue.addAll(splits);
+          if (splits.getFirst() != null) {
+            queue.addAll(splits.getFirst());
+          } else {
+            missingHFiles.add(splits.getSecond());
+          }
         }
       } catch (ExecutionException e1) {
         Throwable t = e1.getCause();
@@ -754,7 +771,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
         throw (InterruptedIOException)new InterruptedIOException().initCause(e1);
       }
     }
-    return regionGroups;
+    return pair;
   }
 
   // unique file name for the table
@@ -817,17 +834,22 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
    * protected for testing
    * @throws IOException if an IO failure is encountered
    */
-  protected List<LoadQueueItem> groupOrSplit(Multimap<ByteBuffer, LoadQueueItem> regionGroups,
-      final LoadQueueItem item, final Table table,
-      final Pair<byte[][], byte[][]> startEndKeys)
-      throws IOException {
+  protected Pair<List<LoadQueueItem>, String> groupOrSplit(
+      Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, final Table table,
+      final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
     final Path hfilePath = item.hfilePath;
     // fs is the source filesystem
     if (fs == null) {
       fs = hfilePath.getFileSystem(getConf());
     }
-    HFile.Reader hfr = HFile.createReader(fs, hfilePath,
-        new CacheConfig(getConf()), getConf());
+    HFile.Reader hfr = null;
+    try {
+      hfr = HFile.createReader(fs, hfilePath,
+          new CacheConfig(getConf()), getConf());
+    } catch (FileNotFoundException fnfe) {
+      LOG.debug("encountered", fnfe);
+      return new Pair<>(null, hfilePath.getName());
+    }
     final byte[] first, last;
     try {
       hfr.loadFileInfo();
@@ -890,7 +912,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
       List<LoadQueueItem> lqis = splitStoreFile(item, table,
           startEndKeys.getFirst()[indexForCallable],
           startEndKeys.getSecond()[indexForCallable]);
-      return lqis;
+      return new Pair<>(lqis, null);
     }
 
     // group regions.
@@ -1171,7 +1193,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
     LOG.info("Table "+ tableName +" is available!!");
   }
 
-  public int run(String dirPath, Map<byte[], List<Path>> map, TableName tableName) throws Exception{
+  public List<String> run(String dirPath, Map<byte[], List<Path>> map, TableName tableName) throws Exception{
     initialize();
     try (Connection connection = ConnectionFactory.createConnection(getConf());
         Admin admin = connection.getAdmin()) {
@@ -1197,13 +1219,12 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
         boolean copyFiles = "yes".equalsIgnoreCase(getConf().get(ALWAYS_COPY_FILES, ""));
         if (dirPath != null) {
           doBulkLoad(hfofDir, admin, table, locator, silence, copyFiles);
+          return null;
         } else {
-          doBulkLoad(map, admin, table, locator, silence, copyFiles);
+          return doBulkLoad(map, admin, table, locator, silence, copyFiles);
         }
       }
     }
-
-    return 0;
   }
 
   @Override
@@ -1215,7 +1236,9 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
 
     String dirPath = args[0];
     TableName tableName = TableName.valueOf(args[1]);
-    return run(dirPath, null, tableName);
+    List<String> missingHFiles = run(dirPath, null, tableName);
+    if (missingHFiles == null) return 0;
+    return -1;
   }
 
   public static void main(String[] args) throws Exception {

http://git-wip-us.apache.org/repos/asf/hbase/blob/39d43ab7/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
index 88b9247..fe7abcd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
@@ -323,12 +323,14 @@ public class TestLoadIncrementalHFiles {
       map = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
       map.put(FAMILY, list);
     }
+    Path last = null;
     for (byte[][] range : hfileRanges) {
       byte[] from = range[0];
       byte[] to = range[1];
       Path path = new Path(familyDir, "hfile_" + hfileIdx++);
       HFileTestUtil.createHFile(util.getConfiguration(), fs, path, FAMILY, QUALIFIER, from, to, 1000);
       if (useMap) {
+        last = path;
         list.add(path);
       }
     }
@@ -346,7 +348,10 @@ public class TestLoadIncrementalHFiles {
     LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
     String [] args= {dir.toString(), tableName.toString()};
     if (useMap) {
-      loader.run(null, map, tableName);
+      fs.delete(last);
+      List<String> missingHFiles = loader.run(null, map, tableName);
+      expectedRows -= 1000;
+      assertTrue(missingHFiles.contains(last.getName()));
     } else {
       loader.run(args);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/39d43ab7/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
index 2060726..a1ed832 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
@@ -404,13 +404,14 @@ public class TestLoadIncrementalHFilesSplitRecovery {
       LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
           util.getConfiguration()) {
         @Override
-        protected List<LoadQueueItem> groupOrSplit(
+        protected Pair<List<LoadQueueItem>, String> groupOrSplit(
             Multimap<ByteBuffer, LoadQueueItem> regionGroups,
             final LoadQueueItem item, final Table htable,
             final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
-          List<LoadQueueItem> lqis = super.groupOrSplit(regionGroups, item, htable, startEndKeys);
-          if (lqis != null) {
-            countedLqis.addAndGet(lqis.size());
+          Pair<List<LoadQueueItem>, String> lqis = super.groupOrSplit(regionGroups, item, htable,
+              startEndKeys);
+          if (lqis != null && lqis.getFirst() != null) {
+            countedLqis.addAndGet(lqis.getFirst().size());
           }
           return lqis;
         }
@@ -479,7 +480,7 @@ public class TestLoadIncrementalHFilesSplitRecovery {
         int i = 0;
 
         @Override
-        protected List<LoadQueueItem> groupOrSplit(
+        protected Pair<List<LoadQueueItem>, String> groupOrSplit(
             Multimap<ByteBuffer, LoadQueueItem> regionGroups,
             final LoadQueueItem item, final Table table,
             final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
@@ -521,13 +522,14 @@ public class TestLoadIncrementalHFilesSplitRecovery {
     LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) {
 
       @Override
-      protected List<LoadQueueItem> groupOrSplit(
+      protected Pair<List<LoadQueueItem>, String> groupOrSplit(
           Multimap<ByteBuffer, LoadQueueItem> regionGroups,
           final LoadQueueItem item, final Table htable,
           final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
-        List<LoadQueueItem> lqis = super.groupOrSplit(regionGroups, item, htable, startEndKeys);
-        if (lqis != null) {
-          countedLqis.addAndGet(lqis.size());
+        Pair<List<LoadQueueItem>, String> lqis = super.groupOrSplit(regionGroups, item, htable,
+            startEndKeys);
+        if (lqis != null && lqis.getFirst() != null) {
+          countedLqis.addAndGet(lqis.getFirst().size());
         }
         return lqis;
       }