You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/01/08 07:27:09 UTC
[31/43] hbase git commit: HBASE-19622 Reimplement ReplicationPeers
with the new replication storage interface
http://git-wip-us.apache.org/repos/asf/hbase/blob/ca27ab15/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 853bafb..24a4f30 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -166,7 +166,6 @@ public class ReplicationSourceManager implements ReplicationListener {
this.clusterId = clusterId;
this.walFileLengthProvider = walFileLengthProvider;
this.replicationTracker.registerListener(this);
- this.replicationPeers.getAllPeerIds();
// It's preferable to failover 1 RS at a time, but with good zk servers
// more could be processed at the same time.
int nbWorkers = conf.getInt("replication.executor.workers", 1);
@@ -270,8 +269,8 @@ public class ReplicationSourceManager implements ReplicationListener {
}
List<ServerName> otherRegionServers = replicationTracker.getListOfRegionServers().stream()
.map(ServerName::valueOf).collect(Collectors.toList());
- LOG.info(
- "Current list of replicators: " + currentReplicators + " other RSs: " + otherRegionServers);
+ LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
+ + otherRegionServers);
// Look if there's anything to process after a restart
for (ServerName rs : currentReplicators) {
@@ -288,7 +287,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* The returned future is for adoptAbandonedQueues task.
*/
Future<?> init() throws IOException, ReplicationException {
- for (String id : this.replicationPeers.getConnectedPeerIds()) {
+ for (String id : this.replicationPeers.getAllPeerIds()) {
addSource(id);
if (replicationForBulkLoadDataEnabled) {
// Check if peer exists in hfile-refs queue, if not add it. This can happen in the case
@@ -307,8 +306,8 @@ public class ReplicationSourceManager implements ReplicationListener {
*/
@VisibleForTesting
ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException {
- ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id);
- ReplicationPeer peer = replicationPeers.getConnectedPeer(id);
+ ReplicationPeerConfig peerConfig = replicationPeers.getPeerConfig(id);
+ ReplicationPeer peer = replicationPeers.getPeer(id);
ReplicationSourceInterface src = getReplicationSource(id, peerConfig, peer);
synchronized (this.walsById) {
this.sources.add(src);
@@ -354,7 +353,7 @@ public class ReplicationSourceManager implements ReplicationListener {
public void deleteSource(String peerId, boolean closeConnection) {
abortWhenFail(() -> this.queueStorage.removeQueue(server.getServerName(), peerId));
if (closeConnection) {
- this.replicationPeers.peerDisconnected(peerId);
+ this.replicationPeers.removePeer(peerId);
}
}
@@ -445,12 +444,12 @@ public class ReplicationSourceManager implements ReplicationListener {
// update replication queues on ZK
// synchronize on replicationPeers to avoid adding source for the to-be-removed peer
synchronized (replicationPeers) {
- for (String id : replicationPeers.getConnectedPeerIds()) {
+ for (String id : replicationPeers.getAllPeerIds()) {
try {
this.queueStorage.addWAL(server.getServerName(), id, logName);
} catch (ReplicationException e) {
- throw new IOException("Cannot add log to replication queue" +
- " when creating a new source, queueId=" + id + ", filename=" + logName, e);
+ throw new IOException("Cannot add log to replication queue"
+ + " when creating a new source, queueId=" + id + ", filename=" + logName, e);
}
}
}
@@ -593,7 +592,7 @@ public class ReplicationSourceManager implements ReplicationListener {
public void addPeer(String id) throws ReplicationException, IOException {
LOG.info("Trying to add peer, peerId: " + id);
- boolean added = this.replicationPeers.peerConnected(id);
+ boolean added = this.replicationPeers.addPeer(id);
if (added) {
LOG.info("Peer " + id + " connected success, trying to start the replication source thread.");
addSource(id);
@@ -729,19 +728,25 @@ public class ReplicationSourceManager implements ReplicationListener {
// there is not an actual peer defined corresponding to peerId for the failover.
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
String actualPeerId = replicationQueueInfo.getPeerId();
- ReplicationPeer peer = replicationPeers.getConnectedPeer(actualPeerId);
+
+ ReplicationPeer peer = replicationPeers.getPeer(actualPeerId);
+ if (peer == null) {
+ LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS
+ + ", peer is null");
+ abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), peerId));
+ continue;
+ }
+
ReplicationPeerConfig peerConfig = null;
try {
- peerConfig = replicationPeers.getReplicationPeerConfig(actualPeerId);
- } catch (ReplicationException ex) {
- LOG.warn("Received exception while getting replication peer config, skipping replay"
- + ex);
- }
- if (peer == null || peerConfig == null) {
- LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS);
+ peerConfig = replicationPeers.getPeerConfig(actualPeerId);
+ } catch (Exception e) {
+ LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS
+ + ", failed to read peer config", e);
abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), peerId));
continue;
}
+
// track sources in walsByIdRecoveredQueues
Map<String, SortedSet<String>> walsByGroup = new HashMap<>();
walsByIdRecoveredQueues.put(peerId, walsByGroup);
@@ -760,7 +765,7 @@ public class ReplicationSourceManager implements ReplicationListener {
// synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
// see removePeer
synchronized (oldsources) {
- if (!replicationPeers.getConnectedPeerIds().contains(src.getPeerId())) {
+ if (!replicationPeers.getAllPeerIds().contains(src.getPeerId())) {
src.terminate("Recovered queue doesn't belong to any current peer");
closeRecoveredQueue(src);
continue;
http://git-wip-us.apache.org/repos/asf/hbase/blob/ca27ab15/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
index 8802e36..905ade4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
@@ -87,7 +87,7 @@ public class TestReplicationHFileCleaner {
server = new DummyServer();
conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
Replication.decorateMasterConfiguration(conf);
- rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf, server);
+ rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf);
rp.init();
rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
fs = FileSystem.get(conf);
@@ -101,7 +101,8 @@ public class TestReplicationHFileCleaner {
@Before
public void setup() throws ReplicationException, IOException {
root = TEST_UTIL.getDataTestDirOnTestFS();
- rp.registerPeer(peerId, new ReplicationPeerConfig().setClusterKey(TEST_UTIL.getClusterKey()));
+ rp.getPeerStorage().addPeer(peerId,
+ ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL.getClusterKey()).build(), true);
rq.addPeerToHFileRefs(peerId);
}
@@ -112,7 +113,7 @@ public class TestReplicationHFileCleaner {
} catch (IOException e) {
LOG.warn("Failed to delete files recursively from path " + root);
}
- rp.unregisterPeer(peerId);
+ rp.getPeerStorage().removePeer(peerId);
}
@Test
http://git-wip-us.apache.org/repos/asf/hbase/blob/ca27ab15/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
index c57d9bb..ca4369e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
@@ -21,9 +21,7 @@ package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
http://git-wip-us.apache.org/repos/asf/hbase/blob/ca27ab15/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
index f118ca3..fdfa6b7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
@@ -22,8 +22,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
@@ -44,6 +42,7 @@ import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
+import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -73,10 +72,6 @@ public class TestReplicationTrackerZKImpl {
private ReplicationTracker rt;
private AtomicInteger rsRemovedCount;
private String rsRemovedData;
- private AtomicInteger plChangedCount;
- private List<String> plChangedData;
- private AtomicInteger peerRemovedCount;
- private String peerRemovedData;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
@@ -93,7 +88,7 @@ public class TestReplicationTrackerZKImpl {
String fakeRs1 = ZNodePaths.joinZNode(zkw.znodePaths.rsZNode, "hostname1.example.org:1234");
try {
ZKClusterId.setClusterId(zkw, new ClusterId());
- rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw);
+ rp = ReplicationFactory.getReplicationPeers(zkw, conf);
rp.init();
rt = ReplicationFactory.getReplicationTracker(zkw, rp, conf, zkw, new DummyServer(fakeRs1));
} catch (Exception e) {
@@ -101,10 +96,6 @@ public class TestReplicationTrackerZKImpl {
}
rsRemovedCount = new AtomicInteger(0);
rsRemovedData = "";
- plChangedCount = new AtomicInteger(0);
- plChangedData = new ArrayList<>();
- peerRemovedCount = new AtomicInteger(0);
- peerRemovedData = "";
}
@AfterClass
@@ -157,25 +148,22 @@ public class TestReplicationTrackerZKImpl {
@Test(timeout = 30000)
public void testPeerNameControl() throws Exception {
int exists = 0;
- int hyphen = 0;
- rp.registerPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
+ rp.getPeerStorage().addPeer("6",
+ ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true);
- try{
- rp.registerPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
- }catch(IllegalArgumentException e){
- exists++;
+ try {
+ rp.getPeerStorage().addPeer("6",
+ ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true);
+ } catch (ReplicationException e) {
+ if (e.getCause() instanceof KeeperException.NodeExistsException) {
+ exists++;
+ }
}
- try{
- rp.registerPeer("6-ec2", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
- }catch(IllegalArgumentException e){
- hyphen++;
- }
assertEquals(1, exists);
- assertEquals(1, hyphen);
// clean up
- rp.unregisterPeer("6");
+ rp.getPeerStorage().removePeer("6");
}
private class DummyReplicationListener implements ReplicationListener {
http://git-wip-us.apache.org/repos/asf/hbase/blob/ca27ab15/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index f6724df..4c737e2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -380,7 +380,7 @@ public abstract class TestReplicationSourceManager {
}
Server s1 = new DummyServer("dummyserver1.example.org");
ReplicationPeers rp1 =
- ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1);
+ ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration());
rp1.init();
NodeFailoverWorker w1 =
manager.new NodeFailoverWorker(server.getServerName());
@@ -561,7 +561,7 @@ public abstract class TestReplicationSourceManager {
private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig,
final boolean waitForSource) throws Exception {
final ReplicationPeers rp = manager.getReplicationPeers();
- rp.registerPeer(peerId, peerConfig);
+ rp.getPeerStorage().addPeer(peerId, peerConfig, true);
try {
manager.addPeer(peerId);
} catch (Exception e) {
@@ -588,7 +588,7 @@ public abstract class TestReplicationSourceManager {
}
return true;
} else {
- return (rp.getConnectedPeer(peerId) != null);
+ return (rp.getPeer(peerId) != null);
}
});
}
@@ -600,8 +600,8 @@ public abstract class TestReplicationSourceManager {
*/
private void removePeerAndWait(final String peerId) throws Exception {
final ReplicationPeers rp = manager.getReplicationPeers();
- if (rp.getAllPeerIds().contains(peerId)) {
- rp.unregisterPeer(peerId);
+ if (rp.getPeerStorage().listPeerIds().contains(peerId)) {
+ rp.getPeerStorage().removePeer(peerId);
try {
manager.removePeer(peerId);
} catch (Exception e) {
@@ -611,10 +611,9 @@ public abstract class TestReplicationSourceManager {
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
- List<String> peers = rp.getAllPeerIds();
- return (!manager.getAllQueues().contains(peerId)) &&
- (rp.getConnectedPeer(peerId) == null) && (!peers.contains(peerId)) &&
- manager.getSource(peerId) == null;
+ Collection<String> peers = rp.getPeerStorage().listPeerIds();
+ return (!manager.getAllQueues().contains(peerId)) && (rp.getPeer(peerId) == null)
+ && (!peers.contains(peerId)) && manager.getSource(peerId) == null;
}
});
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ca27ab15/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/HBaseZKTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/HBaseZKTestingUtility.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/HBaseZKTestingUtility.java
index fc31c37..b755c32 100644
--- a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/HBaseZKTestingUtility.java
+++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/HBaseZKTestingUtility.java
@@ -182,8 +182,7 @@ public class HBaseZKTestingUtility extends HBaseCommonTestingUtility {
/**
* Gets a ZKWatcher.
*/
- public static ZKWatcher getZooKeeperWatcher(HBaseZKTestingUtility testUtil)
- throws ZooKeeperConnectionException, IOException {
+ public static ZKWatcher getZooKeeperWatcher(HBaseZKTestingUtility testUtil) throws IOException {
ZKWatcher zkw = new ZKWatcher(testUtil.getConfiguration(), "unittest", new Abortable() {
boolean aborted = false;