You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2023/05/10 13:47:32 UTC

[hbase] branch master updated: HBASE-27733 hfile split occurs during bulkload, the new HFile file does not specify favored nodes (#5121)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 5cea8112fde HBASE-27733 hfile split occurs during bulkload, the new HFile file does not specify favored nodes (#5121)
5cea8112fde is described below

commit 5cea8112fde64c019e7c9ed8f9a4220834276eda
Author: alan.zhao <30...@users.noreply.github.com>
AuthorDate: Wed May 10 21:47:20 2023 +0800

    HBASE-27733 hfile split occurs during bulkload, the new HFile file does not specify favored nodes (#5121)
    
    Co-authored-by: alanzhao <al...@126.com>
    Signed-off-by: Wellington Chevreuil <wc...@apache.org>
---
 .../hadoop/hbase/tool/BulkLoadHFilesTool.java      |  87 +++++++++++++---
 .../hadoop/hbase/tool/TestBulkLoadHFiles.java      | 114 +++++++++++++++++++--
 2 files changed, 178 insertions(+), 23 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
index e54de3403e7..9b4e1aea906 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
@@ -22,6 +22,7 @@ import static java.lang.String.format;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -56,13 +57,17 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.client.AsyncAdmin;
 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncTableRegionLocator;
 import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
@@ -114,6 +119,13 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
 
   private static final Logger LOG = LoggerFactory.getLogger(BulkLoadHFilesTool.class);
 
+  /**
+   * Keep locality while generating HFiles for bulkload. See HBASE-12596
+   */
+  public static final String LOCALITY_SENSITIVE_CONF_KEY =
+    "hbase.bulkload.locality.sensitive.enabled";
+  private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
+
   public static final String NAME = "completebulkload";
   /**
    * Whether to run validation on hfiles before loading.
@@ -540,7 +552,6 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
     Set<Future<Pair<List<LoadQueueItem>, String>>> splittingFutures = new HashSet<>();
     while (!queue.isEmpty()) {
       final LoadQueueItem item = queue.remove();
-
       final Callable<Pair<List<LoadQueueItem>, String>> call =
         () -> groupOrSplit(conn, tableName, regionGroups, item, startEndKeys);
       splittingFutures.add(pool.submit(call));
@@ -578,8 +589,8 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
     return UUID.randomUUID().toString().replaceAll("-", "");
   }
 
-  private List<LoadQueueItem> splitStoreFile(LoadQueueItem item, TableDescriptor tableDesc,
-    byte[] splitKey) throws IOException {
+  private List<LoadQueueItem> splitStoreFile(AsyncTableRegionLocator loc, LoadQueueItem item,
+    TableDescriptor tableDesc, byte[] splitKey) throws IOException {
     Path hfilePath = item.getFilePath();
     byte[] family = item.getFamily();
     Path tmpDir = hfilePath.getParent();
@@ -594,7 +605,8 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
 
     Path botOut = new Path(tmpDir, uniqueName + ".bottom");
     Path topOut = new Path(tmpDir, uniqueName + ".top");
-    splitStoreFile(getConf(), hfilePath, familyDesc, splitKey, botOut, topOut);
+
+    splitStoreFile(loc, getConf(), hfilePath, familyDesc, splitKey, botOut, topOut);
 
     FileSystem fs = tmpDir.getFileSystem(getConf());
     fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx"));
@@ -718,8 +730,9 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
         checkRegionIndexValid(splitIdx, startEndKeys, tableName);
       }
       byte[] splitPoint = startEndKeys.get(splitIdx).getSecond();
-      List<LoadQueueItem> lqis =
-        splitStoreFile(item, FutureUtils.get(conn.getAdmin().getDescriptor(tableName)), splitPoint);
+      List<LoadQueueItem> lqis = splitStoreFile(conn.getRegionLocator(tableName), item,
+        FutureUtils.get(conn.getAdmin().getDescriptor(tableName)), splitPoint);
+
       return new Pair<>(lqis, null);
     }
 
@@ -729,25 +742,27 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
   }
 
   /**
-   * Split a storefile into a top and bottom half, maintaining the metadata, recreating bloom
-   * filters, etc.
+   * Split a storefile into a top and bottom half with favored nodes, maintaining the metadata,
+   * recreating bloom filters, etc.
    */
   @InterfaceAudience.Private
-  static void splitStoreFile(Configuration conf, Path inFile, ColumnFamilyDescriptor familyDesc,
-    byte[] splitKey, Path bottomOut, Path topOut) throws IOException {
+  static void splitStoreFile(AsyncTableRegionLocator loc, Configuration conf, Path inFile,
+    ColumnFamilyDescriptor familyDesc, byte[] splitKey, Path bottomOut, Path topOut)
+    throws IOException {
     // Open reader with no block cache, and not in-memory
     Reference topReference = Reference.createTopReference(splitKey);
     Reference bottomReference = Reference.createBottomReference(splitKey);
 
-    copyHFileHalf(conf, inFile, topOut, topReference, familyDesc);
-    copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc);
+    copyHFileHalf(conf, inFile, topOut, topReference, familyDesc, loc);
+    copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc, loc);
   }
 
   /**
-   * Copy half of an HFile into a new HFile.
+   * Copy half of an HFile into a new HFile with favored nodes.
    */
   private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile,
-    Reference reference, ColumnFamilyDescriptor familyDescriptor) throws IOException {
+    Reference reference, ColumnFamilyDescriptor familyDescriptor, AsyncTableRegionLocator loc)
+    throws IOException {
     FileSystem fs = inFile.getFileSystem(conf);
     CacheConfig cacheConf = CacheConfig.DISABLED;
     HalfStoreFileReader halfReader = null;
@@ -769,12 +784,50 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
         .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)).withBlockSize(blocksize)
         .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()).withIncludesTags(true)
         .withCreateTime(EnvironmentEdgeManager.currentTime()).build();
