You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/01/11 23:20:56 UTC

hbase git commit: HBASE-17165 Add retry to LoadIncrementalHFiles tool

Repository: hbase
Updated Branches:
  refs/heads/master 471cf13b1 -> f7d0f15c9


HBASE-17165 Add retry to LoadIncrementalHFiles tool

Signed-off-by: Michael Stack <st...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f7d0f15c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f7d0f15c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f7d0f15c

Branch: refs/heads/master
Commit: f7d0f15c99e7eacb487ba9e06cfa42ecc4d41263
Parents: 471cf13
Author: Mike Grimes <gr...@amazon.com>
Authored: Mon Nov 21 14:34:07 2016 -0800
Committer: Michael Stack <st...@apache.org>
Committed: Wed Jan 11 15:20:48 2017 -0800

----------------------------------------------------------------------
 .../hbase/mapreduce/LoadIncrementalHFiles.java  | 136 +++++++++++--------
 .../TestLoadIncrementalHFilesSplitRecovery.java |  69 ++++++++--
 2 files changed, 136 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/f7d0f15c/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index 980dcb1..963c4a1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -45,6 +45,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.lang.mutable.MutableInt;
 import org.apache.commons.logging.Log;
@@ -110,6 +111,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
   private boolean initalized = false;
 
   public static final String NAME = "completebulkload";
+  static final String RETRY_ON_IO_EXCEPTION = "hbase.bulkload.retries.retryOnIOException";
   public static final String MAX_FILES_PER_REGION_PER_FAMILY
     = "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily";
   private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
@@ -133,6 +135,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
   private UserProvider userProvider;
   private int nrThreads;
   private RpcControllerFactory rpcControllerFactory;
+  private AtomicInteger numRetries;
 
   private Map<LoadQueueItem, ByteBuffer> retValue = null;
 
@@ -158,6 +161,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
     nrThreads = conf.getInt("hbase.loadincremental.threads.max",
       Runtime.getRuntime().availableProcessors());
     initalized = true;
+    numRetries = new AtomicInteger(1);
   }
 
   private void usage() {
@@ -510,6 +514,69 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
     return item2RegionMap;
   }
 
+  protected ClientServiceCallable<byte[]> buildClientServiceCallable(final Connection conn,
+      TableName tableName, byte[] first, Collection<LoadQueueItem> lqis, boolean copyFile) {
+
+    final List<Pair<byte[], String>> famPaths = new ArrayList<>(lqis.size());
+    for (LoadQueueItem lqi : lqis) {
+        famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString()));
+    }
+
+    return new ClientServiceCallable<byte[]>(conn,
+        tableName, first, rpcControllerFactory.newController()) {
+      @Override
+      protected byte[] rpcCall() throws Exception {
+        SecureBulkLoadClient secureClient = null;
+        boolean success = false;
+        try {
+          LOG.debug("Going to connect to server " + getLocation() + " for row "
+              + Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths);
+          byte[] regionName = getLocation().getRegionInfo().getRegionName();
+          try (Table table = conn.getTable(getTableName())) {
+            secureClient = new SecureBulkLoadClient(getConf(), table);
+            success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
+                assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, copyFile);
+          }
+          return success ? regionName : null;
+        } finally {
+          //Best effort copying of files that might not have been imported
+          //from the staging directory back to original location
+          //in user directory
+          if (secureClient != null && !success) {
+            FileSystem targetFs = FileSystem.get(getConf());
+            // fs is the source filesystem
+            if (fs == null) {
+              fs = lqis.iterator().next().hfilePath.getFileSystem(getConf());
+            }
+            // Check to see if the source and target filesystems are the same
+            // If they are the same filesystem, we will try move the files back
+            // because previously we moved them to the staging directory.
+            if (FSHDFSUtils.isSameHdfs(getConf(), fs, targetFs)) {
+              for (Pair<byte[], String> el : famPaths) {
+                Path hfileStagingPath = null;
+                Path hfileOrigPath = new Path(el.getSecond());
+                try {
+                  hfileStagingPath = new Path(new Path(bulkToken, Bytes.toString(el.getFirst())),
+                      hfileOrigPath.getName());
+                  if (targetFs.rename(hfileStagingPath, hfileOrigPath)) {
+                    LOG.debug("Moved back file " + hfileOrigPath + " from " +
+                        hfileStagingPath);
+                  } else if (targetFs.exists(hfileStagingPath)) {
+                    LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
+                        hfileStagingPath);
+                  }
+                } catch (Exception ex) {
+                  LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
+                      hfileStagingPath, ex);
+                }
+              }
+            }
+          }
+        }
+      }
+    };
+  }
+
   /**
    * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the
    * passed directory and validates whether the prepared queue has all the valid table column
@@ -655,11 +722,14 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
       final byte[] first = e.getKey().array();
       final Collection<LoadQueueItem> lqis =  e.getValue();
 
+      final ClientServiceCallable<byte[]> serviceCallable =
+          buildClientServiceCallable(conn, table.getName(), first, lqis, copyFile);
+
       final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() {
         @Override
         public List<LoadQueueItem> call() throws Exception {
           List<LoadQueueItem> toRetry =
-              tryAtomicRegionLoad(conn, table.getName(), first, lqis, copyFile);
+              tryAtomicRegionLoad(serviceCallable, table.getName(), first, lqis);
           return toRetry;
         }
       };
@@ -946,75 +1016,21 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
    * @return empty list if success, list of items to retry on recoverable
    *   failure
    */
