You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by as...@apache.org on 2016/06/03 13:15:02 UTC

hbase git commit: HBASE-15888 Extend HBASE-12769 for bulk load data replication

Repository: hbase
Updated Branches:
  refs/heads/master 72d3f2a86 -> 0cbce0762


HBASE-15888 Extend HBASE-12769 for bulk load data replication


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0cbce076
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0cbce076
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0cbce076

Branch: refs/heads/master
Commit: 0cbce07626b77d9aa75a16f5e52c19428865dce7
Parents: 72d3f2a
Author: Ashish Singhi <as...@apache.org>
Authored: Fri Jun 3 18:42:00 2016 +0530
Committer: Ashish Singhi <as...@apache.org>
Committed: Fri Jun 3 18:42:00 2016 +0530

----------------------------------------------------------------------
 .../replication/ReplicationPeersZKImpl.java     |  6 ++
 .../hbase/util/hbck/ReplicationChecker.java     | 59 ++++++++++++++++++--
 2 files changed, 61 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/0cbce076/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index 09d2100..15265d9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -550,6 +550,12 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
           }
         }
       }
+      // Check for hfile-refs queue
+      if (-1 != ZKUtil.checkExists(zookeeper, hfileRefsZNode)
+          && queuesClient.getAllPeersFromHFileRefsQueue().contains(peerId)) {
+        throw new ReplicationException("Undeleted queue for peerId: " + peerId
+            + ", found in hfile-refs node path " + hfileRefsZNode);
+      }
     } catch (KeeperException e) {
       throw new ReplicationException("Could not check queues deleted with id=" + peerId, e);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0cbce076/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
index 89f2557..e472558 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
@@ -51,16 +51,21 @@ import org.apache.zookeeper.KeeperException;
 @InterfaceAudience.Private
 public class ReplicationChecker {
   private static final Log LOG = LogFactory.getLog(ReplicationChecker.class);
+  private final ZooKeeperWatcher zkw;
   private final ErrorReporter errorReporter;
   private final ReplicationQueuesClient queuesClient;
   private final ReplicationPeers replicationPeers;
   private final ReplicationQueueDeletor queueDeletor;
   // replicator with its queueIds for removed peers
   private final Map<String, List<String>> undeletedQueueIds = new HashMap<>();
-  
+  // replicator with its undeleted queueIds for removed peers in hfile-refs queue
+  private Set<String> undeletedHFileRefsQueueIds = new HashSet<>();
+  private final String hfileRefsZNode;
+
   public ReplicationChecker(Configuration conf, ZooKeeperWatcher zkw, ClusterConnection connection,
       ErrorReporter errorReporter) throws IOException {
     try {
+      this.zkw = zkw;
       this.errorReporter = errorReporter;
       this.queuesClient = ReplicationFactory.getReplicationQueuesClient(zkw, conf, connection);
       this.queuesClient.init();
@@ -71,6 +76,13 @@ public class ReplicationChecker {
     } catch (ReplicationException e) {
       throw new IOException("failed to construct ReplicationChecker", e);
     }
+
+    String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
+    String replicationZNode = ZKUtil.joinZNode(this.zkw.baseZNode, replicationZNodeName);
+    String hfileRefsZNodeName =
+        conf.get(ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
+          ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT);
+    hfileRefsZNode = ZKUtil.joinZNode(replicationZNode, hfileRefsZNodeName);
   }
 
   public boolean hasUnDeletedQueues() {
@@ -103,13 +115,37 @@ public class ReplicationChecker {
     } catch (KeeperException ke) {
       throw new IOException(ke);
     }
+
+    checkUnDeletedHFileRefsQueues(peerIds);
+  }
+
+  private void checkUnDeletedHFileRefsQueues(Set<String> peerIds) throws IOException {
+    try {
+      if (-1 == ZKUtil.checkExists(zkw, hfileRefsZNode)) {
+        return;
+      }
+      List<String> listOfPeers = this.queuesClient.getAllPeersFromHFileRefsQueue();
+      Set<String> peers = new HashSet<>(listOfPeers);
+      peers.removeAll(peerIds);
+      if (!peers.isEmpty()) {
+        undeletedHFileRefsQueueIds.addAll(peers);
+        String msg =
+            "Undeleted replication hfile-refs queue for removed peer found: "
+                + undeletedHFileRefsQueueIds + " under hfile-refs node " + hfileRefsZNode;
+        errorReporter.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE,
+          msg);
+      }
+    } catch (KeeperException e) {
+      throw new IOException("Failed to get list of all peers from hfile-refs znode "
+          + hfileRefsZNode, e);
+    }
   }
-  
+
   private static class ReplicationQueueDeletor extends ReplicationStateZKBase {
     public ReplicationQueueDeletor(ZooKeeperWatcher zk, Configuration conf, Abortable abortable) {
       super(zk, conf, abortable);
     }
-    
+
     public void removeQueue(String replicator, String queueId) throws IOException {
       String queueZnodePath = ZKUtil.joinZNode(ZKUtil.joinZNode(this.queuesZNode, replicator),
         queueId);
@@ -122,7 +158,7 @@ public class ReplicationChecker {
       }
     }
   }
-  
+
   public void fixUnDeletedQueues() throws IOException {
     for (Entry<String, List<String>> replicatorAndQueueIds : undeletedQueueIds.entrySet()) {
       String replicator = replicatorAndQueueIds.getKey();
@@ -130,5 +166,20 @@ public class ReplicationChecker {
         queueDeletor.removeQueue(replicator, queueId);
       }
     }
+    fixUnDeletedHFileRefsQueue();
+  }
+
+  private void fixUnDeletedHFileRefsQueue() throws IOException {
+    for (String hfileRefsQueueId : undeletedHFileRefsQueueIds) {
+      String node = ZKUtil.joinZNode(hfileRefsZNode, hfileRefsQueueId);
+      try {
+        ZKUtil.deleteNodeRecursively(this.zkw, node);
+        LOG.info("Successfully deleted hfile-refs queue " + hfileRefsQueueId + " from path "
+            + hfileRefsZNode);
+      } catch (KeeperException e) {
+        throw new IOException("Failed to delete hfile-refs queue " + hfileRefsQueueId
+            + " from path " + hfileRefsZNode);
+      }
+    }
   }
 }