You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/05/25 06:43:04 UTC

[hbase] branch master updated: HBASE-22455 Split TestReplicationStatus

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 27c02a0  HBASE-22455 Split TestReplicationStatus
27c02a0 is described below

commit 27c02a0b95f3bc89fcdcce1d7b4bd59197cf120f
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Fri May 24 09:41:47 2019 +0800

    HBASE-22455 Split TestReplicationStatus
---
 .../hbase/replication/TestReplicationBase.java     |  14 +-
 .../hbase/replication/TestReplicationStatus.java   | 295 +++------------------
 .../TestReplicationStatusAfterLagging.java         |  68 +++++
 ...licationStatusBothNormalAndRecoveryLagging.java |  85 ++++++
 ...ationStatusSourceStartedTargetStoppedNewOp.java |  69 +++++
 ...ationStatusSourceStartedTargetStoppedNoOps.java |  60 +++++
 ...atusSourceStartedTargetStoppedWithRecovery.java |  83 ++++++
 7 files changed, 419 insertions(+), 255 deletions(-)

diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index e993a78..3091ef3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.StartMiniClusterOption;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
@@ -87,6 +88,8 @@ public class TestReplicationBase {
 
   protected static HBaseTestingUtility utility1;
   protected static HBaseTestingUtility utility2;
+  protected static final int NUM_SLAVES1 = 2;
+  protected static final int NUM_SLAVES2 = 4;
   protected static final int NB_ROWS_IN_BATCH = 100;
   protected static final int NB_ROWS_IN_BIG_BATCH =
       NB_ROWS_IN_BATCH * 10;
@@ -209,6 +212,13 @@ public class TestReplicationBase {
     utility2 = new HBaseTestingUtility(conf2);
   }
 
+  protected static void restartHBaseCluster(HBaseTestingUtility util, int numSlaves)
+      throws Exception {
+    util.shutdownMiniHBaseCluster();
+    util
+      .startMiniHBaseCluster(StartMiniClusterOption.builder().numRegionServers(numSlaves).build());
+  }
+
   protected static void startClusters() throws Exception{
     utility1.startMiniZKCluster();
     MiniZooKeeperCluster miniZK = utility1.getZkCluster();
@@ -224,10 +234,10 @@ public class TestReplicationBase {
     LOG.info("Setup second Zk");
 
     CONF_WITH_LOCALFS = HBaseConfiguration.create(conf1);
-    utility1.startMiniCluster(2);
+    utility1.startMiniCluster(NUM_SLAVES1);
     // Have a bunch of slave servers, because inter-cluster shipping logic uses number of sinks
     // as a component in deciding maximum number of parallel batches to send to the peer cluster.
-    utility2.startMiniCluster(4);
+    utility2.startMiniCluster(NUM_SLAVES2);
 
     hbaseAdmin = ConnectionFactory.createConnection(conf1).getAdmin();
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java
index c778f52..a305b66 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java
@@ -18,15 +18,14 @@
 package org.apache.hadoop.hbase.replication;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.util.EnumSet;
 import java.util.List;
+import org.apache.hadoop.hbase.ClusterMetrics;
 import org.apache.hadoop.hbase.ClusterMetrics.Option;
-import org.apache.hadoop.hbase.ClusterStatus;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.ServerMetrics;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Put;
@@ -34,279 +33,69 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-@Category({ReplicationTests.class, MediumTests.class})
+@Category({ ReplicationTests.class, MediumTests.class })
 public class TestReplicationStatus extends TestReplicationBase {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestReplicationStatus.class);
-
-  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStatus.class);
-  private static final String PEER_ID = "2";
+    HBaseClassTestRule.forClass(TestReplicationStatus.class);
 
   /**
-   * Test for HBASE-9531
-   * put a few rows into htable1, which should be replicated to htable2
-   * create a ClusterStatus instance 'status' from HBaseAdmin
-   * test : status.getLoad(server).getReplicationLoadSourceList()
+   * Test for HBASE-9531.
+   * <p/>
+   * put a few rows into htable1, which should be replicated to htable2 <br/>
+   * create a ClusterStatus instance 'status' from HBaseAdmin <br/>
+   * test : status.getLoad(server).getReplicationLoadSourceList() <br/>
    * test : status.getLoad(server).getReplicationLoadSink()
-   * * @throws Exception
    */
   @Test
   public void testReplicationStatus() throws Exception {
-    LOG.info("testReplicationStatus");
-    utility2.shutdownMiniHBaseCluster();
-    utility2.startMiniHBaseCluster(1,4);
-    try (Admin hbaseAdmin = utility1.getConnection().getAdmin()) {
-      // disable peer
-      admin.disablePeer(PEER_ID);
-
-      final byte[] qualName = Bytes.toBytes("q");
-      Put p;
-
-      for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
-        p = new Put(Bytes.toBytes("row" + i));
-        p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
-        htable1.put(p);
-      }
-
-      ClusterStatus status = new ClusterStatus(hbaseAdmin.getClusterMetrics(
-        EnumSet.of(Option.LIVE_SERVERS)));
-
-      for (JVMClusterUtil.RegionServerThread thread : utility1.getHBaseCluster()
-          .getRegionServerThreads()) {
-        ServerName server = thread.getRegionServer().getServerName();
-        ServerLoad sl = status.getLoad(server);
-        List<ReplicationLoadSource> rLoadSourceList = sl.getReplicationLoadSourceList();
-        ReplicationLoadSink rLoadSink = sl.getReplicationLoadSink();
-
-        // check SourceList only has one entry, beacuse only has one peer
-        assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() == 1));
-        assertEquals(PEER_ID, rLoadSourceList.get(0).getPeerID());
-
-        // check Sink exist only as it is difficult to verify the value on the fly
-        assertTrue("failed to get ReplicationLoadSink.AgeOfLastShippedOp ",
-          (rLoadSink.getAgeOfLastAppliedOp() >= 0));
-        assertTrue("failed to get ReplicationLoadSink.TimeStampsOfLastAppliedOp ",
-          (rLoadSink.getTimestampsOfLastAppliedOp() >= 0));
-      }
+    Admin hbaseAdmin = utility1.getAdmin();
+    // disable peer
+    hbaseAdmin.disableReplicationPeer(PEER_ID2);
 
-      // Stop rs1, then the queue of rs1 will be transfered to rs0
-      utility1.getHBaseCluster().getRegionServer(1).stop("Stop RegionServer");
-      Thread.sleep(10000);
-      status = new ClusterStatus(hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)));
-      ServerName server = utility1.getHBaseCluster().getRegionServer(0).getServerName();
-      ServerLoad sl = status.getLoad(server);
-      List<ReplicationLoadSource> rLoadSourceList = sl.getReplicationLoadSourceList();
-      // check SourceList still only has one entry
-      assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() == 2));
-      assertEquals(PEER_ID, rLoadSourceList.get(0).getPeerID());
-    } finally {
-      admin.enablePeer(PEER_ID);
-      utility1.getHBaseCluster().getRegionServer(1).start();
-    }
-  }
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    //we need to perform initialisations from TestReplicationBase.setUpBeforeClass() on each
-    //test here, so we override BeforeClass to do nothing and call
-    // TestReplicationBase.setUpBeforeClass() from setup method
-    TestReplicationBase.configureClusters();
-  }
-
-  @Before
-  @Override
-  public void setUpBase() throws Exception {
-    TestReplicationBase.startClusters();
-    super.setUpBase();
-  }
+    final byte[] qualName = Bytes.toBytes("q");
+    Put p;
 
