You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/04/08 03:40:18 UTC
[20/20] hbase git commit: HBASE-20296 Remove last pushed sequence ids
when removing tables from a peer
HBASE-20296 Remove last pushed sequence ids when removing tables from a peer
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e821b9ac
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e821b9ac
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e821b9ac
Branch: refs/heads/HBASE-20046-branch-2
Commit: e821b9ac5420bda2e2f964907a49b059a97525cc
Parents: eb5dd93
Author: zhangduo <zh...@apache.org>
Authored: Sat Mar 31 20:25:13 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sun Apr 8 11:38:01 2018 +0800
----------------------------------------------------------------------
.../apache/hadoop/hbase/MetaTableAccessor.java | 72 +++++-----
.../replication/ReplicationQueueStorage.java | 9 ++
.../replication/ZKReplicationQueueStorage.java | 15 +++
.../master/replication/AddPeerProcedure.java | 14 +-
.../master/replication/ModifyPeerProcedure.java | 134 ++++++++++---------
.../replication/UpdatePeerConfigProcedure.java | 96 ++++++++++++-
.../hadoop/hbase/client/TestEnableTable.java | 4 +-
.../TestRemoveFromSerialReplicationPeer.java | 120 +++++++++++++++++
8 files changed, 363 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/e821b9ac/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
index 4cc46c8..0f5ef09 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
@@ -57,6 +57,8 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.RegionState.State;
@@ -682,20 +684,19 @@ public class MetaTableAccessor {
scanMeta(connection, null, null, QueryType.ALL, v);
}
- public static void scanMetaForTableRegions(Connection connection,
- Visitor visitor, TableName tableName) throws IOException {
+ public static void scanMetaForTableRegions(Connection connection, Visitor visitor,
+ TableName tableName) throws IOException {
scanMeta(connection, tableName, QueryType.REGION, Integer.MAX_VALUE, visitor);
}
- public static void scanMeta(Connection connection, TableName table,
- QueryType type, int maxRows, final Visitor visitor) throws IOException {
+ public static void scanMeta(Connection connection, TableName table, QueryType type, int maxRows,
+ final Visitor visitor) throws IOException {
scanMeta(connection, getTableStartRowForMeta(table, type), getTableStopRowForMeta(table, type),
- type, maxRows, visitor);
+ type, maxRows, visitor);
}
- public static void scanMeta(Connection connection,
- @Nullable final byte[] startRow, @Nullable final byte[] stopRow,
- QueryType type, final Visitor visitor) throws IOException {
+ public static void scanMeta(Connection connection, @Nullable final byte[] startRow,
+ @Nullable final byte[] stopRow, QueryType type, final Visitor visitor) throws IOException {
scanMeta(connection, startRow, stopRow, type, Integer.MAX_VALUE, visitor);
}
@@ -708,26 +709,19 @@ public class MetaTableAccessor {
* @param tableName table withing we scan
* @param row start scan from this row
* @param rowLimit max number of rows to return
- * @throws IOException
*/
- public static void scanMeta(Connection connection,
- final Visitor visitor, final TableName tableName,
- final byte[] row, final int rowLimit)
- throws IOException {
-
+ public static void scanMeta(Connection connection, final Visitor visitor,
+ final TableName tableName, final byte[] row, final int rowLimit) throws IOException {
byte[] startRow = null;
byte[] stopRow = null;
if (tableName != null) {
- startRow =
- getTableStartRowForMeta(tableName, QueryType.REGION);
+ startRow = getTableStartRowForMeta(tableName, QueryType.REGION);
if (row != null) {
- RegionInfo closestRi =
- getClosestRegionInfo(connection, tableName, row);
- startRow = RegionInfo
- .createRegionName(tableName, closestRi.getStartKey(), HConstants.ZEROES, false);
+ RegionInfo closestRi = getClosestRegionInfo(connection, tableName, row);
+ startRow =
+ RegionInfo.createRegionName(tableName, closestRi.getStartKey(), HConstants.ZEROES, false);
}
- stopRow =
- getTableStopRowForMeta(tableName, QueryType.REGION);
+ stopRow = getTableStopRowForMeta(tableName, QueryType.REGION);
}
scanMeta(connection, startRow, stopRow, QueryType.REGION, rowLimit, visitor);
}
@@ -743,11 +737,16 @@ public class MetaTableAccessor {
* @param type scanned part of meta
* @param maxRows maximum rows to return
* @param visitor Visitor invoked against each row.
- * @throws IOException
*/
public static void scanMeta(Connection connection, @Nullable final byte[] startRow,
@Nullable final byte[] stopRow, QueryType type, int maxRows, final Visitor visitor)
throws IOException {
+ scanMeta(connection, startRow, stopRow, type, null, maxRows, visitor);
+ }
+
+ private static void scanMeta(Connection connection, @Nullable final byte[] startRow,
+ @Nullable final byte[] stopRow, QueryType type, @Nullable Filter filter, int maxRows,
+ final Visitor visitor) throws IOException {
int rowUpperLimit = maxRows > 0 ? maxRows : Integer.MAX_VALUE;
Scan scan = getMetaScan(connection, rowUpperLimit);
@@ -760,13 +759,14 @@ public class MetaTableAccessor {
if (stopRow != null) {
scan.withStopRow(stopRow);
}
+ if (filter != null) {
+ scan.setFilter(filter);
+ }
if (LOG.isTraceEnabled()) {
- LOG.trace("Scanning META"
- + " starting at row=" + Bytes.toStringBinary(startRow)
- + " stopping at row=" + Bytes.toStringBinary(stopRow)
- + " for max=" + rowUpperLimit
- + " with caching=" + scan.getCaching());
+ LOG.trace("Scanning META" + " starting at row=" + Bytes.toStringBinary(startRow) +
+ " stopping at row=" + Bytes.toStringBinary(stopRow) + " for max=" + rowUpperLimit +
+ " with caching=" + scan.getCaching());
}
int currentRow = 0;
@@ -1973,7 +1973,7 @@ public class MetaTableAccessor {
byte[] value = getParentsBytes(parents);
put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow())
.setFamily(HConstants.REPLICATION_BARRIER_FAMILY).setQualifier(REPLICATION_PARENT_QUALIFIER)
- .setTimestamp(put.getTimeStamp()).setType(Type.Put).setValue(value).build());
+ .setTimestamp(put.getTimestamp()).setType(Type.Put).setValue(value).build());
}
private static Put makePutForReplicationBarrier(RegionInfo regionInfo, long openSeqNum, long ts)
@@ -1988,7 +1988,7 @@ public class MetaTableAccessor {
.setRow(put.getRow())
.setFamily(HConstants.REPLICATION_BARRIER_FAMILY)
.setQualifier(HConstants.SEQNUM_QUALIFIER)
- .setTimestamp(put.getTimeStamp())
+ .setTimestamp(put.getTimestamp())
.setType(Type.Put)
.setValue(Bytes.toBytes(openSeqNum))
.build());
@@ -2128,6 +2128,18 @@ public class MetaTableAccessor {
return list;
}
+ public static List<String> getTableEncodedRegionNamesForSerialReplication(Connection conn,
+ TableName tableName) throws IOException {
+ List<String> list = new ArrayList<>();
+ scanMeta(conn, getTableStartRowForMeta(tableName, QueryType.REPLICATION),
+ getTableStopRowForMeta(tableName, QueryType.REPLICATION), QueryType.REPLICATION,
+ new FirstKeyOnlyFilter(), Integer.MAX_VALUE, r -> {
+ list.add(RegionInfo.encodeRegionName(r.getRow()));
+ return true;
+ });
+ return list;
+ }
+
private static void debugLogMutations(List<? extends Mutation> mutations) throws IOException {
if (!METALOG.isDebugEnabled()) {
return;
http://git-wip-us.apache.org/repos/asf/hbase/blob/e821b9ac/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
index cd37ac2..84653ad 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
@@ -91,6 +91,15 @@ public interface ReplicationQueueStorage {
* @param peerId peer id
*/
void removeLastSequenceIds(String peerId) throws ReplicationException;
+
+ /**
+ * Remove the max sequence id record for the given peer and regions.
+ * @param peerId peer id
+ * @param encodedRegionNames the encoded region names
+ */
+ void removeLastSequenceIds(String peerId, List<String> encodedRegionNames)
+ throws ReplicationException;
+
/**
* Get the current position for a specific WAL in a given queue for a given regionserver.
* @param serverName the name of the regionserver
http://git-wip-us.apache.org/repos/asf/hbase/blob/e821b9ac/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
index 96b0b91..6d72128 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
@@ -28,6 +28,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -347,6 +348,20 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
}
@Override
+ public void removeLastSequenceIds(String peerId, List<String> encodedRegionNames)
+ throws ReplicationException {
+ try {
+ List<ZKUtilOp> listOfOps =
+ encodedRegionNames.stream().map(n -> getSerialReplicationRegionPeerNode(n, peerId))
+ .map(ZKUtilOp::deleteNodeFailSilent).collect(Collectors.toList());
+ ZKUtil.multiOrSequential(zookeeper, listOfOps, true);
+ } catch (KeeperException e) {
+ throw new ReplicationException("Failed to remove last sequence ids, peerId=" + peerId +
+ ", encodedRegionNames.size=" + encodedRegionNames.size(), e);
+ }
+ }
+
+ @Override
public long getWALPosition(ServerName serverName, String queueId, String fileName)
throws ReplicationException {
byte[] bytes;
http://git-wip-us.apache.org/repos/asf/hbase/blob/e821b9ac/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
index 72228f6..2f2d5a5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.AddPeerStateData;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
/**
* The procedure for adding a new replication peer.
@@ -57,8 +58,15 @@ public class AddPeerProcedure extends ModifyPeerProcedure {
}
@Override
- protected boolean reopenRegionsAfterRefresh() {
- return true;
+ protected PeerModificationState nextStateAfterRefresh() {
+ return peerConfig.isSerial() ? PeerModificationState.SERIAL_PEER_REOPEN_REGIONS
+ : super.nextStateAfterRefresh();
+ }
+
+ @Override
+ protected void updateLastPushedSequenceIdForSerialPeer(MasterProcedureEnv env)
+ throws IOException, ReplicationException {
+ setLastPushedSequenceId(env, peerConfig);
}
@Override
@@ -102,7 +110,7 @@ public class AddPeerProcedure extends ModifyPeerProcedure {
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.serializeStateData(serializer);
serializer.serialize(AddPeerStateData.newBuilder()
- .setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig)).setEnabled(enabled).build());
+ .setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig)).setEnabled(enabled).build());
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/e821b9ac/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
index 2b76487..8bedeff 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
@@ -18,11 +18,10 @@
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
-import java.io.UncheckedIOException;
import java.util.HashMap;
import java.util.Map;
-import java.util.stream.Stream;
import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
@@ -55,7 +54,7 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
private static final Logger LOG = LoggerFactory.getLogger(ModifyPeerProcedure.class);
- private static final int SET_LAST_SEQ_ID_BATCH_SIZE = 1000;
+ protected static final int UPDATE_LAST_SEQ_ID_BATCH_SIZE = 1000;
protected ModifyPeerProcedure() {
}
@@ -93,12 +92,11 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
}
/**
- * Implementation class can override this method. The default return value is false which means we
- * will jump to POST_PEER_MODIFICATION and finish the procedure. If returns true, we will jump to
- * SERIAL_PEER_REOPEN_REGIONS.
+ * Implementation class can override this method. By default we will jump to
+ * POST_PEER_MODIFICATION and finish the procedure.
*/
- protected boolean reopenRegionsAfterRefresh() {
- return false;
+ protected PeerModificationState nextStateAfterRefresh() {
+ return PeerModificationState.POST_PEER_MODIFICATION;
}
/**
@@ -123,80 +121,97 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
throw new UnsupportedOperationException();
}
- private Stream<TableDescriptor> getTables(MasterProcedureEnv env) throws IOException {
- ReplicationPeerConfig peerConfig = getNewPeerConfig();
- Stream<TableDescriptor> stream = env.getMasterServices().getTableDescriptors().getAll().values()
- .stream().filter(TableDescriptor::hasGlobalReplicationScope)
- .filter(td -> ReplicationUtils.contains(peerConfig, td.getTableName()));
- ReplicationPeerConfig oldPeerConfig = getOldPeerConfig();
- if (oldPeerConfig != null && oldPeerConfig.isSerial()) {
- stream = stream.filter(td -> !ReplicationUtils.contains(oldPeerConfig, td.getTableName()));
- }
- return stream;
+ protected void updateLastPushedSequenceIdForSerialPeer(MasterProcedureEnv env)
+ throws IOException, ReplicationException {
+ throw new UnsupportedOperationException();
}
private void reopenRegions(MasterProcedureEnv env) throws IOException {
- Stream<TableDescriptor> stream = getTables(env);
+ ReplicationPeerConfig peerConfig = getNewPeerConfig();
+ ReplicationPeerConfig oldPeerConfig = getOldPeerConfig();
TableStateManager tsm = env.getMasterServices().getTableStateManager();
- stream.filter(td -> {
- try {
- return tsm.getTableState(td.getTableName()).isEnabled();
- } catch (TableStateNotFoundException e) {
- return false;
- } catch (IOException e) {
- throw new UncheckedIOException(e);
+ for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) {
+ if (!td.hasGlobalReplicationScope()) {
+ continue;
+ }
+ TableName tn = td.getTableName();
+ if (!ReplicationUtils.contains(peerConfig, tn)) {
+ continue;
+ }
+ if (oldPeerConfig != null && oldPeerConfig.isSerial() &&
+ ReplicationUtils.contains(oldPeerConfig, tn)) {
+ continue;
}
- }).forEach(td -> {
try {
- addChildProcedure(env.getAssignmentManager().createReopenProcedures(
- env.getAssignmentManager().getRegionStates().getRegionsOfTable(td.getTableName())));
- } catch (IOException e) {
- throw new UncheckedIOException(e);
+ if (!tsm.getTableState(tn).isEnabled()) {
+ continue;
+ }
+ } catch (TableStateNotFoundException e) {
+ continue;
}
- });
+ addChildProcedure(env.getAssignmentManager().createReopenProcedures(
+ env.getAssignmentManager().getRegionStates().getRegionsOfTable(tn)));
+ }
}
private void addToMap(Map<String, Long> lastSeqIds, String encodedRegionName, long barrier,
ReplicationQueueStorage queueStorage) throws ReplicationException {
if (barrier >= 0) {
lastSeqIds.put(encodedRegionName, barrier);
- if (lastSeqIds.size() >= SET_LAST_SEQ_ID_BATCH_SIZE) {
+ if (lastSeqIds.size() >= UPDATE_LAST_SEQ_ID_BATCH_SIZE) {
queueStorage.setLastSequenceIds(peerId, lastSeqIds);
lastSeqIds.clear();
}
}
}
- private void setLastSequenceIdForSerialPeer(MasterProcedureEnv env)
- throws IOException, ReplicationException {
- Stream<TableDescriptor> stream = getTables(env);
+ protected final void setLastPushedSequenceId(MasterProcedureEnv env,
+ ReplicationPeerConfig peerConfig) throws IOException, ReplicationException {
+ Map<String, Long> lastSeqIds = new HashMap<String, Long>();
+ for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) {
+ if (!td.hasGlobalReplicationScope()) {
+ continue;
+ }
+ TableName tn = td.getTableName();
+ if (!ReplicationUtils.contains(peerConfig, tn)) {
+ continue;
+ }
+ setLastPushedSequenceIdForTable(env, tn, lastSeqIds);
+ }
+ if (!lastSeqIds.isEmpty()) {
+ env.getReplicationPeerManager().getQueueStorage().setLastSequenceIds(peerId, lastSeqIds);
+ }
+ }
+
+ // Will put the encodedRegionName->lastPushedSeqId pair into the map passed in, if the map is
+ // large enough we will call queueStorage.setLastSequenceIds and clear the map. So the caller
+ // should not forget to check whether the map is empty at last, if not you should call
+ // queueStorage.setLastSequenceIds to write out the remaining entries in the map.
+ protected final void setLastPushedSequenceIdForTable(MasterProcedureEnv env, TableName tableName,
+ Map<String, Long> lastSeqIds) throws IOException, ReplicationException {
TableStateManager tsm = env.getMasterServices().getTableStateManager();
ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage();
Connection conn = env.getMasterServices().getConnection();
RegionStates regionStates = env.getAssignmentManager().getRegionStates();
MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
- Map<String, Long> lastSeqIds = new HashMap<String, Long>();
- stream.forEach(td -> {
- try {
- if (tsm.getTableState(td.getTableName()).isEnabled()) {
- for (Pair<String, Long> name2Barrier : MetaTableAccessor
- .getTableEncodedRegionNameAndLastBarrier(conn, td.getTableName())) {
- addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1,
- queueStorage);
- }
- } else {
- for (RegionInfo region : regionStates.getRegionsOfTable(td.getTableName(), true)) {
- long maxSequenceId =
- WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), mfs.getRegionDir(region));
- addToMap(lastSeqIds, region.getEncodedName(), maxSequenceId, queueStorage);
- }
- }
- } catch (IOException | ReplicationException e) {
- throw new RuntimeException(e);
+ boolean isTableEnabled;
+ try {
+ isTableEnabled = tsm.getTableState(tableName).isEnabled();
+ } catch (TableStateNotFoundException e) {
+ return;
+ }
+ if (isTableEnabled) {
+ for (Pair<String, Long> name2Barrier : MetaTableAccessor
+ .getTableEncodedRegionNameAndLastBarrier(conn, tableName)) {
+ addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1,
+ queueStorage);
+ }
+ } else {
+ for (RegionInfo region : regionStates.getRegionsOfTable(tableName, true)) {
+ long maxSequenceId =
+ WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), mfs.getRegionDir(region));
+ addToMap(lastSeqIds, region.getEncodedName(), maxSequenceId, queueStorage);
}
- });
- if (!lastSeqIds.isEmpty()) {
- queueStorage.setLastSequenceIds(peerId, lastSeqIds);
}
}
@@ -232,8 +247,7 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
return Flow.HAS_MORE_STATE;
case REFRESH_PEER_ON_RS:
refreshPeer(env, getPeerOperationType());
- setNextState(reopenRegionsAfterRefresh() ? PeerModificationState.SERIAL_PEER_REOPEN_REGIONS
- : PeerModificationState.POST_PEER_MODIFICATION);
+ setNextState(nextStateAfterRefresh());
return Flow.HAS_MORE_STATE;
case SERIAL_PEER_REOPEN_REGIONS:
try {
@@ -246,7 +260,7 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
return Flow.HAS_MORE_STATE;
case SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID:
try {
- setLastSequenceIdForSerialPeer(env);
+ updateLastPushedSequenceIdForSerialPeer(env);
} catch (Exception e) {
LOG.warn("{} set last sequence id for peer {} failed, retry", getClass().getName(),
peerId, e);
http://git-wip-us.apache.org/repos/asf/hbase/blob/e821b9ac/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
index ccfd4a0..39c8fa9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
@@ -18,6 +18,14 @@
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
@@ -25,11 +33,13 @@ import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.UpdatePeerConfigStateData;
/**
@@ -59,12 +69,84 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure {
return PeerOperationType.UPDATE_CONFIG;
}
+ private void addToList(List<String> encodedRegionNames, String encodedRegionName,
+ ReplicationQueueStorage queueStorage) throws ReplicationException {
+ encodedRegionNames.add(encodedRegionName);
+ if (encodedRegionNames.size() >= UPDATE_LAST_SEQ_ID_BATCH_SIZE) {
+ queueStorage.removeLastSequenceIds(peerId, encodedRegionNames);
+ encodedRegionNames.clear();
+ }
+ }
+
+ @Override
+ protected PeerModificationState nextStateAfterRefresh() {
+ if (peerConfig.isSerial()) {
+ if (oldPeerConfig.isSerial()) {
+ // both serial, then if the ns/table-cfs configs are not changed, just go with the normal
+ // way, otherwise we need to reopen the regions for the newly added tables.
+ return ReplicationUtils.isNamespacesAndTableCFsEqual(peerConfig, oldPeerConfig)
+ ? super.nextStateAfterRefresh()
+ : PeerModificationState.SERIAL_PEER_REOPEN_REGIONS;
+ } else {
+ // we change the peer to serial, need to reopen all regions
+ return PeerModificationState.SERIAL_PEER_REOPEN_REGIONS;
+ }
+ } else {
+ if (oldPeerConfig.isSerial()) {
+ // we remove the serial flag for peer, then we do not need to reopen all regions, but we
+ // need to remove the last pushed sequence ids.
+ return PeerModificationState.SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID;
+ } else {
+ // not serial for both, just go with the normal way.
+ return super.nextStateAfterRefresh();
+ }
+ }
+ }
+
@Override
- protected boolean reopenRegionsAfterRefresh() {
- // If we remove some tables from the peer config then we do not need to enter the extra states
- // for serial replication. Could try to optimize later since it is not easy to determine this...
- return peerConfig.isSerial() && (!oldPeerConfig.isSerial() ||
- !ReplicationUtils.isNamespacesAndTableCFsEqual(peerConfig, oldPeerConfig));
+ protected void updateLastPushedSequenceIdForSerialPeer(MasterProcedureEnv env)
+ throws IOException, ReplicationException {
+ if (!oldPeerConfig.isSerial()) {
+ assert peerConfig.isSerial();
+ // change to serial
+ setLastPushedSequenceId(env, peerConfig);
+ return;
+ }
+ if (!peerConfig.isSerial()) {
+ // remove the serial flag
+ env.getReplicationPeerManager().removeAllLastPushedSeqIds(peerId);
+ return;
+ }
+ // enter here means peerConfig and oldPeerConfig are both serial, let's find out the diffs and
+ // process them
+ ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage();
+ Connection conn = env.getMasterServices().getConnection();
+ Map<String, Long> lastSeqIds = new HashMap<String, Long>();
+ List<String> encodedRegionNames = new ArrayList<>();
+ for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) {
+ if (!td.hasGlobalReplicationScope()) {
+ continue;
+ }
+ TableName tn = td.getTableName();
+ if (ReplicationUtils.contains(oldPeerConfig, tn)) {
+ if (!ReplicationUtils.contains(peerConfig, tn)) {
+ // removed from peer config
+ for (String encodedRegionName : MetaTableAccessor
+ .getTableEncodedRegionNamesForSerialReplication(conn, tn)) {
+ addToList(encodedRegionNames, encodedRegionName, queueStorage);
+ }
+ }
+ } else if (ReplicationUtils.contains(peerConfig, tn)) {
+ // newly added to peer config
+ setLastPushedSequenceIdForTable(env, tn, lastSeqIds);
+ }
+ }
+ if (!encodedRegionNames.isEmpty()) {
+ queueStorage.removeLastSequenceIds(peerId, encodedRegionNames);
+ }
+ if (!lastSeqIds.isEmpty()) {
+ queueStorage.setLastSequenceIds(peerId, lastSeqIds);
+ }
}
@Override
@@ -99,7 +181,9 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure {
@Override
protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException {
env.getReplicationPeerManager().updatePeerConfig(peerId, peerConfig);
- if (enabled && reopenRegionsAfterRefresh()) {
+ // if we need to jump to the special states for serial peers, then we need to disable the peer
+ // first if it is not disabled yet.
+ if (enabled && nextStateAfterRefresh() != super.nextStateAfterRefresh()) {
env.getReplicationPeerManager().disablePeer(peerId);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e821b9ac/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestEnableTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestEnableTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestEnableTable.java
index 3b807aa..7a1bc55 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestEnableTable.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestEnableTable.java
@@ -186,8 +186,8 @@ public class TestEnableTable {
fail("Got an exception while deleting " + tableName);
}
int rowCount = 0;
- try (ResultScanner scanner =
- metaTable.getScanner(MetaTableAccessor.getScanForTableName(TEST_UTIL.getConnection(), tableName))) {
+ try (ResultScanner scanner = metaTable
+ .getScanner(MetaTableAccessor.getScanForTableName(TEST_UTIL.getConnection(), tableName))) {
for (Result result : scanner) {
LOG.info("Found when none expected: " + result);
rowCount++;
http://git-wip-us.apache.org/repos/asf/hbase/blob/e821b9ac/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestRemoveFromSerialReplicationPeer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestRemoveFromSerialReplicationPeer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestRemoveFromSerialReplicationPeer.java
new file mode 100644
index 0000000..eda15d8
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestRemoveFromSerialReplicationPeer.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Collections;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+
+/**
+ * Testcase for HBASE-20296.
+ */
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestRemoveFromSerialReplicationPeer extends SerialReplicationTestBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestRemoveFromSerialReplicationPeer.class);
+
+ @Before
+ public void setUp() throws IOException, StreamLacksCapabilityException {
+ setupWALWriter();
+ }
+
+ private void waitUntilHasLastPushedSequenceId(RegionInfo region) throws Exception {
+ ReplicationQueueStorage queueStorage =
+ UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
+ UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
+
+ @Override
+ public boolean evaluate() throws Exception {
+ return queueStorage.getLastSequenceId(region.getEncodedName(), PEER_ID) > 0;
+ }
+
+ @Override
+ public String explainFailure() throws Exception {
+ return "Still no last pushed sequence id for " + region;
+ }
+ });
+ }
+
+ @Test
+ public void testRemoveTable() throws Exception {
+ TableName tableName = createTable();
+ ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
+ .setClusterKey("127.0.0.1:2181:/hbase")
+ .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName())
+ .setReplicateAllUserTables(false)
+ .setTableCFsMap(ImmutableMap.of(tableName, Collections.emptyList())).setSerial(true).build();
+ UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true);
+ try (Table table = UTIL.getConnection().getTable(tableName)) {
+ for (int i = 0; i < 100; i++) {
+ table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
+ }
+ }
+ RegionInfo region = UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo();
+ waitUntilHasLastPushedSequenceId(region);
+
+ UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID,
+ ReplicationPeerConfig.newBuilder(peerConfig).setTableCFsMap(Collections.emptyMap()).build());
+
+ ReplicationQueueStorage queueStorage =
+ UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
+ assertEquals(HConstants.NO_SEQNUM,
+ queueStorage.getLastSequenceId(region.getEncodedName(), PEER_ID));
+ }
+
+ @Test
+ public void testRemoveSerialFlag() throws Exception {
+ TableName tableName = createTable();
+ addPeer(true);
+ try (Table table = UTIL.getConnection().getTable(tableName)) {
+ for (int i = 0; i < 100; i++) {
+ table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
+ }
+ }
+ RegionInfo region = UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo();
+ waitUntilHasLastPushedSequenceId(region);
+ UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID, ReplicationPeerConfig
+ .newBuilder(UTIL.getAdmin().getReplicationPeerConfig(PEER_ID)).setSerial(false).build());
+ waitUntilReplicationDone(100);
+
+ ReplicationQueueStorage queueStorage =
+ UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
+ assertEquals(HConstants.NO_SEQNUM,
+ queueStorage.getLastSequenceId(region.getEncodedName(), PEER_ID));
+ }
+}