You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/11/09 13:41:56 UTC

[hbase] branch HBASE-26233 updated: HBASE-26412 Handle sink failure in RegionReplicationSink (#3815)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-26233
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/HBASE-26233 by this push:
     new 502c1e3  HBASE-26412 Handle sink failure in RegionReplicationSink (#3815)
502c1e3 is described below

commit 502c1e3f56d8afbfa1cfbb1f366390c9bdb66aaf
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Tue Nov 9 21:41:25 2021 +0800

    HBASE-26412 Handle sink failure in RegionReplicationSink (#3815)
    
    Signed-off-by: GeorryHuang <hu...@apache.org>
---
 .../apache/hadoop/hbase/regionserver/HRegion.java  |   7 +-
 .../hbase/regionserver/RegionReplicationSink.java  | 147 ++++++++++++++-----
 .../TestRegionReplicaReplicationError.java         | 158 +++++++++++++++++++++
 .../regionserver/TestRegionReplicaReplication.java |   4 +-
 4 files changed, 275 insertions(+), 41 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index c9d456b..d3da9f3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -1104,8 +1104,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       return;
     }
     status.setStatus("Initializaing region replication sink");
-    regionReplicationSink = Optional.of(new RegionReplicationSink(conf, regionInfo,
-      regionReplication, td.hasRegionMemStoreReplication(), rss.getAsyncClusterConnection()));
+    regionReplicationSink = Optional.of(new RegionReplicationSink(conf, regionInfo, td, () -> {
+      rss.getFlushRequester().requestFlush(this, new ArrayList<>(td.getColumnFamilyNames()),
+        FlushLifeCycleTracker.DUMMY);
+    }, rss.getAsyncClusterConnection()));
+
   }
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionReplicationSink.java
index 6911289..cdd77e8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionReplicationSink.java
@@ -17,18 +17,29 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
-import java.util.concurrent.CompletableFuture;
+import java.util.Set;
+import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
+import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.ipc.ServerCall;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALEdit;
@@ -39,6 +50,8 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
+
 /**
  * The class for replicating WAL edits to secondary replicas, one instance per region.
  */
@@ -97,35 +110,39 @@ public class RegionReplicationSink {
 
   private final RegionInfo primary;
 
-  private final int regionReplication;
-
-  private final boolean hasRegionMemStoreReplication;
+  private final TableDescriptor tableDesc;
 
-  private final Queue<SinkEntry> entries = new ArrayDeque<>();
+  private final Runnable flushRequester;
 
   private final AsyncClusterConnection conn;
 
+  // used to track the replicas which we failed to replicate edits to them
+  // will be cleared after we get a flush edit.
+  private final Set<Integer> failedReplicas = new HashSet<>();
+
+  private final Queue<SinkEntry> entries = new ArrayDeque<>();
+
   private final int retries;
 
   private final long rpcTimeoutNs;
 
   private final long operationTimeoutNs;
 
-  private CompletableFuture<Void> future;
+  private boolean sending;
 
   private boolean stopping;
 
   private boolean stopped;
 
-  RegionReplicationSink(Configuration conf, RegionInfo primary, int regionReplication,
-    boolean hasRegionMemStoreReplication, AsyncClusterConnection conn) {
+  RegionReplicationSink(Configuration conf, RegionInfo primary, TableDescriptor td,
+    Runnable flushRequester, AsyncClusterConnection conn) {
     Preconditions.checkArgument(RegionReplicaUtil.isDefaultReplica(primary), "%s is not primary",
       primary);
-    Preconditions.checkArgument(regionReplication > 1,
-      "region replication should be greater than 1 but got %s", regionReplication);
+    Preconditions.checkArgument(td.getRegionReplication() > 1,
+      "region replication should be greater than 1 but got %s", td.getRegionReplication());
     this.primary = primary;
-    this.regionReplication = regionReplication;
-    this.hasRegionMemStoreReplication = hasRegionMemStoreReplication;
+    this.tableDesc = td;
+    this.flushRequester = flushRequester;
     this.conn = conn;
     this.retries = conf.getInt(RETRIES_NUMBER, RETRIES_NUMBER_DEFAULT);
     this.rpcTimeoutNs =
@@ -134,6 +151,36 @@ public class RegionReplicationSink {
       .toNanos(conf.getLong(OPERATION_TIMEOUT_MS, OPERATION_TIMEOUT_MS_DEFAULT));
   }
 
+  private void onComplete(List<SinkEntry> sent,
+    Map<Integer, MutableObject<Throwable>> replica2Error) {
+    sent.forEach(SinkEntry::replicated);
+    Set<Integer> failed = new HashSet<>();
+    for (Map.Entry<Integer, MutableObject<Throwable>> entry : replica2Error.entrySet()) {
+      Integer replicaId = entry.getKey();
+      Throwable error = entry.getValue().getValue();
+      if (error != null) {
+        LOG.warn("Failed to replicate to secondary replica {} for {}, stop replicating" +
+          " for a while and trigger a flush", replicaId, primary, error);
+        failed.add(replicaId);
+      }
+    }
+    synchronized (entries) {
+      if (!failed.isEmpty()) {
+        failedReplicas.addAll(failed);
+        flushRequester.run();
+      }
+      sending = false;
+      if (stopping) {
+        stopped = true;
+        entries.notifyAll();
+        return;
+      }
+      if (!entries.isEmpty()) {
+        send();
+      }
+    }
+  }
+
   private void send() {
     List<SinkEntry> toSend = new ArrayList<>();
     for (SinkEntry entry;;) {
@@ -143,32 +190,37 @@ public class RegionReplicationSink {
       }
       toSend.add(entry);
     }
+    int toSendReplicaCount = tableDesc.getRegionReplication() - 1 - failedReplicas.size();
+    if (toSendReplicaCount <= 0) {
+      return;
+    }
+    sending = true;
     List<WAL.Entry> walEntries =
       toSend.stream().map(e -> new WAL.Entry(e.key, e.edit)).collect(Collectors.toList());
-    List<CompletableFuture<Void>> futures = new ArrayList<>();
-    for (int replicaId = 1; replicaId < regionReplication; replicaId++) {
+    AtomicInteger remaining = new AtomicInteger(toSendReplicaCount);
+    Map<Integer, MutableObject<Throwable>> replica2Error = new HashMap<>();
+    for (int replicaId = 1; replicaId < tableDesc.getRegionReplication(); replicaId++) {
+      MutableObject<Throwable> error = new MutableObject<>();
+      replica2Error.put(replicaId, error);
       RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(primary, replicaId);
-      futures.add(conn.replicate(replica, walEntries, retries, rpcTimeoutNs, operationTimeoutNs));
+      FutureUtils.addListener(
+        conn.replicate(replica, walEntries, retries, rpcTimeoutNs, operationTimeoutNs), (r, e) -> {
+          error.setValue(e);
+          if (remaining.decrementAndGet() == 0) {
+            onComplete(toSend, replica2Error);
+          }
+        });
     }
-    future = CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0]));
-    FutureUtils.addListener(future, (r, e) -> {
-      if (e != null) {
-        // TODO: drop pending edits and issue a flush
-        LOG.warn("Failed to replicate to secondary replicas for {}", primary, e);
-      }
-      toSend.forEach(SinkEntry::replicated);
-      synchronized (entries) {
-        future = null;
-        if (stopping) {
-          stopped = true;
-          entries.notifyAll();
-          return;
-        }
-        if (!entries.isEmpty()) {
-          send();
-        }
-      }
-    });
+  }
+
+  private boolean flushAllStores(FlushDescriptor flushDesc) {
+    Set<byte[]> storesFlushed =
+      flushDesc.getStoreFlushesList().stream().map(sfd -> sfd.getFamilyName().toByteArray())
+        .collect(Collectors.toCollection(() -> new TreeSet<>(Bytes.BYTES_COMPARATOR)));
+    if (storesFlushed.size() != tableDesc.getColumnFamilyCount()) {
+      return false;
+    }
+    return storesFlushed.containsAll(tableDesc.getColumnFamilyNames());
   }
 
   /**
@@ -178,7 +230,7 @@ public class RegionReplicationSink {
    * rpc call has cell scanner, which is off heap.
    */
   public void add(WALKeyImpl key, WALEdit edit, ServerCall<?> rpcCall) {
-    if (!hasRegionMemStoreReplication && !edit.isMetaEdit()) {
+    if (!tableDesc.hasRegionMemStoreReplication() && !edit.isMetaEdit()) {
       // only replicate meta edit if region memstore replication is not enabled
       return;
     }
@@ -186,10 +238,31 @@ public class RegionReplicationSink {
       if (stopping) {
         return;
       }
+      if (edit.isMetaEdit()) {
+        // check whether we flushed all stores, which means we could drop all the previous edits,
+        // and also, recover from the previous failure of some replicas
+        for (Cell metaCell : edit.getCells()) {
+          if (CellUtil.matchingFamily(metaCell, WALEdit.METAFAMILY)) {
+            FlushDescriptor flushDesc;
+            try {
+              flushDesc = WALEdit.getFlushDescriptor(metaCell);
+            } catch (IOException e) {
+              LOG.warn("Failed to parse FlushDescriptor from {}", metaCell);
+              continue;
+            }
+            if (flushDesc != null && flushAllStores(flushDesc)) {
+              LOG.debug("Got a flush all request, clear failed replicas {} and {} pending" +
+                " replication entries", failedReplicas, entries.size());
+              entries.clear();
+              failedReplicas.clear();
+            }
+          }
+        }
+      }
       // TODO: limit the total cached entries here, and we should have a global limitation, not for
       // only this region.
       entries.add(new SinkEntry(key, edit, rpcCall));
-      if (future == null) {
+      if (!sending) {
         send();
       }
     }
@@ -203,7 +276,7 @@ public class RegionReplicationSink {
   void stop() {
     synchronized (entries) {
       stopping = true;
-      if (future == null) {
+      if (!sending) {
         stopped = true;
         entries.notifyAll();
       }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaReplicationError.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaReplicationError.java
new file mode 100644
index 0000000..f88c4a9
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaReplicationError.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
+import org.apache.hadoop.hbase.StartTestingClusterOption;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.testclassification.FlakeyTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
+
+/**
+ * Test region replication when error occur.
+ * <p/>
+ * We can not simply move the secondary replicas as we will trigger a flush for the primary replica
+ * when secondary replica is online, which will always make the data of the two regions in sync. So
+ * here we need to simulate request errors.
+ */
+@Category({ FlakeyTests.class, LargeTests.class })
+public class TestRegionReplicaReplicationError {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestRegionReplicaReplicationError.class);
+
+  public static final class ErrorReplayRSRpcServices extends RSRpcServices {
+
+    private final AtomicInteger count = new AtomicInteger(0);
+
+    public ErrorReplayRSRpcServices(HRegionServer rs) throws IOException {
+      super(rs);
+    }
+
+    @Override
+    public ReplicateWALEntryResponse replay(RpcController controller,
+      ReplicateWALEntryRequest request) throws ServiceException {
+      List<WALEntry> entries = request.getEntryList();
+      if (CollectionUtils.isEmpty(entries)) {
+        return ReplicateWALEntryResponse.getDefaultInstance();
+      }
+      ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
+      HRegion region;
+      try {
+        region = server.getRegionByEncodedName(regionName.toStringUtf8());
+      } catch (NotServingRegionException e) {
+        throw new ServiceException(e);
+      }
+      // fail the first several request
+      if (region.getRegionInfo().getReplicaId() == 1 && count.addAndGet(entries.size()) < 100) {
+        throw new ServiceException("Inject error!");
+      }
+      return super.replay(controller, request);
+    }
+
+  }
+
+  public static final class RSForTest
+    extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer {
+
+    public RSForTest(Configuration conf) throws IOException, InterruptedException {
+      super(conf);
+    }
+
+    @Override
+    protected RSRpcServices createRpcServices() throws IOException {
+      return new ErrorReplayRSRpcServices(this);
+    }
+  }
+
+  private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
+
+  private static TableName TN = TableName.valueOf("test");
+
+  private static byte[] CF = Bytes.toBytes("cf");
+
+  private static byte[] CQ = Bytes.toBytes("cq");
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    HTU.getConfiguration().setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY,
+      true);
+    HTU.startMiniCluster(
+      StartTestingClusterOption.builder().rsClass(RSForTest.class).numRegionServers(3).build());
+    TableDescriptor td = TableDescriptorBuilder.newBuilder(TN).setRegionReplication(3)
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF)).build();
+    HTU.getAdmin().createTable(td);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    HTU.shutdownMiniCluster();
+  }
+
+  private boolean checkReplica(Table table, int replicaId) throws IOException {
+    boolean ret = true;
+    for (int i = 0; i < 500; i++) {
+      Result result = table.get(new Get(Bytes.toBytes(i)).setReplicaId(replicaId));
+      byte[] value = result.getValue(CF, CQ);
+      ret &= value != null && value.length > 0 && Bytes.toInt(value) == i;
+    }
+    return ret;
+  }
+
+  @Test
+  public void test() throws IOException {
+    try (Table table = HTU.getConnection().getTable(TN)) {
+      for (int i = 0; i < 500; i++) {
+        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
+      }
+      HTU.waitFor(30000, () -> checkReplica(table, 2));
+      HTU.waitFor(30000, () -> checkReplica(table, 1));
+    }
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java
index 7dd4255..231c9e1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java
@@ -53,8 +53,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
 
 /**
- * Tests RegionReplicaReplicationEndpoint class by setting up region replicas and verifying
- * async wal replication replays the edits to the secondary region in various scenarios.
+ * Tests region replication by setting up region replicas and verifying async wal replication
+ * replays the edits to the secondary region in various scenarios.
  */
 @Category({FlakeyTests.class, LargeTests.class})
 public class TestRegionReplicaReplication {