-  @After
-  @Override
-  public void tearDownBase() throws Exception {
-    utility2.shutdownMiniCluster();
-    utility1.shutdownMiniCluster();
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass(){
-    //We need to override it here to avoid issues when trying to execute super class teardown
-  }
-
-  @Test
-  public void testReplicationStatusSourceStartedTargetStoppedNoOps() throws Exception {
-    utility2.shutdownMiniHBaseCluster();
-    utility1.shutdownMiniHBaseCluster();
-    utility1.startMiniHBaseCluster();
-    Admin hbaseAdmin = utility1.getConnection().getAdmin();
-    ServerName serverName = utility1.getHBaseCluster().
-        getRegionServer(0).getServerName();
-    Thread.sleep(10000);
-    ClusterStatus status = new ClusterStatus(hbaseAdmin.
-        getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)));
-    List<ReplicationLoadSource> loadSources = status.getLiveServerMetrics().
-        get(serverName).getReplicationLoadSourceList();
-    assertEquals(1, loadSources.size());
-    ReplicationLoadSource loadSource = loadSources.get(0);
-    assertFalse(loadSource.hasEditsSinceRestart());
-    assertEquals(0, loadSource.getTimestampOfLastShippedOp());
-    assertEquals(0, loadSource.getReplicationLag());
-    assertFalse(loadSource.isRecovered());
-  }
-
-  @Test
-  public void testReplicationStatusSourceStartedTargetStoppedNewOp() throws Exception {
-    utility2.shutdownMiniHBaseCluster();
-    utility1.shutdownMiniHBaseCluster();
-    utility1.startMiniHBaseCluster();
-    Admin hbaseAdmin = utility1.getConnection().getAdmin();
-    //add some values to source cluster
     for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
-      Put p = new Put(Bytes.toBytes("row" + i));
-      p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i));
+      p = new Put(Bytes.toBytes("row" + i));
+      p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
       htable1.put(p);
     }
