You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2023/03/18 13:39:05 UTC
[hbase] branch HBASE-27109/table_based_rqs updated: HBASE-27216 Revisit the ReplicationSyncUp tool (#4966)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch HBASE-27109/table_based_rqs
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/HBASE-27109/table_based_rqs by this push:
new 1f14f4ae436 HBASE-27216 Revisit the ReplicationSyncUp tool (#4966)
1f14f4ae436 is described below
commit 1f14f4ae43632f5eaa81ff7e9af066f35ff97ed4
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Sat Mar 18 21:38:53 2023 +0800
HBASE-27216 Revisit the ReplicationSyncUp tool (#4966)
Signed-off-by: Liangjun He <he...@apache.org>
---
.../org/apache/hadoop/hbase/util/JsonMapper.java | 4 +
.../protobuf/server/master/MasterProcedure.proto | 1 +
.../hbase/replication/ReplicationQueueStorage.java | 21 ++
.../replication/ReplicationStorageFactory.java | 27 +-
.../replication/TableReplicationQueueStorage.java | 20 ++
.../org/apache/hadoop/hbase/master/HMaster.java | 41 +++
.../AssignReplicationQueuesProcedure.java | 48 ++-
.../ClaimReplicationQueueRemoteProcedure.java | 32 ++
.../OfflineTableReplicationQueueStorage.java | 382 +++++++++++++++++++++
.../master/replication/ReplicationPeerManager.java | 2 +-
.../regionserver/ReplicationSourceManager.java | 188 +++++-----
.../regionserver/ReplicationSyncUp.java | 195 +++++++++--
.../hadoop/hbase/wal/AbstractFSWALProvider.java | 4 +
.../hbase/master/cleaner/TestLogsCleaner.java | 4 +-
.../replication/TestReplicationSyncUpTool.java | 185 +++++-----
.../replication/TestReplicationSyncUpToolBase.java | 3 +-
...estReplicationSyncUpToolWithBulkLoadedData.java | 58 ++--
.../TestTableReplicationQueueStorage.java | 51 +++
...tReplicationSyncUpToolWithMultipleAsyncWAL.java | 3 -
.../TestReplicationSyncUpToolWithMultipleWAL.java | 3 -
.../regionserver/TestReplicationSourceManager.java | 14 +-
.../regionserver/TestSerialReplicationChecker.java | 4 +-
22 files changed, 1025 insertions(+), 265 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/JsonMapper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/JsonMapper.java
index 0ff131f23bf..f2c4585a6a8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/JsonMapper.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/JsonMapper.java
@@ -40,4 +40,8 @@ public final class JsonMapper {
public static String writeObjectAsString(Object object) throws IOException {
return GSON.toJson(object);
}
+
+ public static <T> T fromJson(String json, Class<T> clazz) {
+ return GSON.fromJson(json, clazz);
+ }
}
diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto
index 14d07c17c88..901abf6bd0c 100644
--- a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto
@@ -717,6 +717,7 @@ message ModifyColumnFamilyStoreFileTrackerStateData {
enum AssignReplicationQueuesState {
ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES = 1;
ASSIGN_REPLICATION_QUEUES_CLAIM = 2;
+ ASSIGN_REPLICATION_QUEUES_REMOVE_QUEUES = 3;
}
message AssignReplicationQueuesStateData {
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
index 1e36bbeb78f..b5bc64eb55a 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
@@ -203,4 +203,25 @@ public interface ReplicationQueueStorage {
* Add the given hfile refs to the given peer.
*/
void batchUpdateHFileRefs(String peerId, List<String> hfileRefs) throws ReplicationException;
+
+ // the below method is for clean up stale data after running ReplicatoinSyncUp
+ /**
+ * Remove all the last sequence ids and hfile references data which are written before the given
+ * timestamp.
+ * <p/>
+ * The data of these two types are not used by replication directly.
+ * <p/>
+ * For last sequence ids, we will check it in serial replication, to make sure that we will
+ * replicate all edits in order, so if there are stale data, the worst case is that we will stop
+ * replicating as we think we still need to finish previous ranges first, although actually we
+ * have already replicated them out.
+ * <p/>
+ * For hfile references, it is just used by hfile cleaner to not remove these hfiles before we
+ * replicate them out, so if there are stale data, the worst case is that we can not remove these
+ * hfiles, although actually they have already been replicated out.
+ * <p/>
+ * So it is OK for us to just bring up the cluster first, and then use this method to delete the
+ * stale data, i.e, the data which are written before a specific timestamp.
+ */
+ void removeLastSequenceIdsAndHFileRefsBefore(long ts) throws ReplicationException;
}
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java
index dc4317feaa4..4d5fcb45634 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
+import java.lang.reflect.Constructor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
@@ -27,8 +28,11 @@ import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Used to create replication storage(peer, queue) classes.
@@ -36,11 +40,15 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public final class ReplicationStorageFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(ReplicationStorageFactory.class);
+
public static final String REPLICATION_QUEUE_TABLE_NAME = "hbase.replication.queue.table.name";
public static final TableName REPLICATION_QUEUE_TABLE_NAME_DEFAULT =
TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication");
+ public static final String REPLICATION_QUEUE_IMPL = "hbase.replication.queue.impl";
+
public static TableDescriptor createReplicationQueueTableDescriptor(TableName tableName)
throws IOException {
return TableDescriptorBuilder.newBuilder(tableName)
@@ -72,15 +80,26 @@ public final class ReplicationStorageFactory {
*/
public static ReplicationQueueStorage getReplicationQueueStorage(Connection conn,
Configuration conf) {
- return getReplicationQueueStorage(conn, TableName.valueOf(conf.get(REPLICATION_QUEUE_TABLE_NAME,
- REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString())));
+ return getReplicationQueueStorage(conn, conf, TableName.valueOf(conf
+ .get(REPLICATION_QUEUE_TABLE_NAME, REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString())));
}
/**
* Create a new {@link ReplicationQueueStorage}.
*/
public static ReplicationQueueStorage getReplicationQueueStorage(Connection conn,
- TableName tableName) {
- return new TableReplicationQueueStorage(conn, tableName);
+ Configuration conf, TableName tableName) {
+ Class<? extends ReplicationQueueStorage> clazz = conf.getClass(REPLICATION_QUEUE_IMPL,
+ TableReplicationQueueStorage.class, ReplicationQueueStorage.class);
+ try {
+ Constructor<? extends ReplicationQueueStorage> c =
+ clazz.getConstructor(Connection.class, TableName.class);
+ return c.newInstance(conn, tableName);
+ } catch (Exception e) {
+ LOG.debug(
+ "failed to create ReplicationQueueStorage with Connection, try creating with Configuration",
+ e);
+ return ReflectionUtils.newInstance(clazz, conf, tableName);
+ }
}
}
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java
index f3870f4d09d..e59edd52f79 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java
@@ -594,4 +594,24 @@ public class TableReplicationQueueStorage implements ReplicationQueueStorage {
throw new ReplicationException("failed to batch update hfile references", e);
}
}
+
+ @Override
+ public void removeLastSequenceIdsAndHFileRefsBefore(long ts) throws ReplicationException {
+ try (Table table = conn.getTable(tableName);
+ ResultScanner scanner = table.getScanner(new Scan().addFamily(LAST_SEQUENCE_ID_FAMILY)
+ .addFamily(HFILE_REF_FAMILY).setFilter(new KeyOnlyFilter()))) {
+ for (;;) {
+ Result r = scanner.next();
+ if (r == null) {
+ break;
+ }
+ Delete delete = new Delete(r.getRow()).addFamily(LAST_SEQUENCE_ID_FAMILY, ts)
+ .addFamily(HFILE_REF_FAMILY, ts);
+ table.delete(delete);
+ }
+ } catch (IOException e) {
+ throw new ReplicationException(
+ "failed to remove last sequence ids and hfile references before timestamp " + ts, e);
+ }
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 67d0f889d64..f2086393e63 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -34,6 +34,9 @@ import java.lang.reflect.InvocationTargetException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -59,6 +62,7 @@ import java.util.stream.Collectors;
import javax.servlet.http.HttpServlet;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CatalogFamilyFormat;
@@ -226,6 +230,8 @@ import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration
import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
import org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp.ReplicationSyncUpToolInfo;
import org.apache.hadoop.hbase.rsgroup.RSGroupAdminEndpoint;
import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
import org.apache.hadoop.hbase.rsgroup.RSGroupInfoManager;
@@ -246,6 +252,7 @@ import org.apache.hadoop.hbase.util.HBaseFsck;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.IdLock;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.util.JsonMapper;
import org.apache.hadoop.hbase.util.ModifyRegionUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.RetryCounter;
@@ -267,7 +274,9 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
+import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+import org.apache.hbase.thirdparty.com.google.gson.JsonParseException;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
import org.apache.hbase.thirdparty.com.google.protobuf.Service;
import org.apache.hbase.thirdparty.org.eclipse.jetty.server.Server;
@@ -1278,6 +1287,38 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
status.setStatus("Initializing MOB Cleaner");
initMobCleaner();
+ // delete the stale data for replication sync up tool if necessary
+ status.setStatus("Cleanup ReplicationSyncUp status if necessary");
+ Path replicationSyncUpInfoFile =
+ new Path(new Path(dataRootDir, ReplicationSyncUp.INFO_DIR), ReplicationSyncUp.INFO_FILE);
+ if (dataFs.exists(replicationSyncUpInfoFile)) {
+ // info file is available, load the timestamp and use it to clean up stale data in replication
+ // queue storage.
+ byte[] data;
+ try (FSDataInputStream in = dataFs.open(replicationSyncUpInfoFile)) {
+ data = ByteStreams.toByteArray(in);
+ }
+ ReplicationSyncUpToolInfo info = null;
+ try {
+ info = JsonMapper.fromJson(Bytes.toString(data), ReplicationSyncUpToolInfo.class);
+ } catch (JsonParseException e) {
+ // usually this should be a partial file, which means the ReplicationSyncUp tool did not
+ // finish properly, so not a problem. Here we do not clean up the status as we do not know
+ // the reason why the tool did not finish properly, so let users clean the status up
+ // manually
+ LOG.warn("failed to parse replication sync up info file, ignore and continue...", e);
+ }
+ if (info != null) {
+ LOG.info("Remove last sequence ids and hfile references which are written before {}({})",
+ info.getStartTimeMs(), DateTimeFormatter.ISO_DATE_TIME.withZone(ZoneId.systemDefault())
+ .format(Instant.ofEpochMilli(info.getStartTimeMs())));
+ replicationPeerManager.getQueueStorage()
+ .removeLastSequenceIdsAndHFileRefsBefore(info.getStartTimeMs());
+ // delete the file after removing the stale data, so next time we do not need to do this
+ // again.
+ dataFs.delete(replicationSyncUpInfoFile, false);
+ }
+ }
status.setStatus("Calling postStartMaster coprocessors");
if (this.cpHost != null) {
// don't let cp initialization errors kill the master
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AssignReplicationQueuesProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AssignReplicationQueuesProcedure.java
index d33259dd436..b547c87009d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AssignReplicationQueuesProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AssignReplicationQueuesProcedure.java
@@ -24,7 +24,9 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
@@ -37,6 +39,7 @@ import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationQueueId;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -102,7 +105,7 @@ public class AssignReplicationQueuesProcedure
}
}
- private Flow claimQueues(MasterProcedureEnv env) throws ReplicationException {
+ private Flow claimQueues(MasterProcedureEnv env) throws ReplicationException, IOException {
Set<String> existingPeerIds = env.getReplicationPeerManager().listPeers(null).stream()
.map(ReplicationPeerDescription::getPeerId).collect(Collectors.toSet());
ReplicationQueueStorage storage = env.getReplicationPeerManager().getQueueStorage();
@@ -130,18 +133,51 @@ public class AssignReplicationQueuesProcedure
return Flow.HAS_MORE_STATE;
}
+ // check whether ReplicationSyncUp has already done the work for us, if so, we should skip
+ // claiming the replication queues and deleting them instead.
+ private boolean shouldSkip(MasterProcedureEnv env) throws IOException {
+ MasterFileSystem mfs = env.getMasterFileSystem();
+ Path syncUpDir = new Path(mfs.getRootDir(), ReplicationSyncUp.INFO_DIR);
+ return mfs.getFileSystem().exists(new Path(syncUpDir, crashedServer.getServerName()));
+ }
+
+ private void removeQueues(MasterProcedureEnv env) throws ReplicationException, IOException {
+ ReplicationQueueStorage storage = env.getReplicationPeerManager().getQueueStorage();
+ for (ReplicationQueueId queueId : storage.listAllQueueIds(crashedServer)) {
+ storage.removeQueue(queueId);
+ }
+ MasterFileSystem mfs = env.getMasterFileSystem();
+ Path syncUpDir = new Path(mfs.getRootDir(), ReplicationSyncUp.INFO_DIR);
+ // remove the region server record file
+ mfs.getFileSystem().delete(new Path(syncUpDir, crashedServer.getServerName()), false);
+ }
+
@Override
protected Flow executeFromState(MasterProcedureEnv env, AssignReplicationQueuesState state)
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
try {
switch (state) {
case ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES:
- addMissingQueues(env);
- retryCounter = null;
- setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_CLAIM);
- return Flow.HAS_MORE_STATE;
+ if (shouldSkip(env)) {
+ setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_REMOVE_QUEUES);
+ return Flow.HAS_MORE_STATE;
+ } else {
+ addMissingQueues(env);
+ retryCounter = null;
+ setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_CLAIM);
+ return Flow.HAS_MORE_STATE;
+ }
case ASSIGN_REPLICATION_QUEUES_CLAIM:
- return claimQueues(env);
+ if (shouldSkip(env)) {
+ retryCounter = null;
+ setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_REMOVE_QUEUES);
+ return Flow.HAS_MORE_STATE;
+ } else {
+ return claimQueues(env);
+ }
+ case ASSIGN_REPLICATION_QUEUES_REMOVE_QUEUES:
+ removeQueues(env);
+ return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException("unhandled state=" + state);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java
index 7b637384398..d3aeeba541a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java
@@ -19,16 +19,22 @@ package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import java.util.Optional;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation;
import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
import org.apache.hadoop.hbase.master.procedure.ServerRemoteProcedure;
+import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
import org.apache.hadoop.hbase.replication.ReplicationQueueId;
import org.apache.hadoop.hbase.replication.regionserver.ClaimReplicationQueueCallable;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,6 +60,32 @@ public class ClaimReplicationQueueRemoteProcedure extends ServerRemoteProcedure
this.targetServer = targetServer;
}
+ // check whether ReplicationSyncUp has already done the work for us, if so, we should skip
+ // claiming the replication queues and deleting them instead.
+ private boolean shouldSkip(MasterProcedureEnv env) throws IOException {
+ MasterFileSystem mfs = env.getMasterFileSystem();
+ Path syncUpDir = new Path(mfs.getRootDir(), ReplicationSyncUp.INFO_DIR);
+ return mfs.getFileSystem().exists(new Path(syncUpDir, getServerName().getServerName()));
+ }
+
+ @Override
+ protected synchronized Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
+ throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
+ try {
+ if (shouldSkip(env)) {
+ LOG.info("Skip claiming {} because replication sync up has already done it for us",
+ getServerName());
+ return null;
+ }
+ } catch (IOException e) {
+ LOG.warn("failed to check whether we should skip claiming {} due to replication sync up",
+ getServerName(), e);
+ // just finish the procedure here, as the AssignReplicationQueuesProcedure will reschedule
+ return null;
+ }
+ return super.execute(env);
+ }
+
@Override
public Optional<RemoteOperation> remoteCallBuild(MasterProcedureEnv env, ServerName remote) {
assert targetServer.equals(remote);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/OfflineTableReplicationQueueStorage.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/OfflineTableReplicationQueueStorage.java
new file mode 100644
index 00000000000..9faca74f710
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/OfflineTableReplicationQueueStorage.java
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
+import org.apache.hadoop.hbase.replication.ReplicationQueueData;
+import org.apache.hadoop.hbase.replication.ReplicationQueueId;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
+import org.apache.hadoop.hbase.replication.TableReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+
+@InterfaceAudience.Private
+public class OfflineTableReplicationQueueStorage implements ReplicationQueueStorage {
+
+ private final Map<ReplicationQueueId, Map<String, ReplicationGroupOffset>> offsets =
+ new HashMap<>();
+
+ private final Map<String, Map<String, Long>> lastSequenceIds = new HashMap<>();
+
+ private final Map<String, Set<String>> hfileRefs = new HashMap<>();
+
+ private void loadRegionInfo(FileSystem fs, Path regionDir,
+ NavigableMap<byte[], RegionInfo> startKey2RegionInfo) throws IOException {
+ RegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
+ // TODO: we consider that the there will not be too many regions for hbase:replication table, so
+ // here we just iterate over all the regions to find out the overlapped ones. Can be optimized
+ // later.
+ Iterator<Map.Entry<byte[], RegionInfo>> iter = startKey2RegionInfo.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<byte[], RegionInfo> entry = iter.next();
+ if (hri.isOverlap(entry.getValue())) {
+ if (hri.getRegionId() > entry.getValue().getRegionId()) {
+ // we are newer, remove the old hri, we can not break here as if hri is a merged region,
+ // we need to remove all its parent regions.
+ iter.remove();
+ } else {
+ // we are older, just return, skip the below add
+ return;
+ }
+ }
+
+ }
+ startKey2RegionInfo.put(hri.getStartKey(), hri);
+ }
+
+ private void loadOffsets(Result result) {
+ NavigableMap<byte[], byte[]> map =
+ result.getFamilyMap(TableReplicationQueueStorage.QUEUE_FAMILY);
+ if (map == null || map.isEmpty()) {
+ return;
+ }
+ Map<String, ReplicationGroupOffset> offsetMap = new HashMap<>();
+ map.forEach((k, v) -> {
+ String walGroup = Bytes.toString(k);
+ ReplicationGroupOffset offset = ReplicationGroupOffset.parse(Bytes.toString(v));
+ offsetMap.put(walGroup, offset);
+ });
+ ReplicationQueueId queueId = ReplicationQueueId.parse(Bytes.toString(result.getRow()));
+ offsets.put(queueId, offsetMap);
+ }
+
+ private void loadLastSequenceIds(Result result) {
+ NavigableMap<byte[], byte[]> map =
+ result.getFamilyMap(TableReplicationQueueStorage.LAST_SEQUENCE_ID_FAMILY);
+ if (map == null || map.isEmpty()) {
+ return;
+ }
+ Map<String, Long> lastSeqIdMap = new HashMap<>();
+ map.forEach((k, v) -> {
+ String encodedRegionName = Bytes.toString(k);
+ long lastSeqId = Bytes.toLong(v);
+ lastSeqIdMap.put(encodedRegionName, lastSeqId);
+ });
+ String peerId = Bytes.toString(result.getRow());
+ lastSequenceIds.put(peerId, lastSeqIdMap);
+ }
+
+ private void loadHFileRefs(Result result) {
+ NavigableMap<byte[], byte[]> map =
+ result.getFamilyMap(TableReplicationQueueStorage.HFILE_REF_FAMILY);
+ if (map == null || map.isEmpty()) {
+ return;
+ }
+ Set<String> refs = new HashSet<>();
+ map.keySet().forEach(ref -> refs.add(Bytes.toString(ref)));
+ String peerId = Bytes.toString(result.getRow());
+ hfileRefs.put(peerId, refs);
+ }
+
+ private void loadReplicationQueueData(Configuration conf, TableName tableName)
+ throws IOException {
+ Path rootDir = CommonFSUtils.getRootDir(conf);
+ Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName);
+ FileSystem fs = tableDir.getFileSystem(conf);
+ FileStatus[] regionDirs =
+ CommonFSUtils.listStatus(fs, tableDir, new FSUtils.RegionDirFilter(fs));
+ if (regionDirs == null) {
+ return;
+ }
+ NavigableMap<byte[], RegionInfo> startKey2RegionInfo = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ for (FileStatus regionDir : regionDirs) {
+ loadRegionInfo(fs, regionDir.getPath(), startKey2RegionInfo);
+ }
+ TableDescriptor td = ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName);
+ for (RegionInfo hri : startKey2RegionInfo.values()) {
+ try (ClientSideRegionScanner scanner =
+ new ClientSideRegionScanner(conf, fs, rootDir, td, hri, new Scan(), null)) {
+ for (;;) {
+ Result result = scanner.next();
+ if (result == null) {
+ break;
+ }
+ loadOffsets(result);
+ loadLastSequenceIds(result);
+ loadHFileRefs(result);
+ }
+ }
+ }
+ }
+
+ public OfflineTableReplicationQueueStorage(Configuration conf, TableName tableName)
+ throws IOException {
+ loadReplicationQueueData(conf, tableName);
+ }
+
+ @Override
+ public synchronized void setOffset(ReplicationQueueId queueId, String walGroup,
+ ReplicationGroupOffset offset, Map<String, Long> lastSeqIds) throws ReplicationException {
+ Map<String, ReplicationGroupOffset> offsetMap = offsets.get(queueId);
+ if (offsetMap == null) {
+ offsetMap = new HashMap<>();
+ offsets.put(queueId, offsetMap);
+ }
+ offsetMap.put(walGroup, offset);
+ Map<String, Long> lastSeqIdsMap = lastSequenceIds.get(queueId.getPeerId());
+ if (lastSeqIdsMap == null) {
+ lastSeqIdsMap = new HashMap<>();
+ lastSequenceIds.put(queueId.getPeerId(), lastSeqIdsMap);
+ }
+ for (Map.Entry<String, Long> entry : lastSeqIds.entrySet()) {
+ Long oldSeqId = lastSeqIdsMap.get(entry.getKey());
+ if (oldSeqId == null || oldSeqId < entry.getValue()) {
+ lastSeqIdsMap.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
+ @Override
+ public synchronized Map<String, ReplicationGroupOffset> getOffsets(ReplicationQueueId queueId)
+ throws ReplicationException {
+ Map<String, ReplicationGroupOffset> offsetMap = offsets.get(queueId);
+ if (offsetMap == null) {
+ return Collections.emptyMap();
+ }
+ return ImmutableMap.copyOf(offsetMap);
+ }
+
+ @Override
+ public synchronized List<ReplicationQueueId> listAllQueueIds(String peerId)
+ throws ReplicationException {
+ return offsets.keySet().stream().filter(rqi -> rqi.getPeerId().equals(peerId))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public synchronized List<ReplicationQueueId> listAllQueueIds(ServerName serverName)
+ throws ReplicationException {
+ return offsets.keySet().stream().filter(rqi -> rqi.getServerName().equals(serverName))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public synchronized List<ReplicationQueueId> listAllQueueIds(String peerId, ServerName serverName)
+ throws ReplicationException {
+ return offsets.keySet().stream()
+ .filter(rqi -> rqi.getPeerId().equals(peerId) && rqi.getServerName().equals(serverName))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public synchronized List<ReplicationQueueData> listAllQueues() throws ReplicationException {
+ return offsets.entrySet().stream()
+ .map(e -> new ReplicationQueueData(e.getKey(), ImmutableMap.copyOf(e.getValue())))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public synchronized List<ServerName> listAllReplicators() throws ReplicationException {
+ return offsets.keySet().stream().map(ReplicationQueueId::getServerName).distinct()
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public synchronized Map<String, ReplicationGroupOffset> claimQueue(ReplicationQueueId queueId,
+ ServerName targetServerName) throws ReplicationException {
+ Map<String, ReplicationGroupOffset> offsetMap = offsets.remove(queueId);
+ if (offsetMap == null) {
+ return Collections.emptyMap();
+ }
+ offsets.put(queueId.claim(targetServerName), offsetMap);
+ return ImmutableMap.copyOf(offsetMap);
+ }
+
+ @Override
+ public synchronized void removeQueue(ReplicationQueueId queueId) throws ReplicationException {
+ offsets.remove(queueId);
+ }
+
+ @Override
+ public synchronized void removeAllQueues(String peerId) throws ReplicationException {
+ Iterator<ReplicationQueueId> iter = offsets.keySet().iterator();
+ while (iter.hasNext()) {
+ if (iter.next().getPeerId().equals(peerId)) {
+ iter.remove();
+ }
+ }
+ }
+
+ @Override
+ public synchronized long getLastSequenceId(String encodedRegionName, String peerId)
+ throws ReplicationException {
+ Map<String, Long> lastSeqIdMap = lastSequenceIds.get(peerId);
+ if (lastSeqIdMap == null) {
+ return HConstants.NO_SEQNUM;
+ }
+ Long lastSeqId = lastSeqIdMap.get(encodedRegionName);
+ return lastSeqId != null ? lastSeqId.longValue() : HConstants.NO_SEQNUM;
+ }
+
+ @Override
+ public synchronized void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds)
+ throws ReplicationException {
+ Map<String, Long> lastSeqIdMap = lastSequenceIds.get(peerId);
+ if (lastSeqIdMap == null) {
+ lastSeqIdMap = new HashMap<>();
+ lastSequenceIds.put(peerId, lastSeqIdMap);
+ }
+ lastSeqIdMap.putAll(lastSeqIds);
+ }
+
+ @Override
+ public synchronized void removeLastSequenceIds(String peerId) throws ReplicationException {
+ lastSequenceIds.remove(peerId);
+ }
+
+ @Override
+ public synchronized void removeLastSequenceIds(String peerId, List<String> encodedRegionNames)
+ throws ReplicationException {
+ Map<String, Long> lastSeqIdMap = lastSequenceIds.get(peerId);
+ if (lastSeqIdMap == null) {
+ return;
+ }
+ for (String encodedRegionName : encodedRegionNames) {
+ lastSeqIdMap.remove(encodedRegionName);
+ }
+ }
+
+ @Override
+ public synchronized void removePeerFromHFileRefs(String peerId) throws ReplicationException {
+ hfileRefs.remove(peerId);
+ }
+
+ @Override
+ public synchronized void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs)
+ throws ReplicationException {
+ Set<String> refs = hfileRefs.get(peerId);
+ if (refs == null) {
+ refs = new HashSet<>();
+ hfileRefs.put(peerId, refs);
+ }
+ for (Pair<Path, Path> pair : pairs) {
+ refs.add(pair.getSecond().getName());
+ }
+ }
+
+ @Override
+ public synchronized void removeHFileRefs(String peerId, List<String> files)
+ throws ReplicationException {
+ Set<String> refs = hfileRefs.get(peerId);
+ if (refs == null) {
+ return;
+ }
+ refs.removeAll(files);
+ }
+
+ @Override
+ public synchronized List<String> getAllPeersFromHFileRefsQueue() throws ReplicationException {
+ return ImmutableList.copyOf(hfileRefs.keySet());
+ }
+
+ @Override
+ public synchronized List<String> getReplicableHFiles(String peerId) throws ReplicationException {
+ Set<String> refs = hfileRefs.get(peerId);
+ if (refs == null) {
+ return Collections.emptyList();
+ }
+ return ImmutableList.copyOf(refs);
+ }
+
+ @Override
+ public synchronized Set<String> getAllHFileRefs() throws ReplicationException {
+ return hfileRefs.values().stream().flatMap(Set::stream).collect(Collectors.toSet());
+ }
+
+ @Override
+ public boolean hasData() throws ReplicationException {
+ return true;
+ }
+
+ @Override
+ public void batchUpdateQueues(ServerName serverName, List<ReplicationQueueData> datas)
+ throws ReplicationException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void batchUpdateLastSequenceIds(
+ List<ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId> lastPushedSeqIds)
+ throws ReplicationException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void batchUpdateHFileRefs(String peerId, List<String> hfileRefs)
+ throws ReplicationException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void removeLastSequenceIdsAndHFileRefsBefore(long ts) throws ReplicationException {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index d8c1b5c64c5..bb170be64af 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -641,7 +641,7 @@ public class ReplicationPeerManager {
};
}
return Pair.newPair(ReplicationStorageFactory.getReplicationQueueStorage(
- services.getConnection(), replicationQueueTableName), initializer);
+ services.getConnection(), conf, replicationQueueTableName), initializer);
}
public static ReplicationPeerManager create(MasterServices services, String clusterId)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index f3d07315240..f34f0d194e6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -25,7 +25,6 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -125,6 +124,7 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
*/
@InterfaceAudience.Private
public class ReplicationSourceManager {
+
private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceManager.class);
// all the sources that read this RS's logs and every peer only has one replication source
private final ConcurrentMap<String, ReplicationSourceInterface> sources;
@@ -146,13 +146,15 @@ public class ReplicationSourceManager {
// All logs we are currently tracking
// Index structure of the map is: queue_id->logPrefix/logGroup->logs
- // For normal replication source, the peer id is same with the queue id
private final ConcurrentMap<ReplicationQueueId, Map<String, NavigableSet<String>>> walsById;
// Logs for recovered sources we are currently tracking
// the map is: queue_id->logPrefix/logGroup->logs
- // For recovered source, the queue id's format is peer_id-servername-*
+ // for recovered source, the WAL files should already been moved to oldLogDir, and we have
+ // different layout of old WAL files, for example, with server name sub directories or not, so
+ // here we record the full path instead of just the name, so when refreshing we can enqueue the
+ // WAL file again, without trying to guess the real path of the WAL files.
private final ConcurrentMap<ReplicationQueueId,
- Map<String, NavigableSet<String>>> walsByIdRecoveredQueues;
+ Map<String, NavigableSet<Path>>> walsByIdRecoveredQueues;
private final SyncReplicationPeerMappingManager syncReplicationPeerMappingManager;
@@ -514,9 +516,9 @@ public class ReplicationSourceManager {
ReplicationSourceInterface recoveredReplicationSource =
createRefreshedSource(oldSourceQueueId, peer);
this.oldsources.add(recoveredReplicationSource);
- for (SortedSet<String> walsByGroup : walsByIdRecoveredQueues.get(oldSourceQueueId)
+ for (NavigableSet<Path> walsByGroup : walsByIdRecoveredQueues.get(oldSourceQueueId)
.values()) {
- walsByGroup.forEach(wal -> recoveredReplicationSource.enqueueLog(new Path(wal)));
+ walsByGroup.forEach(wal -> recoveredReplicationSource.enqueueLog(wal));
}
toStartup.add(recoveredReplicationSource);
}
@@ -656,9 +658,11 @@ public class ReplicationSourceManager {
void cleanOldLogs(String log, boolean inclusive, ReplicationSourceInterface source) {
String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log);
if (source.isRecovered()) {
- NavigableSet<String> wals = walsByIdRecoveredQueues.get(source.getQueueId()).get(logPrefix);
+ NavigableSet<Path> wals = walsByIdRecoveredQueues.get(source.getQueueId()).get(logPrefix);
if (wals != null) {
- NavigableSet<String> walsToRemove = wals.headSet(log, inclusive);
+ // here we just want to compare the timestamp, so it is OK to just create a fake WAL path
+ NavigableSet<String> walsToRemove = wals.headSet(new Path(oldLogDir, log), inclusive)
+ .stream().map(Path::getName).collect(Collectors.toCollection(TreeSet::new));
if (walsToRemove.isEmpty()) {
return;
}
@@ -814,6 +818,93 @@ public class ReplicationSourceManager {
}
void claimQueue(ReplicationQueueId queueId) {
+ claimQueue(queueId, false);
+ }
+
+ // sorted from oldest to newest
+ private PriorityQueue<Path> getWALFilesToReplicate(ServerName sourceRS, boolean syncUp,
+ Map<String, ReplicationGroupOffset> offsets) throws IOException {
+ List<Path> walFiles = AbstractFSWALProvider.getArchivedWALFiles(conf, sourceRS,
+ URLEncoder.encode(sourceRS.toString(), StandardCharsets.UTF_8.name()));
+ if (syncUp) {
+ // we also need to list WALs directory for ReplicationSyncUp
+ walFiles.addAll(AbstractFSWALProvider.getWALFiles(conf, sourceRS));
+ }
+ PriorityQueue<Path> walFilesPQ =
+ new PriorityQueue<>(AbstractFSWALProvider.TIMESTAMP_COMPARATOR);
+ // sort the wal files and also filter out replicated files
+ for (Path file : walFiles) {
+ String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(file.getName());
+ ReplicationGroupOffset groupOffset = offsets.get(walGroupId);
+ if (shouldReplicate(groupOffset, file.getName())) {
+ walFilesPQ.add(file);
+ } else {
+ LOG.debug("Skip enqueuing log {} because it is before the start offset {}", file.getName(),
+ groupOffset);
+ }
+ }
+ return walFilesPQ;
+ }
+
+ private void addRecoveredSource(ReplicationSourceInterface src, ReplicationPeerImpl oldPeer,
+ ReplicationQueueId claimedQueueId, PriorityQueue<Path> walFiles) {
+ ReplicationPeerImpl peer = replicationPeers.getPeer(src.getPeerId());
+ if (peer == null || peer != oldPeer) {
+ src.terminate("Recovered queue doesn't belong to any current peer");
+ deleteQueue(claimedQueueId);
+ return;
+ }
+ // Do not setup recovered queue if a sync replication peer is in STANDBY state, or is
+ // transiting to STANDBY state. The only exception is we are in STANDBY state and
+ // transiting to DA, under this state we will replay the remote WAL and they need to be
+ // replicated back.
+ if (peer.getPeerConfig().isSyncReplication()) {
+ Pair<SyncReplicationState, SyncReplicationState> stateAndNewState =
+ peer.getSyncReplicationStateAndNewState();
+ if (
+ (stateAndNewState.getFirst().equals(SyncReplicationState.STANDBY)
+ && stateAndNewState.getSecond().equals(SyncReplicationState.NONE))
+ || stateAndNewState.getSecond().equals(SyncReplicationState.STANDBY)
+ ) {
+ src.terminate("Sync replication peer is in STANDBY state");
+ deleteQueue(claimedQueueId);
+ return;
+ }
+ }
+ // track sources in walsByIdRecoveredQueues
+ Map<String, NavigableSet<Path>> walsByGroup = new HashMap<>();
+ walsByIdRecoveredQueues.put(claimedQueueId, walsByGroup);
+ for (Path wal : walFiles) {
+ String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName());
+ NavigableSet<Path> wals = walsByGroup.get(walPrefix);
+ if (wals == null) {
+ wals = new TreeSet<>(AbstractFSWALProvider.TIMESTAMP_COMPARATOR);
+ walsByGroup.put(walPrefix, wals);
+ }
+ wals.add(wal);
+ }
+ oldsources.add(src);
+ LOG.info("Added source for recovered queue {}, number of wals to replicate: {}", claimedQueueId,
+ walFiles.size());
+ for (Path wal : walFiles) {
+ LOG.debug("Enqueueing log {} from recovered queue for source: {}", wal, claimedQueueId);
+ src.enqueueLog(wal);
+ }
+ src.startup();
+ }
+
+ /**
+ * Claim a replication queue.
+ * <p/>
+ * We add a flag to indicate whether we are called by ReplicationSyncUp. For normal claiming queue
+ * operation, we are the last step of a SCP, so we can assume that all the WAL files are under
+ * oldWALs directory. But for ReplicationSyncUp, we may want to claim the replication queue for a
+ * region server which has not been processed by SCP yet, so we still need to look at its WALs
+ * directory.
+ * @param queueId the replication queue id we want to claim
+ * @param syncUp whether we are called by ReplicationSyncUp
+ */
+ void claimQueue(ReplicationQueueId queueId, boolean syncUp) {
// Wait a bit before transferring the queues, we may be shutting down.
// This sleep may not be enough in some cases.
try {
@@ -872,76 +963,17 @@ public class ReplicationSourceManager {
server.abort("Failed to create replication source after claiming queue.", e);
return;
}
- List<Path> walFiles;
+ PriorityQueue<Path> walFiles;
try {
- walFiles = AbstractFSWALProvider.getArchivedWALFiles(conf, sourceRS,
- URLEncoder.encode(sourceRS.toString(), StandardCharsets.UTF_8.name()));
+ walFiles = getWALFilesToReplicate(sourceRS, syncUp, offsets);
} catch (IOException e) {
- LOG.error("Can not list all wal files for peer {} and queue {}", peerId, queueId, e);
- server.abort("Can not list all wal files after claiming queue.", e);
+ LOG.error("Can not list wal files for peer {} and queue {}", peerId, queueId, e);
+ server.abort("Can not list wal files after claiming queue.", e);
return;
}
- PriorityQueue<Path> walFilesPQ = new PriorityQueue<>(
- Comparator.<Path, Long> comparing(p -> AbstractFSWALProvider.getTimestamp(p.getName()))
- .thenComparing(Path::getName));
- // sort the wal files and also filter out replicated files
- for (Path file : walFiles) {
- String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(file.getName());
- ReplicationGroupOffset groupOffset = offsets.get(walGroupId);
- if (shouldReplicate(groupOffset, file.getName())) {
- walFilesPQ.add(file);
- } else {
- LOG.debug("Skip enqueuing log {} because it is before the start offset {}", file.getName(),
- groupOffset);
- }
- }
- // the method is a bit long, so assign it to null here to avoid later we reuse it again by
- // mistake, we should use the sorted walFilesPQ instead
- walFiles = null;
// synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
synchronized (oldsources) {
- peer = replicationPeers.getPeer(src.getPeerId());
- if (peer == null || peer != oldPeer) {
- src.terminate("Recovered queue doesn't belong to any current peer");
- deleteQueue(claimedQueueId);
- return;
- }
- // Do not setup recovered queue if a sync replication peer is in STANDBY state, or is
- // transiting to STANDBY state. The only exception is we are in STANDBY state and
- // transiting to DA, under this state we will replay the remote WAL and they need to be
- // replicated back.
- if (peer.getPeerConfig().isSyncReplication()) {
- Pair<SyncReplicationState, SyncReplicationState> stateAndNewState =
- peer.getSyncReplicationStateAndNewState();
- if (
- (stateAndNewState.getFirst().equals(SyncReplicationState.STANDBY)
- && stateAndNewState.getSecond().equals(SyncReplicationState.NONE))
- || stateAndNewState.getSecond().equals(SyncReplicationState.STANDBY)
- ) {
- src.terminate("Sync replication peer is in STANDBY state");
- deleteQueue(claimedQueueId);
- return;
- }
- }
- // track sources in walsByIdRecoveredQueues
- Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
- walsByIdRecoveredQueues.put(claimedQueueId, walsByGroup);
- for (Path wal : walFilesPQ) {
- String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName());
- NavigableSet<String> wals = walsByGroup.get(walPrefix);
- if (wals == null) {
- wals = new TreeSet<>();
- walsByGroup.put(walPrefix, wals);
- }
- wals.add(wal.getName());
- }
- oldsources.add(src);
- LOG.info("Added source for recovered queue {}", claimedQueueId);
- for (Path wal : walFilesPQ) {
- LOG.debug("Enqueueing log {} from recovered queue for source: {}", wal, claimedQueueId);
- src.enqueueLog(new Path(oldLogDir, wal));
- }
- src.startup();
+ addRecoveredSource(src, oldPeer, claimedQueueId, walFiles);
}
}
@@ -970,16 +1002,6 @@ public class ReplicationSourceManager {
return Collections.unmodifiableMap(walsById);
}
- /**
- * Get a copy of the wals of the recovered sources on this rs
- * @return a sorted set of wal names
- */
- @RestrictedApi(explanation = "Should only be called in tests", link = "",
- allowedOnPath = ".*/src/test/.*")
- Map<ReplicationQueueId, Map<String, NavigableSet<String>>> getWalsByIdRecoveredQueues() {
- return Collections.unmodifiableMap(walsByIdRecoveredQueues);
- }
-
/**
* Get a list of all the normal sources of this rs
* @return list of all normal sources
@@ -1099,8 +1121,6 @@ public class ReplicationSourceManager {
return this.globalMetrics;
}
- @RestrictedApi(explanation = "Should only be called in tests", link = "",
- allowedOnPath = ".*/src/test/.*")
ReplicationQueueStorage getQueueStorage() {
return queueStorage;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
index b63ad473719..f071cf6f1f8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
@@ -17,13 +17,17 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Set;
-import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
@@ -35,11 +39,18 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.master.replication.OfflineTableReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
+import org.apache.hadoop.hbase.replication.ReplicationQueueId;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.JsonMapper;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -59,6 +70,31 @@ import org.apache.zookeeper.KeeperException;
@InterfaceAudience.Private
public class ReplicationSyncUp extends Configured implements Tool {
+ public static class ReplicationSyncUpToolInfo {
+
+ private long startTimeMs;
+
+ public ReplicationSyncUpToolInfo() {
+ }
+
+ public ReplicationSyncUpToolInfo(long startTimeMs) {
+ this.startTimeMs = startTimeMs;
+ }
+
+ public long getStartTimeMs() {
+ return startTimeMs;
+ }
+
+ public void setStartTimeMs(long startTimeMs) {
+ this.startTimeMs = startTimeMs;
+ }
+ }
+
+ // For storing the information used to skip replicating some wals after the cluster is back online
+ public static final String INFO_DIR = "ReplicationSyncUp";
+
+ public static final String INFO_FILE = "info";
+
private static final long SLEEP_TIME = 10000;
/**
@@ -69,41 +105,116 @@ public class ReplicationSyncUp extends Configured implements Tool {
System.exit(ret);
}
- private Set<ServerName> getLiveRegionServers(ZKWatcher zkw) throws KeeperException {
- List<String> rsZNodes = ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode);
- return rsZNodes == null
- ? Collections.emptySet()
- : rsZNodes.stream().map(ServerName::parseServerName).collect(Collectors.toSet());
+ // Find region servers under wal directory
+ // Here we only care about the region servers which may still be alive, as we need to add
+ // replications for them if missing. The dead region servers which have already been processed
+ // fully do not need to add their replication queues again, as the operation has already been done
+ // in SCP.
+ private Set<ServerName> listRegionServers(FileSystem walFs, Path walDir) throws IOException {
+ FileStatus[] statuses;
+ try {
+ statuses = walFs.listStatus(walDir);
+ } catch (FileNotFoundException e) {
+ System.out.println("WAL directory " + walDir + " does not exists, ignore");
+ return Collections.emptySet();
+ }
+ Set<ServerName> regionServers = new HashSet<>();
+ for (FileStatus status : statuses) {
+ // All wal files under the walDir is within its region server's directory
+ if (!status.isDirectory()) {
+ continue;
+ }
+ ServerName sn = AbstractFSWALProvider.getServerNameFromWALDirectoryName(status.getPath());
+ if (sn != null) {
+ regionServers.add(sn);
+ }
+ }
+ return regionServers;
+ }
+
+ private void addMissingReplicationQueues(ReplicationQueueStorage storage, ServerName regionServer,
+ Set<String> peerIds) throws ReplicationException {
+ Set<String> existingQueuePeerIds = new HashSet<>();
+ List<ReplicationQueueId> queueIds = storage.listAllQueueIds(regionServer);
+ for (Iterator<ReplicationQueueId> iter = queueIds.iterator(); iter.hasNext();) {
+ ReplicationQueueId queueId = iter.next();
+ if (!queueId.isRecovered()) {
+ existingQueuePeerIds.add(queueId.getPeerId());
+ }
+ }
+
+ for (String peerId : peerIds) {
+ if (!existingQueuePeerIds.contains(peerId)) {
+ ReplicationQueueId queueId = new ReplicationQueueId(regionServer, peerId);
+ System.out.println("Add replication queue " + queueId + " for claiming");
+ storage.setOffset(queueId, regionServer.toString(), ReplicationGroupOffset.BEGIN,
+ Collections.emptyMap());
+ }
+ }
+ }
+
+ private void addMissingReplicationQueues(ReplicationQueueStorage storage,
+ Set<ServerName> regionServers, Set<String> peerIds) throws ReplicationException {
+ for (ServerName regionServer : regionServers) {
+ addMissingReplicationQueues(storage, regionServer, peerIds);
+ }
}
// When using this tool, usually the source cluster is unhealthy, so we should try to claim the
// replication queues for the dead region servers first and then replicate the data out.
- private void claimReplicationQueues(ZKWatcher zkw, ReplicationSourceManager mgr)
- throws ReplicationException, KeeperException {
- // TODO: reimplement this tool
- // List<ServerName> replicators = mgr.getQueueStorage().getListOfReplicators();
- // Set<ServerName> liveRegionServers = getLiveRegionServers(zkw);
- // for (ServerName sn : replicators) {
- // if (!liveRegionServers.contains(sn)) {
- // List<String> replicationQueues = mgr.getQueueStorage().getAllQueues(sn);
- // System.out.println(sn + " is dead, claim its replication queues: " + replicationQueues);
- // for (String queue : replicationQueues) {
- // mgr.claimQueue(sn, queue);
- // }
- // }
- // }
+ private void claimReplicationQueues(ReplicationSourceManager mgr, Set<ServerName> regionServers)
+ throws ReplicationException, KeeperException, IOException {
+ // union the region servers from both places, i.e, from the wal directory, and the records in
+ // replication queue storage.
+ Set<ServerName> replicators = new HashSet<>(regionServers);
+ ReplicationQueueStorage queueStorage = mgr.getQueueStorage();
+ replicators.addAll(queueStorage.listAllReplicators());
+ FileSystem fs = CommonFSUtils.getCurrentFileSystem(getConf());
+ Path infoDir = new Path(CommonFSUtils.getRootDir(getConf()), INFO_DIR);
+ for (ServerName sn : replicators) {
+ List<ReplicationQueueId> replicationQueues = queueStorage.listAllQueueIds(sn);
+ System.out.println(sn + " is dead, claim its replication queues: " + replicationQueues);
+ // record the rs name, so when master restarting, we will skip claiming its replication queue
+ fs.createNewFile(new Path(infoDir, sn.getServerName()));
+ for (ReplicationQueueId queueId : replicationQueues) {
+ mgr.claimQueue(queueId, true);
+ }
+ }
+ }
+
+ private void writeInfoFile(FileSystem fs) throws IOException {
+ // Record the info of this run. Currently only record the time we run the job. We will use this
+ // timestamp to clean up the data for last sequence ids and hfile refs in replication queue
+ // storage. See ReplicationQueueStorage.removeLastSequenceIdsAndHFileRefsBefore.
+ ReplicationSyncUpToolInfo info =
+ new ReplicationSyncUpToolInfo(EnvironmentEdgeManager.currentTime());
+ String json = JsonMapper.writeObjectAsString(info);
+ Path infoDir = new Path(CommonFSUtils.getRootDir(getConf()), INFO_DIR);
+ try (FSDataOutputStream out = fs.create(new Path(infoDir, INFO_FILE), false)) {
+ out.write(Bytes.toBytes(json));
+ }
}
@Override
public int run(String[] args) throws Exception {
Abortable abortable = new Abortable() {
+
+ private volatile boolean abort = false;
+
@Override
public void abort(String why, Throwable e) {
+ if (isAborted()) {
+ return;
+ }
+ abort = true;
+ System.err.println("Aborting because of " + why);
+ e.printStackTrace();
+ System.exit(1);
}
@Override
public boolean isAborted() {
- return false;
+ return abort;
}
};
Configuration conf = getConf();
@@ -114,16 +225,24 @@ public class ReplicationSyncUp extends Configured implements Tool {
Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
Path logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME);
- System.out.println("Start Replication Server start");
+ System.out.println("Start Replication Server");
+ writeInfoFile(fs);
Replication replication = new Replication();
- replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir,
+ // use offline table replication queue storage
+ getConf().setClass(ReplicationStorageFactory.REPLICATION_QUEUE_IMPL,
+ OfflineTableReplicationQueueStorage.class, ReplicationQueueStorage.class);
+ DummyServer server = new DummyServer(getConf(), zkw);
+ replication.initialize(server, fs, new Path(logDir, server.toString()), oldLogDir,
new WALFactory(conf,
ServerName
.valueOf(getClass().getSimpleName() + ",16010," + EnvironmentEdgeManager.currentTime()),
null, false));
ReplicationSourceManager manager = replication.getReplicationManager();
manager.init();
- claimReplicationQueues(zkw, manager);
+ Set<ServerName> regionServers = listRegionServers(fs, logDir);
+ addMissingReplicationQueues(manager.getQueueStorage(), regionServers,
+ manager.getReplicationPeers().getAllPeerIds());
+ claimReplicationQueues(manager, regionServers);
while (manager.activeFailoverTaskCount() > 0) {
Thread.sleep(SLEEP_TIME);
}
@@ -138,23 +257,22 @@ public class ReplicationSyncUp extends Configured implements Tool {
return 0;
}
- class DummyServer implements Server {
- String hostname;
- ZKWatcher zkw;
+ private static final class DummyServer implements Server {
+ private final Configuration conf;
+ private final String hostname;
+ private final ZKWatcher zkw;
+ private volatile boolean abort = false;
- DummyServer(ZKWatcher zkw) {
+ DummyServer(Configuration conf, ZKWatcher zkw) {
// a unique name in case the first run fails
hostname = EnvironmentEdgeManager.currentTime() + ".SyncUpTool.replication.org";
+ this.conf = conf;
this.zkw = zkw;
}
- DummyServer(String hostname) {
- this.hostname = hostname;
- }
-
@Override
public Configuration getConfiguration() {
- return getConf();
+ return conf;
}
@Override
@@ -174,11 +292,18 @@ public class ReplicationSyncUp extends Configured implements Tool {
@Override
public void abort(String why, Throwable e) {
+ if (isAborted()) {
+ return;
+ }
+ abort = true;
+ System.err.println("Aborting because of " + why);
+ e.printStackTrace();
+ System.exit(1);
}
@Override
public boolean isAborted() {
- return false;
+ return abort;
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
index dce58dbfae4..129636275e3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -311,6 +311,10 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
return matcher.matches() ? Long.parseLong(matcher.group(2)) : NO_TIMESTAMP;
}
+ public static final Comparator<Path> TIMESTAMP_COMPARATOR =
+ Comparator.<Path, Long> comparing(p -> AbstractFSWALProvider.getTimestamp(p.getName()))
+ .thenComparing(Path::getName);
+
/**
* Construct the directory name for all WALs on a given server. Dir names currently look like this
* for WALs: <code>hbase//WALs/kalashnikov.att.net,61634,1486865297088</code>.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
index d7ba6c227c6..5d474bc2164 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
@@ -127,8 +127,8 @@ public class TestLogsCleaner {
TableDescriptor td = ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName);
TEST_UTIL.getAdmin().createTable(td);
TEST_UTIL.waitTableAvailable(tableName);
- queueStorage =
- ReplicationStorageFactory.getReplicationQueueStorage(TEST_UTIL.getConnection(), tableName);
+ queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(TEST_UTIL.getConnection(),
+ conf, tableName);
masterServices = mock(MasterServices.class);
when(masterServices.getConnection()).thenReturn(TEST_UTIL.getConnection());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
index 7a89af15902..38225613b9d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
@@ -22,24 +22,28 @@ import static org.apache.hadoop.hbase.replication.TestReplicationBase.NB_RETRIES
import static org.apache.hadoop.hbase.replication.TestReplicationBase.NB_ROWS_IN_BATCH;
import static org.apache.hadoop.hbase.replication.TestReplicationBase.SLEEP_TIME;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.junit.ClassRule;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-// revisit later when we implement the new ReplicationSyncUpTool
-@Ignore
@Category({ ReplicationTests.class, LargeTests.class })
public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase {
@@ -55,39 +59,70 @@ public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase {
*/
@Test
public void testSyncUpTool() throws Exception {
-
- /**
- * Set up Replication: on Master and one Slave Table: t1_syncup and t2_syncup columnfamily:
- * 'cf1' : replicated 'norep': not replicated
- */
+ // Set up Replication: on Master and one Slave
+ // Table: t1_syncup and t2_syncup
+ // columnfamily:
+ // 'cf1' : replicated
+ // 'norep': not replicated
setupReplication();
- /**
- * at Master: t1_syncup: put 100 rows into cf1, and 1 rows into norep t2_syncup: put 200 rows
- * into cf1, and 1 rows into norep verify correctly replicated to slave
- */
+ //
+ // at Master:
+ // t1_syncup: put 100 rows into cf1, and 1 rows into norep
+ // t2_syncup: put 200 rows into cf1, and 1 rows into norep
+ //
+ // verify correctly replicated to slave
putAndReplicateRows();
- /**
- * Verify delete works step 1: stop hbase on Slave step 2: at Master: t1_syncup: delete 50 rows
- * from cf1 t2_syncup: delete 100 rows from cf1 no change on 'norep' step 3: stop hbase on
- * master, restart hbase on Slave step 4: verify Slave still have the rows before delete
- * t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 step 5: run syncup tool on Master
- * step 6: verify that delete show up on Slave t1_syncup: 50 rows from cf1 t2_syncup: 100 rows
- * from cf1 verify correctly replicated to Slave
- */
+ // Verify delete works
+ //
+ // step 1: stop hbase on Slave
+ //
+ // step 2: at Master:
+ // t1_syncup: delete 50 rows from cf1
+ // t2_syncup: delete 100 rows from cf1
+ // no change on 'norep'
+ //
+ // step 3: stop hbase on master, restart hbase on Slave
+ //
+ // step 4: verify Slave still have the rows before delete
+ // t1_syncup: 100 rows from cf1
+ // t2_syncup: 200 rows from cf1
+ //
+ // step 5: run syncup tool on Master
+ //
+ // step 6: verify that delete show up on Slave
+ // t1_syncup: 50 rows from cf1
+ // t2_syncup: 100 rows from cf1
+ //
+ // verify correctly replicated to Slave
mimicSyncUpAfterDelete();
- /**
- * Verify put works step 1: stop hbase on Slave step 2: at Master: t1_syncup: put 100 rows from
- * cf1 t2_syncup: put 200 rows from cf1 and put another row on 'norep' ATTN: put to 'cf1' will
- * overwrite existing rows, so end count will be 100 and 200 respectively put to 'norep' will
- * add a new row. step 3: stop hbase on master, restart hbase on Slave step 4: verify Slave
- * still has the rows before put t1_syncup: 50 rows from cf1 t2_syncup: 100 rows from cf1 step
- * 5: run syncup tool on Master step 6: verify that put show up on Slave and 'norep' does not
- * t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 verify correctly replicated to
- * Slave
- */
+ // Verify put works
+ //
+ // step 1: stop hbase on Slave
+ //
+ // step 2: at Master:
+ // t1_syncup: put 100 rows from cf1
+ // t2_syncup: put 200 rows from cf1
+ // and put another row on 'norep'
+ // ATTN:
+ // put to 'cf1' will overwrite existing rows, so end count will be 100 and 200 respectively
+ // put to 'norep' will add a new row.
+ //
+ // step 3: stop hbase on master, restart hbase on Slave
+ //
+ // step 4: verify Slave still has the rows before put
+ // t1_syncup: 50 rows from cf1
+ // t2_syncup: 100 rows from cf1
+ //
+ // step 5: run syncup tool on Master
+ //
+ // step 6: verify that put show up on Slave and 'norep' does not
+ // t1_syncup: 100 rows from cf1
+ // t2_syncup: 200 rows from cf1
+ //
+ // verify correctly replicated to Slave
mimicSyncUpAfterPut();
}
@@ -172,7 +207,8 @@ public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase {
int rowCount_ht2Source = countRows(ht2Source);
assertEquals("t2_syncup has 101 rows on source, after remove 100 of the replicated colfam", 101,
rowCount_ht2Source);
-
+ List<ServerName> sourceRses = UTIL1.getHBaseCluster().getRegionServerThreads().stream()
+ .map(rst -> rst.getRegionServer().getServerName()).collect(Collectors.toList());
shutDownSourceHBaseCluster();
restartTargetHBaseCluster(1);
@@ -184,40 +220,33 @@ public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase {
assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCountHt1TargetAtPeer1);
assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCountHt2TargetAtPeer1);
+ syncUp(UTIL1);
+
// After sync up
- for (int i = 0; i < NB_RETRIES; i++) {
- syncUp(UTIL1);
- rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
- rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
- if (i == NB_RETRIES - 1) {
- if (rowCountHt1TargetAtPeer1 != 50 || rowCountHt2TargetAtPeer1 != 100) {
- // syncUP still failed. Let's look at the source in case anything wrong there
- restartSourceHBaseCluster(1);
- rowCount_ht1Source = countRows(ht1Source);
- LOG.debug("t1_syncup should have 51 rows at source, and it is " + rowCount_ht1Source);
- rowCount_ht2Source = countRows(ht2Source);
- LOG.debug("t2_syncup should have 101 rows at source, and it is " + rowCount_ht2Source);
- }
- assertEquals("@Peer1 t1_syncup should be sync up and have 50 rows", 50,
- rowCountHt1TargetAtPeer1);
- assertEquals("@Peer1 t2_syncup should be sync up and have 100 rows", 100,
- rowCountHt2TargetAtPeer1);
- }
- if (rowCountHt1TargetAtPeer1 == 50 && rowCountHt2TargetAtPeer1 == 100) {
- LOG.info("SyncUpAfterDelete succeeded at retry = " + i);
- break;
- } else {
- LOG.debug("SyncUpAfterDelete failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
- + rowCountHt1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
- + rowCountHt2TargetAtPeer1);
- }
- Thread.sleep(SLEEP_TIME);
+ rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
+ rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
+ assertEquals("@Peer1 t1_syncup should be sync up and have 50 rows", 50,
+ rowCountHt1TargetAtPeer1);
+ assertEquals("@Peer1 t2_syncup should be sync up and have 100 rows", 100,
+ rowCountHt2TargetAtPeer1);
+
+ // check we have recorded the dead region servers and also have an info file
+ Path rootDir = CommonFSUtils.getRootDir(UTIL1.getConfiguration());
+ Path syncUpInfoDir = new Path(rootDir, ReplicationSyncUp.INFO_DIR);
+ FileSystem fs = UTIL1.getTestFileSystem();
+ for (ServerName sn : sourceRses) {
+ assertTrue(fs.exists(new Path(syncUpInfoDir, sn.getServerName())));
}
+ assertTrue(fs.exists(new Path(syncUpInfoDir, ReplicationSyncUp.INFO_FILE)));
+ assertEquals(sourceRses.size() + 1, fs.listStatus(syncUpInfoDir).length);
+
+ restartSourceHBaseCluster(1);
+ // should finally removed all the records after restart
+ UTIL1.waitFor(60000, () -> fs.listStatus(syncUpInfoDir).length == 0);
}
private void mimicSyncUpAfterPut() throws Exception {
LOG.debug("mimicSyncUpAfterPut");
- restartSourceHBaseCluster(1);
shutDownTargetHBaseCluster();
Put p;
@@ -261,34 +290,14 @@ public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase {
assertEquals("@Peer1 t2_syncup should be NOT sync up and have 100 rows", 100,
rowCountHt2TargetAtPeer1);
- // after syun up
- for (int i = 0; i < NB_RETRIES; i++) {
- syncUp(UTIL1);
- rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
- rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
- if (i == NB_RETRIES - 1) {
- if (rowCountHt1TargetAtPeer1 != 100 || rowCountHt2TargetAtPeer1 != 200) {
- // syncUP still failed. Let's look at the source in case anything wrong there
- restartSourceHBaseCluster(1);
- rowCount_ht1Source = countRows(ht1Source);
- LOG.debug("t1_syncup should have 102 rows at source, and it is " + rowCount_ht1Source);
- rowCount_ht2Source = countRows(ht2Source);
- LOG.debug("t2_syncup should have 202 rows at source, and it is " + rowCount_ht2Source);
- }
- assertEquals("@Peer1 t1_syncup should be sync up and have 100 rows", 100,
- rowCountHt1TargetAtPeer1);
- assertEquals("@Peer1 t2_syncup should be sync up and have 200 rows", 200,
- rowCountHt2TargetAtPeer1);
- }
- if (rowCountHt1TargetAtPeer1 == 100 && rowCountHt2TargetAtPeer1 == 200) {
- LOG.info("SyncUpAfterPut succeeded at retry = " + i);
- break;
- } else {
- LOG.debug("SyncUpAfterPut failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
- + rowCountHt1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
- + rowCountHt2TargetAtPeer1);
- }
- Thread.sleep(SLEEP_TIME);
- }
+ syncUp(UTIL1);
+
+ // after sync up
+ rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
+ rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
+ assertEquals("@Peer1 t1_syncup should be sync up and have 100 rows", 100,
+ rowCountHt1TargetAtPeer1);
+ assertEquals("@Peer1 t2_syncup should be sync up and have 200 rows", 200,
+ rowCountHt2TargetAtPeer1);
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java
index d3142106362..8a28db3b185 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java
@@ -136,7 +136,8 @@ public abstract class TestReplicationSyncUpToolBase {
}
final void syncUp(HBaseTestingUtil util) throws Exception {
- ToolRunner.run(util.getConfiguration(), new ReplicationSyncUp(), new String[0]);
+ ToolRunner.run(new Configuration(util.getConfiguration()), new ReplicationSyncUp(),
+ new String[0]);
}
// Utilities that manager shutdown / restart of source / sink clusters. They take care of
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
index b5de8e6324f..afed0483388 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Table;
@@ -45,14 +46,11 @@ import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.HFileTestUtil;
import org.junit.ClassRule;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-//revisit later when we implement the new ReplicationSyncUpTool
-@Ignore
@Category({ ReplicationTests.class, LargeTests.class })
public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplicationSyncUpToolBase {
@@ -74,40 +72,50 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication
@Test
public void testSyncUpTool() throws Exception {
- /**
- * Set up Replication: on Master and one Slave Table: t1_syncup and t2_syncup columnfamily:
- * 'cf1' : replicated 'norep': not replicated
- */
+ // Set up Replication:
+ // on Master and one Slave Table: t1_syncup and t2_syncup
+ // columnfamily:
+ // 'cf1' : replicated
+ // 'norep': not replicated
setupReplication();
- /**
- * Prepare 24 random hfile ranges required for creating hfiles
- */
+ // Prepare 24 random hfile ranges required for creating hfiles
Iterator<String> randomHFileRangeListIterator = null;
Set<String> randomHFileRanges = new HashSet<>(24);
for (int i = 0; i < 24; i++) {
- randomHFileRanges.add(UTIL1.getRandomUUID().toString());
+ randomHFileRanges.add(HBaseTestingUtil.getRandomUUID().toString());
}
List<String> randomHFileRangeList = new ArrayList<>(randomHFileRanges);
Collections.sort(randomHFileRangeList);
randomHFileRangeListIterator = randomHFileRangeList.iterator();
- /**
- * at Master: t1_syncup: Load 50 rows into cf1, and 50 rows from other hdfs into cf1, and 3 rows
- * into norep t2_syncup: Load 100 rows into cf1, and 100 rows from other hdfs into cf1, and 3
- * rows into norep verify correctly replicated to slave
- */
+ // at Master:
+ // t1_syncup: Load 50 rows into cf1, and 50 rows from other hdfs into cf1, and 3 rows into norep
+ // t2_syncup: Load 100 rows into cf1, and 100 rows from other hdfs into cf1, and 3 rows into
+ // norep
+ // verify correctly replicated to slave
loadAndReplicateHFiles(true, randomHFileRangeListIterator);
- /**
- * Verify hfile load works step 1: stop hbase on Slave step 2: at Master: t1_syncup: Load
- * another 100 rows into cf1 and 3 rows into norep t2_syncup: Load another 200 rows into cf1 and
- * 3 rows into norep step 3: stop hbase on master, restart hbase on Slave step 4: verify Slave
- * still has the rows before load t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 step
- * 5: run syncup tool on Master step 6: verify that hfiles show up on Slave and 'norep' does not
- * t1_syncup: 200 rows from cf1 t2_syncup: 400 rows from cf1 verify correctly replicated to
- * Slave
- */
+ // Verify hfile load works
+ //
+ // step 1: stop hbase on Slave
+ //
+ // step 2: at Master:
+ // t1_syncup: Load another 100 rows into cf1 and 3 rows into norep
+ // t2_syncup: Load another 200 rows into cf1 and 3 rows into norep
+ //
+ // step 3: stop hbase on master, restart hbase on Slave
+ //
+ // step 4: verify Slave still has the rows before load
+ // t1_syncup: 100 rows from cf1
+ // t2_syncup: 200 rows from cf1
+ //
+ // step 5: run syncup tool on Master
+ //
+ // step 6: verify that hfiles show up on Slave and 'norep' does not
+ // t1_syncup: 200 rows from cf1
+ // t2_syncup: 400 rows from cf1
+ // verify correctly replicated to Slave
mimicSyncUpAfterBulkLoad(randomHFileRangeListIterator);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestTableReplicationQueueStorage.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestTableReplicationQueueStorage.java
index 4148c1c1a2c..9041831d0e8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestTableReplicationQueueStorage.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestTableReplicationQueueStorage.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.MD5Hash;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.zookeeper.KeeperException;
@@ -420,4 +421,54 @@ public class TestTableReplicationQueueStorage {
assertEquals(0, storage.getAllPeersFromHFileRefsQueue().size());
assertTrue(storage.getReplicableHFiles(peerId2).isEmpty());
}
+
+ private void addLastSequenceIdsAndHFileRefs(String peerId1, String peerId2)
+ throws ReplicationException {
+ for (int i = 0; i < 100; i++) {
+ String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
+ storage.setLastSequenceIds(peerId1, ImmutableMap.of(encodedRegionName, (long) i));
+ }
+
+ List<Pair<Path, Path>> files1 = new ArrayList<>(3);
+ files1.add(new Pair<>(null, new Path("file_1")));
+ files1.add(new Pair<>(null, new Path("file_2")));
+ files1.add(new Pair<>(null, new Path("file_3")));
+ storage.addHFileRefs(peerId2, files1);
+ }
+
+ @Test
+ public void testRemoveLastSequenceIdsAndHFileRefsBefore()
+ throws ReplicationException, InterruptedException {
+ String peerId1 = "1";
+ String peerId2 = "2";
+ addLastSequenceIdsAndHFileRefs(peerId1, peerId2);
+ // make sure we have write these out
+ for (int i = 0; i < 100; i++) {
+ String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
+ assertEquals(i, storage.getLastSequenceId(encodedRegionName, peerId1));
+ }
+ assertEquals(1, storage.getAllPeersFromHFileRefsQueue().size());
+ assertEquals(3, storage.getReplicableHFiles(peerId2).size());
+
+ // should have nothing after removal
+ long ts = EnvironmentEdgeManager.currentTime();
+ storage.removeLastSequenceIdsAndHFileRefsBefore(ts);
+ for (int i = 0; i < 100; i++) {
+ String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
+ assertEquals(HConstants.NO_SEQNUM, storage.getLastSequenceId(encodedRegionName, peerId1));
+ }
+ assertEquals(0, storage.getAllPeersFromHFileRefsQueue().size());
+
+ Thread.sleep(100);
+ // add again and remove with the old timestamp
+ addLastSequenceIdsAndHFileRefs(peerId1, peerId2);
+ storage.removeLastSequenceIdsAndHFileRefsBefore(ts);
+ // make sure we do not delete the data which are written after the give timestamp
+ for (int i = 0; i < 100; i++) {
+ String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
+ assertEquals(i, storage.getLastSequenceId(encodedRegionName, peerId1));
+ }
+ assertEquals(1, storage.getAllPeersFromHFileRefsQueue().size());
+ assertEquals(3, storage.getReplicableHFiles(peerId2).size());
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleAsyncWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleAsyncWAL.java
index 28779be4399..83cd41773ca 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleAsyncWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleAsyncWAL.java
@@ -25,11 +25,8 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.wal.RegionGroupingProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.junit.ClassRule;
-import org.junit.Ignore;
import org.junit.experimental.categories.Category;
-//revisit later when we implement the new ReplicationSyncUpTool
-@Ignore
@Category({ ReplicationTests.class, LargeTests.class })
public class TestReplicationSyncUpToolWithMultipleAsyncWAL extends TestReplicationSyncUpTool {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java
index f495f433bc9..673b841430e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java
@@ -25,11 +25,8 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.wal.RegionGroupingProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.junit.ClassRule;
-import org.junit.Ignore;
import org.junit.experimental.categories.Category;
-//revisit later when we implement the new ReplicationSyncUpTool
-@Ignore
@Category({ ReplicationTests.class, LargeTests.class })
public class TestReplicationSyncUpToolWithMultipleWAL extends TestReplicationSyncUpTool {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index b7564ed9168..1bb9a3e2949 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -45,11 +45,8 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
import org.apache.hadoop.hbase.replication.DummyReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
@@ -125,8 +122,6 @@ public class TestReplicationSourceManager {
private static final TableName TABLE_NAME = TableName.valueOf("test");
- private static TableDescriptor TD;
-
private static RegionInfo RI;
private static NavigableMap<byte[], Integer> SCOPES;
@@ -152,10 +147,6 @@ public class TestReplicationSourceManager {
FS = UTIL.getTestFileSystem();
CONF = new Configuration(UTIL.getConfiguration());
CONF.setLong("replication.sleep.before.failover", 0);
- TD = TableDescriptorBuilder.newBuilder(TABLE_NAME)
- .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(F1)
- .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
- .setColumnFamily(ColumnFamilyDescriptorBuilder.of(F2)).build();
RI = RegionInfoBuilder.newBuilder(TABLE_NAME).build();
SCOPES = new TreeMap<>(Bytes.BYTES_COMPARATOR);
@@ -176,7 +167,8 @@ public class TestReplicationSourceManager {
when(server.getConfiguration()).thenReturn(CONF);
when(server.getZooKeeper()).thenReturn(UTIL.getZooKeeperWatcher());
when(server.getConnection()).thenReturn(UTIL.getConnection());
- when(server.getServerName()).thenReturn(ServerName.valueOf("hostname.example.org", 1234, 1));
+ ServerName sn = ServerName.valueOf("hostname.example.org", 1234, 1);
+ when(server.getServerName()).thenReturn(sn);
oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
FS.mkdirs(oldLogDir);
logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
@@ -189,7 +181,7 @@ public class TestReplicationSourceManager {
CONF.set(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, tableName.getNameAsString());
replication = new Replication();
- replication.initialize(server, FS, logDir, oldLogDir,
+ replication.initialize(server, FS, new Path(logDir, sn.toString()), oldLogDir,
new WALFactory(CONF, server.getServerName(), null, false));
manager = replication.getReplicationManager();
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
index 1544265435c..8731adbe4c2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
@@ -99,8 +99,8 @@ public class TestSerialReplicationChecker {
TableName repTable = TableName.valueOf("test_serial_rep");
UTIL.getAdmin()
.createTable(ReplicationStorageFactory.createReplicationQueueTableDescriptor(repTable));
- QUEUE_STORAGE =
- ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getConnection(), repTable);
+ QUEUE_STORAGE = ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getConnection(),
+ UTIL.getConfiguration(), repTable);
}
@AfterClass