You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ch...@apache.org on 2017/11/28 10:15:34 UTC

hbase git commit: HBASE-19342 fix TestTableBasedReplicationSourceManagerImpl#testRemovePeerMetricsCleanup

Repository: hbase
Updated Branches:
  refs/heads/branch-2 ed1666720 -> 8b6f305ac


HBASE-19342 fix TestTableBasedReplicationSourceManagerImpl#testRemovePeerMetricsCleanup


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8b6f305a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8b6f305a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8b6f305a

Branch: refs/heads/branch-2
Commit: 8b6f305ac7f975bfde2b48ac211d40340d04e9ce
Parents: ed16667
Author: Chia-Ping Tsai <ch...@gmail.com>
Authored: Tue Nov 28 18:06:38 2017 +0800
Committer: Chia-Ping Tsai <ch...@gmail.com>
Committed: Tue Nov 28 18:12:55 2017 +0800

----------------------------------------------------------------------
 .../regionserver/ReplicationSourceManager.java  |  10 +-
 .../replication/ReplicationSourceDummy.java     |   8 +-
 .../TestReplicationSourceManager.java           | 117 +++++++++++++++----
 ...tTableBasedReplicationSourceManagerImpl.java |   7 ++
 4 files changed, 115 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/8b6f305a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 45d7d94..3aa3843 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -40,7 +40,6 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -51,7 +50,6 @@ import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
@@ -68,6 +66,7 @@ import org.apache.hadoop.hbase.replication.ReplicationTracker;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.yetus.audience.InterfaceAudience;
 
 import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -305,6 +304,13 @@ public class ReplicationSourceManager implements ReplicationListener {
     return src;
   }
 