-    Thread.sleep(10000);
-    ServerName serverName = utility1.getHBaseCluster().
-        getRegionServer(0).getServerName();
-    ClusterStatus status = new ClusterStatus(hbaseAdmin.
-        getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)));
-    List<ReplicationLoadSource> loadSources = status.getLiveServerMetrics().
-        get(serverName).getReplicationLoadSourceList();
-    assertEquals(1, loadSources.size());
-    ReplicationLoadSource loadSource = loadSources.get(0);
-    assertTrue(loadSource.hasEditsSinceRestart());
-    assertEquals(0, loadSource.getTimestampOfLastShippedOp());
-    assertTrue(loadSource.getReplicationLag()>0);
-    assertFalse(loadSource.isRecovered());
-  }
 
-  @Test
-  public void testReplicationStatusSourceStartedTargetStoppedWithRecovery() throws Exception {
-    utility2.shutdownMiniHBaseCluster();
-    utility1.shutdownMiniHBaseCluster();
-    utility1.startMiniHBaseCluster();
-    //add some values to cluster 1
-    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
-      Put p = new Put(Bytes.toBytes("row" + i));
-      p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i));
-      htable1.put(p);
-    }
-    Thread.sleep(10000);
-    utility1.shutdownMiniHBaseCluster();
-    utility1.startMiniHBaseCluster();
-    Admin hbaseAdmin = utility1.getConnection().getAdmin();
-    ServerName serverName = utility1.getHBaseCluster().
-        getRegionServer(0).getServerName();
-    Thread.sleep(10000);
-    ClusterStatus status = new ClusterStatus(hbaseAdmin.
-        getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)));
-    List<ReplicationLoadSource> loadSources = status.getLiveServerMetrics().
-        get(serverName).getReplicationLoadSourceList();
-    assertEquals(2, loadSources.size());
-    boolean foundRecovery = false;
-    boolean foundNormal = false;
-    for(ReplicationLoadSource loadSource : loadSources){
-      if (loadSource.isRecovered()){
-        foundRecovery = true;
-        assertTrue(loadSource.hasEditsSinceRestart());
-        assertEquals(0, loadSource.getTimestampOfLastShippedOp());
-        assertTrue(loadSource.getReplicationLag()>0);
-      } else {
-        foundNormal = true;
-        assertFalse(loadSource.hasEditsSinceRestart());
-        assertEquals(0, loadSource.getTimestampOfLastShippedOp());
-        assertEquals(0, loadSource.getReplicationLag());
-      }
-    }
-    assertTrue("No normal queue found.", foundNormal);
-    assertTrue("No recovery queue found.", foundRecovery);
-  }
+    ClusterMetrics metrics = hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
 
