You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by op...@apache.org on 2018/03/26 13:09:54 UTC

hbase git commit: HBASE-20138 Find a way to deal with the conflicts when updating replication position

Repository: hbase
Updated Branches:
  refs/heads/master e9701a059 -> 7a1d00c7a


HBASE-20138 Find a way to deal with the conflicts when updating replication position


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7a1d00c7
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7a1d00c7
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7a1d00c7

Branch: refs/heads/master
Commit: 7a1d00c7a0719ccb8d97b42965835f124ef27804
Parents: e9701a0
Author: huzheng <op...@gmail.com>
Authored: Wed Mar 21 17:34:10 2018 +0800
Committer: huzheng <op...@gmail.com>
Committed: Mon Mar 26 21:07:22 2018 +0800

----------------------------------------------------------------------
 .../replication/ZKReplicationQueueStorage.java  | 110 +++++++++++++------
 .../replication/TestReplicationStateBasic.java  |   6 +
 .../TestZKReplicationQueueStorage.java          |  48 +++++++-
 .../apache/hadoop/hbase/zookeeper/ZKUtil.java   |  34 ++++--
 .../hadoop/hbase/zookeeper/TestZKUtil.java      |  25 +++++
 5 files changed, 178 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/7a1d00c7/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
index e5a498a..19986f1 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
@@ -204,20 +204,25 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
   }
 
   private void addLastSeqIdsToOps(String queueId, Map<String, Long> lastSeqIds,
-      List<ZKUtilOp> listOfOps) throws KeeperException {
+      List<ZKUtilOp> listOfOps) throws KeeperException, ReplicationException {
+    String peerId = new ReplicationQueueInfo(queueId).getPeerId();
     for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
-      String peerId = new ReplicationQueueInfo(queueId).getPeerId();
       String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId);
-      /*
-       * Make sure the existence of path
-       * /hbase/replication/regions/<hash>/<encoded-region-name>-<peer-id>. As the javadoc in
-       * multiOrSequential() method said, if received a NodeExistsException, all operations will
-       * fail. So create the path here, and in fact, no need to add this operation to listOfOps,
-       * because only need to make sure that update file position and sequence id atomically.
-       */
-      ZKUtil.createWithParents(zookeeper, path);
-      // Persist the max sequence id of region to zookeeper.
-      listOfOps.add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue())));
+      Pair<Long, Integer> p = getLastSequenceIdWithVersion(lastSeqEntry.getKey(), peerId);
+      byte[] data = ZKUtil.positionToByteArray(lastSeqEntry.getValue());
+      if (p.getSecond() < 0) { // ZNode does not exist.
+        ZKUtil.createWithParents(zookeeper,
+          path.substring(0, path.lastIndexOf(ZNodePaths.ZNODE_PATH_SEPARATOR)));
+        listOfOps.add(ZKUtilOp.createAndFailSilent(path, data));
+        continue;
+      }
+      // Perform CAS in a specific version v0 (HBASE-20138)
+      int v0 = p.getSecond();
+      long lastPushedSeqId = p.getFirst();
+      if (lastSeqEntry.getValue() <= lastPushedSeqId) {
+        continue;
+      }
+      listOfOps.add(ZKUtilOp.setData(path, data, v0));
     }
   }
 
