You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ch...@apache.org on 2017/11/28 10:15:34 UTC
hbase git commit: HBASE-19342 fix
TestTableBasedReplicationSourceManagerImpl#testRemovePeerMetricsCleanup
Repository: hbase
Updated Branches:
refs/heads/branch-2 ed1666720 -> 8b6f305ac
HBASE-19342 fix TestTableBasedReplicationSourceManagerImpl#testRemovePeerMetricsCleanup
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8b6f305a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8b6f305a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8b6f305a
Branch: refs/heads/branch-2
Commit: 8b6f305ac7f975bfde2b48ac211d40340d04e9ce
Parents: ed16667
Author: Chia-Ping Tsai <ch...@gmail.com>
Authored: Tue Nov 28 18:06:38 2017 +0800
Committer: Chia-Ping Tsai <ch...@gmail.com>
Committed: Tue Nov 28 18:12:55 2017 +0800
----------------------------------------------------------------------
.../regionserver/ReplicationSourceManager.java | 10 +-
.../replication/ReplicationSourceDummy.java | 8 +-
.../TestReplicationSourceManager.java | 117 +++++++++++++++----
...tTableBasedReplicationSourceManagerImpl.java | 7 ++
4 files changed, 115 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/8b6f305a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 45d7d94..3aa3843 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -40,7 +40,6 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -51,7 +50,6 @@ import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
@@ -68,6 +66,7 @@ import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -305,6 +304,13 @@ public class ReplicationSourceManager implements ReplicationListener {
return src;
}
+ @VisibleForTesting
+ int getSizeOfLatestPath() {
+ synchronized (latestPaths) {
+ return latestPaths.size();
+ }
+ }
+
/**
* Delete a complete queue of wals associated with a peer cluster
* @param peerId Id of the peer cluster queue of wals to delete
http://git-wip-us.apache.org/repos/asf/hbase/blob/8b6f305a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index a12cebd..7ea79f9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
-
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -45,7 +45,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
Path currentPath;
MetricsSource metrics;
WALFileLengthProvider walFileLengthProvider;
-
+ AtomicBoolean startup = new AtomicBoolean(false);
@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueues rq, ReplicationPeers rp, Server server, String peerClusterId,
@@ -70,7 +70,11 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
@Override
public void startup() {
+ startup.set(true);
+ }
+ public boolean isStartup() {
+ return startup.get();
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/8b6f305a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index a104d4f..83dc636 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -39,7 +39,7 @@ import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
-
+import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -61,9 +61,9 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
-import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@@ -77,8 +77,10 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
@@ -117,6 +119,8 @@ public abstract class TestReplicationSourceManager {
protected static ReplicationSourceManager manager;
+ protected static ReplicationSourceManager managerOfCluster;
+
protected static ZKWatcher zkw;
protected static HTableDescriptor htd;
@@ -170,9 +174,14 @@ public abstract class TestReplicationSourceManager {
logDir = new Path(utility.getDataTestDir(),
HConstants.HREGION_LOGDIR_NAME);
replication = new Replication(new DummyServer(), fs, logDir, oldLogDir);
- manager = replication.getReplicationManager();
+ managerOfCluster = getManagerFromCluster();
+ manager = replication.getReplicationManager();
manager.addSource(slaveId);
+ if (managerOfCluster != null) {
+ waitPeer(slaveId, managerOfCluster, true);
+ }
+ waitPeer(slaveId, manager, true);
htd = new HTableDescriptor(test);
HColumnDescriptor col = new HColumnDescriptor(f1);
@@ -189,9 +198,25 @@ public abstract class TestReplicationSourceManager {
hri = new HRegionInfo(htd.getTableName(), r1, r2);
}
+ private static ReplicationSourceManager getManagerFromCluster() {
+ // TestReplicationSourceManagerZkImpl won't start the mini hbase cluster.
+ if (utility.getMiniHBaseCluster() == null) {
+ return null;
+ }
+ return utility.getMiniHBaseCluster().getRegionServerThreads()
+ .stream().map(JVMClusterUtil.RegionServerThread::getRegionServer)
+ .findAny()
+ .map(HRegionServer::getReplicationSourceService)
+ .map(r -> (Replication)r)
+ .map(Replication::getReplicationManager)
+ .get();
+ }
+
@AfterClass
public static void tearDownAfterClass() throws Exception {
- manager.join();
+ if (manager != null) {
+ manager.join();
+ }
utility.shutdownMiniCluster();
}
@@ -213,6 +238,14 @@ public abstract class TestReplicationSourceManager {
public void tearDown() throws Exception {
LOG.info("End " + testName.getMethodName());
cleanLogDir();
+ List<String> ids = manager.getSources().stream()
+ .map(ReplicationSourceInterface::getPeerId).collect(Collectors.toList());
+ for (String id : ids) {
+ if (slaveId.equals(id)) {
+ continue;
+ }
+ removePeerAndWait(id);
+ }
}
@Test
@@ -471,28 +504,50 @@ public abstract class TestReplicationSourceManager {
}
}
+ private static MetricsReplicationSourceSource getGlobalSource() throws Exception {
+ ReplicationSourceInterface source = manager.getSource(slaveId);
+ // Retrieve the global replication metrics source
+ Field f = MetricsSource.class.getDeclaredField("globalSourceSource");
+ f.setAccessible(true);
+ return (MetricsReplicationSourceSource)f.get(source.getSourceMetrics());
+ }
+
+ private static long getSizeOfLatestPath() {
+ // If no mini cluster is running, there are extra replication manager influencing the metrics.
+ if (utility.getMiniHBaseCluster() == null) {
+ return 0;
+ }
+ return utility.getMiniHBaseCluster().getRegionServerThreads()
+ .stream().map(JVMClusterUtil.RegionServerThread::getRegionServer)
+ .map(HRegionServer::getReplicationSourceService)
+ .map(r -> (Replication)r)
+ .map(Replication::getReplicationManager)
+ .mapToLong(ReplicationSourceManager::getSizeOfLatestPath)
+ .sum();
+ }
+
@Test
public void testRemovePeerMetricsCleanup() throws Exception {
final String peerId = "DummyPeer";
final ReplicationPeerConfig peerConfig = new ReplicationPeerConfig()
.setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase");
try {
+ MetricsReplicationSourceSource globalSource = getGlobalSource();
+ final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
+ final long sizeOfLatestPath = getSizeOfLatestPath();
addPeerAndWait(peerId, peerConfig, true);
-
+ assertEquals(sizeOfLatestPath + globalLogQueueSizeInitial,
+ globalSource.getSizeOfLogQueue());
ReplicationSourceInterface source = manager.getSource(peerId);
// Sanity check
assertNotNull(source);
- // Retrieve the global replication metrics source
- Field f = MetricsSource.class.getDeclaredField("globalSourceSource");
- f.setAccessible(true);
- MetricsReplicationSourceSource globalSource =
- (MetricsReplicationSourceSource)f.get(source.getSourceMetrics());
- int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
-
+ final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue();
// Enqueue log and check if metrics updated
source.enqueueLog(new Path("abc"));
- assertEquals(1, source.getSourceMetrics().getSizeOfLogQueue());
- assertEquals(1 + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
+ assertEquals(1 + sizeOfSingleLogQueue,
+ source.getSourceMetrics().getSizeOfLogQueue());
+ assertEquals(source.getSourceMetrics().getSizeOfLogQueue()
+ + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
// Removing the peer should reset the global metrics
removePeerAndWait(peerId);
@@ -502,8 +557,9 @@ public abstract class TestReplicationSourceManager {
addPeerAndWait(peerId, peerConfig, true);
source = manager.getSource(peerId);
assertNotNull(source);
- assertEquals(0, source.getSourceMetrics().getSizeOfLogQueue());
- assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
+ assertEquals(sizeOfLatestPath, source.getSourceMetrics().getSizeOfLogQueue());
+ assertEquals(source.getSourceMetrics().getSizeOfLogQueue()
+ + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
} finally {
removePeerAndWait(peerId);
}
@@ -520,13 +576,27 @@ public abstract class TestReplicationSourceManager {
final boolean waitForSource) throws Exception {
final ReplicationPeers rp = manager.getReplicationPeers();
rp.registerPeer(peerId, peerConfig);
- Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
- @Override public boolean evaluate() throws Exception {
- if (waitForSource) {
- return (manager.getSource(peerId) != null);
- } else {
- return (rp.getConnectedPeer(peerId) != null);
+ waitPeer(peerId, manager, waitForSource);
+ if (managerOfCluster != null) {
+ waitPeer(peerId, managerOfCluster, waitForSource);
+ }
+ }
+
+ private static void waitPeer(final String peerId,
+ ReplicationSourceManager manager, final boolean waitForSource) {
+ ReplicationPeers rp = manager.getReplicationPeers();
+ Waiter.waitFor(conf, 20000, () -> {
+ if (waitForSource) {
+ ReplicationSourceInterface rs = manager.getSource(peerId);
+ if (rs == null) {
+ return false;
+ }
+ if (rs instanceof ReplicationSourceDummy) {
+ return ((ReplicationSourceDummy)rs).isStartup();
}
+ return true;
+ } else {
+ return (rp.getConnectedPeer(peerId) != null);
}
});
}
@@ -545,7 +615,8 @@ public abstract class TestReplicationSourceManager {
@Override public boolean evaluate() throws Exception {
List<String> peers = rp.getAllPeerIds();
return (!manager.getAllQueues().contains(peerId)) && (rp.getConnectedPeer(peerId) == null)
- && (!peers.contains(peerId));
+ && (!peers.contains(peerId))
+ && manager.getSource(peerId) == null;
}
});
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8b6f305a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java
index e606257..19457e2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestTableBasedReplicationSourceManagerImpl.java
@@ -20,6 +20,9 @@ package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
@@ -50,6 +53,10 @@ public class TestTableBasedReplicationSourceManagerImpl extends TestReplicationS
TableBasedReplicationQueuesClientImpl.class, ReplicationQueuesClient.class);
utility = new HBaseTestingUtility(conf);
utility.startMiniCluster();
+ Waiter.waitFor(conf, 3 * 1000,
+ () -> utility.getMiniHBaseCluster().getMaster().isInitialized());
+ utility.waitUntilAllRegionsAssigned(TableName.valueOf(
+ NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication"));
setupZkAndReplication();
}