You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2016/06/17 05:01:58 UTC
[16/30] hbase git commit: HBASE-15974 Create a
ReplicationQueuesClientHBaseImpl
http://git-wip-us.apache.org/repos/asf/hbase/blob/2093aade/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java
index bd6d070..346ff37 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java
@@ -39,6 +39,7 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -58,15 +59,20 @@ public class TestReplicationStateHBaseImpl {
private static ReplicationQueues rq1;
private static ReplicationQueues rq2;
private static ReplicationQueues rq3;
+ private static ReplicationQueuesClient rqc;
private static ReplicationPeers rp;
- private static final String server1 = ServerName.valueOf("hostname1.example.org", 1234, 123L)
- .toString();
+
+ private static final String server0 = ServerName.valueOf("hostname0.example.org", 1234, -1L)
+ .toString();
+ private static final String server1 = ServerName.valueOf("hostname1.example.org", 1234, 1L)
+ .toString();
private static final String server2 = ServerName.valueOf("hostname2.example.org", 1234, 1L)
- .toString();
+ .toString();
private static final String server3 = ServerName.valueOf("hostname3.example.org", 1234, 1L)
- .toString();
+ .toString();
+ private static DummyServer ds0;
private static DummyServer ds1;
private static DummyServer ds2;
private static DummyServer ds3;
@@ -77,9 +83,9 @@ public class TestReplicationStateHBaseImpl {
utility.startMiniCluster();
conf = utility.getConfiguration();
conf.setClass("hbase.region.replica.replication.ReplicationQueuesType",
- ReplicationQueuesHBaseImpl.class, ReplicationQueues.class);
- conf.setClass("hbase.region.replica.replication.ReplicationQueuesType",
- ReplicationQueuesHBaseImpl.class, ReplicationQueues.class);
+ TableBasedReplicationQueuesImpl.class, ReplicationQueues.class);
+ conf.setClass("hbase.region.replica.replication.ReplicationQueuesClientType",
+ TableBasedReplicationQueuesClientImpl.class, ReplicationQueuesClient.class);
zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
replicationZNode = ZKUtil.joinZNode(zkw.baseZNode, replicationZNodeName);
@@ -88,6 +94,9 @@ public class TestReplicationStateHBaseImpl {
@Before
public void setUp() {
try {
+ ds0 = new DummyServer(server0);
+ rqc = ReplicationFactory.getReplicationQueuesClient(new ReplicationQueuesClientArguments(
+ conf, ds0));
ds1 = new DummyServer(server1);
rq1 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds1, zkw));
rq1.init(server1);
@@ -99,9 +108,6 @@ public class TestReplicationStateHBaseImpl {
rq3.init(server3);
rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw);
rp.init();
- rp.addPeer("Queue1", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus1"));
- rp.addPeer("Queue2", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus2"));
- rp.addPeer("Queue3", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus3"));
} catch (Exception e) {
fail("testReplicationStateHBaseConstruction received an exception" + e.getMessage());
}
@@ -165,13 +171,13 @@ public class TestReplicationStateHBaseImpl {
try {
rq1.getLogPosition("Queue1", "NotHereWAL");
fail("Replication queue should have thrown a ReplicationException for reading from a " +
- "non-existent WAL");
+ "non-existent WAL");
} catch (ReplicationException e) {
}
try {
rq1.getLogPosition("NotHereQueue", "NotHereWAL");
fail("Replication queue should have thrown a ReplicationException for reading from a " +
- "non-existent queue");
+ "non-existent queue");
} catch (ReplicationException e) {
}
// Test removing logs
@@ -198,6 +204,13 @@ public class TestReplicationStateHBaseImpl {
@Test
public void TestMultipleReplicationQueuesHBaseImpl () {
try {
+ rp.addPeer("Queue1", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus1"));
+ rp.addPeer("Queue2", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus2"));
+ rp.addPeer("Queue3", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus3"));
+ } catch (ReplicationException e) {
+ fail("Failed to add peers to ReplicationPeers");
+ }
+ try {
// Test adding in WAL files
rq1.addLog("Queue1", "WALLogFile1.1");
rq1.addLog("Queue1", "WALLogFile1.2");
@@ -298,6 +311,56 @@ public class TestReplicationStateHBaseImpl {
}
}
+ @Test
+ public void TestReplicationQueuesClient() throws Exception{
+
+ // Test ReplicationQueuesClient log tracking
+ rq1.addLog("Queue1", "WALLogFile1.1");
+ assertEquals(1, rqc.getLogsInQueue(server1, "Queue1").size());
+ rq1.removeLog("Queue1", "WALLogFile1.1");
+ assertEquals(0, rqc.getLogsInQueue(server1, "Queue1").size());
+ rq2.addLog("Queue2", "WALLogFile2.1");
+ rq2.addLog("Queue2", "WALLogFile2.2");
+ assertEquals(2, rqc.getLogsInQueue(server2, "Queue2").size());
+ rq3.addLog("Queue1", "WALLogFile1.1");
+ rq3.addLog("Queue3", "WALLogFile3.1");
+ rq3.addLog("Queue3", "WALLogFile3.2");
+
+ // Test ReplicationQueueClient log tracking for faulty cases
+ assertEquals(0, ds0.getAbortCount());
+ assertNull(rqc.getLogsInQueue("NotHereServer", "NotHereQueue"));
+ assertNull(rqc.getLogsInQueue(server1, "NotHereQueue"));
+ assertNull(rqc.getLogsInQueue("NotHereServer", "WALLogFile1.1"));
+ assertEquals(3, ds0.getAbortCount());
+ // Test ReplicationQueueClient replicators
+ List<String> replicators = rqc.getListOfReplicators();
+ assertEquals(3, replicators.size());
+ assertTrue(replicators.contains(server1));
+ assertTrue(replicators.contains(server2));
+ rq1.removeQueue("Queue1");
+ assertEquals(2, rqc.getListOfReplicators().size());
+
+ // Test ReplicationQueuesClient queue tracking
+ assertEquals(0, rqc.getAllQueues(server1).size());
+ rq1.addLog("Queue2", "WALLogFile2.1");
+ rq1.addLog("Queue3", "WALLogFile3.1");
+ assertEquals(2, rqc.getAllQueues(server1).size());
+ rq1.removeAllQueues();
+ assertEquals(0, rqc.getAllQueues(server1).size());
+
+ // Test ReplicationQueuesClient queue tracking for faulty cases
+ assertEquals(0, rqc.getAllQueues("NotHereServer").size());
+
+ // Test ReplicationQueuesClient get all WAL's
+ assertEquals(5 , rqc.getAllWALs().size());
+ rq3.removeLog("Queue1", "WALLogFile1.1");
+ assertEquals(4, rqc.getAllWALs().size());
+ rq3.removeAllQueues();
+ assertEquals(2, rqc.getAllWALs().size());
+ rq2.removeAllQueues();
+ assertEquals(0, rqc.getAllWALs().size());
+ }
+
@After
public void clearQueues() throws Exception{
rq1.removeAllQueues();
@@ -306,6 +369,7 @@ public class TestReplicationStateHBaseImpl {
assertEquals(0, rq1.getAllQueues().size());
assertEquals(0, rq2.getAllQueues().size());
assertEquals(0, rq3.getAllQueues().size());
+ ds0.resetAbortCount();
ds1.resetAbortCount();
ds2.resetAbortCount();
ds3.resetAbortCount();
@@ -313,7 +377,7 @@ public class TestReplicationStateHBaseImpl {
@After
public void tearDown() throws KeeperException, IOException {
- ZKUtil.deleteNodeRecursively(zkw, replicationZNode);
+ ZKUtil.deleteNodeRecursively(zkw, replicationZNode);
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/hbase/blob/2093aade/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
index 972a400..a357a1f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
@@ -96,11 +96,13 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
rq1 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds1, zkw));
rq2 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds2, zkw));
rq3 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds3, zkw));
+ rqc = ReplicationFactory.getReplicationQueuesClient(
+ new ReplicationQueuesClientArguments(conf, ds1, zkw));
} catch (Exception e) {
- // This should not occur, because getReplicationQueues() only throws for ReplicationQueuesHBaseImpl
+ // This should not occur, because getReplicationQueues() only throws for
+ // TableBasedReplicationQueuesImpl
fail("ReplicationFactory.getReplicationQueues() threw an IO Exception");
}
- rqc = ReplicationFactory.getReplicationQueuesClient(zkw, conf, ds1);
rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw);
OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf);
rqZK = new ReplicationQueuesZKImpl(zkw, conf, ds1);
http://git-wip-us.apache.org/repos/asf/hbase/blob/2093aade/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index e14fd3c..bf47d4f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -68,6 +68,8 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
@@ -436,8 +438,9 @@ public class TestReplicationSourceManager {
s1.getZooKeeper()));
rq1.init(s1.getServerName().toString());
- ReplicationQueuesClient client =
- ReplicationFactory.getReplicationQueuesClient(s1.getZooKeeper(), s1.getConfiguration(), s1);
+ ReplicationQueuesClientZKImpl client =
+ (ReplicationQueuesClientZKImpl)ReplicationFactory.getReplicationQueuesClient(
+ new ReplicationQueuesClientArguments(s1.getConfiguration(), s1, s1.getZooKeeper()));
int v0 = client.getQueuesZNodeCversion();
rq1.claimQueues(s0.getServerName().getServerName());