You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/11/18 15:26:24 UTC

[hbase] 06/06: HBASE-26457 Should not always clear all the failed replicas when getting a flush all request (#3850)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-26233
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit aa3d0b49a387e13de0485fbc547439bc641c2597
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Wed Nov 17 23:20:22 2021 +0800

    HBASE-26457 Should not always clear all the failed replicas when getting a flush all request (#3850)
    
    Signed-off-by: Xiaolin Ha <ha...@apache.org>
---
 .../regionreplication/RegionReplicationSink.java   | 119 +++++++++++++--------
 .../TestRegionReplicationSink.java                 |  87 +++++++++++++++
 2 files changed, 162 insertions(+), 44 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
index 9095870..d5e2387 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
@@ -22,15 +22,16 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
-import org.agrona.collections.IntHashSet;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
@@ -123,8 +124,11 @@ public class RegionReplicationSink {
   private final AsyncClusterConnection conn;
 
   // used to track the replicas which we failed to replicate edits to them
-  // will be cleared after we get a flush edit.
-  private final IntHashSet failedReplicas = new IntHashSet();
+  // the key is the replica id, the value is the sequence id of the last failed edit
+  // when we get a flush all request, we will try to remove a replica from this map, the key point
+  // here is the flush sequence number must be greater than the failed sequence id, otherwise we
+  // should not remove the replica from this map
+  private final Map<Integer, Long> failedReplicas = new HashMap<>();
 
   private final Queue<SinkEntry> entries = new ArrayDeque<>();
 
@@ -180,16 +184,16 @@ public class RegionReplicationSink {
       if (error != null) {
         if (maxSequenceId > lastFlushedSequenceId) {
           LOG.warn(
-            "Failed to replicate to secondary replica {} for {}, since the max sequence" +
-              " id of sunk entris is {}, which is greater than the last flush SN {}," +
-              " we will stop replicating for a while and trigger a flush",
+            "Failed to replicate to secondary replica {} for {}, since the max sequence"
+              + " id of sunk entris is {}, which is greater than the last flush SN {},"
+              + " we will stop replicating for a while and trigger a flush",
             replicaId, primary, maxSequenceId, lastFlushedSequenceId, error);
           failed.add(replicaId);
         } else {
           LOG.warn(
-            "Failed to replicate to secondary replica {} for {}, since the max sequence" +
-              " id of sunk entris is {}, which is less than or equal to the last flush SN {}," +
-              " we will not stop replicating",
+            "Failed to replicate to secondary replica {} for {}, since the max sequence"
+              + " id of sunk entris is {}, which is less than or equal to the last flush SN {},"
+              + " we will not stop replicating",
             replicaId, primary, maxSequenceId, lastFlushedSequenceId, error);
         }
       }
@@ -197,7 +201,9 @@ public class RegionReplicationSink {
     synchronized (entries) {
       pendingSize -= toReleaseSize;
       if (!failed.isEmpty()) {
-        failedReplicas.addAll(failed);
+        for (Integer replicaId : failed) {
+          failedReplicas.put(replicaId, maxSequenceId);
+        }
         flushRequester.requestFlush(maxSequenceId);
       }
       sending = false;
@@ -231,7 +237,7 @@ public class RegionReplicationSink {
     AtomicInteger remaining = new AtomicInteger(toSendReplicaCount);
     Map<Integer, MutableObject<Throwable>> replica2Error = new HashMap<>();
     for (int replicaId = 1; replicaId < regionReplication; replicaId++) {
-      if (failedReplicas.contains(replicaId)) {
+      if (failedReplicas.containsKey(replicaId)) {
         continue;
       }
       MutableObject<Throwable> error = new MutableObject<>();
@@ -247,7 +253,7 @@ public class RegionReplicationSink {
     }
   }
 
-  private boolean flushAllStores(FlushDescriptor flushDesc) {
+  private boolean isFlushAllStores(FlushDescriptor flushDesc) {
     Set<byte[]> storesFlushed =
       flushDesc.getStoreFlushesList().stream().map(sfd -> sfd.getFamilyName().toByteArray())
         .collect(Collectors.toCollection(() -> new TreeSet<>(Bytes.BYTES_COMPARATOR)));
@@ -257,6 +263,24 @@ public class RegionReplicationSink {
     return storesFlushed.containsAll(tableDesc.getColumnFamilyNames());
   }
 
+  private Optional<FlushDescriptor> getFlushAllDescriptor(Cell metaCell) {
+    if (!CellUtil.matchingFamily(metaCell, WALEdit.METAFAMILY)) {
+      return Optional.empty();
+    }
+    FlushDescriptor flushDesc;
+    try {
+      flushDesc = WALEdit.getFlushDescriptor(metaCell);
+    } catch (IOException e) {
+      LOG.warn("Failed to parse FlushDescriptor from {}", metaCell);
+      return Optional.empty();
+    }
+    if (flushDesc != null && isFlushAllStores(flushDesc)) {
+      return Optional.of(flushDesc);
+    } else {
+      return Optional.empty();
+    }
+  }
+
   private void clearAllEntries() {
     long toClearSize = 0;
     for (SinkEntry entry : entries) {
@@ -268,6 +292,20 @@ public class RegionReplicationSink {
     manager.decrease(toClearSize);
   }
 
+  private void clearFailedReplica(long flushSequenceNumber) {
+    for (Iterator<Map.Entry<Integer, Long>> iter = failedReplicas.entrySet().iterator(); iter
+      .hasNext();) {
+      Map.Entry<Integer, Long> entry = iter.next();
+      if (entry.getValue().longValue() < flushSequenceNumber) {
+        LOG.debug(
+          "Got a flush all request with sequence id {}, clear failed replica {}" +
+            " with last failed sequence id {}",
+          flushSequenceNumber, entry.getKey(), entry.getValue());
+        iter.remove();
+      }
+    }
+  }
+
   /**
    * Add this edit to replication queue.
    * <p/>
@@ -287,41 +325,34 @@ public class RegionReplicationSink {
         // check whether we flushed all stores, which means we could drop all the previous edits,
         // and also, recover from the previous failure of some replicas
         for (Cell metaCell : edit.getCells()) {
-          if (CellUtil.matchingFamily(metaCell, WALEdit.METAFAMILY)) {
-            FlushDescriptor flushDesc;
-            try {
-              flushDesc = WALEdit.getFlushDescriptor(metaCell);
-            } catch (IOException e) {
-              LOG.warn("Failed to parse FlushDescriptor from {}", metaCell);
-              continue;
-            }
-            if (flushDesc != null && flushAllStores(flushDesc)) {
-              long flushedSequenceId = flushDesc.getFlushSequenceNumber();
-              int toClearCount = 0;
-              long toClearSize = 0;
-              for (;;) {
-                SinkEntry e = entries.peek();
-                if (e == null) {
-                  break;
-                }
-                if (e.key.getSequenceId() < flushedSequenceId) {
-                  entries.poll();
-                  toClearCount++;
-                  toClearSize += e.size;
-                } else {
-                  break;
-                }
+          getFlushAllDescriptor(metaCell).ifPresent(flushDesc -> {
+            long flushSequenceNumber = flushDesc.getFlushSequenceNumber();
+            int toClearCount = 0;
+            long toClearSize = 0;
+            for (;;) {
+              SinkEntry e = entries.peek();
+              if (e == null) {
+                break;
               }
-              lastFlushedSequenceId = flushedSequenceId;
-              failedReplicas.clear();
+              if (e.key.getSequenceId() < flushSequenceNumber) {
+                entries.poll();
+                toClearCount++;
+                toClearSize += e.size;
+              } else {
+                break;
+              }
+            }
+            lastFlushedSequenceId = flushSequenceNumber;
+            if (LOG.isDebugEnabled()) {
               LOG.debug(
-                "Got a flush all request with sequence id {}, clear failed replicas {}" +
-                  " and {} pending entries with size {}",
-                flushedSequenceId, failedReplicas, toClearCount,
+                "Got a flush all request with sequence id {}, clear {} pending"
+                  + " entries with size {}",
+                flushSequenceNumber, toClearCount,
                 StringUtils.TraditionalBinaryPrefix.long2String(toClearSize, "", 1));
-              flushRequester.recordFlush(flushedSequenceId);
             }
-          }
+            clearFailedReplica(flushSequenceNumber);
+            flushRequester.recordFlush(flushSequenceNumber);
+          });
         }
       }
       if (failedReplicas.size() == regionReplication - 1) {
@@ -340,7 +371,7 @@ public class RegionReplicationSink {
         // failed
         clearAllEntries();
         for (int replicaId = 1; replicaId < regionReplication; replicaId++) {
-          failedReplicas.add(replicaId);
+          failedReplicas.put(replicaId, entry.key.getSequenceId());
         }
         flushRequester.requestFlush(entry.key.getSequenceId());
       }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java
index 248cdba..76a224b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java
@@ -313,4 +313,91 @@ public class TestRegionReplicationSink {
     // should have send out all so no pending entries.
     assertEquals(0, sink.pendingSize());
   }
+
+  @Test
+  public void testNotClearFailedReplica() {
+    // simulate this scenario:
+    // 1. prepare flush
+    // 2. add one edit broken
+    // 3. commit flush with flush sequence number less than the previous edit(this is the normal
+    // case)
+    // we should not clear the failed replica as we do not flush the broken edit out with this
+    // flush, we need an extra flush to flush it out
+    MutableInt next = new MutableInt(0);
+    List<CompletableFuture<Void>> futures =
+      Stream.generate(() -> new CompletableFuture<Void>()).limit(8).collect(Collectors.toList());
+    when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
+      .then(i -> futures.get(next.getAndIncrement()));
+    when(manager.increase(anyLong())).thenReturn(true);
+
+    ServerCall<?> rpcCall1 = mock(ServerCall.class);
+    WALKeyImpl key1 = mock(WALKeyImpl.class);
+    when(key1.estimatedSerializedSizeOf()).thenReturn(100L);
+    when(key1.getSequenceId()).thenReturn(1L);
+    Map<byte[], List<Path>> committedFiles = td.getColumnFamilyNames().stream()
+      .collect(Collectors.toMap(Function.identity(), k -> Collections.emptyList(), (u, v) -> {
+        throw new IllegalStateException();
+      }, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR)));
+    FlushDescriptor fd =
+      ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, primary, 1L, committedFiles);
+    WALEdit edit1 = WALEdit.createFlushWALEdit(primary, fd);
+    sink.add(key1, edit1, rpcCall1);
+
+    futures.get(0).complete(null);
+    futures.get(1).complete(null);
+
+    ServerCall<?> rpcCall2 = mock(ServerCall.class);
+    WALKeyImpl key2 = mock(WALKeyImpl.class);
+    when(key2.estimatedSerializedSizeOf()).thenReturn(200L);
+    when(key2.getSequenceId()).thenReturn(2L);
+    WALEdit edit2 = mock(WALEdit.class);
+    when(edit2.estimatedSerializedSizeOf()).thenReturn(2000L);
+    sink.add(key2, edit2, rpcCall2);
+
+    // fail the call to replica 1
+    futures.get(2).completeExceptionally(new IOException("inject error"));
+    futures.get(3).complete(null);
+
+    ServerCall<?> rpcCall3 = mock(ServerCall.class);
+    WALKeyImpl key3 = mock(WALKeyImpl.class);
+    when(key3.estimatedSerializedSizeOf()).thenReturn(300L);
+    when(key3.getSequenceId()).thenReturn(3L);
+    FlushDescriptor fd3 =
+      ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, primary, 1L, committedFiles);
+    WALEdit edit3 = WALEdit.createFlushWALEdit(primary, fd3);
+    sink.add(key3, edit3, rpcCall3);
+
+    // we should only call replicate once for edit3, since replica 1 is marked as failed, and the
+    // flush request can not clean the failed replica since the flush sequence number is not greater
+    // than sequence id of the last failed edit
+    verify(conn, times(5)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
+    futures.get(4).complete(null);
+
+    ServerCall<?> rpcCall4 = mock(ServerCall.class);
+    WALKeyImpl key4 = mock(WALKeyImpl.class);
+    when(key4.estimatedSerializedSizeOf()).thenReturn(400L);
+    when(key4.getSequenceId()).thenReturn(4L);
+    WALEdit edit4 = mock(WALEdit.class);
+    when(edit4.estimatedSerializedSizeOf()).thenReturn(4000L);
+    sink.add(key4, edit4, rpcCall4);
+
+    // still, only send to replica 2
+    verify(conn, times(6)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
+    futures.get(5).complete(null);
+
+    ServerCall<?> rpcCall5 = mock(ServerCall.class);
+    WALKeyImpl key5 = mock(WALKeyImpl.class);
+    when(key5.estimatedSerializedSizeOf()).thenReturn(300L);
+    when(key5.getSequenceId()).thenReturn(3L);
+    FlushDescriptor fd5 =
+      ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, primary, 4L, committedFiles);
+    WALEdit edit5 = WALEdit.createFlushWALEdit(primary, fd5);
+    sink.add(key5, edit5, rpcCall5);
+
+    futures.get(6).complete(null);
+    futures.get(7).complete(null);
+    // should have cleared the failed replica because the flush sequence number is greater than than
+    // the sequence id of the last failed edit
+    verify(conn, times(8)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
+  }
 }