You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hbase.apache.org by GitBox <gi...@apache.org> on 2020/07/23 17:19:25 UTC

[GitHub] [hbase] virajjasani commented on a change in pull request #2127: HBASE-24757 : ReplicationSink should limit row size in batch mutation based on hbase.rpc.rows.warning.threshold

virajjasani commented on a change in pull request #2127:
URL: https://github.com/apache/hbase/pull/2127#discussion_r459605416



##########
File path: hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
##########
@@ -1583,6 +1583,16 @@
     "hbase.regionserver.slowlog.systable.enabled";
   public static final boolean DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY = false;
 
+  /**
+   * Number of rows in a batch operation above which a warning will be logged.
+   */
+  public static final String BATCH_ROWS_THRESHOLD_NAME = "hbase.rpc.rows.warning.threshold";
+
+  /**
+   * Default value of {@link #BATCH_ROWS_THRESHOLD_NAME}
+   */
+  public static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000;
+

Review comment:
       We want `RsRpcServices` as well as `ReplicationSink` both to use these constants. Would you still recommend having duplicate copies?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
##########
@@ -403,13 +398,24 @@ public void stopReplicationSinkServices() {
    * Do the changes and handle the pool
    * @param tableName table to insert into
    * @param allRows list of actions
+   * @param batchRowSizeThreshold rowSize threshold for batch mutation
    */
-  private void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException {
+  private void batch(TableName tableName, Collection<List<Row>> allRows, int batchRowSizeThreshold)
+      throws IOException {
     if (allRows.isEmpty()) {
       return;
     }
     AsyncTable<?> table = getConnection().getTable(tableName);
-    List<Future<?>> futures = allRows.stream().map(table::batchAll).collect(Collectors.toList());
+    List<Future<?>> futures = new ArrayList<>();
+    for (List<Row> rows : allRows) {
+      List<List<Row>> batchRows;
+      if (rows.size() > batchRowSizeThreshold) {
+        batchRows = Lists.partition(rows, batchRowSizeThreshold);
+      } else {
+        batchRows = Collections.singletonList(rows);
+      }
+      futures.addAll(batchRows.stream().map(table::batchAll).collect(Collectors.toList()));
+    }

Review comment:
       That's true, they are handled in next iteration.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
##########
@@ -93,15 +94,21 @@
   private SourceFSConfigurationProvider provider;
   private WALEntrySinkFilter walEntrySinkFilter;
 
+  /**
+   * Row size threshold for multi requests above which a warning is logged
+   */
+  private final int rowSizeWarnThreshold;
+
   /**
    * Create a sink for replication
    * @param conf conf object
-   * @param stopper boolean to tell this thread to stop
    * @throws IOException thrown when HDFS goes bad or bad file name
    */
-  public ReplicationSink(Configuration conf, Stoppable stopper)
+  public ReplicationSink(Configuration conf)

Review comment:
       Agree for stoppable usage, but here it is anyways unused :(




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org