You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/11/07 08:27:11 UTC

hbase git commit: HBASE-21441 NPE if RS restarts between REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN and TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE

Repository: hbase
Updated Branches:
  refs/heads/master 86cbbdea9 -> 6d46b8d25


HBASE-21441 NPE if RS restarts between REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN and TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6d46b8d2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6d46b8d2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6d46b8d2

Branch: refs/heads/master
Commit: 6d46b8d256bcd63349ea83e4a588b879a122854a
Parents: 86cbbde
Author: zhangduo <zh...@apache.org>
Authored: Tue Nov 6 22:06:04 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Wed Nov 7 16:10:42 2018 +0800

----------------------------------------------------------------------
 .../replication/regionserver/Replication.java   |  11 ++
 ...yncReplicationNewRSJoinBetweenRefreshes.java | 125 +++++++++++++++++++
 2 files changed, 136 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/6d46b8d2/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index b04f0cb..799d975 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.replication.ReplicationTracker;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
 import org.apache.hadoop.hbase.wal.WALProvider;
@@ -137,6 +138,16 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
         SyncReplicationWALProvider syncWALProvider = (SyncReplicationWALProvider) walProvider;
         peerActionListener = syncWALProvider;
         syncWALProvider.setPeerInfoProvider(syncReplicationPeerInfoProvider);
+        // for sync replication state change, we need to reload the state twice, you can see the
+        // code in PeerProcedureHandlerImpl, so here we need to go over the sync replication peers
+        // to see if any of them are in the middle of the two refreshes, if so, we need to manually
+        // repeat the action we have done in the first refresh, otherwise when the second refresh
+        // comes we will be in trouble, such as NPE.
+        replicationPeers.getAllPeerIds().stream().map(replicationPeers::getPeer)
+            .filter(p -> p.getPeerConfig().isSyncReplication())
+            .filter(p -> p.getNewSyncReplicationState() != SyncReplicationState.NONE)
+            .forEach(p -> syncWALProvider.peerSyncReplicationStateChange(p.getId(),
+              p.getSyncReplicationState(), p.getNewSyncReplicationState(), 0));
       }
     }
     this.statsThreadPeriod =

http://git-wip-us.apache.org/repos/asf/hbase/blob/6d46b8d2/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationNewRSJoinBetweenRefreshes.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationNewRSJoinBetweenRefreshes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationNewRSJoinBetweenRefreshes.java
new file mode 100644
index 0000000..86ad8c0
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationNewRSJoinBetweenRefreshes.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER_VALUE;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
+import org.apache.hadoop.hbase.master.replication.TransitPeerSyncReplicationStateProcedure;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Testcase for HBASE-21441.
+ */
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestSyncReplicationNewRSJoinBetweenRefreshes extends SyncReplicationTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestSyncReplicationNewRSJoinBetweenRefreshes.class);
+
+  private static boolean HALT;
+
+  private static CountDownLatch ARRIVE;
+
+  private static CountDownLatch RESUME;
+
+  public static final class HaltCP implements RegionServerObserver, RegionServerCoprocessor {
+
+    @Override
+    public Optional<RegionServerObserver> getRegionServerObserver() {
+      return Optional.of(this);
+    }
+
+    @Override
+    public void postExecuteProcedures(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
+        throws IOException {
+      synchronized (HaltCP.class) {
+        if (!HALT) {
+          return;
+        }
+        UTIL1.getMiniHBaseCluster().getMaster().getProcedures().stream()
+          .filter(p -> p instanceof TransitPeerSyncReplicationStateProcedure)
+          .filter(p -> !p.isFinished()).map(p -> (TransitPeerSyncReplicationStateProcedure) p)
+          .findFirst().ifPresent(proc -> {
+            // this is the next state of REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN_VALUE
+            if (proc.getCurrentStateId() == REOPEN_ALL_REGIONS_IN_PEER_VALUE) {
+              // tell the main thread to start a new region server
+              ARRIVE.countDown();
+              try {
+                // wait for the region server to online
+                RESUME.await();
+              } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+              }
+              HALT = false;
+            }
+          });
+      }
+    }
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    UTIL1.getConfiguration().setClass(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY,
+      HaltCP.class, RegionServerObserver.class);
+    SyncReplicationTestBase.setUp();
+  }
+
+  @Test
+  public void test() throws IOException, InterruptedException {
+    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+      SyncReplicationState.STANDBY);
+    UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+      SyncReplicationState.ACTIVE);
+
+    ARRIVE = new CountDownLatch(1);
+    RESUME = new CountDownLatch(1);
+    HALT = true;
+    Thread t = new Thread(() -> {
+      try {
+        UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+          SyncReplicationState.DOWNGRADE_ACTIVE);
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
+    });
+    t.start();
+    ARRIVE.await();
+    UTIL1.getMiniHBaseCluster().startRegionServer();
+    RESUME.countDown();
+    t.join();
+    assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE,
+      UTIL1.getAdmin().getReplicationPeerSyncReplicationState(PEER_ID));
+  }
+}