You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/11/27 13:53:19 UTC

[hbase] 08/12: HBASE-26456 Limit the size for one replicating (#3873)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-26233
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 472dc23cc36e53df1eddfc090d14327911297439
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Tue Nov 23 18:40:21 2021 +0800

    HBASE-26456 Limit the size for one replicating (#3873)
    
    Signed-off-by: Xiaolin Ha <ha...@apache.org>
---
 .../regionreplication/RegionReplicationSink.java   | 35 +++++++--
 .../TestRegionReplicationSink.java                 | 87 ++++++++++++++++++++++
 2 files changed, 114 insertions(+), 8 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
index f0129b7..6c83f15 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
@@ -77,6 +77,14 @@ public class RegionReplicationSink {
 
   public static final long OPERATION_TIMEOUT_MS_DEFAULT = 1000;
 
+  public static final String BATCH_SIZE_CAPACITY = "hbase.region.read-replica.sink.size.capacity";
+
+  public static final long BATCH_SIZE_CAPACITY_DEFAULT = 1024L * 1024;
+
+  public static final String BATCH_COUNT_CAPACITY = "hbase.region.read-replica.sink.nb.capacity";
+
+  public static final int BATCH_COUNT_CAPACITY_DEFAULT = 100;
+
   private static final class SinkEntry {
 
     final WALKeyImpl key;
@@ -139,6 +147,10 @@ public class RegionReplicationSink {
 
   private final long operationTimeoutNs;
 
+  private final long batchSizeCapacity;
+
+  private final long batchCountCapacity;
+
   private volatile long pendingSize;
 
   private long lastFlushedSequenceId;
@@ -166,6 +178,8 @@ public class RegionReplicationSink {
       TimeUnit.MILLISECONDS.toNanos(conf.getLong(RPC_TIMEOUT_MS, RPC_TIMEOUT_MS_DEFAULT));
     this.operationTimeoutNs = TimeUnit.MILLISECONDS
       .toNanos(conf.getLong(OPERATION_TIMEOUT_MS, OPERATION_TIMEOUT_MS_DEFAULT));
+    this.batchSizeCapacity = conf.getLong(BATCH_SIZE_CAPACITY, BATCH_SIZE_CAPACITY_DEFAULT);
+    this.batchCountCapacity = conf.getInt(BATCH_COUNT_CAPACITY, BATCH_COUNT_CAPACITY_DEFAULT);
     this.failedReplicas = new IntHashSet(regionReplication - 1);
   }
 
@@ -186,16 +200,16 @@ public class RegionReplicationSink {
       if (error != null) {
         if (maxSequenceId > lastFlushedSequenceId) {
           LOG.warn(
-            "Failed to replicate to secondary replica {} for {}, since the max sequence" +
-              " id of sunk entris is {}, which is greater than the last flush SN {}," +
-              " we will stop replicating for a while and trigger a flush",
+            "Failed to replicate to secondary replica {} for {}, since the max sequence"
+              + " id of sunk entris is {}, which is greater than the last flush SN {},"
+              + " we will stop replicating for a while and trigger a flush",
             replicaId, primary, maxSequenceId, lastFlushedSequenceId, error);
           failed.add(replicaId);
         } else {
           LOG.warn(
-            "Failed to replicate to secondary replica {} for {}, since the max sequence" +
-              " id of sunk entris is {}, which is less than or equal to the last flush SN {}," +
-              " we will not stop replicating",
+            "Failed to replicate to secondary replica {} for {}, since the max sequence"
+              + " id of sunk entris is {}, which is less than or equal to the last flush SN {},"
+              + " we will not stop replicating",
             replicaId, primary, maxSequenceId, lastFlushedSequenceId, error);
         }
       }
@@ -220,12 +234,17 @@ public class RegionReplicationSink {
 
   private void send() {
     List<SinkEntry> toSend = new ArrayList<>();
+    long totalSize = 0L;
     for (SinkEntry entry;;) {
       entry = entries.poll();
       if (entry == null) {
         break;
       }
       toSend.add(entry);
+      totalSize += entry.size;
+      if (toSend.size() >= batchCountCapacity || totalSize >= batchSizeCapacity) {
+        break;
+      }
     }
     int toSendReplicaCount = regionReplication - 1 - failedReplicas.size();
     if (toSendReplicaCount <= 0) {
@@ -327,8 +346,8 @@ public class RegionReplicationSink {
             long clearedSize = clearAllEntries();
             if (LOG.isDebugEnabled()) {
               LOG.debug(
-                "Got a flush all request with sequence id {}, clear {} pending" +
-                  " entries with size {}, clear failed replicas {}",
+                "Got a flush all request with sequence id {}, clear {} pending"
+                  + " entries with size {}, clear failed replicas {}",
                 flushSequenceNumber, clearedCount,
                 StringUtils.TraditionalBinaryPrefix.long2String(clearedSize, "", 1),
                 failedReplicas);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java
index 918f644..e065709 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java
@@ -94,6 +94,8 @@ public class TestRegionReplicationSink {
   @Before
   public void setUp() {
     conf = HBaseConfiguration.create();
+    conf.setLong(RegionReplicationSink.BATCH_COUNT_CAPACITY, 5);
+    conf.setLong(RegionReplicationSink.BATCH_SIZE_CAPACITY, 1024 * 1024);
     td = TableDescriptorBuilder.newBuilder(name.getTableName())
       .setColumnFamily(ColumnFamilyDescriptorBuilder.of("cf")).setRegionReplication(3).build();
     primary = RegionInfoBuilder.newBuilder(name.getTableName()).build();
@@ -313,4 +315,89 @@ public class TestRegionReplicationSink {
     // should have send out all so no pending entries.
     assertEquals(0, sink.pendingSize());
   }
+
+  @Test
+  public void testSizeCapacity() {
+    MutableInt next = new MutableInt(0);
+    List<CompletableFuture<Void>> futures =
+      Stream.generate(() -> new CompletableFuture<Void>()).limit(6).collect(Collectors.toList());
+    when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
+      .then(i -> futures.get(next.getAndIncrement()));
+    for (int i = 0; i < 3; i++) {
+      ServerCall<?> rpcCall = mock(ServerCall.class);
+      WALKeyImpl key = mock(WALKeyImpl.class);
+      when(key.estimatedSerializedSizeOf()).thenReturn(100L);
+      when(key.getSequenceId()).thenReturn(i + 1L);
+      WALEdit edit = mock(WALEdit.class);
+      when(edit.estimatedSerializedSizeOf()).thenReturn((i + 1) * 600L * 1024);
+      when(manager.increase(anyLong())).thenReturn(true);
+      sink.add(key, edit, rpcCall);
+    }
+    // the first entry will be send out immediately
+    verify(conn, times(2)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
+
+    // complete the first send
+    futures.get(0).complete(null);
+    futures.get(1).complete(null);
+
+    // we should have another batch
+    verify(conn, times(4)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
+
+    // complete the second send
+    futures.get(2).complete(null);
+    futures.get(3).complete(null);
+
+    // the size of the second entry is greater than 1024 * 1024, so we will have another batch
+    verify(conn, times(6)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
+
+    // complete the third send
+    futures.get(4).complete(null);
+    futures.get(5).complete(null);
+
+    // should have send out all so no pending entries.
+    assertEquals(0, sink.pendingSize());
+  }
+
+  @Test
+  public void testCountCapacity() {
+    MutableInt next = new MutableInt(0);
+    List<CompletableFuture<Void>> futures =
+      Stream.generate(() -> new CompletableFuture<Void>()).limit(6).collect(Collectors.toList());
+    when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
+      .then(i -> futures.get(next.getAndIncrement()));
+    for (int i = 0; i < 7; i++) {
+      ServerCall<?> rpcCall = mock(ServerCall.class);
+      WALKeyImpl key = mock(WALKeyImpl.class);
+      when(key.estimatedSerializedSizeOf()).thenReturn(100L);
+      when(key.getSequenceId()).thenReturn(i + 1L);
+      WALEdit edit = mock(WALEdit.class);
+      when(edit.estimatedSerializedSizeOf()).thenReturn(1000L);
+      when(manager.increase(anyLong())).thenReturn(true);
+      sink.add(key, edit, rpcCall);
+    }
+    // the first entry will be send out immediately
+    verify(conn, times(2)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
+
+    // complete the first send
+    futures.get(0).complete(null);
+    futures.get(1).complete(null);
+
+    // we should have another batch
+    verify(conn, times(4)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
+
+    // complete the second send
+    futures.get(2).complete(null);
+    futures.get(3).complete(null);
+
+    // because of the count limit is 5, the above send can not send all the edits, so we will do
+    // another send
+    verify(conn, times(6)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
+
+    // complete the third send
+    futures.get(4).complete(null);
+    futures.get(5).complete(null);
+
+    // should have send out all so no pending entries.
+    assertEquals(0, sink.pendingSize());
+  }
 }