You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/03/27 10:21:21 UTC
[35/50] [abbrv] hbase git commit: HBASE-20285 Delete all last pushed
sequence ids when removing a peer or removing the serial flag for a peer
HBASE-20285 Delete all last pushed sequence ids when removing a peer or removing the serial flag for a peer
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/056c3395
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/056c3395
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/056c3395
Branch: refs/heads/HBASE-19064
Commit: 056c3395d952f9e6d9c08b734c2a970ce935ec85
Parents: 15c398f
Author: zhangduo <zh...@apache.org>
Authored: Mon Mar 26 22:17:00 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Tue Mar 27 12:20:51 2018 +0800
----------------------------------------------------------------------
.../src/main/protobuf/MasterProcedure.proto | 10 +++
.../replication/ReplicationQueueStorage.java | 5 ++
.../replication/ZKReplicationQueueStorage.java | 37 ++++++++++-
.../TestZKReplicationQueueStorage.java | 31 ++++++++-
.../replication/DisablePeerProcedure.java | 15 +++++
.../master/replication/EnablePeerProcedure.java | 15 +++++
.../master/replication/RemovePeerProcedure.java | 31 ++++++++-
.../replication/ReplicationPeerManager.java | 8 ++-
.../replication/UpdatePeerConfigProcedure.java | 3 +
.../replication/SerialReplicationTestBase.java | 19 +++++-
.../TestAddToSerialReplicationPeer.java | 28 ++-------
.../replication/TestSerialReplication.java | 66 +++++++++++++++++---
12 files changed, 226 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/056c3395/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
index f710759..b37557c 100644
--- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
@@ -421,3 +421,13 @@ message UpdatePeerConfigStateData {
required ReplicationPeer peer_config = 1;
optional ReplicationPeer old_peer_config = 2;
}
+
+message RemovePeerStateData {
+ optional ReplicationPeer peer_config = 1;
+}
+
+message EnablePeerStateData {
+}
+
+message DisablePeerStateData {
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/056c3395/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
index 99a1e97..cd37ac2 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
@@ -87,6 +87,11 @@ public interface ReplicationQueueStorage {
void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds) throws ReplicationException;
/**
+ * Remove all the max sequence id record for the given peer.
+ * @param peerId peer id
+ */
+ void removeLastSequenceIds(String peerId) throws ReplicationException;
+ /**
* Get the current position for a specific WAL in a given queue for a given regionserver.
* @param serverName the name of the regionserver
* @param queueId a String that identifies the queue
http://git-wip-us.apache.org/repos/asf/hbase/blob/056c3395/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
index 19986f1..a629da3 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
@@ -103,7 +103,8 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
*/
private final String hfileRefsZNode;
- private final String regionsZNode;
+ @VisibleForTesting
+ final String regionsZNode;
public ZKReplicationQueueStorage(ZKWatcher zookeeper, Configuration conf) {
super(zookeeper, conf);
@@ -313,6 +314,40 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
}
@Override
+ public void removeLastSequenceIds(String peerId) throws ReplicationException {
+ String suffix = "-" + peerId;
+ try {
+ StringBuilder sb = new StringBuilder(regionsZNode);
+ int regionsZNodeLength = regionsZNode.length();
+ int levelOneLength = regionsZNodeLength + 3;
+ int levelTwoLength = levelOneLength + 3;
+ List<String> levelOneDirs = ZKUtil.listChildrenNoWatch(zookeeper, regionsZNode);
+ // it is possible that levelOneDirs is null if we haven't write any last pushed sequence ids
+ // yet, so we need an extra check here.
+ if (CollectionUtils.isEmpty(levelOneDirs)) {
+ return;
+ }
+ for (String levelOne : levelOneDirs) {
+ sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(levelOne);
+ for (String levelTwo : ZKUtil.listChildrenNoWatch(zookeeper, sb.toString())) {
+ sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(levelTwo);
+ for (String znode : ZKUtil.listChildrenNoWatch(zookeeper, sb.toString())) {
+ if (znode.endsWith(suffix)) {
+ sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(znode);
+ ZKUtil.deleteNode(zookeeper, sb.toString());
+ sb.setLength(levelTwoLength);
+ }
+ }
+ sb.setLength(levelOneLength);
+ }
+ sb.setLength(regionsZNodeLength);
+ }
+ } catch (KeeperException e) {
+ throw new ReplicationException("Failed to remove all last sequence ids, peerId=" + peerId, e);
+ }
+ }
+
+ @Override
public long getWALPosition(ServerName serverName, String queueId, String fileName)
throws ReplicationException {
byte[] bytes;
http://git-wip-us.apache.org/repos/asf/hbase/blob/056c3395/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
index 5821271..74a24ac 100644
--- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
+++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
@@ -32,15 +32,17 @@ import java.util.SortedSet;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseZKTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.MD5Hash;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.AfterClass;
-import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
@@ -71,7 +73,7 @@ public class TestZKReplicationQueueStorage {
}
@After
- public void tearDownAfterTest() throws ReplicationException {
+ public void tearDownAfterTest() throws ReplicationException, KeeperException, IOException {
for (ServerName serverName : STORAGE.getListOfReplicators()) {
for (String queue : STORAGE.getAllQueues(serverName)) {
STORAGE.removeQueue(serverName, queue);
@@ -301,6 +303,29 @@ public class TestZKReplicationQueueStorage {
String encodedRegionName = "31d9792f4435b99d9fb1016f6fbc8dc7";
String expectedPath = "/hbase/replication/regions/31/d9/792f4435b99d9fb1016f6fbc8dc7-" + peerId;
String path = STORAGE.getSerialReplicationRegionPeerNode(encodedRegionName, peerId);
- Assert.assertEquals(expectedPath, path);
+ assertEquals(expectedPath, path);
+ }
+
+ @Test
+ public void testRemoveAllLastPushedSeqIdsForPeer() throws Exception {
+ String peerId = "1";
+ String peerIdToDelete = "2";
+ for (int i = 0; i < 100; i++) {
+ String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
+ STORAGE.setLastSequenceIds(peerId, ImmutableMap.of(encodedRegionName, (long) i));
+ STORAGE.setLastSequenceIds(peerIdToDelete, ImmutableMap.of(encodedRegionName, (long) i));
+ }
+ for (int i = 0; i < 100; i++) {
+ String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
+ assertEquals(i, STORAGE.getLastSequenceId(encodedRegionName, peerId));
+ assertEquals(i, STORAGE.getLastSequenceId(encodedRegionName, peerIdToDelete));
+ }
+ STORAGE.removeLastSequenceIds(peerIdToDelete);
+ for (int i = 0; i < 100; i++) {
+ String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
+ assertEquals(i, STORAGE.getLastSequenceId(encodedRegionName, peerId));
+ assertEquals(HConstants.NO_SEQNUM,
+ STORAGE.getLastSequenceId(encodedRegionName, peerIdToDelete));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/056c3395/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java
index 0871575..7bda1d7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java
@@ -20,11 +20,14 @@ package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.DisablePeerStateData;
+
/**
* The procedure for disabling a replication peer.
*/
@@ -67,4 +70,16 @@ public class DisablePeerProcedure extends ModifyPeerProcedure {
cpHost.postDisableReplicationPeer(peerId);
}
}
+
+ @Override
+ protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ super.serializeStateData(serializer);
+ serializer.serialize(DisablePeerStateData.getDefaultInstance());
+ }
+
+ @Override
+ protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ super.deserializeStateData(serializer);
+ serializer.deserialize(DisablePeerStateData.class);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/056c3395/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java
index 890462f..530d4cb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java
@@ -20,11 +20,14 @@ package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.EnablePeerStateData;
+
/**
* The procedure for enabling a replication peer.
*/
@@ -67,4 +70,16 @@ public class EnablePeerProcedure extends ModifyPeerProcedure {
cpHost.postEnableReplicationPeer(peerId);
}
}
+
+ @Override
+ protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ super.serializeStateData(serializer);
+ serializer.serialize(EnablePeerStateData.getDefaultInstance());
+ }
+
+ @Override
+ protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ super.deserializeStateData(serializer);
+ serializer.deserialize(EnablePeerStateData.class);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/056c3395/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
index 64faf2b..82dc07e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
@@ -18,13 +18,18 @@
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
+import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RemovePeerStateData;
+
/**
* The procedure for removing a replication peer.
*/
@@ -33,6 +38,8 @@ public class RemovePeerProcedure extends ModifyPeerProcedure {
private static final Logger LOG = LoggerFactory.getLogger(RemovePeerProcedure.class);
+ private ReplicationPeerConfig peerConfig;
+
public RemovePeerProcedure() {
}
@@ -51,7 +58,7 @@ public class RemovePeerProcedure extends ModifyPeerProcedure {
if (cpHost != null) {
cpHost.preRemoveReplicationPeer(peerId);
}
- env.getReplicationPeerManager().preRemovePeer(peerId);
+ peerConfig = env.getReplicationPeerManager().preRemovePeer(peerId);
}
@Override
@@ -63,10 +70,32 @@ public class RemovePeerProcedure extends ModifyPeerProcedure {
protected void postPeerModification(MasterProcedureEnv env)
throws IOException, ReplicationException {
env.getReplicationPeerManager().removeAllQueuesAndHFileRefs(peerId);
+ if (peerConfig.isSerial()) {
+ env.getReplicationPeerManager().removeAllLastPushedSeqIds(peerId);
+ }
LOG.info("Successfully removed peer {}", peerId);
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {
cpHost.postRemoveReplicationPeer(peerId);
}
}
+
+ @Override
+ protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ super.serializeStateData(serializer);
+ RemovePeerStateData.Builder builder = RemovePeerStateData.newBuilder();
+ if (peerConfig != null) {
+ builder.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig));
+ }
+ serializer.serialize(builder.build());
+ }
+
+ @Override
+ protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ super.deserializeStateData(serializer);
+ RemovePeerStateData data = serializer.deserialize(RemovePeerStateData.class);
+ if (data.hasPeerConfig()) {
+ this.peerConfig = ReplicationPeerConfigUtil.convert(data.getPeerConfig());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/056c3395/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index a0e01e0..87d0111 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -109,8 +109,8 @@ public class ReplicationPeerManager {
return desc;
}
- void preRemovePeer(String peerId) throws DoNotRetryIOException {
- checkPeerExists(peerId);
+ ReplicationPeerConfig preRemovePeer(String peerId) throws DoNotRetryIOException {
+ return checkPeerExists(peerId).getPeerConfig();
}
void preEnablePeer(String peerId) throws DoNotRetryIOException {
@@ -220,6 +220,10 @@ public class ReplicationPeerManager {
return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty();
}
+ void removeAllLastPushedSeqIds(String peerId) throws ReplicationException {
+ queueStorage.removeLastSequenceIds(peerId);
+ }
+
void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException {
// Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still
// on-going when the refresh peer config procedure is done, if a RS which has already been
http://git-wip-us.apache.org/repos/asf/hbase/blob/056c3395/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
index b7e670a..ccfd4a0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
@@ -107,6 +107,9 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure {
@Override
protected void postPeerModification(MasterProcedureEnv env)
throws IOException, ReplicationException {
+ if (oldPeerConfig.isSerial() && !peerConfig.isSerial()) {
+ env.getReplicationPeerManager().removeAllLastPushedSeqIds(peerId);
+ }
LOG.info("Successfully updated peer config of {} to {}", peerId, peerConfig);
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/056c3395/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java
index b5aae85..4b7fa87 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java
@@ -26,8 +26,13 @@ import java.util.UUID;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
@@ -129,7 +134,10 @@ public class SerialReplicationTestBase {
@After
public void tearDown() throws Exception {
- UTIL.getAdmin().removeReplicationPeer(PEER_ID);
+ Admin admin = UTIL.getAdmin();
+ for (ReplicationPeerDescription pd : admin.listReplicationPeers()) {
+ admin.removeReplicationPeer(pd.getPeerId());
+ }
rollAllWALs();
if (WRITER != null) {
WRITER.close();
@@ -233,4 +241,13 @@ public class SerialReplicationTestBase {
assertEquals(expectedEntries, count);
}
}
+
+ protected final TableName createTable() throws IOException, InterruptedException {
+ TableName tableName = TableName.valueOf(name.getMethodName());
+ UTIL.getAdmin().createTable(
+ TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
+ .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
+ UTIL.waitTableAvailable(tableName);
+ return tableName;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/056c3395/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java
index 64b5bb1..317c120 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java
@@ -21,14 +21,11 @@ import java.io.IOException;
import java.util.Collections;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
@@ -88,11 +85,7 @@ public class TestAddToSerialReplicationPeer extends SerialReplicationTestBase {
@Test
public void testAddPeer() throws Exception {
- TableName tableName = TableName.valueOf(name.getMethodName());
- UTIL.getAdmin().createTable(
- TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
- .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
- UTIL.waitTableAvailable(tableName);
+ TableName tableName = createTable();
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
@@ -118,12 +111,7 @@ public class TestAddToSerialReplicationPeer extends SerialReplicationTestBase {
.setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build();
UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true);
- TableName tableName = TableName.valueOf(name.getMethodName());
-
- UTIL.getAdmin().createTable(
- TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
- .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
- UTIL.waitTableAvailable(tableName);
+ TableName tableName = createTable();
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
@@ -159,11 +147,7 @@ public class TestAddToSerialReplicationPeer extends SerialReplicationTestBase {
.setReplicateAllUserTables(false).setSerial(true).build();
UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true);
- TableName tableName = TableName.valueOf(name.getMethodName());
- UTIL.getAdmin().createTable(
- TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
- .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
- UTIL.waitTableAvailable(tableName);
+ TableName tableName = createTable();
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
@@ -190,11 +174,7 @@ public class TestAddToSerialReplicationPeer extends SerialReplicationTestBase {
@Test
public void testDisabledTable() throws Exception {
- TableName tableName = TableName.valueOf(name.getMethodName());
- UTIL.getAdmin().createTable(
- TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
- .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
- UTIL.waitTableAvailable(tableName);
+ TableName tableName = createTable();
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
http://git-wip-us.apache.org/repos/asf/hbase/blob/056c3395/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
index 9271068..07e626b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
@@ -65,11 +65,7 @@ public class TestSerialReplication extends SerialReplicationTestBase {
@Test
public void testRegionMove() throws Exception {
- TableName tableName = TableName.valueOf(name.getMethodName());
- UTIL.getAdmin().createTable(
- TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
- .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
- UTIL.waitTableAvailable(tableName);
+ TableName tableName = createTable();
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
@@ -89,11 +85,7 @@ public class TestSerialReplication extends SerialReplicationTestBase {
@Test
public void testRegionSplit() throws Exception {
- TableName tableName = TableName.valueOf(name.getMethodName());
- UTIL.getAdmin().createTable(
- TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
- .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
- UTIL.waitTableAvailable(tableName);
+ TableName tableName = createTable();
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
@@ -204,4 +196,58 @@ public class TestSerialReplication extends SerialReplicationTestBase {
assertEquals(200, count);
}
}
+
+ @Test
+ public void testRemovePeerNothingReplicated() throws Exception {
+ TableName tableName = createTable();
+ String encodedRegionName =
+ UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo().getEncodedName();
+ ReplicationQueueStorage queueStorage =
+ UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
+ assertEquals(HConstants.NO_SEQNUM, queueStorage.getLastSequenceId(encodedRegionName, PEER_ID));
+ UTIL.getAdmin().removeReplicationPeer(PEER_ID);
+ assertEquals(HConstants.NO_SEQNUM, queueStorage.getLastSequenceId(encodedRegionName, PEER_ID));
+ }
+
+ @Test
+ public void testRemovePeer() throws Exception {
+ TableName tableName = createTable();
+ try (Table table = UTIL.getConnection().getTable(tableName)) {
+ for (int i = 0; i < 100; i++) {
+ table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
+ }
+ }
+ enablePeerAndWaitUntilReplicationDone(100);
+ checkOrder(100);
+ String encodedRegionName =
+ UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo().getEncodedName();
+ ReplicationQueueStorage queueStorage =
+ UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
+ assertTrue(queueStorage.getLastSequenceId(encodedRegionName, PEER_ID) > 0);
+ UTIL.getAdmin().removeReplicationPeer(PEER_ID);
+ // confirm that we delete the last pushed sequence id
+ assertEquals(HConstants.NO_SEQNUM, queueStorage.getLastSequenceId(encodedRegionName, PEER_ID));
+ }
+
+ @Test
+ public void testRemoveSerialFlag() throws Exception {
+ TableName tableName = createTable();
+ try (Table table = UTIL.getConnection().getTable(tableName)) {
+ for (int i = 0; i < 100; i++) {
+ table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
+ }
+ }
+ enablePeerAndWaitUntilReplicationDone(100);
+ checkOrder(100);
+ String encodedRegionName =
+ UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo().getEncodedName();
+ ReplicationQueueStorage queueStorage =
+ UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
+ assertTrue(queueStorage.getLastSequenceId(encodedRegionName, PEER_ID) > 0);
+ ReplicationPeerConfig peerConfig = UTIL.getAdmin().getReplicationPeerConfig(PEER_ID);
+ UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID,
+ ReplicationPeerConfig.newBuilder(peerConfig).setSerial(false).build());
+ // confirm that we delete the last pushed sequence id
+ assertEquals(HConstants.NO_SEQNUM, queueStorage.getLastSequenceId(encodedRegionName, PEER_ID));
+ }
}