You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/03/07 10:21:45 UTC

[44/50] [abbrv] hbase git commit: HBASE-19622 Reimplement ReplicationPeers with the new replication storage interface

http://git-wip-us.apache.org/repos/asf/hbase/blob/bab8c37f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 5b7bab8..ce9882a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -160,7 +160,6 @@ public class ReplicationSourceManager implements ReplicationListener {
     this.clusterId = clusterId;
     this.walFileLengthProvider = walFileLengthProvider;
     this.replicationTracker.registerListener(this);
-    this.replicationPeers.getAllPeerIds();
     // It's preferable to failover 1 RS at a time, but with good zk servers
     // more could be processed at the same time.
     int nbWorkers = conf.getInt("replication.executor.workers", 1);
@@ -260,8 +259,8 @@ public class ReplicationSourceManager implements ReplicationListener {
     }
     List<ServerName> otherRegionServers = replicationTracker.getListOfRegionServers().stream()
         .map(ServerName::valueOf).collect(Collectors.toList());
-    LOG.info(
-      "Current list of replicators: " + currentReplicators + " other RSs: " + otherRegionServers);
+    LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
+        + otherRegionServers);
 
     // Look if there's anything to process after a restart
     for (ServerName rs : currentReplicators) {
@@ -278,7 +277,7 @@ public class ReplicationSourceManager implements ReplicationListener {
    * The returned future is for adoptAbandonedQueues task.
    */
   Future<?> init() throws IOException, ReplicationException {
-    for (String id : this.replicationPeers.getConnectedPeerIds()) {
+    for (String id : this.replicationPeers.getAllPeerIds()) {
       addSource(id);
       if (replicationForBulkLoadDataEnabled) {
         // Check if peer exists in hfile-refs queue, if not add it. This can happen in the case
@@ -297,8 +296,8 @@ public class ReplicationSourceManager implements ReplicationListener {
    */
   @VisibleForTesting
   ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException {
-    ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id);
-    ReplicationPeer peer = replicationPeers.getConnectedPeer(id);
+    ReplicationPeerConfig peerConfig = replicationPeers.getPeerConfig(id);
+    ReplicationPeer peer = replicationPeers.getPeer(id);
     ReplicationSourceInterface src = getReplicationSource(id, peerConfig, peer);
     synchronized (this.walsById) {
       this.sources.add(src);
@@ -344,7 +343,7 @@ public class ReplicationSourceManager implements ReplicationListener {
   public void deleteSource(String peerId, boolean closeConnection) {
     abortWhenFail(() -> this.queueStorage.removeQueue(server.getServerName(), peerId));
     if (closeConnection) {
-      this.replicationPeers.peerDisconnected(peerId);
+      this.replicationPeers.removePeer(peerId);
     }
   }
 
@@ -437,12 +436,12 @@ public class ReplicationSourceManager implements ReplicationListener {
     // update replication queues on ZK
     // synchronize on replicationPeers to avoid adding source for the to-be-removed peer
     synchronized (replicationPeers) {
-      for (String id : replicationPeers.getConnectedPeerIds()) {
+      for (String id : replicationPeers.getAllPeerIds()) {
         try {
           this.queueStorage.addWAL(server.getServerName(), id, logName);
         } catch (ReplicationException e) {
-          throw new IOException("Cannot add log to replication queue" +
-            " when creating a new source, queueId=" + id + ", filename=" + logName, e);
+          throw new IOException("Cannot add log to replication queue"
+              + " when creating a new source, queueId=" + id + ", filename=" + logName, e);
         }
       }
     }
@@ -587,7 +586,7 @@ public class ReplicationSourceManager implements ReplicationListener {
 
   public void addPeer(String id) throws ReplicationException, IOException {
     LOG.info("Trying to add peer, peerId: " + id);
-    boolean added = this.replicationPeers.peerConnected(id);
+    boolean added = this.replicationPeers.addPeer(id);
     if (added) {
       LOG.info("Peer " + id + " connected success, trying to start the replication source thread.");
       addSource(id);
@@ -723,16 +722,21 @@ public class ReplicationSourceManager implements ReplicationListener {
           // there is not an actual peer defined corresponding to peerId for the failover.
           ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
           String actualPeerId = replicationQueueInfo.getPeerId();
-          ReplicationPeer peer = replicationPeers.getConnectedPeer(actualPeerId);
+
+          ReplicationPeer peer = replicationPeers.getPeer(actualPeerId);
+          if (peer == null) {
+            LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS
+                + ", peer is null");
+            abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), peerId));
+            continue;
+          }
+
           ReplicationPeerConfig peerConfig = null;
           try {
-            peerConfig = replicationPeers.getReplicationPeerConfig(actualPeerId);
-          } catch (ReplicationException ex) {
-            LOG.warn("Received exception while getting replication peer config, skipping replay"
-                + ex);
-          }
-          if (peer == null || peerConfig == null) {
-            LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS);
+            peerConfig = replicationPeers.getPeerConfig(actualPeerId);
+          } catch (Exception e) {
+            LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS
+                + ", failed to read peer config", e);
             abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), peerId));
             continue;
           }
@@ -761,7 +765,7 @@ public class ReplicationSourceManager implements ReplicationListener {
           // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
           // see removePeer
           synchronized (oldsources) {
-            if (!replicationPeers.getConnectedPeerIds().contains(src.getPeerId())) {
+            if (!replicationPeers.getAllPeerIds().contains(src.getPeerId())) {
               src.terminate("Recovered queue doesn't belong to any current peer");
               closeRecoveredQueue(src);
               continue;

http://git-wip-us.apache.org/repos/asf/hbase/blob/bab8c37f/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
index e1eb822..08dd428 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
@@ -94,7 +94,7 @@ public class TestReplicationHFileCleaner {
     server = new DummyServer();
     conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
     HMaster.decorateMasterConfiguration(conf);
-    rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf, server);
+    rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf);
     rp.init();
     rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
     fs = FileSystem.get(conf);
@@ -108,7 +108,8 @@ public class TestReplicationHFileCleaner {
   @Before
   public void setup() throws ReplicationException, IOException {
     root = TEST_UTIL.getDataTestDirOnTestFS();
-    rp.registerPeer(peerId, new ReplicationPeerConfig().setClusterKey(TEST_UTIL.getClusterKey()));
+    rp.getPeerStorage().addPeer(peerId,
+      ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL.getClusterKey()).build(), true);
     rq.addPeerToHFileRefs(peerId);
   }
 
@@ -119,7 +120,7 @@ public class TestReplicationHFileCleaner {
     } catch (IOException e) {
       LOG.warn("Failed to delete files recursively from path " + root);
     }
-    rp.unregisterPeer(peerId);
+    rp.getPeerStorage().removePeer(peerId);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hbase/blob/bab8c37f/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
index df2b97c..f30c48d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
@@ -19,9 +19,7 @@ package org.apache.hadoop.hbase.replication;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/hbase/blob/bab8c37f/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
index c18996f..a0245d0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
+import org.apache.zookeeper.KeeperException;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -91,7 +92,7 @@ public class TestReplicationTrackerZKImpl {
     String fakeRs1 = ZNodePaths.joinZNode(zkw.znodePaths.rsZNode, "hostname1.example.org:1234");
     try {
       ZKClusterId.setClusterId(zkw, new ClusterId());
-      rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw);
+      rp = ReplicationFactory.getReplicationPeers(zkw, conf);
       rp.init();
       rt = ReplicationFactory.getReplicationTracker(zkw, rp, conf, zkw, new DummyServer(fakeRs1));
     } catch (Exception e) {
@@ -151,25 +152,22 @@ public class TestReplicationTrackerZKImpl {
   @Test
   public void testPeerNameControl() throws Exception {
     int exists = 0;
-    int hyphen = 0;
-    rp.registerPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
+    rp.getPeerStorage().addPeer("6",
+      ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true);
 
-    try{
-      rp.registerPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
-    }catch(IllegalArgumentException e){
-      exists++;
+    try {
+      rp.getPeerStorage().addPeer("6",
+        ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true);
+    } catch (ReplicationException e) {
+      if (e.getCause() instanceof KeeperException.NodeExistsException) {
+        exists++;
+      }
     }
 
-    try{
-      rp.registerPeer("6-ec2", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()));
-    }catch(IllegalArgumentException e){
-      hyphen++;
-    }
     assertEquals(1, exists);
-    assertEquals(1, hyphen);
 
     // clean up
-    rp.unregisterPeer("6");
+    rp.getPeerStorage().removePeer("6");
   }
 
   private class DummyReplicationListener implements ReplicationListener {

http://git-wip-us.apache.org/repos/asf/hbase/blob/bab8c37f/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 1d3b6ea..77b2fb2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -389,7 +389,7 @@ public abstract class TestReplicationSourceManager {
     }
     Server s1 = new DummyServer("dummyserver1.example.org");
     ReplicationPeers rp1 =
-        ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1);
+        ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration());
     rp1.init();
     NodeFailoverWorker w1 =
         manager.new NodeFailoverWorker(server.getServerName());
@@ -585,7 +585,7 @@ public abstract class TestReplicationSourceManager {
   private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig,
       final boolean waitForSource) throws Exception {
     final ReplicationPeers rp = manager.getReplicationPeers();
-    rp.registerPeer(peerId, peerConfig);
+    rp.getPeerStorage().addPeer(peerId, peerConfig, true);
     try {
       manager.addPeer(peerId);
     } catch (Exception e) {
@@ -612,7 +612,7 @@ public abstract class TestReplicationSourceManager {
         }
         return true;
       } else {
-        return (rp.getConnectedPeer(peerId) != null);
+        return (rp.getPeer(peerId) != null);
       }
     });
   }
@@ -624,8 +624,8 @@ public abstract class TestReplicationSourceManager {
    */
   private void removePeerAndWait(final String peerId) throws Exception {
     final ReplicationPeers rp = manager.getReplicationPeers();
-    if (rp.getAllPeerIds().contains(peerId)) {
-      rp.unregisterPeer(peerId);
+    if (rp.getPeerStorage().listPeerIds().contains(peerId)) {
+      rp.getPeerStorage().removePeer(peerId);
       try {
         manager.removePeer(peerId);
       } catch (Exception e) {
@@ -635,10 +635,9 @@ public abstract class TestReplicationSourceManager {
     Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
       @Override
       public boolean evaluate() throws Exception {
-        List<String> peers = rp.getAllPeerIds();
-        return (!manager.getAllQueues().contains(peerId)) &&
-          (rp.getConnectedPeer(peerId) == null) && (!peers.contains(peerId)) &&
-          manager.getSource(peerId) == null;
+        Collection<String> peers = rp.getPeerStorage().listPeerIds();
+        return (!manager.getAllQueues().contains(peerId)) && (rp.getPeer(peerId) == null)
+            && (!peers.contains(peerId)) && manager.getSource(peerId) == null;
       }
     });
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/bab8c37f/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/HBaseZKTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/HBaseZKTestingUtility.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/HBaseZKTestingUtility.java
index fc31c37..b755c32 100644
--- a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/HBaseZKTestingUtility.java
+++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/HBaseZKTestingUtility.java
@@ -182,8 +182,7 @@ public class HBaseZKTestingUtility extends HBaseCommonTestingUtility {
   /**
    * Gets a ZKWatcher.
    */
-  public static ZKWatcher getZooKeeperWatcher(HBaseZKTestingUtility testUtil)
-      throws ZooKeeperConnectionException, IOException {
+  public static ZKWatcher getZooKeeperWatcher(HBaseZKTestingUtility testUtil) throws IOException {
     ZKWatcher zkw = new ZKWatcher(testUtil.getConfiguration(), "unittest", new Abortable() {
       boolean aborted = false;