You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2019/09/14 17:46:35 UTC

[hbase] branch master updated: HBASE-22939 SpaceQuotas - Bulkload from different hdfs failed when space quotas are turned on. (#553)

This is an automated email from the ASF dual-hosted git repository.

stack pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new f31301d  HBASE-22939 SpaceQuotas - Bulkload from different hdfs failed when space quotas are turned on. (#553)
f31301d is described below

commit f31301dcf8e1e5d15a4b9939063e5514d810804b
Author: Yiran Wu <yi...@gmail.com>
AuthorDate: Sun Sep 15 01:46:29 2019 +0800

    HBASE-22939 SpaceQuotas - Bulkload from different hdfs failed when space quotas are turned on. (#553)
    
    Signed-off-by: Sakthi <sa...@apache.org>
---
 .../hadoop/hbase/regionserver/RSRpcServices.java   | 12 ++++-
 ...estReplicationSyncUpToolWithBulkLoadedData.java | 53 ++++++++++++++++++----
 2 files changed, 55 insertions(+), 10 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 61354fd..fe1c43a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -45,6 +45,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.LongAdder;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.ByteBufferExtendedCell;
 import org.apache.hadoop.hbase.CacheEvictionStats;
@@ -2404,7 +2405,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
             filePaths.add(familyPath.getPath());
           }
           // Check if the batch of files exceeds the current quota
-          sizeToBeLoaded = enforcement.computeBulkLoadSize(regionServer.getFileSystem(), filePaths);
+          sizeToBeLoaded = enforcement.computeBulkLoadSize(getFileSystem(filePaths), filePaths);
         }
       }
       // secure bulk load
@@ -2492,6 +2493,15 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     }
   }
 
+  private FileSystem getFileSystem(List<String> filePaths) throws IOException {
+    if (filePaths.isEmpty()) {
+      // local hdfs
+      return regionServer.getFileSystem();
+    }
+    // source hdfs
+    return new Path(filePaths.get(0)).getFileSystem(regionServer.getConfiguration());
+  }
+
   private com.google.protobuf.Message execServiceOnRegion(HRegion region,
       final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
     // ignore the passed in controller (from the serialized call)
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
index 5f10ef9..6247c22 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.quotas.QuotaUtil;
 import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
@@ -63,6 +64,7 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication
   protected void customizeClusterConf(Configuration conf) {
     conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
     conf.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
+    conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
     conf.set("hbase.replication.source.fs.conf.provider",
       TestSourceFSConfigurationProvider.class.getCanonicalName());
   }
@@ -76,11 +78,11 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication
     setupReplication();
 
     /**
-     * Prepare 16 random hfile ranges required for creating hfiles
+     * Prepare 24 random hfile ranges required for creating hfiles
      */
     Iterator<String> randomHFileRangeListIterator = null;
-    Set<String> randomHFileRanges = new HashSet<>(16);
-    for (int i = 0; i < 16; i++) {
+    Set<String> randomHFileRanges = new HashSet<>(24);
+    for (int i = 0; i < 24; i++) {
       randomHFileRanges.add(UTIL1.getRandomUUID().toString());
     }
     List<String> randomHFileRangeList = new ArrayList<>(randomHFileRanges);
@@ -88,8 +90,9 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication
     randomHFileRangeListIterator = randomHFileRangeList.iterator();
 
     /**
-     * at Master: t1_syncup: Load 100 rows into cf1, and 3 rows into norep t2_syncup: Load 200 rows
-     * into cf1, and 3 rows into norep verify correctly replicated to slave
+     * at Master: t1_syncup: Load 50 rows into cf1, and 50 rows from other hdfs into cf1, and 3
+     * rows into norep t2_syncup: Load 100 rows into cf1, and 100 rows from other hdfs into cf1,
+     * and 3 rows into norep verify correctly replicated to slave
      */
     loadAndReplicateHFiles(true, randomHFileRangeListIterator);
 
@@ -170,11 +173,17 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication
       Iterator<String> randomHFileRangeListIterator) throws Exception {
     LOG.debug("loadAndReplicateHFiles");
 
-    // Load 100 + 3 hfiles to t1_syncup.
+    // Load 50 + 50 + 3 hfiles to t1_syncup.
     byte[][][] hfileRanges =
       new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
         Bytes.toBytes(randomHFileRangeListIterator.next()) } };
-    loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht1Source, hfileRanges, 100);
+    loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht1Source, hfileRanges, 50);
+
+    hfileRanges =
+        new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
+            Bytes.toBytes(randomHFileRangeListIterator.next()) } };
+    loadFromOtherHDFSAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht1Source,
+        hfileRanges, 50);
 
     hfileRanges =
       new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
@@ -182,11 +191,17 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication
     loadAndValidateHFileReplication("HFileReplication_1", row, NO_REP_FAMILY, ht1Source,
       hfileRanges, 3);
 
-    // Load 200 + 3 hfiles to t2_syncup.
+    // Load 100 + 100 + 3 hfiles to t2_syncup.
     hfileRanges =
       new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
         Bytes.toBytes(randomHFileRangeListIterator.next()) } };
-    loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht2Source, hfileRanges, 200);
+    loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht2Source, hfileRanges, 100);
+
+    hfileRanges =
+        new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
+            Bytes.toBytes(randomHFileRangeListIterator.next()) } };
+    loadFromOtherHDFSAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht2Source,
+        hfileRanges, 100);
 
     hfileRanges =
       new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
@@ -224,6 +239,26 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication
     loader.bulkLoad(tableName, dir);
   }
 
+  private void loadFromOtherHDFSAndValidateHFileReplication(String testName, byte[] row, byte[] fam,
+      Table source, byte[][][] hfileRanges, int numOfRows) throws Exception {
+    Path dir = UTIL2.getDataTestDirOnTestFS(testName);
+    FileSystem fs = UTIL2.getTestFileSystem();
+    dir = dir.makeQualified(fs);
+    Path familyDir = new Path(dir, Bytes.toString(fam));
+
+    int hfileIdx = 0;
+    for (byte[][] range : hfileRanges) {
+      byte[] from = range[0];
+      byte[] to = range[1];
+      HFileTestUtil.createHFile(UTIL2.getConfiguration(), fs,
+          new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows);
+    }
+
+    final TableName tableName = source.getName();
+    BulkLoadHFiles loader = BulkLoadHFiles.create(UTIL1.getConfiguration());
+    loader.bulkLoad(tableName, dir);
+  }
+
   private void wait(Table target, int expectedCount, String msg)
       throws IOException, InterruptedException {
     for (int i = 0; i < NB_RETRIES; i++) {