You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/11/14 12:52:34 UTC

[hbase] branch HBASE-26233 updated: HBASE-26449 The way we add or clear failedReplicas may have race (#3846)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-26233
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/HBASE-26233 by this push:
     new 7de82bb  HBASE-26449 The way we add or clear failedReplicas may have race (#3846)
7de82bb is described below

commit 7de82bb4bcd5ae45ffd811556b8bfe021409a280
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Sun Nov 14 20:52:04 2021 +0800

    HBASE-26449 The way we add or clear failedReplicas may have race (#3846)
    
    Signed-off-by: Xin Sun <dd...@gmail.com>
---
 .../regionreplication/RegionReplicationSink.java   |  28 ++++-
 .../TestRegionReplicationSink.java                 | 130 ++++++++++++++++++++-
 2 files changed, 150 insertions(+), 8 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
index 9c6f6e2..68aa508 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
@@ -30,6 +30,7 @@ import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
+import org.agrona.collections.IntHashSet;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
@@ -123,7 +124,7 @@ public class RegionReplicationSink {
 
   // used to track the replicas which we failed to replicate edits to them
   // will be cleared after we get a flush edit.
-  private final Set<Integer> failedReplicas = new HashSet<>();
+  private final IntHashSet failedReplicas = new IntHashSet();
 
   private final Queue<SinkEntry> entries = new ArrayDeque<>();
 
@@ -135,6 +136,8 @@ public class RegionReplicationSink {
 
   private volatile long pendingSize;
 
+  private long lastFlushSequenceNumber;
+
   private boolean sending;
 
   private boolean stopping;
@@ -162,8 +165,10 @@ public class RegionReplicationSink {
 
   private void onComplete(List<SinkEntry> sent,
     Map<Integer, MutableObject<Throwable>> replica2Error) {
+    long maxSequenceId = Long.MIN_VALUE;
     long toReleaseSize = 0;
     for (SinkEntry entry : sent) {
+      maxSequenceId = Math.max(maxSequenceId, entry.key.getSequenceId());
       entry.replicated();
       toReleaseSize += entry.size;
     }
@@ -173,9 +178,20 @@ public class RegionReplicationSink {
       Integer replicaId = entry.getKey();
       Throwable error = entry.getValue().getValue();
       if (error != null) {
-        LOG.warn("Failed to replicate to secondary replica {} for {}, stop replicating" +
-          " for a while and trigger a flush", replicaId, primary, error);
-        failed.add(replicaId);
+        if (maxSequenceId > lastFlushSequenceNumber) {
+          LOG.warn(
+            "Failed to replicate to secondary replica {} for {}, since the max sequence" +
+              " id of sunk entris is {}, which is greater than the last flush SN {}," +
+              " we will stop replicating for a while and trigger a flush",
+            replicaId, primary, maxSequenceId, lastFlushSequenceNumber, error);
+          failed.add(replicaId);
+        } else {
+          LOG.warn(
+            "Failed to replicate to secondary replica {} for {}, since the max sequence" +
+              " id of sunk entris is {}, which is less than or equal to the last flush SN {}," +
+              " we will not stop replicating",
+            replicaId, primary, maxSequenceId, lastFlushSequenceNumber, error);
+        }
       }
     }
     synchronized (entries) {
@@ -215,6 +231,9 @@ public class RegionReplicationSink {
     AtomicInteger remaining = new AtomicInteger(toSendReplicaCount);
     Map<Integer, MutableObject<Throwable>> replica2Error = new HashMap<>();
     for (int replicaId = 1; replicaId < regionReplication; replicaId++) {
+      if (failedReplicas.contains(replicaId)) {
+        continue;
+      }
       MutableObject<Throwable> error = new MutableObject<>();
       replica2Error.put(replicaId, error);
       RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(primary, replicaId);
@@ -292,6 +311,7 @@ public class RegionReplicationSink {
                   break;
                 }
               }
+              lastFlushSequenceNumber = flushDesc.getFlushSequenceNumber();
               failedReplicas.clear();
               LOG.debug(
                 "Got a flush all request with sequence id {}, clear failed replicas {}" +
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java
index 19b1698..248cdba 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java
@@ -28,11 +28,19 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableNameTestRule;
@@ -45,14 +53,20 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.ipc.ServerCall;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
+
 @Category({ RegionServerTests.class, MediumTests.class })
 public class TestRegionReplicationSink {
 
@@ -72,6 +86,8 @@ public class TestRegionReplicationSink {
 
   private RegionReplicationBufferManager manager;
 
+  private RegionReplicationSink sink;
+
   @Rule
   public final TableNameTestRule name = new TableNameTestRule();
 
@@ -84,15 +100,17 @@ public class TestRegionReplicationSink {
     flushRequester = mock(Runnable.class);
     conn = mock(AsyncClusterConnection.class);
     manager = mock(RegionReplicationBufferManager.class);
+    sink = new RegionReplicationSink(conf, primary, td, manager, flushRequester, conn);
   }
 
-  private RegionReplicationSink create() {
-    return new RegionReplicationSink(conf, primary, td, manager, flushRequester, conn);
+  @After
+  public void tearDown() throws InterruptedException {
+    sink.stop();
+    sink.waitUntilStopped();
   }
 
   @Test
   public void testNormal() {
-    RegionReplicationSink sink = create();
     MutableInt next = new MutableInt(0);
     List<CompletableFuture<Void>> futures =
       Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>());
@@ -129,7 +147,6 @@ public class TestRegionReplicationSink {
 
   @Test
   public void testDropEdits() {
-    RegionReplicationSink sink = create();
     MutableInt next = new MutableInt(0);
     List<CompletableFuture<Void>> futures =
       Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>());
@@ -191,4 +208,109 @@ public class TestRegionReplicationSink {
     // replicas
     verify(conn, times(2)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
   }
+
+  @Test
+  public void testNotAddToFailedReplicas() {
+    MutableInt next = new MutableInt(0);
+    List<CompletableFuture<Void>> futures =
+      Stream.generate(() -> new CompletableFuture<Void>()).limit(4).collect(Collectors.toList());
+    when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
+      .then(i -> futures.get(next.getAndIncrement()));
+
+    ServerCall<?> rpcCall1 = mock(ServerCall.class);
+    WALKeyImpl key1 = mock(WALKeyImpl.class);
+    when(key1.estimatedSerializedSizeOf()).thenReturn(100L);
+    when(key1.getSequenceId()).thenReturn(1L);
+    WALEdit edit1 = mock(WALEdit.class);
+    when(edit1.estimatedSerializedSizeOf()).thenReturn(1000L);
+    when(manager.increase(anyLong())).thenReturn(true);
+    sink.add(key1, edit1, rpcCall1);
+
+    ServerCall<?> rpcCall2 = mock(ServerCall.class);
+    WALKeyImpl key2 = mock(WALKeyImpl.class);
+    when(key2.estimatedSerializedSizeOf()).thenReturn(200L);
+    when(key2.getSequenceId()).thenReturn(3L);
+
+    Map<byte[], List<Path>> committedFiles = td.getColumnFamilyNames().stream()
+      .collect(Collectors.toMap(Function.identity(), k -> Collections.emptyList(), (u, v) -> {
+        throw new IllegalStateException();
+      }, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR)));
+    FlushDescriptor fd =
+      ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, primary, 2L, committedFiles);
+    WALEdit edit2 = WALEdit.createFlushWALEdit(primary, fd);
+    sink.add(key2, edit2, rpcCall2);
+
+    // fail the call to replica 2
+    futures.get(0).complete(null);
+    futures.get(1).completeExceptionally(new IOException("inject error"));
+
+    // the failure should not cause replica 2 to be added to failedReplicas, as we have already
+    // trigger a flush after it.
+    verify(conn, times(4)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
+
+    futures.get(2).complete(null);
+    futures.get(3).complete(null);
+
+    // should have send out all so no pending entries.
+    assertEquals(0, sink.pendingSize());
+  }
+
+  @Test
+  public void testAddToFailedReplica() {
+    MutableInt next = new MutableInt(0);
+    List<CompletableFuture<Void>> futures =
+      Stream.generate(() -> new CompletableFuture<Void>()).limit(5).collect(Collectors.toList());
+    when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
+      .then(i -> futures.get(next.getAndIncrement()));
+
+    ServerCall<?> rpcCall1 = mock(ServerCall.class);
+    WALKeyImpl key1 = mock(WALKeyImpl.class);
+    when(key1.estimatedSerializedSizeOf()).thenReturn(100L);
+    when(key1.getSequenceId()).thenReturn(1L);
+    WALEdit edit1 = mock(WALEdit.class);
+    when(edit1.estimatedSerializedSizeOf()).thenReturn(1000L);
+    when(manager.increase(anyLong())).thenReturn(true);
+    sink.add(key1, edit1, rpcCall1);
+
+    ServerCall<?> rpcCall2 = mock(ServerCall.class);
+    WALKeyImpl key2 = mock(WALKeyImpl.class);
+    when(key2.estimatedSerializedSizeOf()).thenReturn(200L);
+    when(key2.getSequenceId()).thenReturn(1L);
+    WALEdit edit2 = mock(WALEdit.class);
+    when(edit2.estimatedSerializedSizeOf()).thenReturn(2000L);
+    when(manager.increase(anyLong())).thenReturn(true);
+    sink.add(key2, edit2, rpcCall2);
+
+    // fail the call to replica 2
+    futures.get(0).complete(null);
+    futures.get(1).completeExceptionally(new IOException("inject error"));
+
+    // we should only call replicate once for edit2, since replica 2 is marked as failed
+    verify(conn, times(3)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
+    futures.get(2).complete(null);
+    // should have send out all so no pending entries.
+    assertEquals(0, sink.pendingSize());
+
+    ServerCall<?> rpcCall3 = mock(ServerCall.class);
+    WALKeyImpl key3 = mock(WALKeyImpl.class);
+    when(key3.estimatedSerializedSizeOf()).thenReturn(200L);
+    when(key3.getSequenceId()).thenReturn(3L);
+    Map<byte[], List<Path>> committedFiles = td.getColumnFamilyNames().stream()
+      .collect(Collectors.toMap(Function.identity(), k -> Collections.emptyList(), (u, v) -> {
+        throw new IllegalStateException();
+      }, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR)));
+    FlushDescriptor fd =
+      ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, primary, 2L, committedFiles);
+    WALEdit edit3 = WALEdit.createFlushWALEdit(primary, fd);
+    sink.add(key3, edit3, rpcCall3);
+
+    // the flush marker should have cleared the failedReplicas, so we will send the edit to 2
+    // replicas again
+    verify(conn, times(5)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
+    futures.get(3).complete(null);
+    futures.get(4).complete(null);
+
+    // should have send out all so no pending entries.
+    assertEquals(0, sink.pendingSize());
+  }
 }