-  @Test
-  public void testReplicationStatusBothNormalAndRecoveryLagging() throws Exception {
-    utility2.shutdownMiniHBaseCluster();
-    utility1.shutdownMiniHBaseCluster();
-    utility1.startMiniHBaseCluster();
-    //add some values to cluster 1
-    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
-      Put p = new Put(Bytes.toBytes("row" + i));
-      p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i));
-      htable1.put(p);
-    }
-    Thread.sleep(10000);
-    utility1.shutdownMiniHBaseCluster();
-    utility1.startMiniHBaseCluster();
-    Admin hbaseAdmin = utility1.getConnection().getAdmin();
-    ServerName serverName = utility1.getHBaseCluster().
-        getRegionServer(0).getServerName();
-    Thread.sleep(10000);
-    //add more values to cluster 1, these should cause normal queue to lag
-    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
-      Put p = new Put(Bytes.toBytes("row" + i));
-      p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i));
-      htable1.put(p);
-    }
-    Thread.sleep(10000);
-    ClusterStatus status = new ClusterStatus(hbaseAdmin.
-        getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)));
-    List<ReplicationLoadSource> loadSources = status.getLiveServerMetrics().
-        get(serverName).getReplicationLoadSourceList();
-    assertEquals(2, loadSources.size());
-    boolean foundRecovery = false;
-    boolean foundNormal = false;
-    for(ReplicationLoadSource loadSource : loadSources){
-      if (loadSource.isRecovered()){
-        foundRecovery = true;
-      } else {
-        foundNormal = true;
-      }
-      assertTrue(loadSource.hasEditsSinceRestart());
-      assertEquals(0, loadSource.getTimestampOfLastShippedOp());
-      assertTrue(loadSource.getReplicationLag()>0);
-    }
-    assertTrue("No normal queue found.", foundNormal);
-    assertTrue("No recovery queue found.", foundRecovery);
-  }
+    for (JVMClusterUtil.RegionServerThread thread : utility1.getHBaseCluster()
+      .getRegionServerThreads()) {
+      ServerName server = thread.getRegionServer().getServerName();
+      ServerMetrics sm = metrics.getLiveServerMetrics().get(server);
+      List<ReplicationLoadSource> rLoadSourceList = sm.getReplicationLoadSourceList();
+      ReplicationLoadSink rLoadSink = sm.getReplicationLoadSink();
 
-  @Test
-  public void testReplicationStatusAfterLagging() throws Exception {
-    utility2.shutdownMiniHBaseCluster();
-    utility1.shutdownMiniHBaseCluster();
-    utility1.startMiniHBaseCluster();
-    //add some values to cluster 1
-    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
-      Put p = new Put(Bytes.toBytes("row" + i));
-      p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i));
-      htable1.put(p);
+      // check SourceList only has one entry, because only has one peer
+      assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() == 1));
+      assertEquals(PEER_ID2, rLoadSourceList.get(0).getPeerID());
+
+      // check Sink exist only as it is difficult to verify the value on the fly
+      assertTrue("failed to get ReplicationLoadSink.AgeOfLastShippedOp ",
+        (rLoadSink.getAgeOfLastAppliedOp() >= 0));
+      assertTrue("failed to get ReplicationLoadSink.TimeStampsOfLastAppliedOp ",
+        (rLoadSink.getTimestampsOfLastAppliedOp() >= 0));
     }
-    utility2.startMiniHBaseCluster();
+
+    // Stop rs1, then the queue of rs1 will be transfered to rs0
+    utility1.getHBaseCluster().getRegionServer(1).stop("Stop RegionServer");
     Thread.sleep(10000);
