You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/11/23 14:37:14 UTC
[hbase] 04/08: HBASE-26449 The way we add or clear failedReplicas may have race (#3846)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch HBASE-26233
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit fbc7bdbe348c03be4efc934e043a2b8b676ca3ed
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Sun Nov 14 20:52:04 2021 +0800
HBASE-26449 The way we add or clear failedReplicas may have race (#3846)
Signed-off-by: Xin Sun <dd...@gmail.com>
---
.../regionreplication/RegionReplicationSink.java | 28 ++++-
.../TestRegionReplicationSink.java | 130 ++++++++++++++++++++-
2 files changed, 150 insertions(+), 8 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
index 9c6f6e2..68aa508 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
@@ -30,6 +30,7 @@ import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import org.agrona.collections.IntHashSet;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
@@ -123,7 +124,7 @@ public class RegionReplicationSink {
// used to track the replicas which we failed to replicate edits to them
// will be cleared after we get a flush edit.
- private final Set<Integer> failedReplicas = new HashSet<>();
+ private final IntHashSet failedReplicas = new IntHashSet();
private final Queue<SinkEntry> entries = new ArrayDeque<>();
@@ -135,6 +136,8 @@ public class RegionReplicationSink {
private volatile long pendingSize;
+ private long lastFlushSequenceNumber;
+
private boolean sending;
private boolean stopping;
@@ -162,8 +165,10 @@ public class RegionReplicationSink {
private void onComplete(List<SinkEntry> sent,
Map<Integer, MutableObject<Throwable>> replica2Error) {
+ long maxSequenceId = Long.MIN_VALUE;
long toReleaseSize = 0;
for (SinkEntry entry : sent) {
+ maxSequenceId = Math.max(maxSequenceId, entry.key.getSequenceId());
entry.replicated();
toReleaseSize += entry.size;
}
@@ -173,9 +178,20 @@ public class RegionReplicationSink {
Integer replicaId = entry.getKey();
Throwable error = entry.getValue().getValue();
if (error != null) {
- LOG.warn("Failed to replicate to secondary replica {} for {}, stop replicating" +
- " for a while and trigger a flush", replicaId, primary, error);
- failed.add(replicaId);
+ if (maxSequenceId > lastFlushSequenceNumber) {
+ LOG.warn(
+ "Failed to replicate to secondary replica {} for {}, since the max sequence" +
+ " id of sunk entris is {}, which is greater than the last flush SN {}," +
+ " we will stop replicating for a while and trigger a flush",
+ replicaId, primary, maxSequenceId, lastFlushSequenceNumber, error);
+ failed.add(replicaId);
+ } else {
+ LOG.warn(
+ "Failed to replicate to secondary replica {} for {}, since the max sequence" +
+ " id of sunk entris is {}, which is less than or equal to the last flush SN {}," +
+ " we will not stop replicating",
+ replicaId, primary, maxSequenceId, lastFlushSequenceNumber, error);
+ }
}
}
synchronized (entries) {
@@ -215,6 +231,9 @@ public class RegionReplicationSink {
AtomicInteger remaining = new AtomicInteger(toSendReplicaCount);
Map<Integer, MutableObject<Throwable>> replica2Error = new HashMap<>();
for (int replicaId = 1; replicaId < regionReplication; replicaId++) {
+ if (failedReplicas.contains(replicaId)) {
+ continue;
+ }
MutableObject<Throwable> error = new MutableObject<>();
replica2Error.put(replicaId, error);
RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(primary, replicaId);
@@ -292,6 +311,7 @@ public class RegionReplicationSink {
break;
}
}
+ lastFlushSequenceNumber = flushDesc.getFlushSequenceNumber();
failedReplicas.clear();
LOG.debug(
"Got a flush all request with sequence id {}, clear failed replicas {}" +
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java
index 19b1698..248cdba 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java
@@ -28,11 +28,19 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableNameTestRule;
@@ -45,14 +53,20 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.ipc.ServerCall;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
+
@Category({ RegionServerTests.class, MediumTests.class })
public class TestRegionReplicationSink {
@@ -72,6 +86,8 @@ public class TestRegionReplicationSink {
private RegionReplicationBufferManager manager;
+ private RegionReplicationSink sink;
+
@Rule
public final TableNameTestRule name = new TableNameTestRule();
@@ -84,15 +100,17 @@ public class TestRegionReplicationSink {
flushRequester = mock(Runnable.class);
conn = mock(AsyncClusterConnection.class);
manager = mock(RegionReplicationBufferManager.class);
+ sink = new RegionReplicationSink(conf, primary, td, manager, flushRequester, conn);
}
- private RegionReplicationSink create() {
- return new RegionReplicationSink(conf, primary, td, manager, flushRequester, conn);
+ @After
+ public void tearDown() throws InterruptedException {
+ sink.stop();
+ sink.waitUntilStopped();
}
@Test
public void testNormal() {
- RegionReplicationSink sink = create();
MutableInt next = new MutableInt(0);
List<CompletableFuture<Void>> futures =
Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>());
@@ -129,7 +147,6 @@ public class TestRegionReplicationSink {
@Test
public void testDropEdits() {
- RegionReplicationSink sink = create();
MutableInt next = new MutableInt(0);
List<CompletableFuture<Void>> futures =
Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>());
@@ -191,4 +208,109 @@ public class TestRegionReplicationSink {
// replicas
verify(conn, times(2)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
}
+
+ @Test
+ public void testNotAddToFailedReplicas() {
+ MutableInt next = new MutableInt(0);
+ List<CompletableFuture<Void>> futures =
+ Stream.generate(() -> new CompletableFuture<Void>()).limit(4).collect(Collectors.toList());
+ when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
+ .then(i -> futures.get(next.getAndIncrement()));
+
+ ServerCall<?> rpcCall1 = mock(ServerCall.class);
+ WALKeyImpl key1 = mock(WALKeyImpl.class);
+ when(key1.estimatedSerializedSizeOf()).thenReturn(100L);
+ when(key1.getSequenceId()).thenReturn(1L);
+ WALEdit edit1 = mock(WALEdit.class);
+ when(edit1.estimatedSerializedSizeOf()).thenReturn(1000L);
+ when(manager.increase(anyLong())).thenReturn(true);
+ sink.add(key1, edit1, rpcCall1);
+
+ ServerCall<?> rpcCall2 = mock(ServerCall.class);
+ WALKeyImpl key2 = mock(WALKeyImpl.class);
+ when(key2.estimatedSerializedSizeOf()).thenReturn(200L);
+ when(key2.getSequenceId()).thenReturn(3L);
+
+ Map<byte[], List<Path>> committedFiles = td.getColumnFamilyNames().stream()
+ .collect(Collectors.toMap(Function.identity(), k -> Collections.emptyList(), (u, v) -> {
+ throw new IllegalStateException();
+ }, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR)));
+ FlushDescriptor fd =
+ ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, primary, 2L, committedFiles);
+ WALEdit edit2 = WALEdit.createFlushWALEdit(primary, fd);
+ sink.add(key2, edit2, rpcCall2);
+
+ // fail the call to replica 2
+ futures.get(0).complete(null);
+ futures.get(1).completeExceptionally(new IOException("inject error"));
+
+ // the failure should not cause replica 2 to be added to failedReplicas, as we have already
+ // trigger a flush after it.
+ verify(conn, times(4)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
+
+ futures.get(2).complete(null);
+ futures.get(3).complete(null);
+
+ // should have send out all so no pending entries.
+ assertEquals(0, sink.pendingSize());
+ }
+
+ @Test
+ public void testAddToFailedReplica() {
+ MutableInt next = new MutableInt(0);
+ List<CompletableFuture<Void>> futures =
+ Stream.generate(() -> new CompletableFuture<Void>()).limit(5).collect(Collectors.toList());
+ when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
+ .then(i -> futures.get(next.getAndIncrement()));
+
+ ServerCall<?> rpcCall1 = mock(ServerCall.class);
+ WALKeyImpl key1 = mock(WALKeyImpl.class);
+ when(key1.estimatedSerializedSizeOf()).thenReturn(100L);
+ when(key1.getSequenceId()).thenReturn(1L);
+ WALEdit edit1 = mock(WALEdit.class);
+ when(edit1.estimatedSerializedSizeOf()).thenReturn(1000L);
+ when(manager.increase(anyLong())).thenReturn(true);
+ sink.add(key1, edit1, rpcCall1);
+
+ ServerCall<?> rpcCall2 = mock(ServerCall.class);
+ WALKeyImpl key2 = mock(WALKeyImpl.class);
+ when(key2.estimatedSerializedSizeOf()).thenReturn(200L);
+ when(key2.getSequenceId()).thenReturn(1L);
+ WALEdit edit2 = mock(WALEdit.class);
+ when(edit2.estimatedSerializedSizeOf()).thenReturn(2000L);
+ when(manager.increase(anyLong())).thenReturn(true);
+ sink.add(key2, edit2, rpcCall2);
+
+ // fail the call to replica 2
+ futures.get(0).complete(null);
+ futures.get(1).completeExceptionally(new IOException("inject error"));
+
+ // we should only call replicate once for edit2, since replica 2 is marked as failed
+ verify(conn, times(3)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
+ futures.get(2).complete(null);
+ // should have send out all so no pending entries.
+ assertEquals(0, sink.pendingSize());
+
+ ServerCall<?> rpcCall3 = mock(ServerCall.class);
+ WALKeyImpl key3 = mock(WALKeyImpl.class);
+ when(key3.estimatedSerializedSizeOf()).thenReturn(200L);
+ when(key3.getSequenceId()).thenReturn(3L);
+ Map<byte[], List<Path>> committedFiles = td.getColumnFamilyNames().stream()
+ .collect(Collectors.toMap(Function.identity(), k -> Collections.emptyList(), (u, v) -> {
+ throw new IllegalStateException();
+ }, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR)));
+ FlushDescriptor fd =
+ ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, primary, 2L, committedFiles);
+ WALEdit edit3 = WALEdit.createFlushWALEdit(primary, fd);
+ sink.add(key3, edit3, rpcCall3);
+
+ // the flush marker should have cleared the failedReplicas, so we will send the edit to 2
+ // replicas again
+ verify(conn, times(5)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
+ futures.get(3).complete(null);
+ futures.get(4).complete(null);
+
+ // should have send out all so no pending entries.
+ assertEquals(0, sink.pendingSize());
+ }
}