You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by as...@apache.org on 2016/06/09 13:15:20 UTC
hbase git commit: HBASE-15952 Bulk load data replication is not
working when RS user does not have permission on hfile-refs node
Repository: hbase
Updated Branches:
refs/heads/master 41cc21554 -> 9012a0b12
HBASE-15952 Bulk load data replication is not working when RS user does not have permission on hfile-refs node
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9012a0b1
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9012a0b1
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9012a0b1
Branch: refs/heads/master
Commit: 9012a0b123b3eea8b08c8687cef812e83e9b491d
Parents: 41cc215
Author: Ashish Singhi <as...@apache.org>
Authored: Thu Jun 9 18:44:29 2016 +0530
Committer: Ashish Singhi <as...@apache.org>
Committed: Thu Jun 9 18:44:29 2016 +0530
----------------------------------------------------------------------
.../replication/ReplicationPeersZKImpl.java | 21 -------------
.../hbase/replication/ReplicationQueues.java | 6 ++++
.../replication/ReplicationQueuesHBaseImpl.java | 6 ++++
.../replication/ReplicationQueuesZKImpl.java | 33 ++++++++++++++++----
.../regionserver/ReplicationSourceManager.java | 11 +++++--
.../cleaner/TestReplicationHFileCleaner.java | 1 +
.../replication/TestReplicationStateBasic.java | 5 +++
7 files changed, 53 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/9012a0b1/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index 15265d9..5af97c2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -129,17 +129,6 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
- // Irrespective of bulk load hfile replication is enabled or not we add peerId node to
- // hfile-refs node -- HBASE-15397
- try {
- String peerId = ZKUtil.joinZNode(this.hfileRefsZNode, id);
- LOG.info("Adding peer " + peerId + " to hfile reference queue.");
- ZKUtil.createWithParents(this.zookeeper, peerId);
- } catch (KeeperException e) {
- throw new ReplicationException("Failed to add peer with id=" + id
- + ", node under hfile references node.", e);
- }
-
List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(getPeerNode(id),
ReplicationSerDeHelper.toByteArray(peerConfig));
@@ -166,16 +155,6 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
+ " because that id does not exist.");
}
ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
- // Delete peerId node from hfile-refs node irrespective of whether bulk loaded hfile
- // replication is enabled or not
-
- String peerId = ZKUtil.joinZNode(this.hfileRefsZNode, id);
- try {
- LOG.info("Removing peer " + peerId + " from hfile reference queue.");
- ZKUtil.deleteNodeRecursively(this.zookeeper, peerId);
- } catch (NoNodeException e) {
- LOG.info("Did not find node " + peerId + " to delete.", e);
- }
} catch (KeeperException e) {
throw new ReplicationException("Could not remove peer with id=" + id, e);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9012a0b1/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
index db6da91..809b122 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
@@ -123,6 +123,12 @@ public interface ReplicationQueues {
void addPeerToHFileRefs(String peerId) throws ReplicationException;
/**
+ * Remove a peer from hfile reference queue.
+ * @param peerId peer cluster id to be removed
+ */
+ void removePeerFromHFileRefs(String peerId);
+
+ /**
* Add new hfile references to the queue.
* @param peerId peer cluster id to which the hfiles need to be replicated
* @param files list of hfile references to be added
http://git-wip-us.apache.org/repos/asf/hbase/blob/9012a0b1/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java
index bbc9e32..29f0632 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesHBaseImpl.java
@@ -302,6 +302,12 @@ public class ReplicationQueuesHBaseImpl implements ReplicationQueues {
}
@Override
+ public void removePeerFromHFileRefs(String peerId) {
+ // TODO
+ throw new NotImplementedException();
+ }
+
+ @Override
public void addHFileRefs(String peerId, List<String> files) throws ReplicationException {
// TODO
throw new NotImplementedException();
http://git-wip-us.apache.org/repos/asf/hbase/blob/9012a0b1/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
index 32d0883..f03efff 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
@@ -89,12 +89,14 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
} catch (KeeperException e) {
throw new ReplicationException("Could not initialize replication queues.", e);
}
- // Irrespective of bulk load hfile replication is enabled or not we add peerId node to
- // hfile-refs node -- HBASE-15397
- try {
- ZKUtil.createWithParents(this.zookeeper, this.hfileRefsZNode);
- } catch (KeeperException e) {
- throw new ReplicationException("Could not initialize hfile references replication queue.", e);
+ if (conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
+ HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT)) {
+ try {
+ ZKUtil.createWithParents(this.zookeeper, this.hfileRefsZNode);
+ } catch (KeeperException e) {
+ throw new ReplicationException("Could not initialize hfile references replication queue.",
+ e);
+ }
}
}
@@ -504,4 +506,23 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
e);
}
}
+
+ @Override
+ public void removePeerFromHFileRefs(String peerId) {
+ final String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId);
+ try {
+ if (ZKUtil.checkExists(this.zookeeper, peerZnode) == -1) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Peer " + peerZnode + " not found in hfile reference queue.");
+ }
+ return;
+ } else {
+ LOG.info("Removing peer " + peerZnode + " from hfile reference queue.");
+ ZKUtil.deleteNodeRecursively(this.zookeeper, peerZnode);
+ }
+ } catch (KeeperException e) {
+ LOG.error("Ignoring the exception to remove peer " + peerId + " from hfile reference queue.",
+ e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9012a0b1/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index ed2eecc..e9330f4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -115,6 +115,7 @@ public class ReplicationSourceManager implements ReplicationListener {
private final ThreadPoolExecutor executor;
private final Random rand;
+ private final boolean replicationForBulkLoadDataEnabled;
/**
@@ -166,6 +167,9 @@ public class ReplicationSourceManager implements ReplicationListener {
this.executor.setThreadFactory(tfb.build());
this.rand = new Random();
this.latestPaths = Collections.synchronizedSet(new HashSet<Path>());
+ replicationForBulkLoadDataEnabled =
+ conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
+ HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
}
/**
@@ -227,9 +231,6 @@ public class ReplicationSourceManager implements ReplicationListener {
* old region server wal queues
*/
protected void init() throws IOException, ReplicationException {
- boolean replicationForBulkLoadDataEnabled =
- conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
- HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
for (String id : this.replicationPeers.getPeerIds()) {
addSource(id);
if (replicationForBulkLoadDataEnabled) {
@@ -579,6 +580,7 @@ public class ReplicationSourceManager implements ReplicationListener {
@Override
public void peerRemoved(String peerId) {
removePeer(peerId);
+ this.replicationQueues.removePeerFromHFileRefs(peerId);
}
@Override
@@ -588,6 +590,9 @@ public class ReplicationSourceManager implements ReplicationListener {
boolean added = this.replicationPeers.peerAdded(id);
if (added) {
addSource(id);
+ if (replicationForBulkLoadDataEnabled) {
+ this.replicationQueues.addPeerToHFileRefs(id);
+ }
}
} catch (Exception e) {
LOG.error("Error while adding a new peer", e);
http://git-wip-us.apache.org/repos/asf/hbase/blob/9012a0b1/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
index 1778e73..e5f1e69 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
@@ -111,6 +111,7 @@ public class TestReplicationHFileCleaner {
public void setup() throws ReplicationException, IOException {
root = TEST_UTIL.getDataTestDirOnTestFS();
rp.addPeer(peerId, new ReplicationPeerConfig().setClusterKey(TEST_UTIL.getClusterKey()));
+ rq.addPeerToHFileRefs(peerId);
}
@After
http://git-wip-us.apache.org/repos/asf/hbase/blob/9012a0b1/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index 5ab26ab..de5cc31 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -204,6 +204,7 @@ public abstract class TestReplicationStateBasic {
assertNull(rqc.getReplicableHFiles(ID_ONE));
assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size());
rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
+ rq1.addPeerToHFileRefs(ID_ONE);
rq1.addHFileRefs(ID_ONE, files1);
assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size());
assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size());
@@ -225,7 +226,9 @@ public abstract class TestReplicationStateBasic {
rp.init();
rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
+ rq1.addPeerToHFileRefs(ID_ONE);
rp.addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO));
+ rq1.addPeerToHFileRefs(ID_TWO);
List<String> files1 = new ArrayList<String>(3);
files1.add("file_1");
@@ -238,11 +241,13 @@ public abstract class TestReplicationStateBasic {
assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size());
rp.removePeer(ID_ONE);
+ rq1.removePeerFromHFileRefs(ID_ONE);
assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size());
assertNull(rqc.getReplicableHFiles(ID_ONE));
assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size());
rp.removePeer(ID_TWO);
+ rq1.removePeerFromHFileRefs(ID_TWO);
assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size());
assertNull(rqc.getReplicableHFiles(ID_TWO));
}