You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/05/08 12:17:11 UTC

[30/31] hbase git commit: HBASE-20432 Cleanup related resources when remove a sync replication peer

HBASE-20432 Cleanup related resources when remove a sync replication peer


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/56b7707d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/56b7707d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/56b7707d

Branch: refs/heads/HBASE-19064
Commit: 56b7707d4f80e64550c3de0fd103de23b86d7f50
Parents: 7adb055
Author: huzheng <op...@gmail.com>
Authored: Wed Apr 18 20:38:33 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Tue May 8 20:15:45 2018 +0800

----------------------------------------------------------------------
 .../master/replication/RemovePeerProcedure.java | 10 +++++
 .../ReplaySyncReplicationWALManager.java        |  8 ++++
 .../replication/SyncReplicationTestBase.java    | 45 +++++++++++++++++---
 .../replication/TestSyncReplicationActive.java  |  9 ++--
 .../replication/TestSyncReplicationStandBy.java | 31 ++++++++++++--
 5 files changed, 89 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/56b7707d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
index 82dc07e..7335fe0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
@@ -66,9 +66,19 @@ public class RemovePeerProcedure extends ModifyPeerProcedure {
     env.getReplicationPeerManager().removePeer(peerId);
   }
 
+  private void removeRemoteWALs(MasterProcedureEnv env) throws IOException {
+    ReplaySyncReplicationWALManager remoteWALManager =
+        env.getMasterServices().getReplaySyncReplicationWALManager();
+    remoteWALManager.removePeerRemoteWALs(peerId);
+    remoteWALManager.removePeerReplayWALDir(peerId);
+  }
+
   @Override
   protected void postPeerModification(MasterProcedureEnv env)
       throws IOException, ReplicationException {
+    if (peerConfig.isSyncReplication()) {
+      removeRemoteWALs(env);
+    }
     env.getReplicationPeerManager().removeAllQueuesAndHFileRefs(peerId);
     if (peerConfig.isSerial()) {
       env.getReplicationPeerManager().removeAllLastPushedSeqIds(peerId);

http://git-wip-us.apache.org/repos/asf/hbase/blob/56b7707d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java
index 72f5c37..eac5aa4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java
@@ -115,6 +115,14 @@ public class ReplaySyncReplicationWALManager {
     }
   }
 
+  public void removePeerRemoteWALs(String peerId) throws IOException {
+    Path remoteWALDir = getPeerRemoteWALDir(peerId);
+    if (fs.exists(remoteWALDir) && !fs.delete(remoteWALDir, true)) {
+      throw new IOException(
+          "Failed to remove remote WALs dir " + remoteWALDir + " for peer id=" + peerId);
+    }
+  }
+
   public void initPeerWorkers(String peerId) {
     BlockingQueue<ServerName> servers = new LinkedBlockingQueue<>();
     services.getServerManager().getOnlineServers().keySet()

http://git-wip-us.apache.org/repos/asf/hbase/blob/56b7707d/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
index 0d5fce8..de679be 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -71,6 +72,10 @@ public class SyncReplicationTestBase {
 
   protected static String PEER_ID = "1";
 
+  protected static Path remoteWALDir1;
+
+  protected static Path remoteWALDir2;
+
   private static void initTestingUtility(HBaseTestingUtility util, String zkParent) {
     util.setZkCluster(ZK_UTIL.getZkCluster());
     Configuration conf = util.getConfiguration();
@@ -104,11 +109,11 @@ public class SyncReplicationTestBase {
     UTIL2.getAdmin().createTable(td);
     FileSystem fs1 = UTIL1.getTestFileSystem();
     FileSystem fs2 = UTIL2.getTestFileSystem();
-    Path remoteWALDir1 =
-      new Path(UTIL1.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir(),
+    remoteWALDir1 =
+      new Path(UTIL1.getMiniHBaseCluster().getMaster().getMasterFileSystem().getWALRootDir(),
         "remoteWALs").makeQualified(fs1.getUri(), fs1.getWorkingDirectory());
-    Path remoteWALDir2 =
-      new Path(UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir(),
+    remoteWALDir2 =
+      new Path(UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem().getWALRootDir(),
         "remoteWALs").makeQualified(fs2.getUri(), fs2.getWorkingDirectory());
     UTIL1.getAdmin().addReplicationPeer(PEER_ID,
       ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey())
@@ -188,7 +193,37 @@ public class SyncReplicationTestBase {
 
   protected final Path getRemoteWALDir(MasterFileSystem mfs, String peerId) {
     Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME);
-    return new Path(remoteWALDir, PEER_ID);
+    return getRemoteWALDir(remoteWALDir, peerId);
+  }
+
+  protected Path getRemoteWALDir(Path remoteWALDir, String peerId) {
+    return new Path(remoteWALDir, peerId);
+  }
+
+  protected Path getReplayRemoteWALs(Path remoteWALDir, String peerId) {
+    return new Path(remoteWALDir, peerId + "-replay");
+  }
+
+  protected void verifyRemovedPeer(String peerId, Path remoteWALDir, HBaseTestingUtility utility)
+      throws Exception {
+    ReplicationPeerStorage rps = ReplicationStorageFactory
+        .getReplicationPeerStorage(utility.getZooKeeperWatcher(), utility.getConfiguration());
+    try {
+      rps.getPeerSyncReplicationState(peerId);
+      fail("Should throw exception when get the sync replication state of a removed peer.");
+    } catch (NullPointerException e) {
+      // ignore.
+    }
+    try {
+      rps.getPeerNewSyncReplicationState(peerId);
+      fail("Should throw exception when get the new sync replication state of a removed peer");
+    } catch (NullPointerException e) {
+      // ignore.
+    }
+    try (FileSystem fs = utility.getTestFileSystem()) {
+      Assert.assertFalse(fs.exists(getRemoteWALDir(remoteWALDir, peerId)));
+      Assert.assertFalse(fs.exists(getReplayRemoteWALs(remoteWALDir, peerId)));
+    }
   }
 
   protected void verifyReplicationRequestRejection(HBaseTestingUtility utility,

http://git-wip-us.apache.org/repos/asf/hbase/blob/56b7707d/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
index f9020a0..b663c44 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
@@ -58,7 +58,7 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase {
     verifyNotReplicatedThroughRegion(UTIL2, 0, 100);
 
     // Ensure that there's no cluster id in remote log entries.
-    verifyNoClusterIdInRemoteLog(UTIL2, PEER_ID);
+    verifyNoClusterIdInRemoteLog(UTIL2, remoteWALDir2, PEER_ID);
 
     UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
       SyncReplicationState.DOWNGRADE_ACTIVE);
@@ -84,12 +84,9 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase {
     write(UTIL2, 200, 300);
   }
 
-  private void verifyNoClusterIdInRemoteLog(HBaseTestingUtility utility, String peerId)
-      throws Exception {
+  private void verifyNoClusterIdInRemoteLog(HBaseTestingUtility utility, Path remoteDir,
+      String peerId) throws Exception {
     FileSystem fs2 = utility.getTestFileSystem();
-    Path remoteDir =
-        new Path(utility.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir(),
-            "remoteWALs").makeQualified(fs2.getUri(), fs2.getWorkingDirectory());
     FileStatus[] files = fs2.listStatus(new Path(remoteDir, peerId));
     Assert.assertTrue(files.length > 0);
     for (FileStatus file : files) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/56b7707d/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java
index ed61d2a..8526af8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java
@@ -24,6 +24,8 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.Arrays;
+
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -39,6 +41,7 @@ import org.apache.hadoop.hbase.master.MasterFileSystem;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -84,13 +87,35 @@ public class TestSyncReplicationStandBy extends SyncReplicationTestBase {
       assertDisallow(table,
         t -> t.get(Arrays.asList(new Get(Bytes.toBytes("row")), new Get(Bytes.toBytes("row1")))));
       assertDisallow(table,
-        t -> t
-          .put(Arrays.asList(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row")),
+        t -> t.put(
+          Arrays.asList(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row")),
             new Put(Bytes.toBytes("row1")).addColumn(CF, CQ, Bytes.toBytes("row1")))));
       assertDisallow(table, t -> t.mutateRow(new RowMutations(Bytes.toBytes("row"))
-        .add((Mutation) new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row")))));
+          .add((Mutation) new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row")))));
     }
     // We should still allow replication writes
     writeAndVerifyReplication(UTIL1, UTIL2, 0, 100);
+
+    // Remove the peers in ACTIVE & STANDBY cluster.
+    FileSystem fs2 = remoteWALDir2.getFileSystem(UTIL2.getConfiguration());
+    Assert.assertTrue(fs2.exists(getRemoteWALDir(remoteWALDir2, PEER_ID)));
+
+    UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+      SyncReplicationState.DOWNGRADE_ACTIVE);
+    Assert.assertFalse(fs2.exists(getRemoteWALDir(remoteWALDir2, PEER_ID)));
+    Assert.assertFalse(fs2.exists(getReplayRemoteWALs(remoteWALDir2, PEER_ID)));
+
+    UTIL1.getAdmin().removeReplicationPeer(PEER_ID);
+    verifyRemovedPeer(PEER_ID, remoteWALDir1, UTIL1);
+
+    // Peer remoteWAL dir will be renamed to replay WAL dir when transit from S to DA, and the
+    // replay WAL dir will be removed after replaying all WALs, so create a emtpy dir here to test
+    // whether the removeReplicationPeer would remove the remoteWAL dir.
+    fs2.create(getRemoteWALDir(remoteWALDir2, PEER_ID));
+    fs2.create(getReplayRemoteWALs(remoteWALDir2, PEER_ID));
+    Assert.assertTrue(fs2.exists(getRemoteWALDir(remoteWALDir2, PEER_ID)));
+    Assert.assertTrue(fs2.exists(getReplayRemoteWALs(remoteWALDir2, PEER_ID)));
+    UTIL2.getAdmin().removeReplicationPeer(PEER_ID);
+    verifyRemovedPeer(PEER_ID, remoteWALDir2, UTIL2);
   }
 }