-    try(Admin hbaseAdmin = utility1.getConnection().getAdmin()) {
-      ServerName serverName = utility1.getHBaseCluster().getRegionServer(0).
-          getServerName();
-      ClusterStatus status =
-          new ClusterStatus(hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)));
-      List<ReplicationLoadSource> loadSources = status.getLiveServerMetrics().get(serverName).
-          getReplicationLoadSourceList();
-      assertEquals(1, loadSources.size());
-      ReplicationLoadSource loadSource = loadSources.get(0);
-      assertTrue(loadSource.hasEditsSinceRestart());
-      assertTrue(loadSource.getTimestampOfLastShippedOp() > 0);
-      assertEquals(0, loadSource.getReplicationLag());
-    }finally{
-      utility2.shutdownMiniHBaseCluster();
-    }
+    metrics = hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
+    ServerName server = utility1.getHBaseCluster().getRegionServer(0).getServerName();
+    ServerMetrics sm = metrics.getLiveServerMetrics().get(server);
+    List<ReplicationLoadSource> rLoadSourceList = sm.getReplicationLoadSourceList();
+    // check SourceList still only has one entry
+    assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() == 2));
+    assertEquals(PEER_ID2, rLoadSourceList.get(0).getPeerID());
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusAfterLagging.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusAfterLagging.java
new file mode 100644
index 0000000..1993d34
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusAfterLagging.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.EnumSet;
+import java.util.List;
+import org.apache.hadoop.hbase.ClusterMetrics;
+import org.apache.hadoop.hbase.ClusterMetrics.Option;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestReplicationStatusAfterLagging extends TestReplicationBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestReplicationStatusAfterLagging.class);
+
+  @Test
+  public void testReplicationStatusAfterLagging() throws Exception {
+    utility2.shutdownMiniHBaseCluster();
+    restartHBaseCluster(utility1, NUM_SLAVES1);
+    // add some values to cluster 1
+    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+      Put p = new Put(Bytes.toBytes("row" + i));
+      p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i));
+      htable1.put(p);
+    }
+    utility2.startMiniHBaseCluster();
+    Thread.sleep(10000);
+    Admin hbaseAdmin = utility1.getAdmin();
+    ServerName serverName = utility1.getHBaseCluster().getRegionServer(0).getServerName();
+    ClusterMetrics metrics = hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
+    List<ReplicationLoadSource> loadSources =
+      metrics.getLiveServerMetrics().get(serverName).getReplicationLoadSourceList();
+    assertEquals(1, loadSources.size());
+    ReplicationLoadSource loadSource = loadSources.get(0);
+    assertTrue(loadSource.hasEditsSinceRestart());
+    assertTrue(loadSource.getTimestampOfLastShippedOp() > 0);
+    assertEquals(0, loadSource.getReplicationLag());
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusBothNormalAndRecoveryLagging.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusBothNormalAndRecoveryLagging.java
new file mode 100644
index 0000000..4bb41f8
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusBothNormalAndRecoveryLagging.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.EnumSet;
+import java.util.List;
+import org.apache.hadoop.hbase.ClusterMetrics;
+import org.apache.hadoop.hbase.ClusterMetrics.Option;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestReplicationStatusBothNormalAndRecoveryLagging extends TestReplicationBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestReplicationStatusBothNormalAndRecoveryLagging.class);
+
+  @Test
+  public void testReplicationStatusBothNormalAndRecoveryLagging() throws Exception {
+    utility2.shutdownMiniHBaseCluster();
+    // add some values to cluster 1
+    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+      Put p = new Put(Bytes.toBytes("row" + i));
+      p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i));
+      htable1.put(p);
+    }
+    Thread.sleep(10000);
+    restartHBaseCluster(utility1, NUM_SLAVES1);
+    Admin hbaseAdmin = utility1.getAdmin();
+    ServerName serverName = utility1.getHBaseCluster().getRegionServer(0).getServerName();
+    Thread.sleep(10000);
+    // add more values to cluster 1, these should cause normal queue to lag
+    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+      Put p = new Put(Bytes.toBytes("row" + i));
+      p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i));
+      htable1.put(p);
+    }
+    Thread.sleep(10000);
+    ClusterMetrics metrics = hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
+    List<ReplicationLoadSource> loadSources =
+      metrics.getLiveServerMetrics().get(serverName).getReplicationLoadSourceList();
+    assertEquals(2, loadSources.size());
+    boolean foundRecovery = false;
+    boolean foundNormal = false;
+    for (ReplicationLoadSource loadSource : loadSources) {
+      if (loadSource.isRecovered()) {
+        foundRecovery = true;
+      } else {
+        foundNormal = true;
+      }
+      assertTrue(loadSource.hasEditsSinceRestart());
+      assertEquals(0, loadSource.getTimestampOfLastShippedOp());
+      assertTrue(loadSource.getReplicationLag() > 0);
+    }
+    assertTrue("No normal queue found.", foundNormal);
+    assertTrue("No recovery queue found.", foundRecovery);
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNewOp.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNewOp.java
new file mode 100644
index 0000000..fb3f16b
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNewOp.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.EnumSet;
+import java.util.List;
+import org.apache.hadoop.hbase.ClusterMetrics;
+import org.apache.hadoop.hbase.ClusterMetrics.Option;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestReplicationStatusSourceStartedTargetStoppedNewOp extends TestReplicationBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestReplicationStatusSourceStartedTargetStoppedNewOp.class);
+
+  @Test
+  public void testReplicationStatusSourceStartedTargetStoppedNewOp() throws Exception {
+    utility2.shutdownMiniHBaseCluster();
+    restartHBaseCluster(utility1, NUM_SLAVES1);
+    Admin hbaseAdmin = utility1.getAdmin();
+    // add some values to source cluster
+    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+      Put p = new Put(Bytes.toBytes("row" + i));
+      p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i));
+      htable1.put(p);
+    }
+    Thread.sleep(10000);
+    ServerName serverName = utility1.getHBaseCluster().getRegionServer(0).getServerName();
+    ClusterMetrics metrics = hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
+    List<ReplicationLoadSource> loadSources =
+      metrics.getLiveServerMetrics().get(serverName).getReplicationLoadSourceList();
+    assertEquals(1, loadSources.size());
+    ReplicationLoadSource loadSource = loadSources.get(0);
+    assertTrue(loadSource.hasEditsSinceRestart());
+    assertEquals(0, loadSource.getTimestampOfLastShippedOp());
+    assertTrue(loadSource.getReplicationLag() > 0);
+    assertFalse(loadSource.isRecovered());
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNoOps.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNoOps.java
new file mode 100644
index 0000000..76d12b2
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNoOps.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.util.EnumSet;
+import java.util.List;
+import org.apache.hadoop.hbase.ClusterMetrics;
+import org.apache.hadoop.hbase.ClusterMetrics.Option;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestReplicationStatusSourceStartedTargetStoppedNoOps extends TestReplicationBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestReplicationStatusSourceStartedTargetStoppedNoOps.class);
+
+  @Test
+  public void c() throws Exception {
+    utility2.shutdownMiniHBaseCluster();
+    restartHBaseCluster(utility1, NUM_SLAVES1);
+    Admin hbaseAdmin = utility1.getAdmin();
+    ServerName serverName = utility1.getHBaseCluster().getRegionServer(0).getServerName();
+    Thread.sleep(10000);
+    ClusterMetrics metrics = hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
+    List<ReplicationLoadSource> loadSources =
+      metrics.getLiveServerMetrics().get(serverName).getReplicationLoadSourceList();
+    assertEquals(1, loadSources.size());
+    ReplicationLoadSource loadSource = loadSources.get(0);
+    assertFalse(loadSource.hasEditsSinceRestart());
+    assertEquals(0, loadSource.getTimestampOfLastShippedOp());
+    assertEquals(0, loadSource.getReplicationLag());
+    assertFalse(loadSource.isRecovered());
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedWithRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedWithRecovery.java
new file mode 100644
index 0000000..800fa82
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedWithRecovery.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.EnumSet;
+import java.util.List;
+import org.apache.hadoop.hbase.ClusterMetrics;
+import org.apache.hadoop.hbase.ClusterMetrics.Option;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestReplicationStatusSourceStartedTargetStoppedWithRecovery
+    extends TestReplicationBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestReplicationStatusSourceStartedTargetStoppedWithRecovery.class);
+
+  @Test
+  public void testReplicationStatusSourceStartedTargetStoppedWithRecovery() throws Exception {
+    utility2.shutdownMiniHBaseCluster();
+    // add some values to cluster 1
+    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+      Put p = new Put(Bytes.toBytes("row" + i));
+      p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i));
+      htable1.put(p);
+    }
+    Thread.sleep(10000);
+    restartHBaseCluster(utility1, NUM_SLAVES1);
+    Admin hbaseAdmin = utility1.getAdmin();
+    ServerName serverName = utility1.getHBaseCluster().getRegionServer(0).getServerName();
+    Thread.sleep(10000);
+    ClusterMetrics metrics = hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
+    List<ReplicationLoadSource> loadSources =
+      metrics.getLiveServerMetrics().get(serverName).getReplicationLoadSourceList();
+    assertEquals(2, loadSources.size());
+    boolean foundRecovery = false;
+    boolean foundNormal = false;
+    for (ReplicationLoadSource loadSource : loadSources) {
+      if (loadSource.isRecovered()) {
+        foundRecovery = true;
+        assertTrue(loadSource.hasEditsSinceRestart());
+        assertEquals(0, loadSource.getTimestampOfLastShippedOp());
+        assertTrue(loadSource.getReplicationLag() > 0);
+      } else {
+        foundNormal = true;
+        assertFalse(loadSource.hasEditsSinceRestart());
+        assertEquals(0, loadSource.getTimestampOfLastShippedOp());
+        assertEquals(0, loadSource.getReplicationLag());
+      }
+    }
+    assertTrue("No normal queue found.", foundNormal);
+    assertTrue("No recovery queue found.", foundRecovery);
+  }
+}