You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by op...@apache.org on 2018/03/26 13:09:54 UTC
hbase git commit: HBASE-20138 Find a way to deal with the conflicts
when updating replication position
Repository: hbase
Updated Branches:
refs/heads/master e9701a059 -> 7a1d00c7a
HBASE-20138 Find a way to deal with the conflicts when updating replication position
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7a1d00c7
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7a1d00c7
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7a1d00c7
Branch: refs/heads/master
Commit: 7a1d00c7a0719ccb8d97b42965835f124ef27804
Parents: e9701a0
Author: huzheng <op...@gmail.com>
Authored: Wed Mar 21 17:34:10 2018 +0800
Committer: huzheng <op...@gmail.com>
Committed: Mon Mar 26 21:07:22 2018 +0800
----------------------------------------------------------------------
.../replication/ZKReplicationQueueStorage.java | 110 +++++++++++++------
.../replication/TestReplicationStateBasic.java | 6 +
.../TestZKReplicationQueueStorage.java | 48 +++++++-
.../apache/hadoop/hbase/zookeeper/ZKUtil.java | 34 ++++--
.../hadoop/hbase/zookeeper/TestZKUtil.java | 25 +++++
5 files changed, 178 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/7a1d00c7/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
index e5a498a..19986f1 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
@@ -204,20 +204,25 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
}
private void addLastSeqIdsToOps(String queueId, Map<String, Long> lastSeqIds,
- List<ZKUtilOp> listOfOps) throws KeeperException {
+ List<ZKUtilOp> listOfOps) throws KeeperException, ReplicationException {
+ String peerId = new ReplicationQueueInfo(queueId).getPeerId();
for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
- String peerId = new ReplicationQueueInfo(queueId).getPeerId();
String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId);
- /*
- * Make sure the existence of path
- * /hbase/replication/regions/<hash>/<encoded-region-name>-<peer-id>. As the javadoc in
- * multiOrSequential() method said, if received a NodeExistsException, all operations will
- * fail. So create the path here, and in fact, no need to add this operation to listOfOps,
- * because only need to make sure that update file position and sequence id atomically.
- */
- ZKUtil.createWithParents(zookeeper, path);
- // Persist the max sequence id of region to zookeeper.
- listOfOps.add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue())));
+ Pair<Long, Integer> p = getLastSequenceIdWithVersion(lastSeqEntry.getKey(), peerId);
+ byte[] data = ZKUtil.positionToByteArray(lastSeqEntry.getValue());
+ if (p.getSecond() < 0) { // ZNode does not exist.
+ ZKUtil.createWithParents(zookeeper,
+ path.substring(0, path.lastIndexOf(ZNodePaths.ZNODE_PATH_SEPARATOR)));
+ listOfOps.add(ZKUtilOp.createAndFailSilent(path, data));
+ continue;
+ }
+ // Perform CAS in a specific version v0 (HBASE-20138)
+ int v0 = p.getSecond();
+ long lastPushedSeqId = p.getFirst();
+ if (lastSeqEntry.getValue() <= lastPushedSeqId) {
+ continue;
+ }
+ listOfOps.add(ZKUtilOp.setData(path, data, v0));
}
}
@@ -225,50 +230,85 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
public void setWALPosition(ServerName serverName, String queueId, String fileName, long position,
Map<String, Long> lastSeqIds) throws ReplicationException {
try {
- List<ZKUtilOp> listOfOps = new ArrayList<>();
- if (position > 0) {
- listOfOps.add(ZKUtilOp.setData(getFileNode(serverName, queueId, fileName),
- ZKUtil.positionToByteArray(position)));
+ for (int retry = 0;; retry++) {
+ List<ZKUtilOp> listOfOps = new ArrayList<>();
+ if (position > 0) {
+ listOfOps.add(ZKUtilOp.setData(getFileNode(serverName, queueId, fileName),
+ ZKUtil.positionToByteArray(position)));
+ }
+ // Persist the max sequence id(s) of regions for serial replication atomically.
+ addLastSeqIdsToOps(queueId, lastSeqIds, listOfOps);
+ if (listOfOps.isEmpty()) {
+ return;
+ }
+ try {
+ ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
+ return;
+ } catch (KeeperException.BadVersionException | KeeperException.NodeExistsException e) {
+ LOG.warn(
+ "Bad version(or node exist) when persist the last pushed sequence id to zookeeper storage, "
+ + "Retry = " + retry + ", serverName=" + serverName + ", queueId=" + queueId
+ + ", fileName=" + fileName);
+ }
}
- // Persist the max sequence id(s) of regions for serial replication atomically.
- addLastSeqIdsToOps(queueId, lastSeqIds, listOfOps);
- ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
} catch (KeeperException e) {
throw new ReplicationException("Failed to set log position (serverName=" + serverName
+ ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position + ")", e);
}
}
- @Override
- public long getLastSequenceId(String encodedRegionName, String peerId)
- throws ReplicationException {
- byte[] data;
- try {
- data =
- ZKUtil.getData(zookeeper, getSerialReplicationRegionPeerNode(encodedRegionName, peerId));
- } catch (KeeperException | InterruptedException e) {
- throw new ReplicationException("Failed to get the last sequence id(region="
- + encodedRegionName + ", peerId=" + peerId + ")");
+ /**
+ * Return the {lastPushedSequenceId, ZNodeDataVersion} pair. if ZNodeDataVersion is -1, it means
+ * that the ZNode does not exist.
+ */
+ @VisibleForTesting
+ protected Pair<Long, Integer> getLastSequenceIdWithVersion(String encodedRegionName,
+ String peerId) throws KeeperException {
+ Stat stat = new Stat();
+ String path = getSerialReplicationRegionPeerNode(encodedRegionName, peerId);
+ byte[] data = ZKUtil.getDataNoWatch(zookeeper, path, stat);
+ if (data == null) {
+ // ZNode does not exist, so just return version -1 to indicate that no node exist.
+ return Pair.newPair(HConstants.NO_SEQNUM, -1);
}
try {
- return ZKUtil.parseWALPositionFrom(data);
+ return Pair.newPair(ZKUtil.parseWALPositionFrom(data), stat.getVersion());
} catch (DeserializationException de) {
LOG.warn("Failed to parse log position (region=" + encodedRegionName + ", peerId=" + peerId
+ "), data=" + Bytes.toStringBinary(data));
}
- return HConstants.NO_SEQNUM;
+ return Pair.newPair(HConstants.NO_SEQNUM, stat.getVersion());
+ }
+
+ @Override
+ public long getLastSequenceId(String encodedRegionName, String peerId)
+ throws ReplicationException {
+ try {
+ return getLastSequenceIdWithVersion(encodedRegionName, peerId).getFirst();
+ } catch (KeeperException e) {
+ throw new ReplicationException("Failed to get last pushed sequence id (encodedRegionName="
+ + encodedRegionName + ", peerId=" + peerId + ")", e);
+ }
}
@Override
public void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds)
throws ReplicationException {
try {
+ // No need CAS and retry here, because it'll call setLastSequenceIds() for disabled peers
+ // only, so no conflict happen.
List<ZKUtilOp> listOfOps = new ArrayList<>();
- addLastSeqIdsToOps(peerId, lastSeqIds, listOfOps);
- ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
+ for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
+ String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId);
+ ZKUtil.createWithParents(zookeeper, path);
+ listOfOps.add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue())));
+ }
+ if (!listOfOps.isEmpty()) {
+ ZKUtil.multiOrSequential(zookeeper, listOfOps, true);
+ }
} catch (KeeperException e) {
- throw new ReplicationException("Failed to set last sequence ids, peerId=" + peerId +
- ", lastSeqIds.size=" + lastSeqIds.size(), e);
+ throw new ReplicationException("Failed to set last sequence ids, peerId=" + peerId
+ + ", size of lastSeqIds=" + lastSeqIds.size(), e);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/7a1d00c7/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index 3ed4121..437804c 100644
--- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -316,6 +316,12 @@ public abstract class TestReplicationStateBasic {
}
assertEquals(900L, rqs.getLastSequenceId(region0, queue1));
assertEquals(1000L, rqs.getLastSequenceId(region1, queue1));
+
+ // Try to decrease the last pushed id by setWALPosition method.
+ rqs.setWALPosition(serverName1, queue1, getFileName("file1", 0), 11 * 100,
+ ImmutableMap.of(region0, 899L, region1, 1001L));
+ assertEquals(900L, rqs.getLastSequenceId(region0, queue1));
+ assertEquals(1001L, rqs.getLastSequenceId(region1, queue1));
}
protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception {
http://git-wip-us.apache.org/repos/asf/hbase/blob/7a1d00c7/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
index ca86a05..5821271 100644
--- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
+++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
@@ -28,6 +28,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
+
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseZKTestingUtility;
@@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.AfterClass;
@@ -44,6 +46,8 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+
@Category({ ReplicationTests.class, MediumTests.class })
public class TestZKReplicationQueueStorage {
@@ -215,10 +219,11 @@ public class TestZKReplicationQueueStorage {
assertEquals(1, v1 - v0);
}
- private ZKReplicationQueueStorage createWithUnstableCversion() throws IOException {
+ private ZKReplicationQueueStorage createWithUnstableVersion() throws IOException {
return new ZKReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()) {
private int called = 0;
+ private int getLastSeqIdOpIndex = 0;
@Override
protected int getQueuesZNodeCversion() throws KeeperException {
@@ -227,12 +232,26 @@ public class TestZKReplicationQueueStorage {
}
return called;
}
+
+ @Override
+ protected Pair<Long, Integer> getLastSequenceIdWithVersion(String encodedRegionName,
+ String peerId) throws KeeperException {
+ Pair<Long, Integer> oldPair = super.getLastSequenceIdWithVersion(encodedRegionName, peerId);
+ if (getLastSeqIdOpIndex < 100) {
+ // Let the ZNode version increase.
+ String path = getSerialReplicationRegionPeerNode(encodedRegionName, peerId);
+ ZKUtil.createWithParents(zookeeper, path);
+ ZKUtil.setData(zookeeper, path, ZKUtil.positionToByteArray(100L));
+ }
+ getLastSeqIdOpIndex++;
+ return oldPair;
+ }
};
}
@Test
public void testGetAllWALsCversionChange() throws IOException, ReplicationException {
- ZKReplicationQueueStorage storage = createWithUnstableCversion();
+ ZKReplicationQueueStorage storage = createWithUnstableVersion();
storage.addWAL(getServerName(0), "1", "file");
// This should return eventually when cversion stabilizes
Set<String> allWals = storage.getAllWALs();
@@ -243,7 +262,7 @@ public class TestZKReplicationQueueStorage {
// For HBASE-14621
@Test
public void testGetAllHFileRefsCversionChange() throws IOException, ReplicationException {
- ZKReplicationQueueStorage storage = createWithUnstableCversion();
+ ZKReplicationQueueStorage storage = createWithUnstableVersion();
storage.addPeerToHFileRefs("1");
Path p = new Path("/test");
storage.addHFileRefs("1", Arrays.asList(Pair.newPair(p, p)));
@@ -253,6 +272,29 @@ public class TestZKReplicationQueueStorage {
assertThat(allHFileRefs, hasItems("test"));
}
+ // For HBASE-20138
+ @Test
+ public void testSetWALPositionBadVersion() throws IOException, ReplicationException {
+ ZKReplicationQueueStorage storage = createWithUnstableVersion();
+ ServerName serverName1 = ServerName.valueOf("128.0.0.1", 8000, 10000);
+ assertTrue(storage.getAllQueues(serverName1).isEmpty());
+ String queue1 = "1";
+ String fileName = getFileName("file1", 0);
+ String encodedRegionName = "31d9792f4435b99d9fb1016f6fbc8dc6";
+ storage.addWAL(serverName1, queue1, fileName);
+
+ List<String> wals1 = storage.getWALsInQueue(serverName1, queue1);
+ assertEquals(1, wals1.size());
+
+ assertEquals(0, storage.getWALPosition(serverName1, queue1, fileName));
+ // This should return eventually when data version stabilizes
+ storage.setWALPosition(serverName1, queue1, fileName, 100,
+ ImmutableMap.of(encodedRegionName, 120L));
+
+ assertEquals(100, storage.getWALPosition(serverName1, queue1, fileName));
+ assertEquals(120L, storage.getLastSequenceId(encodedRegionName, queue1));
+ }
+
@Test
public void testRegionsZNodeLayout() throws Exception {
String peerId = "1";
http://git-wip-us.apache.org/repos/asf/hbase/blob/7a1d00c7/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
index 7bb4752..75ad0cb 100644
--- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
@@ -1500,11 +1500,18 @@ public final class ZKUtil {
/**
* @return a setData ZKUtilOp
*/
- public static ZKUtilOp setData(String path, byte [] data) {
+ public static ZKUtilOp setData(String path, byte[] data) {
return new SetData(path, data);
}
/**
+ * @return a setData ZKUtilOp
+ */
+ public static ZKUtilOp setData(String path, byte[] data, int version) {
+ return new SetData(path, data, version);
+ }
+
+ /**
* @return path to znode where the ZKOp will occur
*/
public String getPath() {
@@ -1578,17 +1585,28 @@ public final class ZKUtil {
* ZKUtilOp representing setData in ZooKeeper
*/
public static final class SetData extends ZKUtilOp {
- private byte [] data;
+ private byte[] data;
+ private int version = -1;
+
+ private SetData(String path, byte[] data) {
+ super(path);
+ this.data = data;
+ }
- private SetData(String path, byte [] data) {
+ private SetData(String path, byte[] data, int version) {
super(path);
this.data = data;
+ this.version = version;
}
public byte[] getData() {
return data;
}
+ public int getVersion() {
+ return version;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -1599,13 +1617,15 @@ public final class ZKUtil {
}
SetData op = (SetData) o;
- return getPath().equals(op.getPath()) && Arrays.equals(data, op.data);
+ return getPath().equals(op.getPath()) && Arrays.equals(data, op.data)
+ && getVersion() == op.getVersion();
}
@Override
public int hashCode() {
int ret = getPath().hashCode();
- return ret * 31 + Bytes.hashCode(data);
+ ret = ret * 31 + Bytes.hashCode(data);
+ return ret * 31 + Integer.hashCode(version);
}
}
}
@@ -1626,8 +1646,8 @@ public final class ZKUtil {
DeleteNodeFailSilent dnfs = (DeleteNodeFailSilent)op;
return Op.delete(dnfs.getPath(), -1);
} else if (op instanceof SetData) {
- SetData sd = (SetData)op;
- return Op.setData(sd.getPath(), sd.getData(), -1);
+ SetData sd = (SetData) op;
+ return Op.setData(sd.getPath(), sd.getData(), sd.getVersion());
} else {
throw new UnsupportedOperationException("Unexpected ZKUtilOp type: "
+ op.getClass().getName());
http://git-wip-us.apache.org/repos/asf/hbase/blob/7a1d00c7/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java
index 6c3279a..1508441 100644
--- a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java
+++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.zookeeper;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ZKTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
@@ -46,6 +48,7 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
@Category({ ZKTests.class, MediumTests.class })
@@ -117,6 +120,28 @@ public class TestZKUtil {
assertNull(ZKUtil.getDataNoWatch(ZKW, "/l1/l2", null));
}
+ private int getZNodeDataVersion(String znode) throws KeeperException {
+ Stat stat = new Stat();
+ ZKUtil.getDataNoWatch(ZKW, znode, stat);
+ return stat.getVersion();
+ }
+
+ @Test
+ public void testSetDataWithVersion() throws Exception {
+ ZKUtil.createWithParents(ZKW, "/s1/s2/s3");
+ int v0 = getZNodeDataVersion("/s1/s2/s3");
+ assertEquals(0, v0);
+
+ ZKUtil.setData(ZKW, "/s1/s2/s3", Bytes.toBytes(12L));
+ int v1 = getZNodeDataVersion("/s1/s2/s3");
+ assertEquals(1, v1);
+
+ ZKUtil.multiOrSequential(ZKW,
+ ImmutableList.of(ZKUtilOp.setData("/s1/s2/s3", Bytes.toBytes(13L), v1)), false);
+ int v2 = getZNodeDataVersion("/s1/s2/s3");
+ assertEquals(2, v2);
+ }
+
/**
* A test for HBASE-3238
* @throws IOException A connection attempt to zk failed