@@ -225,50 +230,85 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
   public void setWALPosition(ServerName serverName, String queueId, String fileName, long position,
       Map<String, Long> lastSeqIds) throws ReplicationException {
     try {
-      List<ZKUtilOp> listOfOps = new ArrayList<>();
-      if (position > 0) {
-        listOfOps.add(ZKUtilOp.setData(getFileNode(serverName, queueId, fileName),
-          ZKUtil.positionToByteArray(position)));
+      for (int retry = 0;; retry++) {
+        List<ZKUtilOp> listOfOps = new ArrayList<>();
+        if (position > 0) {
+          listOfOps.add(ZKUtilOp.setData(getFileNode(serverName, queueId, fileName),
+            ZKUtil.positionToByteArray(position)));
+        }
+        // Persist the max sequence id(s) of regions for serial replication atomically.
+        addLastSeqIdsToOps(queueId, lastSeqIds, listOfOps);
+        if (listOfOps.isEmpty()) {
+          return;
+        }
+        try {
+          ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
+          return;
+        } catch (KeeperException.BadVersionException | KeeperException.NodeExistsException e) {
+          LOG.warn(
+            "Bad version(or node exist) when persist the last pushed sequence id to zookeeper storage, "
+                + "Retry = " + retry + ", serverName=" + serverName + ", queueId=" + queueId
+                + ", fileName=" + fileName);
+        }
       }
-      // Persist the max sequence id(s) of regions for serial replication atomically.
-      addLastSeqIdsToOps(queueId, lastSeqIds, listOfOps);
-      ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
     } catch (KeeperException e) {
       throw new ReplicationException("Failed to set log position (serverName=" + serverName
           + ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position + ")", e);
     }
   }
 
-  @Override
-  public long getLastSequenceId(String encodedRegionName, String peerId)
-      throws ReplicationException {
-    byte[] data;
-    try {
-      data =
-          ZKUtil.getData(zookeeper, getSerialReplicationRegionPeerNode(encodedRegionName, peerId));
-    } catch (KeeperException | InterruptedException e) {
-      throw new ReplicationException("Failed to get the last sequence id(region="
-          + encodedRegionName + ", peerId=" + peerId + ")");
+  /**
+   * Return the {lastPushedSequenceId, ZNodeDataVersion} pair. if ZNodeDataVersion is -1, it means
+   * that the ZNode does not exist.
+   */
+  @VisibleForTesting
+  protected Pair<Long, Integer> getLastSequenceIdWithVersion(String encodedRegionName,
+      String peerId) throws KeeperException {
+    Stat stat = new Stat();
+    String path = getSerialReplicationRegionPeerNode(encodedRegionName, peerId);
+    byte[] data = ZKUtil.getDataNoWatch(zookeeper, path, stat);
+    if (data == null) {
+      // ZNode does not exist, so just return version -1 to indicate that no node exist.
+      return Pair.newPair(HConstants.NO_SEQNUM, -1);
     }
     try {
-      return ZKUtil.parseWALPositionFrom(data);
+      return Pair.newPair(ZKUtil.parseWALPositionFrom(data), stat.getVersion());
     } catch (DeserializationException de) {
       LOG.warn("Failed to parse log position (region=" + encodedRegionName + ", peerId=" + peerId
           + "), data=" + Bytes.toStringBinary(data));
     }
-    return HConstants.NO_SEQNUM;
+    return Pair.newPair(HConstants.NO_SEQNUM, stat.getVersion());
+  }
+
+  @Override
+  public long getLastSequenceId(String encodedRegionName, String peerId)
+      throws ReplicationException {
+    try {
+      return getLastSequenceIdWithVersion(encodedRegionName, peerId).getFirst();
+    } catch (KeeperException e) {
+      throw new ReplicationException("Failed to get last pushed sequence id (encodedRegionName="
+          + encodedRegionName + ", peerId=" + peerId + ")", e);
+    }
   }
 
   @Override
   public void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds)
       throws ReplicationException {
     try {
+      // No need CAS and retry here, because it'll call setLastSequenceIds() for disabled peers
+      // only, so no conflict happen.
       List<ZKUtilOp> listOfOps = new ArrayList<>();
-      addLastSeqIdsToOps(peerId, lastSeqIds, listOfOps);
-      ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
+      for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
+        String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId);
+        ZKUtil.createWithParents(zookeeper, path);
+        listOfOps.add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue())));
+      }
+      if (!listOfOps.isEmpty()) {
+        ZKUtil.multiOrSequential(zookeeper, listOfOps, true);
+      }
     } catch (KeeperException e) {
-      throw new ReplicationException("Failed to set last sequence ids, peerId=" + peerId +
-        ", lastSeqIds.size=" + lastSeqIds.size(), e);
+      throw new ReplicationException("Failed to set last sequence ids, peerId=" + peerId
+          + ", size of lastSeqIds=" + lastSeqIds.size(), e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/7a1d00c7/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index 3ed4121..437804c 100644
--- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -316,6 +316,12 @@ public abstract class TestReplicationStateBasic {
     }
     assertEquals(900L, rqs.getLastSequenceId(region0, queue1));
     assertEquals(1000L, rqs.getLastSequenceId(region1, queue1));
+
+    // Try to decrease the last pushed id by setWALPosition method.
+    rqs.setWALPosition(serverName1, queue1, getFileName("file1", 0), 11 * 100,
+      ImmutableMap.of(region0, 899L, region1, 1001L));
+    assertEquals(900L, rqs.getLastSequenceId(region0, queue1));
+    assertEquals(1001L, rqs.getLastSequenceId(region1, queue1));
   }
 
   protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception {

http://git-wip-us.apache.org/repos/asf/hbase/blob/7a1d00c7/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
index ca86a05..5821271 100644
--- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
+++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
@@ -28,6 +28,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.SortedSet;
+
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseZKTestingUtility;
@@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.zookeeper.KeeperException;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -44,6 +46,8 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+
 @Category({ ReplicationTests.class, MediumTests.class })
 public class TestZKReplicationQueueStorage {
 
@@ -215,10 +219,11 @@ public class TestZKReplicationQueueStorage {
     assertEquals(1, v1 - v0);
   }
 
-  private ZKReplicationQueueStorage createWithUnstableCversion() throws IOException {
+  private ZKReplicationQueueStorage createWithUnstableVersion() throws IOException {
     return new ZKReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()) {
 
       private int called = 0;
+      private int getLastSeqIdOpIndex = 0;
 
       @Override
       protected int getQueuesZNodeCversion() throws KeeperException {
@@ -227,12 +232,26 @@ public class TestZKReplicationQueueStorage {
         }
         return called;
       }
+
+      @Override
+      protected Pair<Long, Integer> getLastSequenceIdWithVersion(String encodedRegionName,
+          String peerId) throws KeeperException {
+        Pair<Long, Integer> oldPair = super.getLastSequenceIdWithVersion(encodedRegionName, peerId);
+        if (getLastSeqIdOpIndex < 100) {
+          // Let the ZNode version increase.
+          String path = getSerialReplicationRegionPeerNode(encodedRegionName, peerId);
+          ZKUtil.createWithParents(zookeeper, path);
+          ZKUtil.setData(zookeeper, path, ZKUtil.positionToByteArray(100L));
+        }
+        getLastSeqIdOpIndex++;
+        return oldPair;
+      }
     };
   }
 
   @Test
   public void testGetAllWALsCversionChange() throws IOException, ReplicationException {
-    ZKReplicationQueueStorage storage = createWithUnstableCversion();
+    ZKReplicationQueueStorage storage = createWithUnstableVersion();
     storage.addWAL(getServerName(0), "1", "file");
     // This should return eventually when cversion stabilizes
     Set<String> allWals = storage.getAllWALs();
@@ -243,7 +262,7 @@ public class TestZKReplicationQueueStorage {
   // For HBASE-14621
   @Test
   public void testGetAllHFileRefsCversionChange() throws IOException, ReplicationException {
-    ZKReplicationQueueStorage storage = createWithUnstableCversion();
+    ZKReplicationQueueStorage storage = createWithUnstableVersion();
     storage.addPeerToHFileRefs("1");
     Path p = new Path("/test");
     storage.addHFileRefs("1", Arrays.asList(Pair.newPair(p, p)));
@@ -253,6 +272,29 @@ public class TestZKReplicationQueueStorage {
     assertThat(allHFileRefs, hasItems("test"));
   }
 
+  // For HBASE-20138
+  @Test
+  public void testSetWALPositionBadVersion() throws IOException, ReplicationException {
+    ZKReplicationQueueStorage storage = createWithUnstableVersion();
+    ServerName serverName1 = ServerName.valueOf("128.0.0.1", 8000, 10000);
+    assertTrue(storage.getAllQueues(serverName1).isEmpty());
+    String queue1 = "1";
+    String fileName = getFileName("file1", 0);
+    String encodedRegionName = "31d9792f4435b99d9fb1016f6fbc8dc6";
+    storage.addWAL(serverName1, queue1, fileName);
+
+    List<String> wals1 = storage.getWALsInQueue(serverName1, queue1);
+    assertEquals(1, wals1.size());
+
+    assertEquals(0, storage.getWALPosition(serverName1, queue1, fileName));
+    // This should return eventually when data version stabilizes
+    storage.setWALPosition(serverName1, queue1, fileName, 100,
+      ImmutableMap.of(encodedRegionName, 120L));
+
+    assertEquals(100, storage.getWALPosition(serverName1, queue1, fileName));
+    assertEquals(120L, storage.getLastSequenceId(encodedRegionName, queue1));
+  }
+
   @Test
   public void testRegionsZNodeLayout() throws Exception {
     String peerId = "1";

http://git-wip-us.apache.org/repos/asf/hbase/blob/7a1d00c7/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
index 7bb4752..75ad0cb 100644
--- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
@@ -1500,11 +1500,18 @@ public final class ZKUtil {
     /**
      * @return a setData ZKUtilOp
      */
-    public static ZKUtilOp setData(String path, byte [] data) {
+    public static ZKUtilOp setData(String path, byte[] data) {
       return new SetData(path, data);
     }
 
     /**
+     * @return a setData ZKUtilOp
+     */
+    public static ZKUtilOp setData(String path, byte[] data, int version) {
+      return new SetData(path, data, version);
+    }
+
+    /**
      * @return path to znode where the ZKOp will occur
      */
     public String getPath() {
@@ -1578,17 +1585,28 @@ public final class ZKUtil {
      * ZKUtilOp representing setData in ZooKeeper
      */
     public static final class SetData extends ZKUtilOp {
-      private byte [] data;
+      private byte[] data;
+      private int version = -1;
+
+      private SetData(String path, byte[] data) {
+        super(path);
+        this.data = data;
+      }
 
-      private SetData(String path, byte [] data) {
+      private SetData(String path, byte[] data, int version) {
         super(path);
         this.data = data;
+        this.version = version;
       }
 
       public byte[] getData() {
         return data;
       }
 
+      public int getVersion() {
+        return version;
+      }
+
       @Override
       public boolean equals(Object o) {
         if (this == o) {
@@ -1599,13 +1617,15 @@ public final class ZKUtil {
         }
 
         SetData op = (SetData) o;
-        return getPath().equals(op.getPath()) && Arrays.equals(data, op.data);
+        return getPath().equals(op.getPath()) && Arrays.equals(data, op.data)
+            && getVersion() == op.getVersion();
       }
 
       @Override
       public int hashCode() {
         int ret = getPath().hashCode();
-        return ret * 31 + Bytes.hashCode(data);
+        ret = ret * 31 + Bytes.hashCode(data);
+        return ret * 31 + Integer.hashCode(version);
       }
     }
   }
@@ -1626,8 +1646,8 @@ public final class ZKUtil {
       DeleteNodeFailSilent dnfs = (DeleteNodeFailSilent)op;
       return Op.delete(dnfs.getPath(), -1);
     } else if (op instanceof SetData) {
-      SetData sd = (SetData)op;
-      return Op.setData(sd.getPath(), sd.getData(), -1);
+      SetData sd = (SetData) op;
+      return Op.setData(sd.getPath(), sd.getData(), sd.getVersion());
     } else {
       throw new UnsupportedOperationException("Unexpected ZKUtilOp type: "
         + op.getClass().getName());

http://git-wip-us.apache.org/repos/asf/hbase/blob/7a1d00c7/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java
index 6c3279a..1508441 100644
--- a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java
+++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.zookeeper;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ZKTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs;
@@ -46,6 +48,7 @@ import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
 import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
 
 @Category({ ZKTests.class, MediumTests.class })
@@ -117,6 +120,28 @@ public class TestZKUtil {
     assertNull(ZKUtil.getDataNoWatch(ZKW, "/l1/l2", null));
   }
 
+  private int getZNodeDataVersion(String znode) throws KeeperException {
+    Stat stat = new Stat();
+    ZKUtil.getDataNoWatch(ZKW, znode, stat);
+    return stat.getVersion();
+  }
+
+  @Test
+  public void testSetDataWithVersion() throws Exception {
+    ZKUtil.createWithParents(ZKW, "/s1/s2/s3");
+    int v0 = getZNodeDataVersion("/s1/s2/s3");
+    assertEquals(0, v0);
+
+    ZKUtil.setData(ZKW, "/s1/s2/s3", Bytes.toBytes(12L));
+    int v1 = getZNodeDataVersion("/s1/s2/s3");
+    assertEquals(1, v1);
+
+    ZKUtil.multiOrSequential(ZKW,
+      ImmutableList.of(ZKUtilOp.setData("/s1/s2/s3", Bytes.toBytes(13L), v1)), false);
+    int v2 = getZNodeDataVersion("/s1/s2/s3");
+    assertEquals(2, v2);
+  }
+
   /**
    * A test for HBASE-3238
    * @throws IOException A connection attempt to zk failed