You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/11/27 13:53:21 UTC

[hbase] 10/12: HBASE-26481 Consider rolling upgrading from old region replication framework (#3880)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-26233
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 81823bb875de4d18866789a4c427e68134208775
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Sat Nov 27 12:03:18 2021 +0800

    HBASE-26481 Consider rolling upgrading from old region replication framework (#3880)
    
    Signed-off-by: Xin Sun <dd...@gmail.com>
    Reviewed-by: GeorryHuang <hu...@apache.org>
---
 .../hbase/client/AsyncRpcRetryingCaller.java       |  12 +-
 .../hadoop/hbase/replication/ReplicationUtils.java |   3 +
 .../AsyncRegionReplicationRetryingCaller.java      |  41 +++++--
 .../master/replication/ReplicationPeerManager.java |  12 +-
 .../regionserver/ReplicationSourceManager.java     |   9 +-
 .../hbase/client/TestFallbackToUseReplay.java      | 129 +++++++++++++++++++++
 ...StartupWithLegacyRegionReplicationEndpoint.java |  85 ++++++++++++++
 7 files changed, 278 insertions(+), 13 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
index 586e7d5..8648572 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
@@ -147,6 +147,16 @@ public abstract class AsyncRpcRetryingCaller<T> {
     return Optional.empty();
   }
 
+  // Sub classes can override this method to change the error type, to control the retry logic.
+  // For example, during rolling upgrading, if we call this newly added method, we will get a
+  // UnsupportedOperationException(wrapped by a DNRIOE), and sometimes we may want to fallback to
+  // use the old method first, so the sub class could change the exception type to something not a
+  // DNRIOE, so we will schedule a retry, and the next time the sub class could use old method to
+  // make the rpc call.
+  protected Throwable preProcessError(Throwable error) {
+    return error;
+  }
+
   protected final void onError(Throwable t, Supplier<String> errMsg,
       Consumer<Throwable> updateCachedLocation) {
     if (future.isDone()) {
@@ -156,7 +166,7 @@ public abstract class AsyncRpcRetryingCaller<T> {
       LOG.debug("The future is already done, canceled={}, give up retrying", future.isCancelled());
       return;
     }
-    Throwable error = translateException(t);
+    Throwable error = preProcessError(translateException(t));
     // We use this retrying caller to open a scanner, as it is idempotent, but we may throw
     // ScannerResetException, which is a DoNotRetryIOException when opening a scanner as now we will
     // also fetch data when opening a scanner. The intention here is that if we hit a
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
index a786206..e8ecec2 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
@@ -56,6 +56,9 @@ public final class ReplicationUtils {
   // since some FileSystem implementation may not support atomic rename.
   public static final String RENAME_WAL_SUFFIX = ".ren";
 
+  public static final String LEGACY_REGION_REPLICATION_ENDPOINT_NAME =
+    "org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint";
+
   private ReplicationUtils() {
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java
index c854ba3..726559f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java
@@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
 import java.io.IOException;
 import java.util.List;
 import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
 import org.apache.hadoop.hbase.util.Pair;
@@ -43,6 +44,11 @@ public class AsyncRegionReplicationRetryingCaller extends AsyncRpcRetryingCaller
 
   private final Entry[] entries;
 
+  // whether to use replay instead of replicateToReplica, during rolling upgrading if the target
+  // region server has not been upgraded then it will not have the replicateToReplica method, so we
+  // could use replay method first, though it is not perfect.
+  private boolean useReplay;
+
   public AsyncRegionReplicationRetryingCaller(HashedWheelTimer retryTimer,
     AsyncClusterConnectionImpl conn, int maxAttempts, long rpcTimeoutNs, long operationTimeoutNs,
     RegionInfo replica, List<Entry> entries) {
@@ -53,6 +59,27 @@ public class AsyncRegionReplicationRetryingCaller extends AsyncRpcRetryingCaller
     this.entries = entries.toArray(new Entry[0]);
   }
 
+  @Override
+  protected Throwable preProcessError(Throwable error) {
+    if (error instanceof DoNotRetryIOException &&
+      error.getCause() instanceof UnsupportedOperationException) {
+      // fallback to use replay, and also return the cause to let the upper retry
+      useReplay = true;
+      return error.getCause();
+    }
+    return error;
+  }
+
+  private void onComplete(HRegionLocation loc) {
+    if (controller.failed()) {
+      onError(controller.getFailed(),
+        () -> "Call to " + loc.getServerName() + " for " + replica + " failed",
+        err -> conn.getLocator().updateCachedLocationOnError(loc, err));
+    } else {
+      future.complete(null);
+    }
+  }
+
   private void call(HRegionLocation loc) {
     AdminService.Interface stub;
     try {
@@ -67,15 +94,11 @@ public class AsyncRegionReplicationRetryingCaller extends AsyncRpcRetryingCaller
       .buildReplicateWALEntryRequest(entries, replica.getEncodedNameAsBytes(), null, null, null);
     resetCallTimeout();
     controller.setCellScanner(pair.getSecond());
-    stub.replicateToReplica(controller, pair.getFirst(), r -> {
-      if (controller.failed()) {
-        onError(controller.getFailed(),
-          () -> "Call to " + loc.getServerName() + " for " + replica + " failed",
-          err -> conn.getLocator().updateCachedLocationOnError(loc, err));
-      } else {
-        future.complete(null);
-      }
-    });
+    if (useReplay) {
+      stub.replay(controller, pair.getFirst(), r -> onComplete(loc));
+    } else {
+      stub.replicateToReplica(controller, pair.getFirst(), r -> onComplete(loc));
+    }
   }
 
   @Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index d9829a5..b62d4b4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -57,6 +57,8 @@ import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
 import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
@@ -69,6 +71,8 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
 @InterfaceAudience.Private
 public class ReplicationPeerManager {
 
+  private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerManager.class);
+
   private final ReplicationPeerStorage peerStorage;
 
   private final ReplicationQueueStorage queueStorage;
@@ -546,7 +550,13 @@ public class ReplicationPeerManager {
     ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>();
     for (String peerId : peerStorage.listPeerIds()) {
       ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
-
+      if (ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME
+        .equals(peerConfig.getReplicationEndpointImpl())) {
+        // we do not use this endpoint for region replication any more, see HBASE-26233
+        LOG.warn("Legacy region replication peer found, removing: {}", peerConfig);
+        peerStorage.removePeer(peerId);
+        continue;
+      }
       peerConfig = ReplicationPeerConfigUtil.updateReplicationBasePeerConfigs(conf, peerConfig);
       peerStorage.updatePeerConfig(peerId, peerConfig);
       boolean enabled = peerStorage.isPeerEnabled(peerId);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 9f8d8dc..7a16e35 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -342,8 +342,14 @@ public class ReplicationSourceManager {
    * @param peerId the id of the replication peer
    * @return the source that was created
    */
-  ReplicationSourceInterface addSource(String peerId) throws IOException {
+  void addSource(String peerId) throws IOException {
     ReplicationPeer peer = replicationPeers.getPeer(peerId);
+    if (ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME
+      .equals(peer.getPeerConfig().getReplicationEndpointImpl())) {
+      // we do not use this endpoint for region replication any more, see HBASE-26233
+      LOG.warn("Legacy region replication peer found, skip adding: {}", peer.getPeerConfig());
+      return;
+    }
     ReplicationSourceInterface src = createSource(peerId, peer);
     // synchronized on latestPaths to avoid missing the new log
     synchronized (this.latestPaths) {
@@ -370,7 +376,6 @@ public class ReplicationSourceManager {
       syncReplicationPeerMappingManager.add(peer.getId(), peerConfig);
     }
     src.startup();
-    return src;
   }
 
   /**
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFallbackToUseReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFallbackToUseReplay.java
new file mode 100644
index 0000000..35c4d95
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFallbackToUseReplay.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.Interface;
+
+/**
+ * Make sure we could fallback to use replay method if replicateToReplica method is not present,
+ * i.e, we are connecting an old region server.
+ */
+@Category({ RegionServerTests.class, SmallTests.class })
+public class TestFallbackToUseReplay {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestFallbackToUseReplay.class);
+
+  private static Configuration CONF = HBaseConfiguration.create();
+
+  private static AsyncClusterConnectionImpl CONN;
+
+  private static AsyncRegionReplicationRetryingCaller CALLER;
+
+  private static RegionInfo REPLICA =
+    RegionInfoBuilder.newBuilder(TableName.valueOf("test")).setReplicaId(1).build();
+
+  private static AtomicBoolean REPLAY_CALLED = new AtomicBoolean(false);
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws IOException {
+    CONF.setInt(AsyncConnectionConfiguration.START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
+    AsyncRegionLocator locator = mock(AsyncRegionLocator.class);
+    when(locator.getRegionLocation(any(), any(), anyInt(), any(), anyLong()))
+      .thenReturn(CompletableFuture.completedFuture(new HRegionLocation(REPLICA,
+        ServerName.valueOf("localhost", 12345, EnvironmentEdgeManager.currentTime()))));
+    AdminService.Interface stub = mock(AdminService.Interface.class);
+    // fail the call to replicateToReplica
+    doAnswer(i -> {
+      HBaseRpcController controller = i.getArgument(0, HBaseRpcController.class);
+      controller.setFailed(new DoNotRetryIOException(new UnsupportedOperationException()));
+      RpcCallback<?> done = i.getArgument(2, RpcCallback.class);
+      done.run(null);
+      return null;
+    }).when(stub).replicateToReplica(any(), any(), any());
+    doAnswer(i -> {
+      REPLAY_CALLED.set(true);
+      RpcCallback<?> done = i.getArgument(2, RpcCallback.class);
+      done.run(null);
+      return null;
+    }).when(stub).replay(any(), any(), any());
+    CONN = new AsyncClusterConnectionImpl(CONF, mock(ConnectionRegistry.class), "test", null,
+      User.getCurrent()) {
+
+      @Override
+      AsyncRegionLocator getLocator() {
+        return locator;
+      }
+
+      @Override
+      Interface getAdminStub(ServerName serverName) throws IOException {
+        return stub;
+      }
+    };
+    CALLER = new AsyncRegionReplicationRetryingCaller(AsyncClusterConnectionImpl.RETRY_TIMER, CONN,
+      10, TimeUnit.SECONDS.toNanos(1), TimeUnit.SECONDS.toNanos(10), REPLICA,
+      Collections.emptyList());
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws IOException {
+    Closeables.close(CONN, true);
+  }
+
+  @Test
+  public void testFallback() {
+    CALLER.call().join();
+    assertTrue(REPLAY_CALLED.get());
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestStartupWithLegacyRegionReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestStartupWithLegacyRegionReplicationEndpoint.java
new file mode 100644
index 0000000..2c75d37
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestStartupWithLegacyRegionReplicationEndpoint.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.regionreplication;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Make sure we could start the cluster with RegionReplicaReplicationEndpoint configured.
+ */
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestStartupWithLegacyRegionReplicationEndpoint {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestStartupWithLegacyRegionReplicationEndpoint.class);
+
+  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    UTIL.startMiniCluster(1);
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void test() throws Exception {
+    ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
+      .setClusterKey("127.0.0.1:2181:/hbase")
+      .setReplicationEndpointImpl(ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME).build();
+    // can not use Admin.addPeer as it will fail with ClassNotFound
+    UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().addPeer("legacy", peerConfig,
+      true);
+    UTIL.getMiniHBaseCluster().stopRegionServer(0);
+    RegionServerThread rst = UTIL.getMiniHBaseCluster().startRegionServer();
+    // we should still have this peer
+    assertNotNull(UTIL.getAdmin().getReplicationPeerConfig("legacy"));
+    // but at RS side, we should not have this peer loaded as replication source
+    assertTrue(rst.getRegionServer().getReplicationSourceService().getReplicationManager()
+      .getSources().isEmpty());
+    UTIL.shutdownMiniHBaseCluster();
+    UTIL.restartHBaseCluster(1);
+    // now we should have removed the peer
+    assertThrows(ReplicationPeerNotFoundException.class,
+      () -> UTIL.getAdmin().getReplicationPeerConfig("legacy"));
+    // at rs side, we should not have the peer this time, not only for not having replication source
+    assertTrue(UTIL.getMiniHBaseCluster().getRegionServer(0).getReplicationSourceService()
+      .getReplicationManager().getReplicationPeers().getAllPeerIds().isEmpty());
+  }
+}