You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/11/07 08:27:11 UTC
hbase git commit: HBASE-21441 NPE if RS restarts between
REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN and
TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE
Repository: hbase
Updated Branches:
refs/heads/master 86cbbdea9 -> 6d46b8d25
HBASE-21441 NPE if RS restarts between REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN and TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6d46b8d2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6d46b8d2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6d46b8d2
Branch: refs/heads/master
Commit: 6d46b8d256bcd63349ea83e4a588b879a122854a
Parents: 86cbbde
Author: zhangduo <zh...@apache.org>
Authored: Tue Nov 6 22:06:04 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Wed Nov 7 16:10:42 2018 +0800
----------------------------------------------------------------------
.../replication/regionserver/Replication.java | 11 ++
...yncReplicationNewRSJoinBetweenRefreshes.java | 125 +++++++++++++++++++
2 files changed, 136 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/6d46b8d2/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index b04f0cb..799d975 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
import org.apache.hadoop.hbase.wal.WALProvider;
@@ -137,6 +138,16 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
SyncReplicationWALProvider syncWALProvider = (SyncReplicationWALProvider) walProvider;
peerActionListener = syncWALProvider;
syncWALProvider.setPeerInfoProvider(syncReplicationPeerInfoProvider);
+ // for sync replication state change, we need to reload the state twice, you can see the
+ // code in PeerProcedureHandlerImpl, so here we need to go over the sync replication peers
+ // to see if any of them are in the middle of the two refreshes, if so, we need to manually
+ // repeat the action we have done in the first refresh, otherwise when the second refresh
+ // comes we will be in trouble, such as NPE.
+ replicationPeers.getAllPeerIds().stream().map(replicationPeers::getPeer)
+ .filter(p -> p.getPeerConfig().isSyncReplication())
+ .filter(p -> p.getNewSyncReplicationState() != SyncReplicationState.NONE)
+ .forEach(p -> syncWALProvider.peerSyncReplicationStateChange(p.getId(),
+ p.getSyncReplicationState(), p.getNewSyncReplicationState(), 0));
}
}
this.statsThreadPeriod =
http://git-wip-us.apache.org/repos/asf/hbase/blob/6d46b8d2/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationNewRSJoinBetweenRefreshes.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationNewRSJoinBetweenRefreshes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationNewRSJoinBetweenRefreshes.java
new file mode 100644
index 0000000..86ad8c0
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationNewRSJoinBetweenRefreshes.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER_VALUE;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
+import org.apache.hadoop.hbase.master.replication.TransitPeerSyncReplicationStateProcedure;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Testcase for HBASE-21441.
+ */
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestSyncReplicationNewRSJoinBetweenRefreshes extends SyncReplicationTestBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestSyncReplicationNewRSJoinBetweenRefreshes.class);
+
+ private static boolean HALT;
+
+ private static CountDownLatch ARRIVE;
+
+ private static CountDownLatch RESUME;
+
+ public static final class HaltCP implements RegionServerObserver, RegionServerCoprocessor {
+
+ @Override
+ public Optional<RegionServerObserver> getRegionServerObserver() {
+ return Optional.of(this);
+ }
+
+ @Override
+ public void postExecuteProcedures(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
+ throws IOException {
+ synchronized (HaltCP.class) {
+ if (!HALT) {
+ return;
+ }
+ UTIL1.getMiniHBaseCluster().getMaster().getProcedures().stream()
+ .filter(p -> p instanceof TransitPeerSyncReplicationStateProcedure)
+ .filter(p -> !p.isFinished()).map(p -> (TransitPeerSyncReplicationStateProcedure) p)
+ .findFirst().ifPresent(proc -> {
+ // this is the next state of REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN_VALUE
+ if (proc.getCurrentStateId() == REOPEN_ALL_REGIONS_IN_PEER_VALUE) {
+ // tell the main thread to start a new region server
+ ARRIVE.countDown();
+ try {
+ // wait for the region server to online
+ RESUME.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ HALT = false;
+ }
+ });
+ }
+ }
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ UTIL1.getConfiguration().setClass(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY,
+ HaltCP.class, RegionServerObserver.class);
+ SyncReplicationTestBase.setUp();
+ }
+
+ @Test
+ public void test() throws IOException, InterruptedException {
+ UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.STANDBY);
+ UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.ACTIVE);
+
+ ARRIVE = new CountDownLatch(1);
+ RESUME = new CountDownLatch(1);
+ HALT = true;
+ Thread t = new Thread(() -> {
+ try {
+ UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.DOWNGRADE_ACTIVE);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ });
+ t.start();
+ ARRIVE.await();
+ UTIL1.getMiniHBaseCluster().startRegionServer();
+ RESUME.countDown();
+ t.join();
+ assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE,
+ UTIL1.getAdmin().getReplicationPeerSyncReplicationState(PEER_ID));
+ }
+}