You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/11/23 14:37:16 UTC
[hbase] 06/08: HBASE-26457 Should not always clear all the failed replicas when getting a flush all request (#3850)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch HBASE-26233
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 8fd503b3db3aea069f26d50af9e1d2b211f530aa
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Wed Nov 17 23:20:22 2021 +0800
HBASE-26457 Should not always clear all the failed replicas when getting a flush all request (#3850)
Signed-off-by: Xiaolin Ha <ha...@apache.org>
---
.../regionreplication/RegionReplicationSink.java | 119 +++++++++++++--------
.../TestRegionReplicationSink.java | 87 +++++++++++++++
2 files changed, 162 insertions(+), 44 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
index 9095870..d5e2387 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
@@ -22,15 +22,16 @@ import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
-import org.agrona.collections.IntHashSet;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
@@ -123,8 +124,11 @@ public class RegionReplicationSink {
private final AsyncClusterConnection conn;
// used to track the replicas which we failed to replicate edits to them
- // will be cleared after we get a flush edit.
- private final IntHashSet failedReplicas = new IntHashSet();
+ // the key is the replica id, the value is the sequence id of the last failed edit
+ // when we get a flush all request, we will try to remove a replica from this map, the key point
+ // here is the flush sequence number must be greater than the failed sequence id, otherwise we
+ // should not remove the replica from this map
+ private final Map<Integer, Long> failedReplicas = new HashMap<>();
private final Queue<SinkEntry> entries = new ArrayDeque<>();
@@ -180,16 +184,16 @@ public class RegionReplicationSink {
if (error != null) {
if (maxSequenceId > lastFlushedSequenceId) {
LOG.warn(
- "Failed to replicate to secondary replica {} for {}, since the max sequence" +
- " id of sunk entris is {}, which is greater than the last flush SN {}," +
- " we will stop replicating for a while and trigger a flush",
+ "Failed to replicate to secondary replica {} for {}, since the max sequence"
+ + " id of sunk entris is {}, which is greater than the last flush SN {},"
+ + " we will stop replicating for a while and trigger a flush",
replicaId, primary, maxSequenceId, lastFlushedSequenceId, error);
failed.add(replicaId);
} else {
LOG.warn(
- "Failed to replicate to secondary replica {} for {}, since the max sequence" +
- " id of sunk entris is {}, which is less than or equal to the last flush SN {}," +
- " we will not stop replicating",
+ "Failed to replicate to secondary replica {} for {}, since the max sequence"
+ + " id of sunk entris is {}, which is less than or equal to the last flush SN {},"
+ + " we will not stop replicating",
replicaId, primary, maxSequenceId, lastFlushedSequenceId, error);
}
}
@@ -197,7 +201,9 @@ public class RegionReplicationSink {
synchronized (entries) {
pendingSize -= toReleaseSize;
if (!failed.isEmpty()) {
- failedReplicas.addAll(failed);
+ for (Integer replicaId : failed) {
+ failedReplicas.put(replicaId, maxSequenceId);
+ }
flushRequester.requestFlush(maxSequenceId);
}
sending = false;
@@ -231,7 +237,7 @@ public class RegionReplicationSink {
AtomicInteger remaining = new AtomicInteger(toSendReplicaCount);
Map<Integer, MutableObject<Throwable>> replica2Error = new HashMap<>();
for (int replicaId = 1; replicaId < regionReplication; replicaId++) {
- if (failedReplicas.contains(replicaId)) {
+ if (failedReplicas.containsKey(replicaId)) {
continue;
}
MutableObject<Throwable> error = new MutableObject<>();
@@ -247,7 +253,7 @@ public class RegionReplicationSink {
}
}
- private boolean flushAllStores(FlushDescriptor flushDesc) {
+ private boolean isFlushAllStores(FlushDescriptor flushDesc) {
Set<byte[]> storesFlushed =
flushDesc.getStoreFlushesList().stream().map(sfd -> sfd.getFamilyName().toByteArray())
.collect(Collectors.toCollection(() -> new TreeSet<>(Bytes.BYTES_COMPARATOR)));
@@ -257,6 +263,24 @@ public class RegionReplicationSink {
return storesFlushed.containsAll(tableDesc.getColumnFamilyNames());
}
+ private Optional<FlushDescriptor> getFlushAllDescriptor(Cell metaCell) {
+ if (!CellUtil.matchingFamily(metaCell, WALEdit.METAFAMILY)) {
+ return Optional.empty();
+ }
+ FlushDescriptor flushDesc;
+ try {
+ flushDesc = WALEdit.getFlushDescriptor(metaCell);
+ } catch (IOException e) {
+ LOG.warn("Failed to parse FlushDescriptor from {}", metaCell);
+ return Optional.empty();
+ }
+ if (flushDesc != null && isFlushAllStores(flushDesc)) {
+ return Optional.of(flushDesc);
+ } else {
+ return Optional.empty();
+ }
+ }
+
private void clearAllEntries() {
long toClearSize = 0;
for (SinkEntry entry : entries) {
@@ -268,6 +292,20 @@ public class RegionReplicationSink {
manager.decrease(toClearSize);
}
+ private void clearFailedReplica(long flushSequenceNumber) {
+ for (Iterator<Map.Entry<Integer, Long>> iter = failedReplicas.entrySet().iterator(); iter
+ .hasNext();) {
+ Map.Entry<Integer, Long> entry = iter.next();
+ if (entry.getValue().longValue() < flushSequenceNumber) {
+ LOG.debug(
+ "Got a flush all request with sequence id {}, clear failed replica {}" +
+ " with last failed sequence id {}",
+ flushSequenceNumber, entry.getKey(), entry.getValue());
+ iter.remove();
+ }
+ }
+ }
+
/**
* Add this edit to replication queue.
* <p/>
@@ -287,41 +325,34 @@ public class RegionReplicationSink {
// check whether we flushed all stores, which means we could drop all the previous edits,
// and also, recover from the previous failure of some replicas
for (Cell metaCell : edit.getCells()) {
- if (CellUtil.matchingFamily(metaCell, WALEdit.METAFAMILY)) {
- FlushDescriptor flushDesc;
- try {
- flushDesc = WALEdit.getFlushDescriptor(metaCell);
- } catch (IOException e) {
- LOG.warn("Failed to parse FlushDescriptor from {}", metaCell);
- continue;
- }
- if (flushDesc != null && flushAllStores(flushDesc)) {
- long flushedSequenceId = flushDesc.getFlushSequenceNumber();
- int toClearCount = 0;
- long toClearSize = 0;
- for (;;) {
- SinkEntry e = entries.peek();
- if (e == null) {
- break;
- }
- if (e.key.getSequenceId() < flushedSequenceId) {
- entries.poll();
- toClearCount++;
- toClearSize += e.size;
- } else {
- break;
- }
+ getFlushAllDescriptor(metaCell).ifPresent(flushDesc -> {
+ long flushSequenceNumber = flushDesc.getFlushSequenceNumber();
+ int toClearCount = 0;
+ long toClearSize = 0;
+ for (;;) {
+ SinkEntry e = entries.peek();
+ if (e == null) {
+ break;
}
- lastFlushedSequenceId = flushedSequenceId;
- failedReplicas.clear();
+ if (e.key.getSequenceId() < flushSequenceNumber) {
+ entries.poll();
+ toClearCount++;
+ toClearSize += e.size;
+ } else {
+ break;
+ }
+ }
+ lastFlushedSequenceId = flushSequenceNumber;
+ if (LOG.isDebugEnabled()) {
LOG.debug(
- "Got a flush all request with sequence id {}, clear failed replicas {}" +
- " and {} pending entries with size {}",
- flushedSequenceId, failedReplicas, toClearCount,
+ "Got a flush all request with sequence id {}, clear {} pending"
+ + " entries with size {}",
+ flushSequenceNumber, toClearCount,
StringUtils.TraditionalBinaryPrefix.long2String(toClearSize, "", 1));
- flushRequester.recordFlush(flushedSequenceId);
}
- }
+ clearFailedReplica(flushSequenceNumber);
+ flushRequester.recordFlush(flushSequenceNumber);
+ });
}
}
if (failedReplicas.size() == regionReplication - 1) {
@@ -340,7 +371,7 @@ public class RegionReplicationSink {
// failed
clearAllEntries();
for (int replicaId = 1; replicaId < regionReplication; replicaId++) {
- failedReplicas.add(replicaId);
+ failedReplicas.put(replicaId, entry.key.getSequenceId());
}
flushRequester.requestFlush(entry.key.getSequenceId());
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java
index 248cdba..76a224b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java
@@ -313,4 +313,91 @@ public class TestRegionReplicationSink {
// should have send out all so no pending entries.
assertEquals(0, sink.pendingSize());
}
+
+ @Test
+ public void testNotClearFailedReplica() {
+ // simulate this scenario:
+ // 1. prepare flush
+ // 2. add one edit broken
+ // 3. commit flush with flush sequence number less than the previous edit(this is the normal
+ // case)
+ // we should not clear the failed replica as we do not flush the broken edit out with this
+ // flush, we need an extra flush to flush it out
+ MutableInt next = new MutableInt(0);
+ List<CompletableFuture<Void>> futures =
+ Stream.generate(() -> new CompletableFuture<Void>()).limit(8).collect(Collectors.toList());
+ when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
+ .then(i -> futures.get(next.getAndIncrement()));
+ when(manager.increase(anyLong())).thenReturn(true);
+
+ ServerCall<?> rpcCall1 = mock(ServerCall.class);
+ WALKeyImpl key1 = mock(WALKeyImpl.class);
+ when(key1.estimatedSerializedSizeOf()).thenReturn(100L);
+ when(key1.getSequenceId()).thenReturn(1L);
+ Map<byte[], List<Path>> committedFiles = td.getColumnFamilyNames().stream()
+ .collect(Collectors.toMap(Function.identity(), k -> Collections.emptyList(), (u, v) -> {
+ throw new IllegalStateException();
+ }, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR)));
+ FlushDescriptor fd =
+ ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, primary, 1L, committedFiles);
+ WALEdit edit1 = WALEdit.createFlushWALEdit(primary, fd);
+ sink.add(key1, edit1, rpcCall1);
+
+ futures.get(0).complete(null);
+ futures.get(1).complete(null);
+
+ ServerCall<?> rpcCall2 = mock(ServerCall.class);
+ WALKeyImpl key2 = mock(WALKeyImpl.class);
+ when(key2.estimatedSerializedSizeOf()).thenReturn(200L);
+ when(key2.getSequenceId()).thenReturn(2L);
+ WALEdit edit2 = mock(WALEdit.class);
+ when(edit2.estimatedSerializedSizeOf()).thenReturn(2000L);
+ sink.add(key2, edit2, rpcCall2);
+
+ // fail the call to replica 1
+ futures.get(2).completeExceptionally(new IOException("inject error"));
+ futures.get(3).complete(null);
+
+ ServerCall<?> rpcCall3 = mock(ServerCall.class);
+ WALKeyImpl key3 = mock(WALKeyImpl.class);
+ when(key3.estimatedSerializedSizeOf()).thenReturn(300L);
+ when(key3.getSequenceId()).thenReturn(3L);
+ FlushDescriptor fd3 =
+ ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, primary, 1L, committedFiles);
+ WALEdit edit3 = WALEdit.createFlushWALEdit(primary, fd3);
+ sink.add(key3, edit3, rpcCall3);
+
+ // we should only call replicate once for edit3, since replica 1 is marked as failed, and the
+ // flush request can not clean the failed replica since the flush sequence number is not greater
+ // than sequence id of the last failed edit
+ verify(conn, times(5)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
+ futures.get(4).complete(null);
+
+ ServerCall<?> rpcCall4 = mock(ServerCall.class);
+ WALKeyImpl key4 = mock(WALKeyImpl.class);
+ when(key4.estimatedSerializedSizeOf()).thenReturn(400L);
+ when(key4.getSequenceId()).thenReturn(4L);
+ WALEdit edit4 = mock(WALEdit.class);
+ when(edit4.estimatedSerializedSizeOf()).thenReturn(4000L);
+ sink.add(key4, edit4, rpcCall4);
+
+ // still, only send to replica 2
+ verify(conn, times(6)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
+ futures.get(5).complete(null);
+
+ ServerCall<?> rpcCall5 = mock(ServerCall.class);
+ WALKeyImpl key5 = mock(WALKeyImpl.class);
+ when(key5.estimatedSerializedSizeOf()).thenReturn(300L);
+ when(key5.getSequenceId()).thenReturn(3L);
+ FlushDescriptor fd5 =
+ ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, primary, 4L, committedFiles);
+ WALEdit edit5 = WALEdit.createFlushWALEdit(primary, fd5);
+ sink.add(key5, edit5, rpcCall5);
+
+ futures.get(6).complete(null);
+ futures.get(7).complete(null);
+ // should have cleared the failed replica because the flush sequence number is greater than than
+ // the sequence id of the last failed edit
+ verify(conn, times(8)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
+ }
}