You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by vj...@apache.org on 2020/07/24 15:02:57 UTC

[hbase] branch branch-2 updated: HBASE-24757 : ReplicationSink should limit row count in batch mutation based on hbase.rpc.rows.warning.threshold (#2136)

This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 6cb51cc  HBASE-24757 : ReplicationSink should limit row count in batch mutation based on hbase.rpc.rows.warning.threshold (#2136)
6cb51cc is described below

commit 6cb51cc0f01f6282360b1e6bc614a0283e42447c
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Fri Jul 24 20:32:40 2020 +0530

    HBASE-24757 : ReplicationSink should limit row count in batch mutation based on hbase.rpc.rows.warning.threshold (#2136)
    
    Closes #2127
    
    Signed-off-by: stack <st...@apache.org>
---
 .../java/org/apache/hadoop/hbase/HConstants.java   | 10 ++++
 .../hadoop/hbase/regionserver/RSRpcServices.java   | 12 +----
 .../replication/regionserver/Replication.java      |  2 +-
 .../replication/regionserver/ReplicationSink.java  | 60 ++++++++++++----------
 .../hbase/regionserver/TestMultiLogThreshold.java  |  5 +-
 .../regionserver/TestReplicationSink.java          | 49 +++++++++++++++---
 .../regionserver/TestWALEntrySinkFilter.java       |  2 +-
 7 files changed, 91 insertions(+), 49 deletions(-)

diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 475989b..1eea08b 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1610,6 +1610,16 @@ public final class HConstants {
     "hbase.regionserver.slowlog.systable.enabled";
   public static final boolean DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY = false;
 
+  /**
+   * Number of rows in a batch operation above which a warning will be logged.
+   */
+  public static final String BATCH_ROWS_THRESHOLD_NAME = "hbase.rpc.rows.warning.threshold";
+
+  /**
+   * Default value of {@link #BATCH_ROWS_THRESHOLD_NAME}
+   */
+  public static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000;
+
   private HConstants() {
     // Can't be instantiated with this ctor.
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index cbca1a5..d1ef55c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -290,15 +290,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
    */
   private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
 
-  /**
-   * Number of rows in a batch operation above which a warning will be logged.
-   */
-  static final String BATCH_ROWS_THRESHOLD_NAME = "hbase.rpc.rows.warning.threshold";
-  /**
-   * Default value of {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME}
-   */
-  static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000;
-
   /*
    * Whether to reject rows with size > threshold defined by
    * {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME}
@@ -1261,7 +1252,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     final Configuration conf = rs.getConfiguration();
     this.ld = ld;
     regionServer = rs;
-    rowSizeWarnThreshold = conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT);
+    rowSizeWarnThreshold = conf.getInt(
+      HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
     rejectRowsWithSizeOverThreshold =
       conf.getBoolean(REJECT_BATCH_ROWS_OVER_THRESHOLD, DEFAULT_REJECT_BATCH_ROWS_OVER_THRESHOLD);
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 186e67d..cb8e299 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -187,7 +187,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
   @Override
   public void startReplicationService() throws IOException {
     this.replicationManager.init();
-    this.replicationSink = new ReplicationSink(this.conf, this.server);
+    this.replicationSink = new ReplicationSink(this.conf);
     this.scheduleThreadPool.scheduleAtFixedRate(
       new ReplicationStatisticsTask(this.replicationSink, this.replicationManager),
       statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
index e910e53..b947c80 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -37,7 +38,6 @@ import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -51,13 +51,14 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
 import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
-import org.apache.hadoop.hbase.wal.WALEdit;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * <p>
@@ -91,15 +92,20 @@ public class ReplicationSink {
   private WALEntrySinkFilter walEntrySinkFilter;
 
   /**
+   * Row size threshold for multi requests above which a warning is logged
+   */
+  private final int rowSizeWarnThreshold;
+
+  /**
    * Create a sink for replication
-   *
-   * @param conf                conf object
-   * @param stopper             boolean to tell this thread to stop
+   * @param conf conf object
    * @throws IOException thrown when HDFS goes bad or bad file name
    */
-  public ReplicationSink(Configuration conf, Stoppable stopper)
+  public ReplicationSink(Configuration conf)
       throws IOException {
     this.conf = HBaseConfiguration.create(conf);
+    rowSizeWarnThreshold = conf.getInt(
+      HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
     decorateConf();
     this.metrics = new MetricsSink();
     this.walEntrySinkFilter = setupWALEntrySinkFilter();
@@ -209,11 +215,7 @@ public class ReplicationSink {
               // Map of table name Vs list of pair of family and list of
               // hfile paths from its namespace
               Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
-                bulkLoadsPerClusters.get(bld.getClusterIdsList());
-              if (bulkLoadHFileMap == null) {
-                bulkLoadHFileMap = new HashMap<>();
-                bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
-              }
+                bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(), k -> new HashMap<>());
               buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
             }
           } else {
@@ -246,7 +248,7 @@ public class ReplicationSink {
       if (!rowMap.isEmpty()) {
         LOG.debug("Started replicating mutations.");
         for (Entry<TableName, Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) {
-          batch(entry.getKey(), entry.getValue().values());
+          batch(entry.getKey(), entry.getValue().values(), rowSizeWarnThreshold);
         }
         LOG.debug("Finished replicating mutations.");
       }
@@ -366,17 +368,10 @@ public class ReplicationSink {
    * @param value
    * @return the list of values corresponding to key1 and key2
    */
-  private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2,List<V>>> map, K1 key1, K2 key2, V value) {
-    Map<K2,List<V>> innerMap = map.get(key1);
-    if (innerMap == null) {
-      innerMap = new HashMap<>();
-      map.put(key1, innerMap);
-    }
-    List<V> values = innerMap.get(key2);
-    if (values == null) {
-      values = new ArrayList<>();
-      innerMap.put(key2, values);
-    }
+  private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2, List<V>>> map, K1 key1, K2 key2,
+      V value) {
+    Map<K2, List<V>> innerMap = map.computeIfAbsent(key1, k -> new HashMap<>());
+    List<V> values = innerMap.computeIfAbsent(key2, k -> new ArrayList<>());
     values.add(value);
     return values;
   }
@@ -404,9 +399,10 @@ public class ReplicationSink {
    * Do the changes and handle the pool
    * @param tableName table to insert into
    * @param allRows list of actions
-   * @throws IOException
+   * @param batchRowSizeThreshold rowSize threshold for batch mutation
    */
-  protected void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException {
+  private void batch(TableName tableName, Collection<List<Row>> allRows, int batchRowSizeThreshold)
+      throws IOException {
     if (allRows.isEmpty()) {
       return;
     }
@@ -415,7 +411,15 @@ public class ReplicationSink {
       Connection connection = getConnection();
       table = connection.getTable(tableName);
       for (List<Row> rows : allRows) {
-        table.batch(rows, null);
+        List<List<Row>> batchRows;
+        if (rows.size() > batchRowSizeThreshold) {
+          batchRows = Lists.partition(rows, batchRowSizeThreshold);
+        } else {
+          batchRows = Collections.singletonList(rows);
+        }
+        for(List<Row> rowList:batchRows){
+          table.batch(rowList, null);
+        }
       }
     } catch (RetriesExhaustedWithDetailsException rewde) {
       for (Throwable ex : rewde.getCauses()) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiLogThreshold.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiLogThreshold.java
index abc9564..c58ee17 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiLogThreshold.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiLogThreshold.java
@@ -23,6 +23,7 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -80,8 +81,8 @@ public class TestMultiLogThreshold {
     final TableName tableName = TableName.valueOf("tableName");
     TEST_UTIL = HBaseTestingUtility.createLocalHTU();
     CONF = TEST_UTIL.getConfiguration();
-    THRESHOLD = CONF.getInt(RSRpcServices.BATCH_ROWS_THRESHOLD_NAME,
-      RSRpcServices.BATCH_ROWS_THRESHOLD_DEFAULT);
+    THRESHOLD = CONF.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME,
+      HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
     CONF.setBoolean("hbase.rpc.rows.size.threshold.reject", rejectLargeBatchOp);
     TEST_UTIL.startMiniCluster();
     TEST_UTIL.createTable(tableName, TEST_FAM);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
index 9145841..57900cd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 import java.security.SecureRandom;
 import java.util.ArrayList;
@@ -55,7 +54,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
@@ -78,7 +77,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.UUID;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey;
 
-@Category({ReplicationTests.class, MediumTests.class})
+@Category({ReplicationTests.class, LargeTests.class})
 public class TestReplicationSink {
 
   @ClassRule
@@ -127,10 +126,8 @@ public class TestReplicationSink {
   public static void setUpBeforeClass() throws Exception {
     TEST_UTIL.getConfiguration().set("hbase.replication.source.fs.conf.provider",
       TestSourceFSConfigurationProvider.class.getCanonicalName());
-
     TEST_UTIL.startMiniCluster(3);
-    SINK =
-      new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), STOPPABLE);
+    SINK = new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()));
     table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1);
     table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2);
     Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration());
@@ -203,6 +200,40 @@ public class TestReplicationSink {
     assertEquals(BATCH_SIZE/2, scanRes.next(BATCH_SIZE).length);
   }
 
+  @Test
+  public void testLargeEditsPutDelete() throws Exception {
+    List<WALEntry> entries = new ArrayList<>();
+    List<Cell> cells = new ArrayList<>();
+    for (int i = 0; i < 5510; i++) {
+      entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
+    }
+    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId,
+      baseNamespaceDir, hfileArchiveDir);
+
+    ResultScanner resultScanner = table1.getScanner(new Scan());
+    int totalRows = 0;
+    while (resultScanner.next() != null) {
+      totalRows++;
+    }
+    assertEquals(5510, totalRows);
+
+    entries = new ArrayList<>();
+    cells = new ArrayList<>();
+    for (int i = 0; i < 11000; i++) {
+      entries.add(
+        createEntry(TABLE_NAME1, i, i % 2 != 0 ? KeyValue.Type.Put : KeyValue.Type.DeleteColumn,
+          cells));
+    }
+    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId,
+      baseNamespaceDir, hfileArchiveDir);
+    resultScanner = table1.getScanner(new Scan());
+    totalRows = 0;
+    while (resultScanner.next() != null) {
+      totalRows++;
+    }
+    assertEquals(5500, totalRows);
+  }
+
   /**
    * Insert to 2 different tables
    * @throws Exception
@@ -221,7 +252,11 @@ public class TestReplicationSink {
     Scan scan = new Scan();
     ResultScanner scanRes = table2.getScanner(scan);
     for(Result res : scanRes) {
-      assertTrue(Bytes.toInt(res.getRow()) % 2 == 0);
+      assertEquals(0, Bytes.toInt(res.getRow()) % 2);
+    }
+    scanRes = table1.getScanner(scan);
+    for(Result res : scanRes) {
+      assertEquals(1, Bytes.toInt(res.getRow()) % 2);
     }
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java
index 39a2789..a3a84fe 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java
@@ -125,7 +125,7 @@ public class TestWALEntrySinkFilter {
         IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class);
     conf.setClass("hbase.client.connection.impl", DevNullConnection.class,
         Connection.class);
-    ReplicationSink sink = new ReplicationSink(conf, STOPPABLE);
+    ReplicationSink sink = new ReplicationSink(conf);
     // Create some dumb walentries.
     List< org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry > entries =
         new ArrayList<>();