You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zy...@apache.org on 2018/07/10 04:38:32 UTC
[10/50] [abbrv] hbase git commit: HBASE-20569 NPE in
RecoverStandbyProcedure.execute
HBASE-20569 NPE in RecoverStandbyProcedure.execute
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/44ca13fe
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/44ca13fe
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/44ca13fe
Branch: refs/heads/HBASE-18477
Commit: 44ca13fe07dc5050a2bc98ccd3f65953f06aaef8
Parents: 7448b04
Author: Guanghao Zhang <zg...@apache.org>
Authored: Thu May 31 20:54:59 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Thu Jun 28 18:08:43 2018 +0800
----------------------------------------------------------------------
.../src/main/protobuf/MasterProcedure.proto | 26 ++-
.../org/apache/hadoop/hbase/master/HMaster.java | 10 +-
.../hadoop/hbase/master/MasterServices.java | 6 +-
.../procedure/MasterProcedureScheduler.java | 3 +-
.../procedure/PeerProcedureInterface.java | 2 +-
.../hbase/master/procedure/PeerQueue.java | 3 +-
.../replication/RecoverStandbyProcedure.java | 68 ++++--
.../master/replication/RemovePeerProcedure.java | 5 +-
.../ReplaySyncReplicationWALManager.java | 169 --------------
.../ReplaySyncReplicationWALProcedure.java | 196 -----------------
.../SyncReplicationReplayWALManager.java | 218 +++++++++++++++++++
.../SyncReplicationReplayWALProcedure.java | 164 ++++++++++++++
...SyncReplicationReplayWALRemoteProcedure.java | 213 ++++++++++++++++++
...ransitPeerSyncReplicationStateProcedure.java | 6 +-
...ZKSyncReplicationReplayWALWorkerStorage.java | 108 +++++++++
.../ReplaySyncReplicationWALCallable.java | 46 ++--
.../hbase/master/MockNoopMasterServices.java | 4 +-
.../replication/SyncReplicationTestBase.java | 6 +-
.../TestSyncReplicationStandbyKillMaster.java | 88 ++++++++
.../TestSyncReplicationStandbyKillRS.java | 119 ++++++++++
.../master/TestRecoverStandbyProcedure.java | 10 +-
21 files changed, 1040 insertions(+), 430 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
index 23ec8f8..a062e9a 100644
--- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
@@ -486,22 +486,34 @@ message TransitPeerSyncReplicationStateStateData {
enum RecoverStandbyState {
RENAME_SYNC_REPLICATION_WALS_DIR = 1;
- INIT_WORKERS = 2;
- DISPATCH_TASKS = 3;
- SNAPSHOT_SYNC_REPLICATION_WALS_DIR = 4;
+ REGISTER_PEER_TO_WORKER_STORAGE = 2;
+ DISPATCH_WALS = 3;
+ UNREGISTER_PEER_FROM_WORKER_STORAGE = 4;
+ SNAPSHOT_SYNC_REPLICATION_WALS_DIR = 5;
+}
+
+enum SyncReplicationReplayWALState {
+ ASSIGN_WORKER = 1;
+ DISPATCH_WALS_TO_WORKER = 2;
+ RELEASE_WORKER = 3;
}
message RecoverStandbyStateData {
+ required bool serial = 1;
+}
+
+message SyncReplicationReplayWALStateData {
required string peer_id = 1;
+ repeated string wal = 2;
}
-message ReplaySyncReplicationWALStateData {
+message SyncReplicationReplayWALRemoteStateData {
required string peer_id = 1;
- required string wal = 2;
- optional ServerName target_server = 3;
+ repeated string wal = 2;
+ required ServerName target_server = 3;
}
message ReplaySyncReplicationWALParameter {
required string peer_id = 1;
- required string wal = 2;
+ repeated string wal = 2;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 2c23e85..dc62752 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -138,8 +138,8 @@ import org.apache.hadoop.hbase.master.replication.AddPeerProcedure;
import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure;
import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure;
import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure;
-import org.apache.hadoop.hbase.master.replication.ReplaySyncReplicationWALManager;
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
+import org.apache.hadoop.hbase.master.replication.SyncReplicationReplayWALManager;
import org.apache.hadoop.hbase.master.replication.TransitPeerSyncReplicationStateProcedure;
import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
@@ -343,7 +343,7 @@ public class HMaster extends HRegionServer implements MasterServices {
// manager of replication
private ReplicationPeerManager replicationPeerManager;
- private ReplaySyncReplicationWALManager replaySyncReplicationWALManager;
+ private SyncReplicationReplayWALManager syncReplicationReplayWALManager;
// buffer for "fatal error" notices from region servers
// in the cluster. This is only used for assisting
@@ -754,6 +754,7 @@ public class HMaster extends HRegionServer implements MasterServices {
this.splitOrMergeTracker.start();
this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, conf);
+ this.syncReplicationReplayWALManager = new SyncReplicationReplayWALManager(this);
this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager);
this.drainingServerTracker.start();
@@ -852,7 +853,6 @@ public class HMaster extends HRegionServer implements MasterServices {
initializeMemStoreChunkCreator();
this.fileSystemManager = new MasterFileSystem(conf);
this.walManager = new MasterWalManager(this);
- this.replaySyncReplicationWALManager = new ReplaySyncReplicationWALManager(this);
// enable table descriptors cache
this.tableDescriptors.setCacheOn();
@@ -3764,7 +3764,7 @@ public class HMaster extends HRegionServer implements MasterServices {
}
@Override
- public ReplaySyncReplicationWALManager getReplaySyncReplicationWALManager() {
- return this.replaySyncReplicationWALManager;
+ public SyncReplicationReplayWALManager getSyncReplicationReplayWALManager() {
+ return this.syncReplicationReplayWALManager;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
index 6034ff7..7b0c56a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
@@ -38,8 +38,8 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.locking.LockManager;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
-import org.apache.hadoop.hbase.master.replication.ReplaySyncReplicationWALManager;
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
+import org.apache.hadoop.hbase.master.replication.SyncReplicationReplayWALManager;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
import org.apache.hadoop.hbase.procedure2.LockedResource;
@@ -462,9 +462,9 @@ public interface MasterServices extends Server {
ReplicationPeerManager getReplicationPeerManager();
/**
- * Returns the {@link ReplaySyncReplicationWALManager}.
+ * Returns the {@link SyncReplicationReplayWALManager}.
*/
- ReplaySyncReplicationWALManager getReplaySyncReplicationWALManager();
+ SyncReplicationReplayWALManager getSyncReplicationReplayWALManager();
/**
* Update the peerConfig for the specified peer
http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
index 1420986..8a28b84 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
@@ -207,7 +207,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
// check if the next procedure is still a child.
// if not, remove the rq from the fairq and go back to the xlock state
Procedure<?> nextProc = rq.peek();
- if (nextProc != null && !Procedure.haveSameParent(nextProc, pollResult)) {
+ if (nextProc != null && !Procedure.haveSameParent(nextProc, pollResult)
+ && nextProc.getRootProcId() != pollResult.getRootProcId()) {
removeFromRunQueue(fairq, rq);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java
index 76b5163..0195ab9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java
@@ -24,7 +24,7 @@ public interface PeerProcedureInterface {
enum PeerOperationType {
ADD, REMOVE, ENABLE, DISABLE, UPDATE_CONFIG, REFRESH, TRANSIT_SYNC_REPLICATION_STATE,
- RECOVER_STANDBY, REPLAY_SYNC_REPLICATION_WAL
+ RECOVER_STANDBY, SYNC_REPLICATION_REPLAY_WAL, SYNC_REPLICATION_REPLAY_WAL_REMOTE
}
String getPeerId();
http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java
index 25feb7e..86d8e43 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java
@@ -50,6 +50,7 @@ class PeerQueue extends Queue<String> {
private static boolean requirePeerExclusiveLock(PeerProcedureInterface proc) {
return proc.getPeerOperationType() != PeerOperationType.REFRESH
- && proc.getPeerOperationType() != PeerOperationType.REPLAY_SYNC_REPLICATION_WAL;
+ && proc.getPeerOperationType() != PeerOperationType.SYNC_REPLICATION_REPLAY_WAL
+ && proc.getPeerOperationType() != PeerOperationType.SYNC_REPLICATION_REPLAY_WAL_REMOTE;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java
index 9860774..5494742 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java
@@ -18,60 +18,79 @@
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
+import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RecoverStandbyState;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RecoverStandbyStateData;
@InterfaceAudience.Private
public class RecoverStandbyProcedure extends AbstractPeerProcedure<RecoverStandbyState> {
private static final Logger LOG = LoggerFactory.getLogger(RecoverStandbyProcedure.class);
+ private boolean serial;
+
public RecoverStandbyProcedure() {
}
- public RecoverStandbyProcedure(String peerId) {
+ public RecoverStandbyProcedure(String peerId, boolean serial) {
super(peerId);
+ this.serial = serial;
}
@Override
protected Flow executeFromState(MasterProcedureEnv env, RecoverStandbyState state)
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
- ReplaySyncReplicationWALManager replaySyncReplicationWALManager =
- env.getMasterServices().getReplaySyncReplicationWALManager();
+ SyncReplicationReplayWALManager syncReplicationReplayWALManager =
+ env.getMasterServices().getSyncReplicationReplayWALManager();
switch (state) {
case RENAME_SYNC_REPLICATION_WALS_DIR:
try {
- replaySyncReplicationWALManager.renameToPeerReplayWALDir(peerId);
+ syncReplicationReplayWALManager.renameToPeerReplayWALDir(peerId);
} catch (IOException e) {
LOG.warn("Failed to rename remote wal dir for peer id={}", peerId, e);
setFailure("master-recover-standby", e);
return Flow.NO_MORE_STATE;
}
- setNextState(RecoverStandbyState.INIT_WORKERS);
+ setNextState(RecoverStandbyState.REGISTER_PEER_TO_WORKER_STORAGE);
return Flow.HAS_MORE_STATE;
- case INIT_WORKERS:
- replaySyncReplicationWALManager.initPeerWorkers(peerId);
- setNextState(RecoverStandbyState.DISPATCH_TASKS);
+ case REGISTER_PEER_TO_WORKER_STORAGE:
+ try {
+ syncReplicationReplayWALManager.registerPeer(peerId);
+ } catch (ReplicationException e) {
+ LOG.warn("Failed to register peer to worker storage for peer id={}, retry", peerId, e);
+ throw new ProcedureYieldException();
+ }
+ setNextState(RecoverStandbyState.DISPATCH_WALS);
return Flow.HAS_MORE_STATE;
- case DISPATCH_TASKS:
- addChildProcedure(getReplayWALs(replaySyncReplicationWALManager).stream()
- .map(wal -> new ReplaySyncReplicationWALProcedure(peerId,
- replaySyncReplicationWALManager.removeWALRootPath(wal)))
- .toArray(ReplaySyncReplicationWALProcedure[]::new));
+ case DISPATCH_WALS:
+ dispathWals(syncReplicationReplayWALManager);
+ setNextState(RecoverStandbyState.UNREGISTER_PEER_FROM_WORKER_STORAGE);
+ return Flow.HAS_MORE_STATE;
+ case UNREGISTER_PEER_FROM_WORKER_STORAGE:
+ try {
+ syncReplicationReplayWALManager.unregisterPeer(peerId);
+ } catch (ReplicationException e) {
+ LOG.warn("Failed to unregister peer from worker storage for peer id={}, retry", peerId,
+ e);
+ throw new ProcedureYieldException();
+ }
setNextState(RecoverStandbyState.SNAPSHOT_SYNC_REPLICATION_WALS_DIR);
return Flow.HAS_MORE_STATE;
case SNAPSHOT_SYNC_REPLICATION_WALS_DIR:
try {
- replaySyncReplicationWALManager.renameToPeerSnapshotWALDir(peerId);
+ syncReplicationReplayWALManager.renameToPeerSnapshotWALDir(peerId);
} catch (IOException e) {
LOG.warn("Failed to cleanup replay wals dir for peer id={}, , retry", peerId, e);
throw new ProcedureYieldException();
@@ -82,10 +101,14 @@ public class RecoverStandbyProcedure extends AbstractPeerProcedure<RecoverStandb
}
}
- private List<Path> getReplayWALs(ReplaySyncReplicationWALManager replaySyncReplicationWALManager)
+ // TODO: dispatch wals by region server when serial is true and sort wals
+ private void dispathWals(SyncReplicationReplayWALManager syncReplicationReplayWALManager)
throws ProcedureYieldException {
try {
- return replaySyncReplicationWALManager.getReplayWALsAndCleanUpUnusedFiles(peerId);
+ List<Path> wals = syncReplicationReplayWALManager.getReplayWALsAndCleanUpUnusedFiles(peerId);
+ addChildProcedure(wals.stream().map(wal -> new SyncReplicationReplayWALProcedure(peerId,
+ Arrays.asList(syncReplicationReplayWALManager.removeWALRootPath(wal))))
+ .toArray(SyncReplicationReplayWALProcedure[]::new));
} catch (IOException e) {
LOG.warn("Failed to get replay wals for peer id={}, , retry", peerId, e);
throw new ProcedureYieldException();
@@ -111,4 +134,17 @@ public class RecoverStandbyProcedure extends AbstractPeerProcedure<RecoverStandb
public PeerOperationType getPeerOperationType() {
return PeerOperationType.RECOVER_STANDBY;
}
+
+ @Override
+ protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ super.serializeStateData(serializer);
+ serializer.serialize(RecoverStandbyStateData.newBuilder().setSerial(serial).build());
+ }
+
+ @Override
+ protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ super.deserializeStateData(serializer);
+ RecoverStandbyStateData data = serializer.deserialize(RecoverStandbyStateData.class);
+ serial = data.getSerial();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
index 254448a..4b77c8d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
@@ -67,11 +67,10 @@ public class RemovePeerProcedure extends ModifyPeerProcedure {
}
private void removeRemoteWALs(MasterProcedureEnv env) throws IOException {
- env.getMasterServices().getReplaySyncReplicationWALManager().removePeerRemoteWALs(peerId);
+ env.getMasterServices().getSyncReplicationReplayWALManager().removePeerRemoteWALs(peerId);
}
- @Override
- protected void postPeerModification(MasterProcedureEnv env)
+ @Override protected void postPeerModification(MasterProcedureEnv env)
throws IOException, ReplicationException {
if (peerConfig.isSyncReplication()) {
removeRemoteWALs(env);
http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java
deleted file mode 100644
index 348c134..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master.replication;
-
-import static org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerRemoteWALDir;
-import static org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerReplayWALDir;
-import static org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerSnapshotWALDir;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.master.MasterServices;
-import org.apache.hadoop.hbase.replication.ReplicationUtils;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
-@InterfaceAudience.Private
-public class ReplaySyncReplicationWALManager {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(ReplaySyncReplicationWALManager.class);
-
- private final MasterServices services;
-
- private final FileSystem fs;
-
- private final Path walRootDir;
-
- private final Path remoteWALDir;
-
- private final Map<String, BlockingQueue<ServerName>> availServers = new HashMap<>();
-
- public ReplaySyncReplicationWALManager(MasterServices services) {
- this.services = services;
- this.fs = services.getMasterFileSystem().getWALFileSystem();
- this.walRootDir = services.getMasterFileSystem().getWALRootDir();
- this.remoteWALDir = new Path(this.walRootDir, ReplicationUtils.REMOTE_WAL_DIR_NAME);
- }
-
- public void createPeerRemoteWALDir(String peerId) throws IOException {
- Path peerRemoteWALDir = getPeerRemoteWALDir(remoteWALDir, peerId);
- if (!fs.exists(peerRemoteWALDir) && !fs.mkdirs(peerRemoteWALDir)) {
- throw new IOException("Unable to mkdir " + peerRemoteWALDir);
- }
- }
-
- private void rename(Path src, Path dst, String peerId) throws IOException {
- if (fs.exists(src)) {
- deleteDir(dst, peerId);
- if (!fs.rename(src, dst)) {
- throw new IOException(
- "Failed to rename dir from " + src + " to " + dst + " for peer id=" + peerId);
- }
- LOG.info("Renamed dir from {} to {} for peer id={}", src, dst, peerId);
- } else if (!fs.exists(dst)) {
- throw new IOException(
- "Want to rename from " + src + " to " + dst + ", but they both do not exist");
- }
- }
-
- public void renameToPeerReplayWALDir(String peerId) throws IOException {
- rename(getPeerRemoteWALDir(remoteWALDir, peerId), getPeerReplayWALDir(remoteWALDir, peerId),
- peerId);
- }
-
- public void renameToPeerSnapshotWALDir(String peerId) throws IOException {
- rename(getPeerReplayWALDir(remoteWALDir, peerId), getPeerSnapshotWALDir(remoteWALDir, peerId),
- peerId);
- }
-
- public List<Path> getReplayWALsAndCleanUpUnusedFiles(String peerId) throws IOException {
- Path peerReplayWALDir = getPeerReplayWALDir(remoteWALDir, peerId);
- for (FileStatus status : fs.listStatus(peerReplayWALDir,
- p -> p.getName().endsWith(ReplicationUtils.RENAME_WAL_SUFFIX))) {
- Path src = status.getPath();
- String srcName = src.getName();
- String dstName =
- srcName.substring(0, srcName.length() - ReplicationUtils.RENAME_WAL_SUFFIX.length());
- FSUtils.renameFile(fs, src, new Path(src.getParent(), dstName));
- }
- List<Path> wals = new ArrayList<>();
- for (FileStatus status : fs.listStatus(peerReplayWALDir)) {
- Path path = status.getPath();
- if (path.getName().endsWith(ReplicationUtils.SYNC_WAL_SUFFIX)) {
- wals.add(path);
- } else {
- if (!fs.delete(path, true)) {
- LOG.warn("Can not delete unused file: " + path);
- }
- }
- }
- return wals;
- }
-
- public void snapshotPeerReplayWALDir(String peerId) throws IOException {
- Path peerReplayWALDir = getPeerReplayWALDir(remoteWALDir, peerId);
- if (fs.exists(peerReplayWALDir) && !fs.delete(peerReplayWALDir, true)) {
- throw new IOException(
- "Failed to remove replay wals dir " + peerReplayWALDir + " for peer id=" + peerId);
- }
- }
-
- private void deleteDir(Path dir, String peerId) throws IOException {
- if (!fs.delete(dir, true) && fs.exists(dir)) {
- throw new IOException("Failed to remove dir " + dir + " for peer id=" + peerId);
- }
- }
-
- public void removePeerRemoteWALs(String peerId) throws IOException {
- deleteDir(getPeerRemoteWALDir(remoteWALDir, peerId), peerId);
- deleteDir(getPeerReplayWALDir(remoteWALDir, peerId), peerId);
- deleteDir(getPeerSnapshotWALDir(remoteWALDir, peerId), peerId);
- }
-
- public void initPeerWorkers(String peerId) {
- BlockingQueue<ServerName> servers = new LinkedBlockingQueue<>();
- services.getServerManager().getOnlineServers().keySet()
- .forEach(server -> servers.offer(server));
- availServers.put(peerId, servers);
- }
-
- public ServerName getAvailServer(String peerId, long timeout, TimeUnit unit)
- throws InterruptedException {
- return availServers.get(peerId).poll(timeout, unit);
- }
-
- public void addAvailServer(String peerId, ServerName server) {
- availServers.get(peerId).offer(server);
- }
-
- public String removeWALRootPath(Path path) {
- String pathStr = path.toString();
- // remove the "/" too.
- return pathStr.substring(walRootDir.toString().length() + 1);
- }
-
- @VisibleForTesting
- public Path getRemoteWALDir() {
- return remoteWALDir;
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALProcedure.java
deleted file mode 100644
index 77fd24d..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALProcedure.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master.replication;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
-import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
-import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation;
-import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException;
-import org.apache.hadoop.hbase.procedure2.Procedure;
-import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
-import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
-import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
-import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
-import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
-import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
-import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
-import org.apache.hadoop.hbase.replication.regionserver.ReplaySyncReplicationWALCallable;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReplaySyncReplicationWALParameter;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReplaySyncReplicationWALStateData;
-
-@InterfaceAudience.Private
-public class ReplaySyncReplicationWALProcedure extends Procedure<MasterProcedureEnv>
- implements RemoteProcedure<MasterProcedureEnv, ServerName>, PeerProcedureInterface {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(ReplaySyncReplicationWALProcedure.class);
-
- private static final long DEFAULT_WAIT_AVAILABLE_SERVER_TIMEOUT = 10000;
-
- private String peerId;
-
- private ServerName targetServer = null;
-
- private String wal;
-
- private boolean dispatched;
-
- private ProcedureEvent<?> event;
-
- private boolean succ;
-
- public ReplaySyncReplicationWALProcedure() {
- }
-
- public ReplaySyncReplicationWALProcedure(String peerId, String wal) {
- this.peerId = peerId;
- this.wal = wal;
- }
-
- @Override
- public RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName remote) {
- return new ServerOperation(this, getProcId(), ReplaySyncReplicationWALCallable.class,
- ReplaySyncReplicationWALParameter.newBuilder().setPeerId(peerId).setWal(wal).build()
- .toByteArray());
- }
-
- @Override
- public void remoteCallFailed(MasterProcedureEnv env, ServerName remote, IOException exception) {
- complete(env, exception);
- }
-
- @Override
- public void remoteOperationCompleted(MasterProcedureEnv env) {
- complete(env, null);
- }
-
- @Override
- public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) {
- complete(env, error);
- }
-
- private void complete(MasterProcedureEnv env, Throwable error) {
- if (event == null) {
- LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery",
- getProcId());
- return;
- }
- ReplaySyncReplicationWALManager replaySyncReplicationWALManager =
- env.getMasterServices().getReplaySyncReplicationWALManager();
- if (error != null) {
- LOG.warn("Replay sync replication wal {} on {} failed for peer id={}", wal, targetServer,
- peerId, error);
- this.succ = false;
- } else {
- LOG.warn("Replay sync replication wal {} on {} suceeded for peer id={}", wal, targetServer,
- peerId);
- this.succ = true;
- replaySyncReplicationWALManager.addAvailServer(peerId, targetServer);
- }
- event.wake(env.getProcedureScheduler());
- event = null;
- }
-
- @Override
- protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
- throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
- if (dispatched) {
- if (succ) {
- return null;
- }
- // retry
- dispatched = false;
- }
-
- // Try poll a available server
- if (targetServer == null) {
- targetServer = env.getMasterServices().getReplaySyncReplicationWALManager()
- .getAvailServer(peerId, DEFAULT_WAIT_AVAILABLE_SERVER_TIMEOUT, TimeUnit.MILLISECONDS);
- if (targetServer == null) {
- LOG.info("No available server to replay wal {} for peer id={}, retry", wal, peerId);
- throw new ProcedureYieldException();
- }
- }
-
- // Dispatch task to target server
- try {
- env.getRemoteDispatcher().addOperationToNode(targetServer, this);
- } catch (FailedRemoteDispatchException e) {
- LOG.info(
- "Can not add remote operation for replay wal {} on {} for peer id={}, " +
- "this usually because the server is already dead, " + "retry",
- wal, targetServer, peerId, e);
- targetServer = null;
- throw new ProcedureYieldException();
- }
- dispatched = true;
- event = new ProcedureEvent<>(this);
- event.suspendIfNotReady(this);
- throw new ProcedureSuspendedException();
- }
-
- @Override
- protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected boolean abort(MasterProcedureEnv env) {
- return false;
- }
-
- @Override
- protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
- ReplaySyncReplicationWALStateData.Builder builder =
- ReplaySyncReplicationWALStateData.newBuilder().setPeerId(peerId).setWal(wal);
- if (targetServer != null) {
- builder.setTargetServer(ProtobufUtil.toServerName(targetServer));
- }
- serializer.serialize(builder.build());
- }
-
- @Override
- protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
- ReplaySyncReplicationWALStateData data =
- serializer.deserialize(ReplaySyncReplicationWALStateData.class);
- peerId = data.getPeerId();
- wal = data.getWal();
- if (data.hasTargetServer()) {
- targetServer = ProtobufUtil.toServerName(data.getTargetServer());
- }
- }
-
- @Override
- public String getPeerId() {
- return peerId;
- }
-
- @Override
- public PeerOperationType getPeerOperationType() {
- return PeerOperationType.REPLAY_SYNC_REPLICATION_WAL;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALManager.java
new file mode 100644
index 0000000..377c9f1
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALManager.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import static org.apache.hadoop.hbase.replication.ReplicationUtils.REMOTE_WAL_REPLAY_SUFFIX;
+import static org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerRemoteWALDir;
+import static org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerReplayWALDir;
+import static org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerSnapshotWALDir;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+@InterfaceAudience.Private
+public class SyncReplicationReplayWALManager {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SyncReplicationReplayWALManager.class);
+
+ private final MasterServices services;
+
+ private final FileSystem fs;
+
+ private final Path walRootDir;
+
+ private final Path remoteWALDir;
+
+ private final ZKSyncReplicationReplayWALWorkerStorage workerStorage;
+
+ private final Map<String, Set<ServerName>> workers = new HashMap<>();
+
+ private final Object workerLock = new Object();
+
+ public SyncReplicationReplayWALManager(MasterServices services)
+ throws IOException, ReplicationException {
+ this.services = services;
+ this.fs = services.getMasterFileSystem().getWALFileSystem();
+ this.walRootDir = services.getMasterFileSystem().getWALRootDir();
+ this.remoteWALDir = new Path(this.walRootDir, ReplicationUtils.REMOTE_WAL_DIR_NAME);
+ this.workerStorage = new ZKSyncReplicationReplayWALWorkerStorage(services.getZooKeeper(),
+ services.getConfiguration());
+ checkReplayingWALDir();
+ }
+
+ private void checkReplayingWALDir() throws IOException, ReplicationException {
+ FileStatus[] files = fs.listStatus(remoteWALDir);
+ for (FileStatus file : files) {
+ String name = file.getPath().getName();
+ if (name.endsWith(REMOTE_WAL_REPLAY_SUFFIX)) {
+ String peerId = name.substring(0, name.length() - REMOTE_WAL_REPLAY_SUFFIX.length());
+ workers.put(peerId, workerStorage.getPeerWorkers(peerId));
+ }
+ }
+ }
+
+ public void registerPeer(String peerId) throws ReplicationException {
+ workers.put(peerId, new HashSet<>());
+ workerStorage.addPeer(peerId);
+ }
+
+ public void unregisterPeer(String peerId) throws ReplicationException {
+ workers.remove(peerId);
+ workerStorage.removePeer(peerId);
+ }
+
+ public ServerName getPeerWorker(String peerId) throws ReplicationException {
+ Optional<ServerName> worker = Optional.empty();
+ ServerName workerServer = null;
+ synchronized (workerLock) {
+ worker = services.getServerManager().getOnlineServers().keySet().stream()
+ .filter(server -> !workers.get(peerId).contains(server)).findFirst();
+ if (worker.isPresent()) {
+ workerServer = worker.get();
+ workers.get(peerId).add(workerServer);
+ }
+ }
+ if (workerServer != null) {
+ workerStorage.addPeerWorker(peerId, workerServer);
+ }
+ return workerServer;
+ }
+
+ public void removePeerWorker(String peerId, ServerName worker) throws ReplicationException {
+ synchronized (workerLock) {
+ workers.get(peerId).remove(worker);
+ }
+ workerStorage.removePeerWorker(peerId, worker);
+ }
+ public void createPeerRemoteWALDir(String peerId) throws IOException {
+ Path peerRemoteWALDir = getPeerRemoteWALDir(remoteWALDir, peerId);
+ if (!fs.exists(peerRemoteWALDir) && !fs.mkdirs(peerRemoteWALDir)) {
+ throw new IOException("Unable to mkdir " + peerRemoteWALDir);
+ }
+ }
+
+ private void rename(Path src, Path dst, String peerId) throws IOException {
+ if (fs.exists(src)) {
+ deleteDir(dst, peerId);
+ if (!fs.rename(src, dst)) {
+ throw new IOException(
+ "Failed to rename dir from " + src + " to " + dst + " for peer id=" + peerId);
+ }
+ LOG.info("Renamed dir from {} to {} for peer id={}", src, dst, peerId);
+ } else if (!fs.exists(dst)) {
+ throw new IOException(
+ "Want to rename from " + src + " to " + dst + ", but they both do not exist");
+ }
+ }
+
+ public void renameToPeerReplayWALDir(String peerId) throws IOException {
+ rename(getPeerRemoteWALDir(remoteWALDir, peerId), getPeerReplayWALDir(remoteWALDir, peerId),
+ peerId);
+ }
+
+ public void renameToPeerSnapshotWALDir(String peerId) throws IOException {
+ rename(getPeerReplayWALDir(remoteWALDir, peerId), getPeerSnapshotWALDir(remoteWALDir, peerId),
+ peerId);
+ }
+
+ public List<Path> getReplayWALsAndCleanUpUnusedFiles(String peerId) throws IOException {
+ Path peerReplayWALDir = getPeerReplayWALDir(remoteWALDir, peerId);
+ for (FileStatus status : fs.listStatus(peerReplayWALDir,
+ p -> p.getName().endsWith(ReplicationUtils.RENAME_WAL_SUFFIX))) {
+ Path src = status.getPath();
+ String srcName = src.getName();
+ String dstName =
+ srcName.substring(0, srcName.length() - ReplicationUtils.RENAME_WAL_SUFFIX.length());
+ FSUtils.renameFile(fs, src, new Path(src.getParent(), dstName));
+ }
+ List<Path> wals = new ArrayList<>();
+ for (FileStatus status : fs.listStatus(peerReplayWALDir)) {
+ Path path = status.getPath();
+ if (path.getName().endsWith(ReplicationUtils.SYNC_WAL_SUFFIX)) {
+ wals.add(path);
+ } else {
+ if (!fs.delete(path, true)) {
+ LOG.warn("Can not delete unused file: " + path);
+ }
+ }
+ }
+ return wals;
+ }
+
+ public void snapshotPeerReplayWALDir(String peerId) throws IOException {
+ Path peerReplayWALDir = getPeerReplayWALDir(remoteWALDir, peerId);
+ if (fs.exists(peerReplayWALDir) && !fs.delete(peerReplayWALDir, true)) {
+ throw new IOException(
+ "Failed to remove replay wals dir " + peerReplayWALDir + " for peer id=" + peerId);
+ }
+ }
+
+ private void deleteDir(Path dir, String peerId) throws IOException {
+ if (!fs.delete(dir, true) && fs.exists(dir)) {
+ throw new IOException("Failed to remove dir " + dir + " for peer id=" + peerId);
+ }
+ }
+
+ public void removePeerRemoteWALs(String peerId) throws IOException {
+ deleteDir(getPeerRemoteWALDir(remoteWALDir, peerId), peerId);
+ deleteDir(getPeerReplayWALDir(remoteWALDir, peerId), peerId);
+ deleteDir(getPeerSnapshotWALDir(remoteWALDir, peerId), peerId);
+ }
+
+ public String removeWALRootPath(Path path) {
+ String pathStr = path.toString();
+ // remove the "/" too.
+ return pathStr.substring(walRootDir.toString().length() + 1);
+ }
+
+ public void finishReplayWAL(String wal) throws IOException {
+ Path walPath = new Path(walRootDir, wal);
+ fs.truncate(walPath, 0);
+ }
+
+ public boolean isReplayWALFinished(String wal) throws IOException {
+ Path walPath = new Path(walRootDir, wal);
+ return fs.getFileStatus(walPath).getLen() == 0;
+ }
+
+ @VisibleForTesting
+ public Path getRemoteWALDir() {
+ return remoteWALDir;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java
new file mode 100644
index 0000000..26d6a3f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SyncReplicationReplayWALState;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SyncReplicationReplayWALStateData;
+
+@InterfaceAudience.Private
+public class SyncReplicationReplayWALProcedure
+ extends StateMachineProcedure<MasterProcedureEnv, SyncReplicationReplayWALState>
+ implements PeerProcedureInterface {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SyncReplicationReplayWALProcedure.class);
+
+ private String peerId;
+
+ private ServerName worker = null;
+
+ private List<String> wals;
+
+ public SyncReplicationReplayWALProcedure() {
+ }
+
+ public SyncReplicationReplayWALProcedure(String peerId, List<String> wals) {
+ this.peerId = peerId;
+ this.wals = wals;
+ }
+
+ @Override protected Flow executeFromState(MasterProcedureEnv env,
+ SyncReplicationReplayWALState state)
+ throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
+ SyncReplicationReplayWALManager syncReplicationReplayWALManager =
+ env.getMasterServices().getSyncReplicationReplayWALManager();
+ switch (state) {
+ case ASSIGN_WORKER:
+ try {
+ worker = syncReplicationReplayWALManager.getPeerWorker(peerId);
+ } catch (ReplicationException e) {
+ LOG.info("Failed to get worker to replay wals {} for peer id={}, retry", wals, peerId);
+ throw new ProcedureYieldException();
+ }
+ if (worker == null) {
+ LOG.info("No worker to replay wals {} for peer id={}, retry", wals, peerId);
+ setNextState(SyncReplicationReplayWALState.ASSIGN_WORKER);
+ } else {
+ setNextState(SyncReplicationReplayWALState.DISPATCH_WALS_TO_WORKER);
+ }
+ return Flow.HAS_MORE_STATE;
+ case DISPATCH_WALS_TO_WORKER:
+ addChildProcedure(new SyncReplicationReplayWALRemoteProcedure(peerId, wals, worker));
+ setNextState(SyncReplicationReplayWALState.RELEASE_WORKER);
+ return Flow.HAS_MORE_STATE;
+ case RELEASE_WORKER:
+ boolean finished = false;
+ try {
+ finished = syncReplicationReplayWALManager.isReplayWALFinished(wals.get(0));
+ } catch (IOException e) {
+ LOG.info("Failed to check whether replay wals {} finished for peer id={}", wals, peerId);
+ throw new ProcedureYieldException();
+ }
+ try {
+ syncReplicationReplayWALManager.removePeerWorker(peerId, worker);
+ } catch (ReplicationException e) {
+ LOG.info("Failed to remove worker for peer id={}, retry", peerId);
+ throw new ProcedureYieldException();
+ }
+ if (!finished) {
+ LOG.info("Failed to replay wals {} for peer id={}, retry", wals, peerId);
+ setNextState(SyncReplicationReplayWALState.ASSIGN_WORKER);
+ return Flow.HAS_MORE_STATE;
+ }
+ return Flow.NO_MORE_STATE;
+ default:
+ throw new UnsupportedOperationException("unhandled state=" + state);
+ }
+ }
+
+ @Override
+ protected void rollbackState(MasterProcedureEnv env,
+ SyncReplicationReplayWALState state)
+ throws IOException, InterruptedException {
+ if (state == getInitialState()) {
+ return;
+ }
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected SyncReplicationReplayWALState getState(int state) {
+ return SyncReplicationReplayWALState.forNumber(state);
+ }
+
+ @Override
+ protected int getStateId(
+ SyncReplicationReplayWALState state) {
+ return state.getNumber();
+ }
+
+ @Override
+ protected SyncReplicationReplayWALState getInitialState() {
+ return SyncReplicationReplayWALState.ASSIGN_WORKER;
+ }
+
+ @Override
+ protected void serializeStateData(ProcedureStateSerializer serializer)
+ throws IOException {
+ SyncReplicationReplayWALStateData.Builder builder =
+ SyncReplicationReplayWALStateData.newBuilder();
+ builder.setPeerId(peerId);
+ wals.stream().forEach(builder::addWal);
+ serializer.serialize(builder.build());
+ }
+
+ @Override
+ protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ SyncReplicationReplayWALStateData data =
+ serializer.deserialize(SyncReplicationReplayWALStateData.class);
+ peerId = data.getPeerId();
+ wals = new ArrayList<>();
+ data.getWalList().forEach(wals::add);
+ }
+
+ @Override
+ public String getPeerId() {
+ return peerId;
+ }
+
+ @Override
+ public PeerOperationType getPeerOperationType() {
+ return PeerOperationType.SYNC_REPLICATION_REPLAY_WAL;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java
new file mode 100644
index 0000000..9f4f330
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
+import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation;
+import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
+import org.apache.hadoop.hbase.replication.regionserver.ReplaySyncReplicationWALCallable;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReplaySyncReplicationWALParameter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SyncReplicationReplayWALRemoteStateData;
+
+@InterfaceAudience.Private
+public class SyncReplicationReplayWALRemoteProcedure extends Procedure<MasterProcedureEnv>
+ implements RemoteProcedure<MasterProcedureEnv, ServerName>, PeerProcedureInterface {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SyncReplicationReplayWALRemoteProcedure.class);
+
+ private String peerId;
+
+ private ServerName targetServer;
+
+ private List<String> wals;
+
+ private boolean dispatched;
+
+ private ProcedureEvent<?> event;
+
+ private boolean succ;
+
+ public SyncReplicationReplayWALRemoteProcedure() {
+ }
+
+ public SyncReplicationReplayWALRemoteProcedure(String peerId, List<String> wals,
+ ServerName targetServer) {
+ this.peerId = peerId;
+ this.wals = wals;
+ this.targetServer = targetServer;
+ }
+
+ @Override
+ public RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName remote) {
+ ReplaySyncReplicationWALParameter.Builder builder =
+ ReplaySyncReplicationWALParameter.newBuilder();
+ builder.setPeerId(peerId);
+ wals.stream().forEach(builder::addWal);
+ return new ServerOperation(this, getProcId(), ReplaySyncReplicationWALCallable.class,
+ builder.build().toByteArray());
+ }
+
+ @Override
+ public void remoteCallFailed(MasterProcedureEnv env, ServerName remote, IOException exception) {
+ complete(env, exception);
+ }
+
+ @Override
+ public void remoteOperationCompleted(MasterProcedureEnv env) {
+ complete(env, null);
+ }
+
+ @Override
+ public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) {
+ complete(env, error);
+ }
+
+ private void complete(MasterProcedureEnv env, Throwable error) {
+ if (event == null) {
+ LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery",
+ getProcId());
+ return;
+ }
+ if (error != null) {
+ LOG.warn("Replay wals {} on {} failed for peer id={}", wals, targetServer, peerId, error);
+ this.succ = false;
+ } else {
+ truncateWALs(env);
+ LOG.info("Replay wals {} on {} succeed for peer id={}", wals, targetServer, peerId);
+ this.succ = true;
+ }
+ event.wake(env.getProcedureScheduler());
+ event = null;
+ }
+
+ /**
+ * Only truncate wals one by one when task succeed. The parent procedure will check the first
+ * wal length to know whether this task succeed.
+ */
+ private void truncateWALs(MasterProcedureEnv env) {
+ String firstWal = wals.get(0);
+ try {
+ env.getMasterServices().getSyncReplicationReplayWALManager().finishReplayWAL(firstWal);
+ } catch (IOException e) {
+ // As it is idempotent to rerun this task. Just ignore this exception and return.
+ LOG.warn("Failed to truncate wal {} for peer id={}", firstWal, peerId, e);
+ return;
+ }
+ for (int i = 1; i < wals.size(); i++) {
+ String wal = wals.get(i);
+ try {
+ env.getMasterServices().getSyncReplicationReplayWALManager().finishReplayWAL(wal);
+ } catch (IOException e1) {
+ try {
+ // retry
+ env.getMasterServices().getSyncReplicationReplayWALManager().finishReplayWAL(wal);
+ } catch (IOException e2) {
+ // As the parent procedure only check the first wal length. Just ignore this exception.
+ LOG.warn("Failed to truncate wal {} for peer id={}", wal, peerId, e2);
+ }
+ }
+ }
+ }
+
+ @Override
+ protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
+ throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
+ if (dispatched) {
+ if (succ) {
+ return null;
+ }
+ // retry
+ dispatched = false;
+ }
+
+ // Dispatch task to target server
+ try {
+ env.getRemoteDispatcher().addOperationToNode(targetServer, this);
+ } catch (FailedRemoteDispatchException e) {
+ LOG.warn(
+ "Can not add remote operation for replay wals {} on {} for peer id={}, "
+ + "this usually because the server is already dead, retry",
+ wals, targetServer, peerId);
+ throw new ProcedureYieldException();
+ }
+ dispatched = true;
+ event = new ProcedureEvent<>(this);
+ event.suspendIfNotReady(this);
+ throw new ProcedureSuspendedException();
+ }
+
+ @Override
+ protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected boolean abort(MasterProcedureEnv env) {
+ return false;
+ }
+
+ @Override
+ protected void serializeStateData(ProcedureStateSerializer serializer)
+ throws IOException {
+ SyncReplicationReplayWALRemoteStateData.Builder builder =
+ SyncReplicationReplayWALRemoteStateData.newBuilder().setPeerId(peerId)
+ .setTargetServer(ProtobufUtil.toServerName(targetServer));
+ wals.stream().forEach(builder::addWal);
+ serializer.serialize(builder.build());
+ }
+
+ @Override
+ protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ SyncReplicationReplayWALRemoteStateData data =
+ serializer.deserialize(SyncReplicationReplayWALRemoteStateData.class);
+ peerId = data.getPeerId();
+ wals = new ArrayList<>();
+ data.getWalList().forEach(wals::add);
+ targetServer = ProtobufUtil.toServerName(data.getTargetServer());
+ }
+
+ @Override
+ public String getPeerId() {
+ return peerId;
+ }
+
+ @Override
+ public PeerOperationType getPeerOperationType() {
+ return PeerOperationType.SYNC_REPLICATION_REPLAY_WAL_REMOTE;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
index 66f67dd..c650974 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
@@ -186,8 +186,8 @@ public class TransitPeerSyncReplicationStateProcedure
}
}
- private void replayRemoteWAL() {
- addChildProcedure(new RecoverStandbyProcedure(peerId));
+ private void replayRemoteWAL(boolean serial) {
+ addChildProcedure(new RecoverStandbyProcedure(peerId, serial));
}
@Override
@@ -232,7 +232,7 @@ public class TransitPeerSyncReplicationStateProcedure
setNextStateAfterRefreshBegin();
return Flow.HAS_MORE_STATE;
case REPLAY_REMOTE_WAL_IN_PEER:
- replayRemoteWAL();
+ replayRemoteWAL(env.getReplicationPeerManager().getPeerConfig(peerId).get().isSerial());
setNextState(
PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
return Flow.HAS_MORE_STATE;
http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ZKSyncReplicationReplayWALWorkerStorage.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ZKSyncReplicationReplayWALWorkerStorage.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ZKSyncReplicationReplayWALWorkerStorage.java
new file mode 100644
index 0000000..5991cf0
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ZKSyncReplicationReplayWALWorkerStorage.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ZKReplicationStorageBase;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
+
+@InterfaceAudience.Private
+public class ZKSyncReplicationReplayWALWorkerStorage extends ZKReplicationStorageBase {
+
+ public static final String WORKERS_ZNODE = "zookeeper.znode.sync.replication.replaywal.workers";
+
+ public static final String WORKERS_ZNODE_DEFAULT = "replaywal-workers";
+
+ /**
+ * The name of the znode that contains a list of workers to replay wal.
+ */
+ private final String workersZNode;
+
+ public ZKSyncReplicationReplayWALWorkerStorage(ZKWatcher zookeeper, Configuration conf) {
+ super(zookeeper, conf);
+ String workersZNodeName = conf.get(WORKERS_ZNODE, WORKERS_ZNODE_DEFAULT);
+ workersZNode = ZNodePaths.joinZNode(replicationZNode, workersZNodeName);
+ }
+
+ private String getPeerNode(String peerId) {
+ return ZNodePaths.joinZNode(workersZNode, peerId);
+ }
+
+ public void addPeer(String peerId) throws ReplicationException {
+ try {
+ ZKUtil.createWithParents(zookeeper, getPeerNode(peerId));
+ } catch (KeeperException e) {
+ throw new ReplicationException(
+ "Failed to add peer id=" + peerId + " to replaywal-workers storage", e);
+ }
+ }
+
+ public void removePeer(String peerId) throws ReplicationException {
+ try {
+ ZKUtil.deleteNodeRecursively(zookeeper, getPeerNode(peerId));
+ } catch (KeeperException e) {
+ throw new ReplicationException(
+ "Failed to remove peer id=" + peerId + " to replaywal-workers storage", e);
+ }
+ }
+
+ private String getPeerWorkerNode(String peerId, ServerName worker) {
+ return ZNodePaths.joinZNode(getPeerNode(peerId), worker.getServerName());
+ }
+
+ public void addPeerWorker(String peerId, ServerName worker) throws ReplicationException {
+ try {
+ ZKUtil.createWithParents(zookeeper, getPeerWorkerNode(peerId, worker));
+ } catch (KeeperException e) {
+ throw new ReplicationException("Failed to add worker=" + worker + " for peer id=" + peerId,
+ e);
+ }
+ }
+
+ public void removePeerWorker(String peerId, ServerName worker) throws ReplicationException {
+ try {
+ ZKUtil.deleteNode(zookeeper, getPeerWorkerNode(peerId, worker));
+ } catch (KeeperException e) {
+ throw new ReplicationException("Failed to remove worker=" + worker + " for peer id=" + peerId,
+ e);
+ }
+ }
+
+ public Set<ServerName> getPeerWorkers(String peerId) throws ReplicationException {
+ try {
+ List<String> children = ZKUtil.listChildrenNoWatch(zookeeper, getPeerNode(peerId));
+ if (children == null) {
+ return new HashSet<>();
+ }
+ return children.stream().map(ServerName::valueOf).collect(Collectors.toSet());
+ } catch (KeeperException e) {
+ throw new ReplicationException("Failed to list workers for peer id=" + peerId, e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java
index 3cf065c..24963f1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java
@@ -21,6 +21,8 @@ import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.locks.Lock;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -32,6 +34,7 @@ import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.KeyLocker;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WAL.Reader;
@@ -68,31 +71,28 @@ public class ReplaySyncReplicationWALCallable implements RSProcedureCallable {
private String peerId;
- private String wal;
+ private List<String> wals = new ArrayList<>();
private Exception initError;
private long batchSize;
+ private final KeyLocker<String> peersLock = new KeyLocker<>();
+
@Override
public Void call() throws Exception {
if (initError != null) {
throw initError;
}
- LOG.info("Received a replay sync replication wal {} event, peerId={}", wal, peerId);
+ LOG.info("Received a replay sync replication wals {} event, peerId={}", wals, peerId);
if (rs.getReplicationSinkService() != null) {
- try (Reader reader = getReader()) {
- List<Entry> entries = readWALEntries(reader);
- while (!entries.isEmpty()) {
- Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> pair = ReplicationProtbufUtil
- .buildReplicateWALEntryRequest(entries.toArray(new Entry[entries.size()]));
- ReplicateWALEntryRequest request = pair.getFirst();
- rs.getReplicationSinkService().replicateLogEntries(request.getEntryList(),
- pair.getSecond(), request.getReplicationClusterId(),
- request.getSourceBaseNamespaceDirPath(), request.getSourceHFileArchiveDirPath());
- // Read next entries.
- entries = readWALEntries(reader);
+ Lock peerLock = peersLock.acquireLock(wals.get(0));
+ try {
+ for (String wal : wals) {
+ replayWAL(wal);
}
+ } finally {
+ peerLock.unlock();
}
}
return null;
@@ -107,7 +107,7 @@ public class ReplaySyncReplicationWALCallable implements RSProcedureCallable {
ReplaySyncReplicationWALParameter param =
ReplaySyncReplicationWALParameter.parseFrom(parameter);
this.peerId = param.getPeerId();
- this.wal = param.getWal();
+ param.getWalList().forEach(this.wals::add);
this.batchSize = rs.getConfiguration().getLong(REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE,
DEFAULT_REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE);
} catch (InvalidProtocolBufferException e) {
@@ -120,7 +120,23 @@ public class ReplaySyncReplicationWALCallable implements RSProcedureCallable {
return EventType.RS_REPLAY_SYNC_REPLICATION_WAL;
}
- private Reader getReader() throws IOException {
+ private void replayWAL(String wal) throws IOException {
+ try (Reader reader = getReader(wal)) {
+ List<Entry> entries = readWALEntries(reader);
+ while (!entries.isEmpty()) {
+ Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> pair = ReplicationProtbufUtil
+ .buildReplicateWALEntryRequest(entries.toArray(new Entry[entries.size()]));
+ ReplicateWALEntryRequest request = pair.getFirst();
+ rs.getReplicationSinkService().replicateLogEntries(request.getEntryList(),
+ pair.getSecond(), request.getReplicationClusterId(),
+ request.getSourceBaseNamespaceDirPath(), request.getSourceHFileArchiveDirPath());
+ // Read next entries.
+ entries = readWALEntries(reader);
+ }
+ }
+ }
+
+ private Reader getReader(String wal) throws IOException {
Path path = new Path(rs.getWALRootDir(), wal);
long length = rs.getWALFileSystem().getFileStatus(path).getLen();
try {
http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
index 48d47ea..ac20dbd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
@@ -41,8 +41,8 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.locking.LockManager;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
-import org.apache.hadoop.hbase.master.replication.ReplaySyncReplicationWALManager;
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
+import org.apache.hadoop.hbase.master.replication.SyncReplicationReplayWALManager;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
import org.apache.hadoop.hbase.procedure2.LockedResource;
@@ -476,7 +476,7 @@ public class MockNoopMasterServices implements MasterServices {
}
@Override
- public ReplaySyncReplicationWALManager getReplaySyncReplicationWALManager() {
+ public SyncReplicationReplayWALManager getSyncReplicationReplayWALManager() {
return null;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
index a20edd3..f765139 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
@@ -78,7 +78,7 @@ public class SyncReplicationTestBase {
protected static Path REMOTE_WAL_DIR2;
- private static void initTestingUtility(HBaseTestingUtility util, String zkParent) {
+ protected static void initTestingUtility(HBaseTestingUtility util, String zkParent) {
util.setZkCluster(ZK_UTIL.getZkCluster());
Configuration conf = util.getConfiguration();
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkParent);
@@ -102,8 +102,8 @@ public class SyncReplicationTestBase {
ZK_UTIL.startMiniZKCluster();
initTestingUtility(UTIL1, "/cluster1");
initTestingUtility(UTIL2, "/cluster2");
- UTIL1.startMiniCluster(3);
- UTIL2.startMiniCluster(3);
+ UTIL1.startMiniCluster(2,3);
+ UTIL2.startMiniCluster(2,3);
TableDescriptor td =
TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build();
http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillMaster.java
new file mode 100644
index 0000000..6265f5c
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillMaster.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestSyncReplicationStandbyKillMaster extends SyncReplicationTestBase {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestSyncReplicationStandbyKillMaster.class);
+
+ private final long SLEEP_TIME = 2000;
+
+ private final int COUNT = 1000;
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestSyncReplicationStandbyKillMaster.class);
+
+ @Test
+ public void testStandbyKillMaster() throws Exception {
+ MasterFileSystem mfs = UTIL2.getHBaseCluster().getMaster().getMasterFileSystem();
+ Path remoteWALDir = getRemoteWALDir(mfs, PEER_ID);
+ assertFalse(mfs.getWALFileSystem().exists(remoteWALDir));
+ UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.STANDBY);
+ assertTrue(mfs.getWALFileSystem().exists(remoteWALDir));
+ UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.ACTIVE);
+
+ // Disable async replication and write data, then shutdown
+ UTIL1.getAdmin().disableReplicationPeer(PEER_ID);
+ write(UTIL1, 0, COUNT);
+ UTIL1.shutdownMiniCluster();
+
+ Thread t = new Thread(() -> {
+ try {
+ Thread.sleep(SLEEP_TIME);
+ UTIL2.getMiniHBaseCluster().getMaster().stop("Stop master for test");
+ } catch (Exception e) {
+ LOG.error("Failed to stop master", e);
+ }
+ });
+ t.start();
+
+ // Transit standby to DA to replay logs
+ try {
+ UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.DOWNGRADE_ACTIVE);
+ } catch (Exception e) {
+ LOG.error("Failed to transit standby cluster to " + SyncReplicationState.DOWNGRADE_ACTIVE);
+ }
+
+ while (UTIL2.getAdmin().getReplicationPeerSyncReplicationState(PEER_ID)
+ != SyncReplicationState.DOWNGRADE_ACTIVE) {
+ Thread.sleep(SLEEP_TIME);
+ }
+ verify(UTIL2, 0, COUNT);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillRS.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillRS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillRS.java
new file mode 100644
index 0000000..3c9724f
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandbyKillRS.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
+import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestSyncReplicationStandbyKillRS extends SyncReplicationTestBase {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestSyncReplicationStandbyKillRS.class);
+
+ private final long SLEEP_TIME = 1000;
+
+ private final int COUNT = 1000;
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestSyncReplicationStandbyKillRS.class);
+
+ @Test
+ public void testStandbyKillRegionServer() throws Exception {
+ MasterFileSystem mfs = UTIL2.getHBaseCluster().getMaster().getMasterFileSystem();
+ Path remoteWALDir = getRemoteWALDir(mfs, PEER_ID);
+ assertFalse(mfs.getWALFileSystem().exists(remoteWALDir));
+ UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.STANDBY);
+ assertTrue(mfs.getWALFileSystem().exists(remoteWALDir));
+ UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.ACTIVE);
+
+ // Disable async replication and write data, then shutdown
+ UTIL1.getAdmin().disableReplicationPeer(PEER_ID);
+ write(UTIL1, 0, COUNT);
+ UTIL1.shutdownMiniCluster();
+
+ JVMClusterUtil.MasterThread activeMaster = UTIL2.getMiniHBaseCluster().getMasterThread();
+ Thread t = new Thread(() -> {
+ try {
+ List<JVMClusterUtil.RegionServerThread> regionServers =
+ UTIL2.getMiniHBaseCluster().getLiveRegionServerThreads();
+ for (JVMClusterUtil.RegionServerThread rst : regionServers) {
+ ServerName serverName = rst.getRegionServer().getServerName();
+ rst.getRegionServer().stop("Stop RS for test");
+ waitForRSShutdownToStartAndFinish(activeMaster, serverName);
+ JVMClusterUtil.RegionServerThread restarted =
+ UTIL2.getMiniHBaseCluster().startRegionServer();
+ restarted.waitForServerOnline();
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to kill RS", e);
+ }
+ });
+ t.start();
+
+ // Transit standby to DA to replay logs
+ try {
+ UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.DOWNGRADE_ACTIVE);
+ } catch (Exception e) {
+ LOG.error("Failed to transit standby cluster to " + SyncReplicationState.DOWNGRADE_ACTIVE);
+ }
+
+ while (UTIL2.getAdmin().getReplicationPeerSyncReplicationState(PEER_ID)
+ != SyncReplicationState.DOWNGRADE_ACTIVE) {
+ Thread.sleep(SLEEP_TIME);
+ }
+ verify(UTIL2, 0, COUNT);
+ }
+
+ private void waitForRSShutdownToStartAndFinish(JVMClusterUtil.MasterThread activeMaster,
+ ServerName serverName) throws InterruptedException {
+ ServerManager sm = activeMaster.getMaster().getServerManager();
+ // First wait for it to be in dead list
+ while (!sm.getDeadServers().isDeadServer(serverName)) {
+ LOG.debug("Waiting for [" + serverName + "] to be listed as dead in master");
+ Thread.sleep(SLEEP_TIME);
+ }
+ LOG.debug("Server [" + serverName + "] marked as dead, waiting for it to " +
+ "finish dead processing");
+ while (sm.areDeadServersInProgress()) {
+ LOG.debug("Server [" + serverName + "] still being processed, waiting");
+ Thread.sleep(SLEEP_TIME);
+ }
+ LOG.debug("Server [" + serverName + "] done with server shutdown processing");
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/44ca13fe/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java
index 2563669..d01a0ac 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.replication.RecoverStandbyProcedure;
-import org.apache.hadoop.hbase.master.replication.ReplaySyncReplicationWALManager;
+import org.apache.hadoop.hbase.master.replication.SyncReplicationReplayWALManager;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
@@ -92,7 +92,7 @@ public class TestRecoverStandbyProcedure {
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
- private static ReplaySyncReplicationWALManager replaySyncReplicationWALManager;
+ private static SyncReplicationReplayWALManager syncReplicationReplayWALManager;
private static ProcedureExecutor<MasterProcedureEnv> procExec;
@@ -107,7 +107,7 @@ public class TestRecoverStandbyProcedure {
conf = UTIL.getConfiguration();
HMaster master = UTIL.getHBaseCluster().getMaster();
fs = master.getMasterFileSystem().getWALFileSystem();
- replaySyncReplicationWALManager = master.getReplaySyncReplicationWALManager();
+ syncReplicationReplayWALManager = master.getSyncReplicationReplayWALManager();
procExec = master.getMasterProcedureExecutor();
}
@@ -138,7 +138,7 @@ public class TestRecoverStandbyProcedure {
@Test
public void testRecoverStandby() throws IOException, StreamLacksCapabilityException {
setupSyncReplicationWALs();
- long procId = procExec.submitProcedure(new RecoverStandbyProcedure(PEER_ID));
+ long procId = procExec.submitProcedure(new RecoverStandbyProcedure(PEER_ID, false));
ProcedureTestingUtility.waitProcedure(procExec, procId);
ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
@@ -153,7 +153,7 @@ public class TestRecoverStandbyProcedure {
private void setupSyncReplicationWALs() throws IOException, StreamLacksCapabilityException {
Path peerRemoteWALDir = ReplicationUtils
- .getPeerRemoteWALDir(replaySyncReplicationWALManager.getRemoteWALDir(), PEER_ID);
+ .getPeerRemoteWALDir(syncReplicationReplayWALManager.getRemoteWALDir(), PEER_ID);
if (!fs.exists(peerRemoteWALDir)) {
fs.mkdirs(peerRemoteWALDir);
}