You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/04/09 07:22:12 UTC
[05/20] hbase git commit: HBASE-20050 Reimplement
updateReplicationPositions logic in serial replication based on the newly
introduced replication storage layer
HBASE-20050 Reimplement updateReplicationPositions logic in serial replication based on the newly introduced replication storage layer
Signed-off-by: zhangduo <zh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1d11cdb2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1d11cdb2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1d11cdb2
Branch: refs/heads/branch-2
Commit: 1d11cdb26cf3c713a4f0306e05baa0c5865501dd
Parents: 39c1ddc
Author: huzheng <op...@gmail.com>
Authored: Wed Feb 28 16:25:24 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Mon Apr 9 15:18:44 2018 +0800
----------------------------------------------------------------------
.../replication/ReplicationQueueStorage.java | 15 +++-
.../replication/ZKReplicationQueueStorage.java | 88 ++++++++++++++++++--
.../replication/TestReplicationStateBasic.java | 48 ++++++++++-
.../TestZKReplicationQueueStorage.java | 7 +-
.../regionserver/ReplicationSourceManager.java | 4 +-
5 files changed, 146 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/1d11cdb2/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
index e774148..4c93da6 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.replication;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
@@ -63,9 +64,19 @@ public interface ReplicationQueueStorage {
* @param queueId a String that identifies the queue
* @param fileName name of the WAL
* @param position the current position in the file
+ * @param lastSeqIds map with {encodedRegionName, sequenceId} pairs for serial replication.
*/
- void setWALPosition(ServerName serverName, String queueId, String fileName, long position)
- throws ReplicationException;
+ void setWALPosition(ServerName serverName, String queueId, String fileName, long position,
+ Map<String, Long> lastSeqIds) throws ReplicationException;
+
+ /**
+ * Read the max sequence id of the specific region for a given peer. For serial replication, we
+ * need the max sequenced id to decide whether we can push the next entries.
+ * @param encodedRegionName the encoded region name
+ * @param peerId peer id
+ * @return the max sequence id of the specific region for a given peer.
+ */
+ long getLastSequenceId(String encodedRegionName, String peerId) throws ReplicationException;
/**
* Get the current position for a specific WAL in a given queue for a given regionserver.
http://git-wip-us.apache.org/repos/asf/hbase/blob/1d11cdb2/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
index da96c65..adbf259 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
@@ -23,6 +23,8 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
@@ -85,6 +87,10 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
"zookeeper.znode.replication.hfile.refs";
public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs";
+ public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY =
+ "zookeeper.znode.replication.regions";
+ public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT = "regions";
+
/**
* The name of the znode that contains all replication queues
*/
@@ -95,6 +101,8 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
*/
private final String hfileRefsZNode;
+ private final String regionsZNode;
+
public ZKReplicationQueueStorage(ZKWatcher zookeeper, Configuration conf) {
super(zookeeper, conf);
@@ -103,6 +111,8 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT);
this.queuesZNode = ZNodePaths.joinZNode(replicationZNode, queuesZNodeName);
this.hfileRefsZNode = ZNodePaths.joinZNode(replicationZNode, hfileRefsZNodeName);
+ this.regionsZNode = ZNodePaths.joinZNode(replicationZNode, conf
+ .get(ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY, ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT));
}
private String getRsNode(ServerName serverName) {
@@ -121,6 +131,28 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
return getFileNode(getQueueNode(serverName, queueId), fileName);
}
+ /**
+ * Put all regions under /hbase/replication/regions znode will lead to too many children because
+ * of the huge number of regions in real production environment. So here we use hash of encoded
+ * region name to distribute the znode into multiple znodes. <br>
+ * So the final znode path will be format like this:
+ *
+ * <pre>
+ * /hbase/replication/regions/254/dd04e76a6966d4ffa908ed0586764767-100
+ * </pre>
+ *
+ * The 254 indicate the hash of encoded region name, the 100 indicate the peer id.
+ * @param encodedRegionName the encoded region name.
+ * @param peerId peer id for replication.
+ * @return ZNode path to persist the max sequence id that we've pushed for the given region and
+ * peer.
+ */
+ private String getSerialReplicationRegionPeerNode(String encodedRegionName, String peerId) {
+ int hash = encodedRegionName.hashCode() & 0x0000FFFF;
+ String hashPath = ZNodePaths.joinZNode(regionsZNode, String.valueOf(hash));
+ return ZNodePaths.joinZNode(hashPath, String.format("%s-%s", encodedRegionName, peerId));
+ }
+
@Override
public void removeQueue(ServerName serverName, String queueId) throws ReplicationException {
try {
@@ -137,8 +169,8 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
try {
ZKUtil.createWithParents(zookeeper, getFileNode(serverName, queueId, fileName));
} catch (KeeperException e) {
- throw new ReplicationException("Failed to add wal to queue (serverName=" + serverName +
- ", queueId=" + queueId + ", fileName=" + fileName + ")", e);
+ throw new ReplicationException("Failed to add wal to queue (serverName=" + serverName
+ + ", queueId=" + queueId + ", fileName=" + fileName + ")", e);
}
}
@@ -157,15 +189,55 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
}
@Override
- public void setWALPosition(ServerName serverName, String queueId, String fileName, long position)
- throws ReplicationException {
+ public void setWALPosition(ServerName serverName, String queueId, String fileName, long position,
+ Map<String, Long> lastSeqIds) throws ReplicationException {
try {
- ZKUtil.setData(zookeeper, getFileNode(serverName, queueId, fileName),
- ZKUtil.positionToByteArray(position));
+ List<ZKUtilOp> listOfOps = new ArrayList<>();
+ listOfOps.add(ZKUtilOp.setData(getFileNode(serverName, queueId, fileName),
+ ZKUtil.positionToByteArray(position)));
+ // Persist the max sequence id(s) of regions for serial replication atomically.
+ if (lastSeqIds != null && lastSeqIds.size() > 0) {
+ for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
+ String peerId = new ReplicationQueueInfo(queueId).getPeerId();
+ String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId);
+ /*
+ * Make sure the existence of path
+ * /hbase/replication/regions/<hash>/<encoded-region-name>-<peer-id>. As the javadoc in
+ * multiOrSequential() method said, if received a NodeExistsException, all operations will
+ * fail. So create the path here, and in fact, no need to add this operation to listOfOps,
+ * because only need to make sure that update file position and sequence id atomically.
+ */
+ ZKUtil.createWithParents(zookeeper, path);
+ // Persist the max sequence id of region to zookeeper.
+ listOfOps
+ .add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue())));
+ }
+ }
+ ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
} catch (KeeperException e) {
- throw new ReplicationException("Failed to set log position (serverName=" + serverName +
- ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position + ")", e);
+ throw new ReplicationException("Failed to set log position (serverName=" + serverName
+ + ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position + ")", e);
+ }
+ }
+
+ @Override
+ public long getLastSequenceId(String encodedRegionName, String peerId)
+ throws ReplicationException {
+ byte[] data;
+ try {
+ data =
+ ZKUtil.getData(zookeeper, getSerialReplicationRegionPeerNode(encodedRegionName, peerId));
+ } catch (KeeperException | InterruptedException e) {
+ throw new ReplicationException("Failed to get the last sequence id(region="
+ + encodedRegionName + ", peerId=" + peerId + ")");
+ }
+ try {
+ return ZKUtil.parseWALPositionFrom(data);
+ } catch (DeserializationException de) {
+ LOG.warn("Failed to parse log position (region=" + encodedRegionName + ", peerId=" + peerId
+ + "), data=" + Bytes.toStringBinary(data));
}
+ return HConstants.NO_SEQNUM;
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/1d11cdb2/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index fccffb5..5999c1f 100644
--- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -17,8 +17,10 @@
*/
package org.apache.hadoop.hbase.replication;
+import static org.hamcrest.CoreMatchers.hasItems;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -26,6 +28,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.util.Pair;
@@ -35,6 +38,8 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+
/**
* White box testing for replication state interfaces. Implementations should extend this class, and
* initialize the interfaces properly.
@@ -122,7 +127,7 @@ public abstract class TestReplicationStateBasic {
assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size());
assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size());
assertEquals(0, rqs.getWALPosition(server3, "qId1", "filename0"));
- rqs.setWALPosition(server3, "qId5", "filename4", 354L);
+ rqs.setWALPosition(server3, "qId5", "filename4", 354L, null);
assertEquals(354L, rqs.getWALPosition(server3, "qId5", "filename4"));
assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size());
@@ -270,6 +275,47 @@ public abstract class TestReplicationStateBasic {
assertNumberOfPeers(2);
}
+ private String getFileName(String base, int i) {
+ return String.format(base + "-%04d", i);
+ }
+
+ @Test
+ public void testPersistLogPositionAndSeqIdAtomically() throws Exception {
+ ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000);
+ assertTrue(rqs.getAllQueues(serverName1).isEmpty());
+ String queue1 = "1";
+ String region0 = "region0", region1 = "region1";
+ for (int i = 0; i < 10; i++) {
+ rqs.addWAL(serverName1, queue1, getFileName("file1", i));
+ }
+ List<String> queueIds = rqs.getAllQueues(serverName1);
+ assertEquals(1, queueIds.size());
+ assertThat(queueIds, hasItems("1"));
+
+ List<String> wals1 = rqs.getWALsInQueue(serverName1, queue1);
+ assertEquals(10, wals1.size());
+ for (int i = 0; i < 10; i++) {
+ assertThat(wals1, hasItems(getFileName("file1", i)));
+ }
+
+ for (int i = 0; i < 10; i++) {
+ assertEquals(0, rqs.getWALPosition(serverName1, queue1, getFileName("file1", i)));
+ }
+ assertEquals(HConstants.NO_SEQNUM, rqs.getLastSequenceId(region0, queue1));
+ assertEquals(HConstants.NO_SEQNUM, rqs.getLastSequenceId(region1, queue1));
+
+ for (int i = 0; i < 10; i++) {
+ rqs.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100,
+ ImmutableMap.of(region0, i * 100L, region1, (i + 1) * 100L));
+ }
+
+ for (int i = 0; i < 10; i++) {
+ assertEquals((i + 1) * 100, rqs.getWALPosition(serverName1, queue1, getFileName("file1", i)));
+ }
+ assertEquals(900L, rqs.getLastSequenceId(region0, queue1));
+ assertEquals(1000L, rqs.getLastSequenceId(region1, queue1));
+ }
+
protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception {
// we can first check if the value was changed in the store, if it wasn't then fail right away
if (status != rp.getPeerStorage().isPeerEnabled(peerId)) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/1d11cdb2/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
index 2c01a26..8ff52f3 100644
--- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
+++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
@@ -127,7 +127,7 @@ public class TestZKReplicationQueueStorage {
List<String> wals1 = STORAGE.getWALsInQueue(serverName1, queue1);
List<String> wals2 = STORAGE.getWALsInQueue(serverName1, queue2);
assertEquals(10, wals1.size());
- assertEquals(10, wals1.size());
+ assertEquals(10, wals2.size());
for (int i = 0; i < 10; i++) {
assertThat(wals1, hasItems(getFileName("file1", i)));
assertThat(wals2, hasItems(getFileName("file2", i)));
@@ -136,8 +136,9 @@ public class TestZKReplicationQueueStorage {
for (int i = 0; i < 10; i++) {
assertEquals(0, STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i)));
assertEquals(0, STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i)));
- STORAGE.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100);
- STORAGE.setWALPosition(serverName1, queue2, getFileName("file2", i), (i + 1) * 100 + 10);
+ STORAGE.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100, null);
+ STORAGE.setWALPosition(serverName1, queue2, getFileName("file2", i), (i + 1) * 100 + 10,
+ null);
}
for (int i = 0; i < 10; i++) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/1d11cdb2/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index d11dc8e..eb9dba2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -482,8 +482,8 @@ public class ReplicationSourceManager implements ReplicationListener {
public void logPositionAndCleanOldLogs(Path log, String queueId, long position,
boolean queueRecovered) {
String fileName = log.getName();
- abortWhenFail(
- () -> this.queueStorage.setWALPosition(server.getServerName(), queueId, fileName, position));
+ abortWhenFail(() -> this.queueStorage.setWALPosition(server.getServerName(), queueId, fileName,
+ position, null));
cleanOldLogs(fileName, queueId, queueRecovered);
}