-  protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn,
-      final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis,
-      boolean copyFile) throws IOException {
+  protected List<LoadQueueItem> tryAtomicRegionLoad(ClientServiceCallable<byte[]> serviceCallable,
+      final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis)
+      throws IOException {
     final List<Pair<byte[], String>> famPaths = new ArrayList<>(lqis.size());
     for (LoadQueueItem lqi : lqis) {
       if (!unmatchedFamilies.contains(Bytes.toString(lqi.family))) {
         famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString()));
       }
     }
-    final ClientServiceCallable<byte[]> svrCallable = new ClientServiceCallable<byte[]>(conn,
-        tableName, first, rpcControllerFactory.newController()) {
-      @Override
-      protected byte[] rpcCall() throws Exception {
-        SecureBulkLoadClient secureClient = null;
-        boolean success = false;
-        try {
-          LOG.debug("Going to connect to server " + getLocation() + " for row "
-              + Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths);
-          byte[] regionName = getLocation().getRegionInfo().getRegionName();
-          try (Table table = conn.getTable(getTableName())) {
-            secureClient = new SecureBulkLoadClient(getConf(), table);
-            success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
-                  assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, copyFile);
-          }
-          return success ? regionName : null;
-        } finally {
-          //Best effort copying of files that might not have been imported
-          //from the staging directory back to original location
-          //in user directory
-          if (secureClient != null && !success) {
-            FileSystem targetFs = FileSystem.get(getConf());
-         // fs is the source filesystem
-            if(fs == null) {
-              fs = lqis.iterator().next().hfilePath.getFileSystem(getConf());
-            }
-            // Check to see if the source and target filesystems are the same
-            // If they are the same filesystem, we will try move the files back
-            // because previously we moved them to the staging directory.
-            if (FSHDFSUtils.isSameHdfs(getConf(), fs, targetFs)) {
-              for(Pair<byte[], String> el : famPaths) {
-                Path hfileStagingPath = null;
-                Path hfileOrigPath = new Path(el.getSecond());
-                try {
-                  hfileStagingPath= new Path(new Path(bulkToken, Bytes.toString(el.getFirst())),
-                    hfileOrigPath.getName());
-                  if(targetFs.rename(hfileStagingPath, hfileOrigPath)) {
-                    LOG.debug("Moved back file " + hfileOrigPath + " from " +
-                        hfileStagingPath);
-                  } else if(targetFs.exists(hfileStagingPath)){
-                    LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
-                        hfileStagingPath);
-                  }
-                } catch(Exception ex) {
-                  LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
-                      hfileStagingPath, ex);
-                }
-              }
-            }
-          }
-        }
-      }
-    };
-
     try {
       List<LoadQueueItem> toRetry = new ArrayList<>();
       Configuration conf = getConf();
       byte[] region = RpcRetryingCallerFactory.instantiate(conf,
           null).<byte[]> newCaller()
-          .callWithRetries(svrCallable, Integer.MAX_VALUE);
+          .callWithRetries(serviceCallable, Integer.MAX_VALUE);
       if (region == null) {
         LOG.warn("Attempt to bulk load region containing "
             + Bytes.toStringBinary(first) + " into table "
@@ -1026,7 +1042,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
       return toRetry;
     } catch (IOException e) {
       LOG.error("Encountered unrecoverable error from region server, additional details: "
-          + svrCallable.getExceptionMessageAdditionalDetail(), e);
+          + serviceCallable.getExceptionMessageAdditionalDetail(), e);
       throw e;
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f7d0f15c/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
index 8337da8..a0bac77 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableExistsException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ClientServiceCallable;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -57,7 +58,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.LoadQueueItem;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
@@ -275,31 +276,34 @@ public class TestLoadIncrementalHFilesSplitRecovery {
    */
   @Test(expected=IOException.class, timeout=120000)
   public void testBulkLoadPhaseFailure() throws Exception {
-    TableName table = TableName.valueOf("bulkLoadPhaseFailure");
+    final TableName table = TableName.valueOf("bulkLoadPhaseFailure");
     final AtomicInteger attmptedCalls = new AtomicInteger();
     final AtomicInteger failedCalls = new AtomicInteger();
     util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
-    try (Connection connection = ConnectionFactory.createConnection(this.util.getConfiguration())) {
+    try (Connection connection = ConnectionFactory.createConnection(util
+        .getConfiguration())) {
       setupTable(connection, table, 10);
-      LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
+      LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
+          util.getConfiguration()) {
         @Override
-        protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn,
-            TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis,
-            boolean copyFile) throws IOException {
+        protected List<LoadQueueItem> tryAtomicRegionLoad(
+            ClientServiceCallable<byte[]> serviceCallable, TableName tableName, final byte[] first,
+            Collection<LoadQueueItem> lqis) throws IOException {
           int i = attmptedCalls.incrementAndGet();
           if (i == 1) {
             Connection errConn;
             try {
               errConn = getMockedConnection(util.getConfiguration());
+              serviceCallable = this.buildClientServiceCallable(errConn, table, first, lqis, true);
             } catch (Exception e) {
               LOG.fatal("mocking cruft, should never happen", e);
               throw new RuntimeException("mocking cruft, should never happen");
             }
             failedCalls.incrementAndGet();
-            return super.tryAtomicRegionLoad(errConn, tableName, first, lqis, copyFile);
+            return super.tryAtomicRegionLoad(serviceCallable, tableName, first, lqis);
           }
 
-          return super.tryAtomicRegionLoad(conn, tableName, first, lqis, copyFile);
+          return super.tryAtomicRegionLoad(serviceCallable, tableName, first, lqis);
         }
       };
       try {
@@ -318,6 +322,53 @@ public class TestLoadIncrementalHFilesSplitRecovery {
     }
   }
 
+  /**
+   * Test that shows that exception thrown from the RS side will result in the
+   * expected number of retries set by ${@link HConstants#HBASE_CLIENT_RETRIES_NUMBER}
+   * when ${@link LoadIncrementalHFiles#RETRY_ON_IO_EXCEPTION} is set
+   */
+  @Test
+  public void testRetryOnIOException() throws Exception {
+    final TableName table = TableName.valueOf("retryOnIOException");
+    final AtomicInteger calls = new AtomicInteger(1);
+    final Connection conn = ConnectionFactory.createConnection(util
+        .getConfiguration());
+    util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
+    util.getConfiguration().setBoolean(
+        LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, true);
+    final LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
+        util.getConfiguration()) {
+      @Override
+      protected List<LoadQueueItem> tryAtomicRegionLoad(
+          ClientServiceCallable<byte[]> serverCallable, TableName tableName,
+          final byte[] first, Collection<LoadQueueItem> lqis)
+          throws IOException {
+        if (calls.getAndIncrement() < util.getConfiguration().getInt(
+            HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+            HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) - 1) {
+          ClientServiceCallable<byte[]> newServerCallable = new ClientServiceCallable<byte[]>(
+              conn, tableName, first, new RpcControllerFactory(
+                  util.getConfiguration()).newController()) {
+            @Override
+            public byte[] rpcCall() throws Exception {
+              throw new IOException("Error calling something on RegionServer");
+            }
+          };
+          return super.tryAtomicRegionLoad(newServerCallable, tableName, first, lqis);
+        } else {
+          return super.tryAtomicRegionLoad(serverCallable, tableName, first, lqis);
+        }
+      }
+    };
+    setupTable(conn, table, 10);
+    Path dir = buildBulkFiles(table, 1);
+    lih.doBulkLoad(dir, conn.getAdmin(), conn.getTable(table),
+        conn.getRegionLocator(table));
+    util.getConfiguration().setBoolean(
+        LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, false);
+
+  }
+
   @SuppressWarnings("deprecation")
   private ClusterConnection getMockedConnection(final Configuration conf)
   throws IOException, org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException {