You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2016/06/17 05:01:58 UTC

[16/30] hbase git commit: HBASE-15974 Create a ReplicationQueuesClientHBaseImpl

http://git-wip-us.apache.org/repos/asf/hbase/blob/2093aade/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java
index bd6d070..346ff37 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateHBaseImpl.java
@@ -39,6 +39,7 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -58,15 +59,20 @@ public class TestReplicationStateHBaseImpl {
   private static ReplicationQueues rq1;
   private static ReplicationQueues rq2;
   private static ReplicationQueues rq3;
+  private static ReplicationQueuesClient rqc;
   private static ReplicationPeers rp;
 
-  private static final String server1 = ServerName.valueOf("hostname1.example.org", 1234, 123L)
-      .toString();
+
+  private static final String server0 = ServerName.valueOf("hostname0.example.org", 1234, -1L)
+    .toString();
+  private static final String server1 = ServerName.valueOf("hostname1.example.org", 1234, 1L)
+    .toString();
   private static final String server2 = ServerName.valueOf("hostname2.example.org", 1234, 1L)
-      .toString();
+    .toString();
   private static final String server3 = ServerName.valueOf("hostname3.example.org", 1234, 1L)
-      .toString();
+    .toString();
 
+  private static DummyServer ds0;
   private static DummyServer ds1;
   private static DummyServer ds2;
   private static DummyServer ds3;
@@ -77,9 +83,9 @@ public class TestReplicationStateHBaseImpl {
     utility.startMiniCluster();
     conf = utility.getConfiguration();
     conf.setClass("hbase.region.replica.replication.ReplicationQueuesType",
-        ReplicationQueuesHBaseImpl.class, ReplicationQueues.class);
-    conf.setClass("hbase.region.replica.replication.ReplicationQueuesType",
-      ReplicationQueuesHBaseImpl.class, ReplicationQueues.class);
+      TableBasedReplicationQueuesImpl.class, ReplicationQueues.class);
+    conf.setClass("hbase.region.replica.replication.ReplicationQueuesClientType",
+      TableBasedReplicationQueuesClientImpl.class, ReplicationQueuesClient.class);
     zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
     String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
     replicationZNode = ZKUtil.joinZNode(zkw.baseZNode, replicationZNodeName);
@@ -88,6 +94,9 @@ public class TestReplicationStateHBaseImpl {
   @Before
   public void setUp() {
     try {
+      ds0 = new DummyServer(server0);
+      rqc = ReplicationFactory.getReplicationQueuesClient(new ReplicationQueuesClientArguments(
+        conf, ds0));
       ds1 = new DummyServer(server1);
       rq1 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds1, zkw));
       rq1.init(server1);
@@ -99,9 +108,6 @@ public class TestReplicationStateHBaseImpl {
       rq3.init(server3);
       rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw);
       rp.init();
-      rp.addPeer("Queue1", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus1"));
-      rp.addPeer("Queue2", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus2"));
-      rp.addPeer("Queue3", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus3"));
     } catch (Exception e) {
       fail("testReplicationStateHBaseConstruction received an exception" + e.getMessage());
     }
@@ -165,13 +171,13 @@ public class TestReplicationStateHBaseImpl {
       try {
         rq1.getLogPosition("Queue1", "NotHereWAL");
         fail("Replication queue should have thrown a ReplicationException for reading from a " +
-            "non-existent WAL");
+          "non-existent WAL");
       } catch (ReplicationException e) {
       }
       try {
         rq1.getLogPosition("NotHereQueue", "NotHereWAL");
         fail("Replication queue should have thrown a ReplicationException for reading from a " +
-            "non-existent queue");
+          "non-existent queue");
       } catch (ReplicationException e) {
       }
       // Test removing logs
@@ -198,6 +204,13 @@ public class TestReplicationStateHBaseImpl {
   @Test
   public void TestMultipleReplicationQueuesHBaseImpl () {
     try {
+      rp.addPeer("Queue1", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus1"));
+      rp.addPeer("Queue2", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus2"));
+      rp.addPeer("Queue3", new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus3"));
+    } catch (ReplicationException e) {
+      fail("Failed to add peers to ReplicationPeers");
+    }
+    try {
       // Test adding in WAL files
       rq1.addLog("Queue1", "WALLogFile1.1");
       rq1.addLog("Queue1", "WALLogFile1.2");
@@ -298,6 +311,56 @@ public class TestReplicationStateHBaseImpl {
     }
   }
 
+  @Test
+  public void TestReplicationQueuesClient() throws Exception{
+
+    // Test ReplicationQueuesClient log tracking
+    rq1.addLog("Queue1", "WALLogFile1.1");
+    assertEquals(1, rqc.getLogsInQueue(server1, "Queue1").size());
+    rq1.removeLog("Queue1", "WALLogFile1.1");
+    assertEquals(0, rqc.getLogsInQueue(server1, "Queue1").size());
+    rq2.addLog("Queue2", "WALLogFile2.1");
+    rq2.addLog("Queue2", "WALLogFile2.2");
+    assertEquals(2, rqc.getLogsInQueue(server2, "Queue2").size());
+    rq3.addLog("Queue1", "WALLogFile1.1");
+    rq3.addLog("Queue3", "WALLogFile3.1");
+    rq3.addLog("Queue3", "WALLogFile3.2");
+
+    // Test ReplicationQueueClient log tracking for faulty cases
+    assertEquals(0, ds0.getAbortCount());
+    assertNull(rqc.getLogsInQueue("NotHereServer", "NotHereQueue"));
+    assertNull(rqc.getLogsInQueue(server1, "NotHereQueue"));
+    assertNull(rqc.getLogsInQueue("NotHereServer", "WALLogFile1.1"));
+    assertEquals(3, ds0.getAbortCount());
+    // Test ReplicationQueueClient replicators
+    List<String> replicators = rqc.getListOfReplicators();
+    assertEquals(3, replicators.size());
+    assertTrue(replicators.contains(server1));
+    assertTrue(replicators.contains(server2));
+    rq1.removeQueue("Queue1");
+    assertEquals(2, rqc.getListOfReplicators().size());
+
+    // Test ReplicationQueuesClient queue tracking
+    assertEquals(0, rqc.getAllQueues(server1).size());
+    rq1.addLog("Queue2", "WALLogFile2.1");
+    rq1.addLog("Queue3", "WALLogFile3.1");
+    assertEquals(2, rqc.getAllQueues(server1).size());
+    rq1.removeAllQueues();
+    assertEquals(0, rqc.getAllQueues(server1).size());
+
+    // Test ReplicationQueuesClient queue tracking for faulty cases
+    assertEquals(0, rqc.getAllQueues("NotHereServer").size());
+
+    // Test ReplicationQueuesClient get all WAL's
+    assertEquals(5 , rqc.getAllWALs().size());
+    rq3.removeLog("Queue1", "WALLogFile1.1");
+    assertEquals(4, rqc.getAllWALs().size());
+    rq3.removeAllQueues();
+    assertEquals(2, rqc.getAllWALs().size());
+    rq2.removeAllQueues();
+    assertEquals(0, rqc.getAllWALs().size());
+  }
+
   @After
   public void clearQueues() throws Exception{
     rq1.removeAllQueues();
@@ -306,6 +369,7 @@ public class TestReplicationStateHBaseImpl {
     assertEquals(0, rq1.getAllQueues().size());
     assertEquals(0, rq2.getAllQueues().size());
     assertEquals(0, rq3.getAllQueues().size());
+    ds0.resetAbortCount();
     ds1.resetAbortCount();
     ds2.resetAbortCount();
     ds3.resetAbortCount();
@@ -313,7 +377,7 @@ public class TestReplicationStateHBaseImpl {
 
   @After
   public void tearDown() throws KeeperException, IOException {
-     ZKUtil.deleteNodeRecursively(zkw, replicationZNode);
+    ZKUtil.deleteNodeRecursively(zkw, replicationZNode);
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/hbase/blob/2093aade/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
index 972a400..a357a1f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
@@ -96,11 +96,13 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
       rq1 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds1, zkw));
       rq2 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds2, zkw));
       rq3 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds3, zkw));
+      rqc = ReplicationFactory.getReplicationQueuesClient(
+        new ReplicationQueuesClientArguments(conf, ds1, zkw));
     } catch (Exception e) {
-      // This should not occur, because getReplicationQueues() only throws for ReplicationQueuesHBaseImpl
+      // This should not occur, because getReplicationQueues() only throws for
+      // TableBasedReplicationQueuesImpl
       fail("ReplicationFactory.getReplicationQueues() threw an IO Exception");
     }
-    rqc = ReplicationFactory.getReplicationQueuesClient(zkw, conf, ds1);
     rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw);
     OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf);
     rqZK = new ReplicationQueuesZKImpl(zkw, conf, ds1);

http://git-wip-us.apache.org/repos/asf/hbase/blob/2093aade/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index e14fd3c..bf47d4f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -68,6 +68,8 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl;
 import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
 import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
@@ -436,8 +438,9 @@ public class TestReplicationSourceManager {
           s1.getZooKeeper()));
     rq1.init(s1.getServerName().toString());
 
-    ReplicationQueuesClient client =
-        ReplicationFactory.getReplicationQueuesClient(s1.getZooKeeper(), s1.getConfiguration(), s1);
+    ReplicationQueuesClientZKImpl client =
+        (ReplicationQueuesClientZKImpl)ReplicationFactory.getReplicationQueuesClient(
+        new ReplicationQueuesClientArguments(s1.getConfiguration(), s1, s1.getZooKeeper()));
 
     int v0 = client.getQueuesZNodeCversion();
     rq1.claimQueues(s0.getServerName().getServerName());