You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/05/04 07:29:05 UTC
hbase git commit: HBASE-20378 Provide a hbck option to cleanup
replication barrier for a table
Repository: hbase
Updated Branches:
refs/heads/master 6225b4a49 -> 87f5b5f34
HBASE-20378 Provide a hbck option to cleanup replication barrier for a table
Signed-off-by: zhangduo <zh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/87f5b5f3
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/87f5b5f3
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/87f5b5f3
Branch: refs/heads/master
Commit: 87f5b5f3411d96c31b4cb61b9a57ced22be91d1f
Parents: 6225b4a
Author: jingyuntian <ti...@gmail.com>
Authored: Sat Apr 28 11:34:29 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Fri May 4 15:27:33 2018 +0800
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/util/HBaseFsck.java | 131 ++++++++++--
.../TestHBaseFsckCleanReplicationBarriers.java | 205 +++++++++++++++++++
.../hadoop/hbase/util/hbck/HbckTestingUtil.java | 20 +-
3 files changed, 336 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/87f5b5f3/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
index 9fcf320..6d9ca9a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
@@ -58,6 +58,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
@@ -85,6 +86,7 @@ import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ClusterConnection;
@@ -99,7 +101,9 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
@@ -115,6 +119,10 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
@@ -268,11 +276,13 @@ public class HBaseFsck extends Configured implements Closeable {
private boolean fixHFileLinks = false; // fix lingering HFileLinks
private boolean fixEmptyMetaCells = false; // fix (remove) empty REGIONINFO_QUALIFIER rows
private boolean fixReplication = false; // fix undeleted replication queues for removed peer
+ private boolean cleanReplicationBarrier = false; // clean replication barriers of a table
private boolean fixAny = false; // Set to true if any of the fix is required.
// limit checking/fixes to listed tables, if empty attempt to check/fix all
// hbase:meta are always checked
private Set<TableName> tablesIncluded = new HashSet<>();
+ private TableName cleanReplicationBarrierTable;
private int maxMerge = DEFAULT_MAX_MERGE; // maximum number of overlapping regions to merge
// maximum number of overlapping regions to sideline
private int maxOverlapsToSideline = DEFAULT_OVERLAPS_TO_SIDELINE;
@@ -786,6 +796,8 @@ public class HBaseFsck extends Configured implements Closeable {
checkAndFixReplication();
+ cleanReplicationBarrier();
+
// Remove the hbck znode
cleanupHbckZnode();
@@ -4118,14 +4130,13 @@ public class HBaseFsck extends Configured implements Closeable {
enum ERROR_CODE {
UNKNOWN, NO_META_REGION, NULL_META_REGION, NO_VERSION_FILE, NOT_IN_META_HDFS, NOT_IN_META,
NOT_IN_META_OR_DEPLOYED, NOT_IN_HDFS_OR_DEPLOYED, NOT_IN_HDFS, SERVER_DOES_NOT_MATCH_META,
- NOT_DEPLOYED,
- MULTI_DEPLOYED, SHOULD_NOT_BE_DEPLOYED, MULTI_META_REGION, RS_CONNECT_FAILURE,
+ NOT_DEPLOYED, MULTI_DEPLOYED, SHOULD_NOT_BE_DEPLOYED, MULTI_META_REGION, RS_CONNECT_FAILURE,
FIRST_REGION_STARTKEY_NOT_EMPTY, LAST_REGION_ENDKEY_NOT_EMPTY, DUPE_STARTKEYS,
HOLE_IN_REGION_CHAIN, OVERLAP_IN_REGION_CHAIN, REGION_CYCLE, DEGENERATE_REGION,
ORPHAN_HDFS_REGION, LINGERING_SPLIT_PARENT, NO_TABLEINFO_FILE, LINGERING_REFERENCE_HFILE,
LINGERING_HFILELINK, WRONG_USAGE, EMPTY_META_CELL, EXPIRED_TABLE_LOCK, BOUNDARIES_ERROR,
ORPHAN_TABLE_STATE, NO_TABLE_STATE, UNDELETED_REPLICATION_QUEUE, DUPE_ENDKEYS,
- UNSUPPORTED_OPTION
+ UNSUPPORTED_OPTION, INVALID_TABLE
}
void clear();
void report(String message);
@@ -4557,6 +4568,10 @@ public class HBaseFsck extends Configured implements Closeable {
fixAny |= shouldFix;
}
+ public void setCleanReplicationBarrier(boolean shouldClean) {
+ cleanReplicationBarrier = shouldClean;
+ }
+
/**
* Check if we should rerun fsck again. This checks if we've tried to
* fix something and we should rerun fsck tool again.
@@ -4567,7 +4582,7 @@ public class HBaseFsck extends Configured implements Closeable {
rerun = true;
}
- boolean shouldRerun() {
+ public boolean shouldRerun() {
return rerun;
}
@@ -4848,7 +4863,11 @@ public class HBaseFsck extends Configured implements Closeable {
"-fixHdfsOrphans -fixHdfsOverlaps -fixVersionFile -sidelineBigOverlaps -fixReferenceFiles" +
"-fixHFileLinks");
out.println(" -repairHoles Shortcut for -fixAssignments -fixMeta -fixHdfsHoles");
-
+ out.println("");
+ out.println(" Replication options");
+ out.println(" -fixReplication Deletes replication queues for removed peers");
+ out.println(" -cleanReplicationBrarier [tableName] clean the replication barriers " +
+ "of a specified table, tableName is required");
out.flush();
errors.reportError(ERROR_CODE.WRONG_USAGE, sw.toString());
@@ -4908,13 +4927,12 @@ public class HBaseFsck extends Configured implements Closeable {
return printUsageAndExit();
}
try {
- long timelag = Long.parseLong(args[i+1]);
+ long timelag = Long.parseLong(args[++i]);
setTimeLag(timelag);
} catch (NumberFormatException e) {
errors.reportError(ERROR_CODE.WRONG_USAGE, "-timelag needs a numeric value.");
return printUsageAndExit();
}
- i++;
} else if (cmd.equals("-sleepBeforeRerun")) {
if (i == args.length - 1) {
errors.reportError(ERROR_CODE.WRONG_USAGE,
@@ -4922,19 +4940,17 @@ public class HBaseFsck extends Configured implements Closeable {
return printUsageAndExit();
}
try {
- sleepBeforeRerun = Long.parseLong(args[i+1]);
+ sleepBeforeRerun = Long.parseLong(args[++i]);
} catch (NumberFormatException e) {
errors.reportError(ERROR_CODE.WRONG_USAGE, "-sleepBeforeRerun needs a numeric value.");
return printUsageAndExit();
}
- i++;
} else if (cmd.equals("-sidelineDir")) {
if (i == args.length - 1) {
errors.reportError(ERROR_CODE.WRONG_USAGE, "HBaseFsck: -sidelineDir needs a value.");
return printUsageAndExit();
}
- i++;
- setSidelineDir(args[i]);
+ setSidelineDir(args[++i]);
} else if (cmd.equals("-fix")) {
errors.reportError(ERROR_CODE.WRONG_USAGE,
"This option is deprecated, please use -fixAssignments instead.");
@@ -5004,14 +5020,13 @@ public class HBaseFsck extends Configured implements Closeable {
return printUsageAndExit();
}
try {
- int maxOverlapsToSideline = Integer.parseInt(args[i+1]);
+ int maxOverlapsToSideline = Integer.parseInt(args[++i]);
setMaxOverlapsToSideline(maxOverlapsToSideline);
} catch (NumberFormatException e) {
errors.reportError(ERROR_CODE.WRONG_USAGE,
"-maxOverlapsToSideline needs a numeric value argument.");
return printUsageAndExit();
}
- i++;
} else if (cmd.equals("-maxMerge")) {
if (i == args.length - 1) {
errors.reportError(ERROR_CODE.WRONG_USAGE,
@@ -5019,14 +5034,13 @@ public class HBaseFsck extends Configured implements Closeable {
return printUsageAndExit();
}
try {
- int maxMerge = Integer.parseInt(args[i+1]);
+ int maxMerge = Integer.parseInt(args[++i]);
setMaxMerge(maxMerge);
} catch (NumberFormatException e) {
errors.reportError(ERROR_CODE.WRONG_USAGE,
"-maxMerge needs a numeric value argument.");
return printUsageAndExit();
}
- i++;
} else if (cmd.equals("-summary")) {
setSummary();
} else if (cmd.equals("-metaonly")) {
@@ -5035,6 +5049,12 @@ public class HBaseFsck extends Configured implements Closeable {
setRegionBoundariesCheck();
} else if (cmd.equals("-fixReplication")) {
setFixReplication(true);
+ } else if (cmd.equals("-cleanReplicationBarrier")) {
+ setCleanReplicationBarrier(true);
+ if(args[++i].startsWith("-")){
+ printUsageAndExit();
+ }
+ setCleanReplicationBarrierTable(args[i]);
} else if (cmd.startsWith("-")) {
errors.reportError(ERROR_CODE.WRONG_USAGE, "Unrecognized option:" + cmd);
return printUsageAndExit();
@@ -5120,7 +5140,7 @@ public class HBaseFsck extends Configured implements Closeable {
boolean result = true;
String hbaseServerVersion = status.getHBaseVersion();
Object[] versionComponents = VersionInfo.getVersionComponents(hbaseServerVersion);
- if (versionComponents[0] instanceof Integer && ((Integer)versionComponents[0]) >= 2) {
+ if (versionComponents[0] instanceof Integer && ((Integer) versionComponents[0]) >= 2) {
// Process command-line args.
for (String arg : args) {
if (unsupportedOptionsInV2.contains(arg)) {
@@ -5134,6 +5154,85 @@ public class HBaseFsck extends Configured implements Closeable {
return result;
}
+ public void setCleanReplicationBarrierTable(String cleanReplicationBarrierTable) {
+ this.cleanReplicationBarrierTable = TableName.valueOf(cleanReplicationBarrierTable);
+ }
+
+ public void cleanReplicationBarrier() throws IOException {
+ if (!cleanReplicationBarrier || cleanReplicationBarrierTable == null) {
+ return;
+ }
+ if (cleanReplicationBarrierTable.isSystemTable()) {
+ errors.reportError(ERROR_CODE.INVALID_TABLE,
+ "invalid table: " + cleanReplicationBarrierTable);
+ return;
+ }
+
+ boolean isGlobalScope = false;
+ try {
+ isGlobalScope = admin.getDescriptor(cleanReplicationBarrierTable).hasGlobalReplicationScope();
+ } catch (TableNotFoundException e) {
+ LOG.info("we may need to clean some erroneous data due to bugs");
+ }
+
+ if (isGlobalScope) {
+ errors.reportError(ERROR_CODE.INVALID_TABLE,
+ "table's replication scope is global: " + cleanReplicationBarrierTable);
+ return;
+ }
+ List<byte[]> regionNames = new ArrayList<>();
+ Scan barrierScan = new Scan();
+ barrierScan.setCaching(100);
+ barrierScan.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
+ barrierScan
+ .withStartRow(MetaTableAccessor.getTableStartRowForMeta(cleanReplicationBarrierTable,
+ MetaTableAccessor.QueryType.REGION))
+ .withStopRow(MetaTableAccessor.getTableStopRowForMeta(cleanReplicationBarrierTable,
+ MetaTableAccessor.QueryType.REGION));
+ Result result;
+ try (ResultScanner scanner = meta.getScanner(barrierScan)) {
+ while ((result = scanner.next()) != null) {
+ regionNames.add(result.getRow());
+ }
+ }
+ if (regionNames.size() <= 0) {
+ errors.reportError(ERROR_CODE.INVALID_TABLE,
+ "there is no barriers of this table: " + cleanReplicationBarrierTable);
+ return;
+ }
+ ReplicationQueueStorage queueStorage =
+ ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
+ List<ReplicationPeerDescription> peerDescriptions = admin.listReplicationPeers();
+ if (peerDescriptions != null && peerDescriptions.size() > 0) {
+ List<String> peers = peerDescriptions.stream()
+ .filter(peerConfig -> ReplicationUtils.contains(peerConfig.getPeerConfig(),
+ cleanReplicationBarrierTable))
+ .map(peerConfig -> peerConfig.getPeerId()).collect(Collectors.toList());
+ try {
+ List<String> batch = new ArrayList<>();
+ for (String peer : peers) {
+ for (byte[] regionName : regionNames) {
+ batch.add(RegionInfo.encodeRegionName(regionName));
+ if (batch.size() % 100 == 0) {
+ queueStorage.removeLastSequenceIds(peer, batch);
+ batch.clear();
+ }
+ }
+ if (batch.size() > 0) {
+ queueStorage.removeLastSequenceIds(peer, batch);
+ batch.clear();
+ }
+ }
+ } catch (ReplicationException re) {
+ throw new IOException(re);
+ }
+ }
+ for (byte[] regionName : regionNames) {
+ meta.delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY));
+ }
+ setShouldRerun();
+ }
+
/**
* ls -r for debugging purposes
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/87f5b5f3/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckCleanReplicationBarriers.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckCleanReplicationBarriers.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckCleanReplicationBarriers.java
new file mode 100644
index 0000000..375f2ed
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckCleanReplicationBarriers.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestHBaseFsckCleanReplicationBarriers {
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestHBaseFsckCleanReplicationBarriers.class);
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private static String PEER_1 = "1", PEER_2 = "2";
+
+ private static ReplicationQueueStorage QUEUE_STORAGE;
+
+ private static String WAL_FILE_NAME = "test.wal";
+
+ private static String TABLE_NAME = "test";
+
+ private static String COLUMN_FAMILY = "info";
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ UTIL.startMiniCluster(1);
+ QUEUE_STORAGE = ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getZooKeeperWatcher(),
+ UTIL.getConfiguration());
+ createPeer();
+ QUEUE_STORAGE.addWAL(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(), PEER_1,
+ WAL_FILE_NAME);
+ QUEUE_STORAGE.addWAL(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(), PEER_2,
+ WAL_FILE_NAME);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testCleanReplicationBarrierWithNonExistTable()
+ throws ClassNotFoundException, IOException {
+ TableName tableName = TableName.valueOf(TABLE_NAME + "_non");
+ boolean cleaned = HbckTestingUtil.cleanReplicationBarrier(UTIL.getConfiguration(), tableName);
+ assertFalse(cleaned);
+ }
+
+ @Test
+ public void testCleanReplicationBarrierWithDeletedTable() throws Exception {
+ TableName tableName = TableName.valueOf(TABLE_NAME + "_deleted");
+ List<RegionInfo> regionInfos = new ArrayList<>();
+ // only write some barriers into meta table
+
+ for (int i = 0; i < 110; i++) {
+ RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes(i))
+ .setEndKey(Bytes.toBytes(i + 1)).build();
+ regionInfos.add(regionInfo);
+ addStateAndBarrier(regionInfo, RegionState.State.OPEN, 10, 100);
+ updatePushedSeqId(regionInfo, 10);
+ assertEquals("check if there is lastPushedId", 10,
+ QUEUE_STORAGE.getLastSequenceId(regionInfo.getEncodedName(), PEER_1));
+ assertEquals("check if there is lastPushedId", 10,
+ QUEUE_STORAGE.getLastSequenceId(regionInfo.getEncodedName(), PEER_2));
+ }
+ Scan barrierScan = new Scan();
+ barrierScan.setCaching(100);
+ barrierScan.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
+ barrierScan
+ .withStartRow(
+ MetaTableAccessor.getTableStartRowForMeta(tableName, MetaTableAccessor.QueryType.REGION))
+ .withStopRow(
+ MetaTableAccessor.getTableStopRowForMeta(tableName, MetaTableAccessor.QueryType.REGION));
+ Result result;
+ try (ResultScanner scanner =
+ MetaTableAccessor.getMetaHTable(UTIL.getConnection()).getScanner(barrierScan)) {
+ while ((result = scanner.next()) != null) {
+ assertTrue(MetaTableAccessor.getReplicationBarriers(result).length > 0);
+ }
+ }
+ boolean cleaned = HbckTestingUtil.cleanReplicationBarrier(UTIL.getConfiguration(), tableName);
+ assertTrue(cleaned);
+ for (RegionInfo regionInfo : regionInfos) {
+ assertEquals("check if there is lastPushedId", -1,
+ QUEUE_STORAGE.getLastSequenceId(regionInfo.getEncodedName(), PEER_1));
+ assertEquals("check if there is lastPushedId", -1,
+ QUEUE_STORAGE.getLastSequenceId(regionInfo.getEncodedName(), PEER_2));
+ }
+ cleaned = HbckTestingUtil.cleanReplicationBarrier(UTIL.getConfiguration(), tableName);
+ assertFalse(cleaned);
+ for (RegionInfo region : regionInfos) {
+ assertEquals(0, MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(),
+ region.getRegionName()).length);
+ }
+ }
+
+ @Test
+ public void testCleanReplicationBarrierWithExistTable() throws Exception {
+ TableName tableName = TableName.valueOf(TABLE_NAME);
+ String cf = COLUMN_FAMILY;
+ TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf)).build())
+ .setReplicationScope(HConstants.REPLICATION_SCOPE_LOCAL).build();
+ UTIL.createTable(tableDescriptor, Bytes.split(Bytes.toBytes(1), Bytes.toBytes(256), 123));
+ assertTrue(UTIL.getAdmin().getRegions(tableName).size() > 0);
+ for (RegionInfo region : UTIL.getAdmin().getRegions(tableName)) {
+ addStateAndBarrier(region, RegionState.State.OFFLINE, 10, 100);
+ updatePushedSeqId(region, 10);
+ assertEquals("check if there is lastPushedId", 10,
+ QUEUE_STORAGE.getLastSequenceId(region.getEncodedName(), PEER_1));
+ assertEquals("check if there is lastPushedId", 10,
+ QUEUE_STORAGE.getLastSequenceId(region.getEncodedName(), PEER_2));
+ }
+ boolean cleaned = HbckTestingUtil.cleanReplicationBarrier(UTIL.getConfiguration(), tableName);
+ assertTrue(cleaned);
+ for (RegionInfo region : UTIL.getAdmin().getRegions(tableName)) {
+ assertEquals("check if there is lastPushedId", -1,
+ QUEUE_STORAGE.getLastSequenceId(region.getEncodedName(), PEER_1));
+ assertEquals("check if there is lastPushedId", -1,
+ QUEUE_STORAGE.getLastSequenceId(region.getEncodedName(), PEER_2));
+ }
+ cleaned = HbckTestingUtil.cleanReplicationBarrier(UTIL.getConfiguration(), tableName);
+ assertFalse(cleaned);
+ for (RegionInfo region : UTIL.getAdmin().getRegions(tableName)) {
+ assertEquals(0, MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(),
+ region.getRegionName()).length);
+ }
+ }
+
+ public static void createPeer() throws IOException {
+ ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
+ .setClusterKey(UTIL.getClusterKey()).setSerial(true).build();
+ UTIL.getAdmin().addReplicationPeer(PEER_1, rpc);
+ UTIL.getAdmin().addReplicationPeer(PEER_2, rpc);
+ }
+
+ private void addStateAndBarrier(RegionInfo region, RegionState.State state, long... barriers)
+ throws IOException {
+ Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime());
+ if (state != null) {
+ put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER,
+ Bytes.toBytes(state.name()));
+ }
+ for (int i = 0; i < barriers.length; i++) {
+ put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER,
+ put.getTimeStamp() - barriers.length + i, Bytes.toBytes(barriers[i]));
+ }
+ try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
+ table.put(put);
+ }
+ }
+
+ private void updatePushedSeqId(RegionInfo region, long seqId) throws ReplicationException {
+ QUEUE_STORAGE.setWALPosition(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(),
+ PEER_1, WAL_FILE_NAME, 10, ImmutableMap.of(region.getEncodedName(), seqId));
+ QUEUE_STORAGE.setWALPosition(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(),
+ PEER_2, WAL_FILE_NAME, 10, ImmutableMap.of(region.getEncodedName(), seqId));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/87f5b5f3/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java
index 99e4f08..1808b5e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.util.hbck;
import static org.junit.Assert.assertEquals;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -40,14 +41,14 @@ public class HbckTestingUtil {
public static HBaseFsck doFsck(
Configuration conf, boolean fix, TableName table) throws Exception {
- return doFsck(conf, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, table);
+ return doFsck(conf, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, fix, table);
}
public static HBaseFsck doFsck(Configuration conf, boolean fixAssignments, boolean fixMeta,
boolean fixHdfsHoles, boolean fixHdfsOverlaps, boolean fixHdfsOrphans,
- boolean fixTableOrphans, boolean fixVersionFile, boolean fixReferenceFiles, boolean fixHFileLinks,
- boolean fixEmptyMetaRegionInfo, boolean fixTableLocks, boolean fixReplication,
- TableName table) throws Exception {
+ boolean fixTableOrphans, boolean fixVersionFile, boolean fixReferenceFiles,
+ boolean fixHFileLinks, boolean fixEmptyMetaRegionInfo, boolean fixTableLocks,
+ boolean fixReplication, boolean cleanReplicationBarrier, TableName table) throws Exception {
HBaseFsck fsck = new HBaseFsck(conf, exec);
try {
HBaseFsck.setDisplayFullReport(); // i.e. -details
@@ -63,6 +64,7 @@ public class HbckTestingUtil {
fsck.setFixHFileLinks(fixHFileLinks);
fsck.setFixEmptyMetaCells(fixEmptyMetaRegionInfo);
fsck.setFixReplication(fixReplication);
+ fsck.setCleanReplicationBarrier(cleanReplicationBarrier);
if (table != null) {
fsck.includeTable(table);
}
@@ -88,6 +90,16 @@ public class HbckTestingUtil {
return hbck;
}
+ public static boolean cleanReplicationBarrier(Configuration conf, TableName table)
+ throws IOException, ClassNotFoundException {
+ HBaseFsck hbck = new HBaseFsck(conf, null);
+ hbck.setCleanReplicationBarrierTable(table.getNameAsString());
+ hbck.setCleanReplicationBarrier(true);
+ hbck.connect();
+ hbck.cleanReplicationBarrier();
+ return hbck.shouldRerun();
+ }
+
public static boolean inconsistencyFound(HBaseFsck fsck) throws Exception {
List<ERROR_CODE> errs = fsck.getErrors().getErrorList();
return (errs != null && !errs.isEmpty());