-      halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
-        .withBloomType(bloomFilterType).withFileContext(hFileContext).build();
+
       HFileScanner scanner = halfReader.getScanner(false, false, false);
       scanner.seekTo();
       do {
-        halfWriter.append(scanner.getCell());
+        final Cell cell = scanner.getCell();
+        if (null != halfWriter) {
+          halfWriter.append(cell);
+        } else {
+
+          // init halfwriter
+          if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
+            byte[] rowKey = CellUtil.cloneRow(cell);
+            HRegionLocation hRegionLocation = FutureUtils.get(loc.getRegionLocation(rowKey));
+            InetSocketAddress[] favoredNodes = null;
+            if (null == hRegionLocation) {
+              LOG.warn(
+                "Failed get region location for  rowkey {} , Using writer without favoured nodes.",
+                Bytes.toString(rowKey));
+              halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
+                .withBloomType(bloomFilterType).withFileContext(hFileContext).build();
+            } else {
+              LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey));
+              InetSocketAddress initialIsa =
+                new InetSocketAddress(hRegionLocation.getHostname(), hRegionLocation.getPort());
+              if (initialIsa.isUnresolved()) {
+                LOG.warn("Failed get location for region {} , Using writer without favoured nodes.",
+                  hRegionLocation);
+                halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
+                  .withBloomType(bloomFilterType).withFileContext(hFileContext).build();
+              } else {
+                LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString());
+                favoredNodes = new InetSocketAddress[] { initialIsa };
+                halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
+                  .withBloomType(bloomFilterType).withFileContext(hFileContext)
+                  .withFavoredNodes(favoredNodes).build();
+              }
+            }
+          } else {
+            halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
+              .withBloomType(bloomFilterType).withFileContext(hFileContext).build();
+          }
+          halfWriter.append(cell);
+        }
+
       } while (scanner.next());
 
       for (Map.Entry<byte[], byte[]> entry : fileInfo.entrySet()) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java
index fecf4c7ec2c..c6cbb6458c5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -43,10 +44,12 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtil;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncTableRegionLocator;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Table;
@@ -63,7 +66,12 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.MiscTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.util.HFileTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.hamcrest.MatcherAssert;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -555,15 +563,48 @@ public class TestBulkLoadHFiles {
     FileSystem fs = util.getTestFileSystem();
     Path testIn = new Path(dir, "testhfile");
     ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY);
+    String tableName = tn.getMethodName();
+    util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString());
     HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
       Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
 
     Path bottomOut = new Path(dir, "bottom.out");
     Path topOut = new Path(dir, "top.out");
 
-    BulkLoadHFilesTool.splitStoreFile(util.getConfiguration(), testIn, familyDesc,
-      Bytes.toBytes("ggg"), bottomOut, topOut);
+    BulkLoadHFilesTool.splitStoreFile(
+      util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)),
+      util.getConfiguration(), testIn, familyDesc, Bytes.toBytes("ggg"), bottomOut, topOut);
+
+    int rowCount = verifyHFile(bottomOut);
+    rowCount += verifyHFile(topOut);
+    assertEquals(1000, rowCount);
+  }
+
+  /**
+   * Test hfile splits with the favored nodes
+   */
+  @Test
+  public void testSplitStoreFileWithFavoriteNodes() throws IOException {
 
+    Path dir = new Path(util.getDefaultRootDirPath(), "testhfile");
+    FileSystem fs = util.getDFSCluster().getFileSystem();
+
+    Path testIn = new Path(dir, "testSplitStoreFileWithFavoriteNodes");
+    ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY);
+    String tableName = tn.getMethodName();
+    Table table = util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString());
+    HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
+      Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
+
+    Path bottomOut = new Path(dir, "bottom.out");
+    Path topOut = new Path(dir, "top.out");
+
+    final AsyncTableRegionLocator regionLocator =
+      util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName));
+    BulkLoadHFilesTool.splitStoreFile(regionLocator, util.getConfiguration(), testIn, familyDesc,
+      Bytes.toBytes("ggg"), bottomOut, topOut);
+    verifyHFileFavoriteNode(topOut, regionLocator, fs);
+    verifyHFileFavoriteNode(bottomOut, regionLocator, fs);
     int rowCount = verifyHFile(bottomOut);
     rowCount += verifyHFile(topOut);
     assertEquals(1000, rowCount);