+  @VisibleForTesting
+  int getSizeOfLatestPath() {
+    synchronized (latestPaths) {
+      return latestPaths.size();
+    }
+  }
+
   /**
    * Delete a complete queue of wals associated with a peer cluster
    * @param peerId Id of the peer cluster queue of wals to delete

http://git-wip-us.apache.org/repos/asf/hbase/blob/8b6f305a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index a12cebd..7ea79f9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.replication;
 import java.io.IOException;
 import java.util.List;
 import java.util.UUID;
-
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -45,7 +45,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
   Path currentPath;
   MetricsSource metrics;
   WALFileLengthProvider walFileLengthProvider;
-
+  AtomicBoolean startup = new AtomicBoolean(false);
   @Override
   public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
       ReplicationQueues rq, ReplicationPeers rp, Server server, String peerClusterId,
@@ -70,7 +70,11 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
 
   @Override
   public void startup() {
+    startup.set(true);
+  }
 
+  public boolean isStartup() {
+    return startup.get();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/8b6f305a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index a104d4f..83dc636 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -39,7 +39,7 @@ import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
-
+import java.util.stream.Collectors;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -61,9 +61,9 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
-import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@@ -77,8 +77,10 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
@@ -117,6 +119,8 @@ public abstract class TestReplicationSourceManager {
 
   protected static ReplicationSourceManager manager;
 
+  protected static ReplicationSourceManager managerOfCluster;
+
   protected static ZKWatcher zkw;
 
   protected static HTableDescriptor htd;
@@ -170,9 +174,14 @@ public abstract class TestReplicationSourceManager {
     logDir = new Path(utility.getDataTestDir(),
         HConstants.HREGION_LOGDIR_NAME);
     replication = new Replication(new DummyServer(), fs, logDir, oldLogDir);
-    manager = replication.getReplicationManager();
 
+    managerOfCluster = getManagerFromCluster();
+    manager = replication.getReplicationManager();
     manager.addSource(slaveId);
+    if (managerOfCluster != null) {
+      waitPeer(slaveId, managerOfCluster, true);
+    }
+    waitPeer(slaveId, manager, true);
 
     htd = new HTableDescriptor(test);
     HColumnDescriptor col = new HColumnDescriptor(f1);
@@ -189,9 +198,25 @@ public abstract class TestReplicationSourceManager {
     hri = new HRegionInfo(htd.getTableName(), r1, r2);
   }
 
+  private static ReplicationSourceManager getManagerFromCluster() {
+    // TestReplicationSourceManagerZkImpl won't start the mini hbase cluster.
+    if (utility.getMiniHBaseCluster() == null) {
+      return null;
+    }
+    return utility.getMiniHBaseCluster().getRegionServerThreads()
+        .stream().map(JVMClusterUtil.RegionServerThread::getRegionServer)
+        .findAny()
+        .map(HRegionServer::getReplicationSourceService)
+        .map(r -> (Replication)r)
+        .map(Replication::getReplicationManager)
+        .get();
+  }
+
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
-    manager.join();
+    if (manager != null) {
+      manager.join();
+    }
     utility.shutdownMiniCluster();
   }
 
@@ -213,6 +238,14 @@ public abstract class TestReplicationSourceManager {
   public void tearDown() throws Exception {
     LOG.info("End " + testName.getMethodName());
     cleanLogDir();
+    List<String> ids = manager.getSources().stream()
+        .map(ReplicationSourceInterface::getPeerId).collect(Collectors.toList());
+    for (String id : ids) {
+      if (slaveId.equals(id)) {
+        continue;
+      }
+      removePeerAndWait(id);
+    }
   }
 
   @Test
@@ -471,28 +504,50 @@ public abstract class TestReplicationSourceManager {
     }
   }
 
+  private static MetricsReplicationSourceSource getGlobalSource() throws Exception {
+    ReplicationSourceInterface source = manager.getSource(slaveId);
+    // Retrieve the global replication metrics source
+    Field f = MetricsSource.class.getDeclaredField("globalSourceSource");
+    f.setAccessible(true);
+    return (MetricsReplicationSourceSource)f.get(source.getSourceMetrics());
+  }
+
+  private static long getSizeOfLatestPath() {
+    // If no mini cluster is running, there are extra replication manager influencing the metrics.
+    if (utility.getMiniHBaseCluster() == null) {
+      return 0;
+    }
+    return utility.getMiniHBaseCluster().getRegionServerThreads()
+        .stream().map(JVMClusterUtil.RegionServerThread::getRegionServer)
+        .map(HRegionServer::getReplicationSourceService)
+        .map(r -> (Replication)r)
+        .map(Replication::getReplicationManager)
+        .mapToLong(ReplicationSourceManager::getSizeOfLatestPath)
+        .sum();
+  }
+
   @Test
   public void testRemovePeerMetricsCleanup() throws Exception {
     final String peerId = "DummyPeer";
     final ReplicationPeerConfig peerConfig = new ReplicationPeerConfig()
         .setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase");
     try {
+      MetricsReplicationSourceSource globalSource = getGlobalSource();
+      final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
+      final long sizeOfLatestPath = getSizeOfLatestPath();
       addPeerAndWait(peerId, peerConfig, true);
-
+      assertEquals(sizeOfLatestPath + globalLogQueueSizeInitial,
+          globalSource.getSizeOfLogQueue());
       ReplicationSourceInterface source = manager.getSource(peerId);
       // Sanity check
       assertNotNull(source);
-      // Retrieve the global replication metrics source
-      Field f = MetricsSource.class.getDeclaredField("globalSourceSource");
-      f.setAccessible(true);
-      MetricsReplicationSourceSource globalSource =
-          (MetricsReplicationSourceSource)f.get(source.getSourceMetrics());
-      int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
-
+      final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue();
       // Enqueue log and check if metrics updated
       source.enqueueLog(new Path("abc"));
-      assertEquals(1, source.getSourceMetrics().getSizeOfLogQueue());
-      assertEquals(1 + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
+      assertEquals(1 + sizeOfSingleLogQueue,
+          source.getSourceMetrics().getSizeOfLogQueue());
+      assertEquals(source.getSourceMetrics().getSizeOfLogQueue()
+              + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
 
       // Removing the peer should reset the global metrics
       removePeerAndWait(peerId);
@@ -502,8 +557,9 @@ public abstract class TestReplicationSourceManager {
       addPeerAndWait(peerId, peerConfig, true);
       source = manager.getSource(peerId);
       assertNotNull(source);
-      assertEquals(0, source.getSourceMetrics().getSizeOfLogQueue());
-      assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
+      assertEquals(sizeOfLatestPath, source.getSourceMetrics().getSizeOfLogQueue());
+      assertEquals(source.getSourceMetrics().getSizeOfLogQueue()
+          + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
     } finally {
       removePeerAndWait(peerId);
     }
@@ -520,13 +576,27 @@ public abstract class TestReplicationSourceManager {
       final boolean waitForSource) throws Exception {
     final ReplicationPeers rp = manager.getReplicationPeers();
     rp.registerPeer(peerId, peerConfig);
-    Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
-      @Override public boolean evaluate() throws Exception {
-        if (waitForSource) {
-          return (manager.getSource(peerId) != null);
-        } else {
-          return (rp.getConnectedPeer(peerId) != null);
+    waitPeer(peerId, manager, waitForSource);
+    if (managerOfCluster != null) {
+      waitPeer(peerId, managerOfCluster, waitForSource);
+    }
+  }
+
+  private static void waitPeer(final String peerId,
+      ReplicationSourceManager manager, final boolean waitForSource) {
+    ReplicationPeers rp = manager.getReplicationPeers();
+    Waiter.waitFor(conf, 20000, () -> {
+      if (waitForSource) {
+        ReplicationSourceInterface rs = manager.getSource(peerId);
+        if (rs == null) {
+          return false;
+        }
+        if (rs instanceof ReplicationSourceDummy) {
+          return ((ReplicationSourceDummy)rs).isStartup();
         }
+        return true;
+      } else {
+        return (rp.getConnectedPeer(peerId) != null);
       }
     });
   }
@@ -545,7 +615,8 @@ public abstract class TestReplicationSourceManager {
       @Override public boolean evaluate() throws Exception {
         List<String> peers = rp.getAllPeerIds();
         return (!manager.getAllQueues().contains(peerId)) && (rp.getConnectedPeer(peerId) == null)
-            && (!peers.contains(peerId));
+            && (!peers.contains(peerId))
+            && manager.getSource(peerId) == null;
       }
     });
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8b6f305a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java
index e606257..19457e2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java
@@ -20,6 +20,9 @@ package org.apache.hadoop.hbase.replication.regionserver;
 
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
 import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
@@ -50,6 +53,10 @@ public class TestTableBasedReplicationSourceManagerImpl extends TestReplicationS
       TableBasedReplicationQueuesClientImpl.class, ReplicationQueuesClient.class);
     utility = new HBaseTestingUtility(conf);
     utility.startMiniCluster();
+    Waiter.waitFor(conf, 3 * 1000,
+      () -> utility.getMiniHBaseCluster().getMaster().isInitialized());
+    utility.waitUntilAllRegionsAssigned(TableName.valueOf(
+        NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication"));
     setupZkAndReplication();
   }