You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/04/09 07:19:30 UTC
[10/22] hbase git commit: HBASE-20129 Add UT for serial replication
checker
HBASE-20129 Add UT for serial replication checker
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8b61a061
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8b61a061
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8b61a061
Branch: refs/heads/HBASE-20046-branch-2
Commit: 8b61a061d3d8cdf9d4bd271cc6211d669e925e60
Parents: f29bf1d
Author: zhangduo <zh...@apache.org>
Authored: Tue Mar 6 08:40:31 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Mon Apr 9 15:18:44 2018 +0800
----------------------------------------------------------------------
.../apache/hadoop/hbase/MetaTableAccessor.java | 71 ++++++++--
.../regionserver/SerialReplicationChecker.java | 18 +++
.../TestSerialReplicationChecker.java | 133 ++++++++++++++++++-
3 files changed, 208 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/8b61a061/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
index 109f2d0..2a88b56 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
@@ -19,12 +19,14 @@ package org.apache.hadoop.hbase;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
+import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -35,7 +37,6 @@ import java.util.TreeMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell.Type;
import org.apache.hadoop.hbase.client.Connection;
@@ -150,11 +151,13 @@ public class MetaTableAccessor {
META_REGION_PREFIX, 0, len);
}
- private static final byte[] REPLICATION_PARENT_QUALIFIER = Bytes.toBytes("parent");
+ @VisibleForTesting
+ public static final byte[] REPLICATION_PARENT_QUALIFIER = Bytes.toBytes("parent");
+
+ private static final byte ESCAPE_BYTE = (byte) 0xFF;
- private static final String REPLICATION_PARENT_SEPARATOR = "|";
+ private static final byte SEPARATED_BYTE = 0x00;
- private static final String REPLICATION_PARENT_SEPARATOR_REGEX = "\\|";
/**
* Lists all of the table regions currently in META.
* Deprecated, keep there until some test use this.
@@ -1921,10 +1924,51 @@ public class MetaTableAccessor {
.build());
}
+ private static void writeRegionName(ByteArrayOutputStream out, byte[] regionName) {
+ for (byte b : regionName) {
+ if (b == ESCAPE_BYTE) {
+ out.write(ESCAPE_BYTE);
+ }
+ out.write(b);
+ }
+ }
+
+ @VisibleForTesting
+ public static byte[] getParentsBytes(List<RegionInfo> parents) {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ Iterator<RegionInfo> iter = parents.iterator();
+ writeRegionName(bos, iter.next().getRegionName());
+ while (iter.hasNext()) {
+ bos.write(ESCAPE_BYTE);
+ bos.write(SEPARATED_BYTE);
+ writeRegionName(bos, iter.next().getRegionName());
+ }
+ return bos.toByteArray();
+ }
+
+ private static List<byte[]> parseParentsBytes(byte[] bytes) {
+ List<byte[]> parents = new ArrayList<>();
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ for (int i = 0; i < bytes.length; i++) {
+ if (bytes[i] == ESCAPE_BYTE) {
+ i++;
+ if (bytes[i] == SEPARATED_BYTE) {
+ parents.add(bos.toByteArray());
+ bos.reset();
+ continue;
+ }
+ // fall through to append the byte
+ }
+ bos.write(bytes[i]);
+ }
+ if (bos.size() > 0) {
+ parents.add(bos.toByteArray());
+ }
+ return parents;
+ }
+
private static void addReplicationParent(Put put, List<RegionInfo> parents) throws IOException {
- byte[] value = parents.stream().map(RegionReplicaUtil::getRegionInfoForDefaultReplica)
- .map(RegionInfo::getRegionNameAsString).collect(Collectors
- .collectingAndThen(Collectors.joining(REPLICATION_PARENT_SEPARATOR), Bytes::toBytes));
+ byte[] value = getParentsBytes(parents);
put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow())
.setFamily(HConstants.REPLICATION_BARRIER_FAMILY).setQualifier(REPLICATION_PARENT_QUALIFIER)
.setTimestamp(put.getTimeStamp()).setType(Type.Put).setValue(value).build());
@@ -1995,6 +2039,14 @@ public class MetaTableAccessor {
public List<byte[]> getParentRegionNames() {
return parentRegionNames;
}
+
+ @Override
+ public String toString() {
+ return "ReplicationBarrierResult [barriers=" + Arrays.toString(barriers) + ", state=" +
+ state + ", parentRegionNames=" +
+ parentRegionNames.stream().map(Bytes::toStringBinary).collect(Collectors.joining(", ")) +
+ "]";
+ }
}
private static long getReplicationBarrier(Cell c) {
@@ -2014,10 +2066,7 @@ public class MetaTableAccessor {
byte[] parentRegionsBytes =
result.getValue(HConstants.REPLICATION_BARRIER_FAMILY, REPLICATION_PARENT_QUALIFIER);
List<byte[]> parentRegionNames =
- parentRegionsBytes != null
- ? Stream.of(Bytes.toString(parentRegionsBytes).split(REPLICATION_PARENT_SEPARATOR_REGEX))
- .map(Bytes::toBytes).collect(Collectors.toList())
- : Collections.emptyList();
+ parentRegionsBytes != null ? parseParentsBytes(parentRegionsBytes) : Collections.emptyList();
return new ReplicationBarrierResult(barriers, state, parentRegionNames);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8b61a061/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java
index 95f3868..9276359 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java
@@ -35,6 +35,8 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
@@ -108,6 +110,8 @@ import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
@InterfaceAudience.Private
class SerialReplicationChecker {
+ private static final Logger LOG = LoggerFactory.getLogger(SerialReplicationChecker.class);
+
public static final String REPLICATION_SERIALLY_WAITING_KEY =
"hbase.serial.replication.waiting.ms";
public static final long REPLICATION_SERIALLY_WAITING_DEFAULT = 10000;
@@ -182,9 +186,11 @@ class SerialReplicationChecker {
long seqId = entry.getKey().getSequenceId();
ReplicationBarrierResult barrierResult = MetaTableAccessor.getReplicationBarrierResult(conn,
entry.getKey().getTableName(), row, entry.getKey().getEncodedRegionName());
+ LOG.debug("Replication barrier for {}: {}", entry, barrierResult);
long[] barriers = barrierResult.getBarriers();
int index = Arrays.binarySearch(barriers, seqId);
if (index == -1) {
+ LOG.debug("{} is before the first barrier, pass", entry);
// This means we are in the range before the first record openSeqNum, this usually because the
// wal is written before we enable serial replication for this table, just return true since
// we can not guarantee the order.
@@ -203,22 +209,29 @@ class SerialReplicationChecker {
// we are in the first range, check whether we have parents
for (byte[] regionName : barrierResult.getParentRegionNames()) {
if (!isParentFinished(regionName)) {
+ LOG.debug("Parent {} has not been finished yet for entry {}, give up",
+ Bytes.toStringBinary(regionName), entry);
return false;
}
}
if (isLastRangeAndOpening(barrierResult, index)) {
+ LOG.debug("{} is in the last range and the region is opening, give up", entry);
return false;
}
+ LOG.debug("{} is in the first range, pass", entry);
recordCanPush(encodedNameAsString, seqId, barriers, 1);
return true;
}
// check whether the previous range is finished
if (!isRangeFinished(barriers[index - 1], encodedNameAsString)) {
+ LOG.debug("Previous range for {} has not been finished yet, give up", entry);
return false;
}
if (isLastRangeAndOpening(barrierResult, index)) {
+ LOG.debug("{} is in the last range and the region is opening, give up", entry);
return false;
}
+ LOG.debug("The previous range for {} has been finished, pass", entry);
recordCanPush(encodedNameAsString, seqId, barriers, index);
return true;
}
@@ -229,8 +242,11 @@ class SerialReplicationChecker {
Long canReplicateUnderSeqId = canPushUnder.getIfPresent(encodedNameAsString);
if (canReplicateUnderSeqId != null) {
if (seqId < canReplicateUnderSeqId.longValue()) {
+ LOG.trace("{} is before the end barrier {}, pass", entry, canReplicateUnderSeqId);
return true;
}
+ LOG.debug("{} is beyond the previous end barrier {}, remove from cache", entry,
+ canReplicateUnderSeqId);
// we are already beyond the last safe point, remove
canPushUnder.invalidate(encodedNameAsString);
}
@@ -239,6 +255,7 @@ class SerialReplicationChecker {
// has been moved to another RS and then back, so we need to check the barrier.
MutableLong previousPushedSeqId = pushed.getUnchecked(encodedNameAsString);
if (seqId == previousPushedSeqId.longValue() + 1) {
+ LOG.trace("The sequence id for {} is continuous, pass");
previousPushedSeqId.increment();
return true;
}
@@ -249,6 +266,7 @@ class SerialReplicationChecker {
throws IOException, InterruptedException {
byte[] row = CellUtil.cloneRow(firstCellInEdit);
while (!canPush(entry, row)) {
+ LOG.debug("Can not push{}, wait", entry);
Thread.sleep(waitTimeMs);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8b61a061/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
index c8387c5..58e9543 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
@@ -19,10 +19,16 @@ package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Cell.Type;
import org.apache.hadoop.hbase.CellBuilderFactory;
@@ -30,8 +36,10 @@ import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
@@ -54,6 +62,8 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
@@ -72,6 +82,8 @@ public class TestSerialReplicationChecker {
private static String WAL_FILE_NAME = "test.wal";
+ private Connection conn;
+
private SerialReplicationChecker checker;
@Rule
@@ -98,8 +110,18 @@ public class TestSerialReplicationChecker {
ReplicationSource source = mock(ReplicationSource.class);
when(source.getPeerId()).thenReturn(PEER_ID);
when(source.getQueueStorage()).thenReturn(QUEUE_STORAGE);
+ conn = mock(Connection.class);
+ when(conn.isClosed()).thenReturn(false);
+ doAnswer(new Answer<Table>() {
+
+ @Override
+ public Table answer(InvocationOnMock invocation) throws Throwable {
+ return UTIL.getConnection().getTable((TableName) invocation.getArgument(0));
+ }
+
+ }).when(conn).getTable(any(TableName.class));
Server server = mock(Server.class);
- when(server.getConnection()).thenReturn(UTIL.getConnection());
+ when(server.getConnection()).thenReturn(conn);
when(source.getServer()).thenReturn(server);
checker = new SerialReplicationChecker(UTIL.getConfiguration(), source);
tableName = TableName.valueOf(name.getMethodName());
@@ -129,8 +151,10 @@ public class TestSerialReplicationChecker {
private void addStateAndBarrier(RegionInfo region, RegionState.State state, long... barriers)
throws IOException {
Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime());
- put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER,
- Bytes.toBytes(state.name()));
+ if (state != null) {
+ put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER,
+ Bytes.toBytes(state.name()));
+ }
for (int i = 0; i < barriers.length; i++) {
put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER,
put.getTimeStamp() - i, Bytes.toBytes(barriers[i]));
@@ -154,6 +178,15 @@ public class TestSerialReplicationChecker {
PEER_ID, WAL_FILE_NAME, 10, ImmutableMap.of(region.getEncodedName(), seqId));
}
+ private void addParents(RegionInfo region, List<RegionInfo> parents) throws IOException {
+ Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime());
+ put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY,
+ MetaTableAccessor.REPLICATION_PARENT_QUALIFIER, MetaTableAccessor.getParentsBytes(parents));
+ try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
+ table.put(put);
+ }
+ }
+
@Test
public void testLastRegionAndOpeningCanNotPush() throws IOException, ReplicationException {
RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
@@ -173,4 +206,98 @@ public class TestSerialReplicationChecker {
setState(region, RegionState.State.OPENING);
assertFalse(checker.canPush(createEntry(region, 104), cell));
}
+
+ @Test
+ public void testCanPushUnder() throws IOException, ReplicationException {
+ RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
+ addStateAndBarrier(region, RegionState.State.OPEN, 10, 100);
+ updatePushedSeqId(region, 9);
+ Cell cell = createCell(region);
+ assertTrue(checker.canPush(createEntry(region, 20), cell));
+ verify(conn, times(1)).getTable(any(TableName.class));
+ // not continuous
+ for (int i = 22; i < 100; i += 2) {
+ assertTrue(checker.canPush(createEntry(region, i), cell));
+ }
+ // verify that we do not go to meta table
+ verify(conn, times(1)).getTable(any(TableName.class));
+ }
+
+ @Test
+ public void testCanPushIfContinuous() throws IOException, ReplicationException {
+ RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
+ addStateAndBarrier(region, RegionState.State.OPEN, 10);
+ updatePushedSeqId(region, 9);
+ Cell cell = createCell(region);
+ assertTrue(checker.canPush(createEntry(region, 20), cell));
+ verify(conn, times(1)).getTable(any(TableName.class));
+ // continuous
+ for (int i = 21; i < 100; i++) {
+ assertTrue(checker.canPush(createEntry(region, i), cell));
+ }
+ // verify that we do not go to meta table
+ verify(conn, times(1)).getTable(any(TableName.class));
+ }
+
+ @Test
+ public void testCanPushAfterMerge() throws IOException, ReplicationException {
+ // 0xFF is the escape byte when storing region name so let's make sure it can work.
+ byte[] endKey = new byte[] { (byte) 0xFF, 0x00, (byte) 0xFF, (byte) 0xFF, 0x01 };
+ RegionInfo regionA =
+ RegionInfoBuilder.newBuilder(tableName).setEndKey(endKey).setRegionId(1).build();
+ RegionInfo regionB =
+ RegionInfoBuilder.newBuilder(tableName).setStartKey(endKey).setRegionId(2).build();
+ RegionInfo region = RegionInfoBuilder.newBuilder(tableName).setRegionId(3).build();
+ addStateAndBarrier(regionA, null, 10, 100);
+ addStateAndBarrier(regionB, null, 20, 200);
+ addStateAndBarrier(region, RegionState.State.OPEN, 200);
+ addParents(region, Arrays.asList(regionA, regionB));
+ Cell cell = createCell(region);
+ // can not push since both parents have not been finished yet
+ assertFalse(checker.canPush(createEntry(region, 300), cell));
+ updatePushedSeqId(regionB, 199);
+ // can not push since regionA has not been finished yet
+ assertFalse(checker.canPush(createEntry(region, 300), cell));
+ updatePushedSeqId(regionA, 99);
+ // can push since all parents have been finished
+ assertTrue(checker.canPush(createEntry(region, 300), cell));
+ }
+
+ @Test
+ public void testCanPushAfterSplit() throws IOException, ReplicationException {
+ // 0xFF is the escape byte when storing region name so let's make sure it can work.
+ byte[] endKey = new byte[] { (byte) 0xFF, 0x00, (byte) 0xFF, (byte) 0xFF, 0x01 };
+ RegionInfo region = RegionInfoBuilder.newBuilder(tableName).setRegionId(1).build();
+ RegionInfo regionA =
+ RegionInfoBuilder.newBuilder(tableName).setEndKey(endKey).setRegionId(2).build();
+ RegionInfo regionB =
+ RegionInfoBuilder.newBuilder(tableName).setStartKey(endKey).setRegionId(3).build();
+ addStateAndBarrier(region, null, 10, 100);
+ addStateAndBarrier(regionA, RegionState.State.OPEN, 100, 200);
+ addStateAndBarrier(regionB, RegionState.State.OPEN, 100, 300);
+ addParents(regionA, Arrays.asList(region));
+ addParents(regionB, Arrays.asList(region));
+ Cell cellA = createCell(regionA);
+ Cell cellB = createCell(regionB);
+ // can not push since parent has not been finished yet
+ assertFalse(checker.canPush(createEntry(regionA, 150), cellA));
+ assertFalse(checker.canPush(createEntry(regionB, 200), cellB));
+ updatePushedSeqId(region, 99);
+ // can push since parent has been finished
+ assertTrue(checker.canPush(createEntry(regionA, 150), cellA));
+ assertTrue(checker.canPush(createEntry(regionB, 200), cellB));
+ }
+
+ @Test
+ public void testCanPushEqualsToBarrier() throws IOException, ReplicationException {
+ // For binary search, equals to an element will result to a positive value, let's test whether
+ // it works.
+ RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
+ addStateAndBarrier(region, RegionState.State.OPEN, 10, 100);
+ Cell cell = createCell(region);
+ assertTrue(checker.canPush(createEntry(region, 10), cell));
+ assertFalse(checker.canPush(createEntry(region, 100), cell));
+ updatePushedSeqId(region, 99);
+ assertTrue(checker.canPush(createEntry(region, 100), cell));
+ }
}