@@ -575,14 +616,17 @@ public class TestBulkLoadHFiles {
     FileSystem fs = util.getTestFileSystem();
     Path testIn = new Path(dir, "testhfile");
     ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY);
+    String tableName = tn.getMethodName();
+    util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString());
     HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
       Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
 
     Path bottomOut = new Path(dir, "bottom.out");
     Path topOut = new Path(dir, "top.out");
 
-    BulkLoadHFilesTool.splitStoreFile(util.getConfiguration(), testIn, familyDesc,
-      Bytes.toBytes("ggg"), bottomOut, topOut);
+    BulkLoadHFilesTool.splitStoreFile(
+      util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)),
+      util.getConfiguration(), testIn, familyDesc, Bytes.toBytes("ggg"), bottomOut, topOut);
 
     verifyHFileCreateTimeTS(bottomOut);
     verifyHFileCreateTimeTS(topOut);
@@ -615,14 +659,17 @@ public class TestBulkLoadHFiles {
     Path testIn = new Path(dir, "testhfile");
     ColumnFamilyDescriptor familyDesc =
       ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setDataBlockEncoding(cfEncoding).build();
+    String tableName = tn.getMethodName();
+    util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString());
     HFileTestUtil.createHFileWithDataBlockEncoding(util.getConfiguration(), fs, testIn,
       bulkloadEncoding, FAMILY, QUALIFIER, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
 
     Path bottomOut = new Path(dir, "bottom.out");
     Path topOut = new Path(dir, "top.out");
 
-    BulkLoadHFilesTool.splitStoreFile(util.getConfiguration(), testIn, familyDesc,
-      Bytes.toBytes("ggg"), bottomOut, topOut);
+    BulkLoadHFilesTool.splitStoreFile(
+      util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)),
+      util.getConfiguration(), testIn, familyDesc, Bytes.toBytes("ggg"), bottomOut, topOut);
 
     int rowCount = verifyHFile(bottomOut);
     rowCount += verifyHFile(topOut);
@@ -654,6 +701,61 @@ public class TestBulkLoadHFiles {
     }
   }
 
+  /**
+   * test split storefile with favorite node information
+   */
+  private void verifyHFileFavoriteNode(Path p, AsyncTableRegionLocator regionLocator, FileSystem fs)
+    throws IOException {
+    Configuration conf = util.getConfiguration();
+
+    try (HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf);) {
+
+      final byte[] firstRowkey = reader.getFirstRowKey().get();
+      final HRegionLocation hRegionLocation =
+        FutureUtils.get(regionLocator.getRegionLocation(firstRowkey));
+
+      final String targetHostName = hRegionLocation.getHostname();
+
+      if (fs instanceof DistributedFileSystem) {
+        String pathStr = p.toUri().getPath();
+        LocatedBlocks blocks =
+          ((DistributedFileSystem) fs).getClient().getLocatedBlocks(pathStr, 0L);
+
+        boolean isFavoriteNode = false;
+        List<LocatedBlock> locatedBlocks = blocks.getLocatedBlocks();
+        int index = 0;
+        do {
+          if (index > 0) {
+            assertTrue("failed use favored nodes", isFavoriteNode);
+          }
+          isFavoriteNode = false;
+          final LocatedBlock block = locatedBlocks.get(index);
+
+          final DatanodeInfo[] locations = block.getLocations();
+          for (DatanodeInfo location : locations) {
+
+            final String hostName = location.getHostName();
+            if (
+              targetHostName.equals(hostName.equals("127.0.0.1")
+                ? InetAddress.getLocalHost().getHostName()
+                : "127.0.0.1") || targetHostName.equals(hostName)
+            ) {
+              isFavoriteNode = true;
+              break;
+            }
+          }
+
+          index++;
+        } while (index < locatedBlocks.size());
+        if (index > 0) {
+          assertTrue("failed use favored nodes", isFavoriteNode);
+        }
+
+      }
+
+    }
+  }
+
   private void addStartEndKeysForTest(TreeMap<byte[], Integer> map, byte[] first, byte[] last) {
     Integer value = map.containsKey(first) ? map.get(first) : 0;
     map.put(first, value + 1);