You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/05/31 07:02:07 UTC
[09/36] hbase git commit: HBASE-20424 Allow writing WAL to local and
remote cluster concurrently
HBASE-20424 Allow writing WAL to local and remote cluster concurrently
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/46866d9f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/46866d9f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/46866d9f
Branch: refs/heads/HBASE-19064
Commit: 46866d9f6f3372202c5e4f172ac9c3f46258aa22
Parents: 25cac93
Author: zhangduo <zh...@apache.org>
Authored: Thu May 24 16:20:28 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Thu May 31 14:31:01 2018 +0800
----------------------------------------------------------------------
.../src/main/protobuf/MasterProcedure.proto | 2 +-
.../hbase/replication/ReplicationUtils.java | 26 ++-
.../asyncfs/FanOutOneBlockAsyncDFSOutput.java | 3 +-
.../replication/RecoverStandbyProcedure.java | 10 +-
.../master/replication/RemovePeerProcedure.java | 5 +-
.../ReplaySyncReplicationWALManager.java | 110 ++++++-----
...ransitPeerSyncReplicationStateProcedure.java | 4 +-
.../hbase/regionserver/HRegionServer.java | 3 +-
.../regionserver/ReplicationSourceService.java | 6 +
.../hbase/regionserver/SplitLogWorker.java | 188 +++++++++++++------
.../regionserver/wal/CombinedAsyncWriter.java | 80 ++------
.../hbase/regionserver/wal/DualAsyncFSWAL.java | 11 +-
.../replication/regionserver/Replication.java | 5 +
.../regionserver/ReplicationSourceManager.java | 2 +-
.../SyncReplicationPeerInfoProviderImpl.java | 3 +-
.../org/apache/hadoop/hbase/util/FSUtils.java | 9 +
.../hbase/wal/SyncReplicationWALProvider.java | 43 ++++-
.../replication/TestReplicationAdmin.java | 2 +-
.../wal/TestCombinedAsyncWriter.java | 20 +-
.../replication/DualAsyncFSWALForTest.java | 149 +++++++++++++++
.../replication/SyncReplicationTestBase.java | 12 +-
.../replication/TestSyncReplicationActive.java | 5 +-
...cReplicationMoreLogsInLocalCopyToRemote.java | 108 +++++++++++
...plicationMoreLogsInLocalGiveUpSplitting.java | 128 +++++++++++++
.../TestSyncReplicationRemoveRemoteWAL.java | 7 +-
.../replication/TestSyncReplicationStandBy.java | 20 +-
.../master/TestRecoverStandbyProcedure.java | 4 +-
.../TestReplicationSourceManager.java | 5 +-
.../wal/TestSyncReplicationWALProvider.java | 1 -
29 files changed, 733 insertions(+), 238 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/46866d9f/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
index f58ad2e..5764a21 100644
--- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
@@ -476,7 +476,7 @@ enum RecoverStandbyState {
RENAME_SYNC_REPLICATION_WALS_DIR = 1;
INIT_WORKERS = 2;
DISPATCH_TASKS = 3;
- REMOVE_SYNC_REPLICATION_WALS_DIR = 4;
+ SNAPSHOT_SYNC_REPLICATION_WALS_DIR = 4;
}
message RecoverStandbyStateData {
http://git-wip-us.apache.org/repos/asf/hbase/blob/46866d9f/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
index 069db7a..dc4217c 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
@@ -46,6 +46,16 @@ public final class ReplicationUtils {
public static final String REMOTE_WAL_DIR_NAME = "remoteWALs";
+ public static final String SYNC_WAL_SUFFIX = ".syncrep";
+
+ public static final String REMOTE_WAL_REPLAY_SUFFIX = "-replay";
+
+ public static final String REMOTE_WAL_SNAPSHOT_SUFFIX = "-snapshot";
+
+ // This is used for copying sync replication log from local to remote and overwrite the old one
+ // since some FileSystem implementation may not support atomic rename.
+ public static final String RENAME_WAL_SUFFIX = ".ren";
+
private ReplicationUtils() {
}
@@ -187,14 +197,26 @@ public final class ReplicationUtils {
return new Path(remoteWALDir).getFileSystem(conf);
}
- public static Path getRemoteWALDirForPeer(String remoteWALDir, String peerId) {
+ public static Path getPeerRemoteWALDir(String remoteWALDir, String peerId) {
return new Path(remoteWALDir, peerId);
}
- public static Path getRemoteWALDirForPeer(Path remoteWALDir, String peerId) {
+ public static Path getPeerRemoteWALDir(Path remoteWALDir, String peerId) {
return new Path(remoteWALDir, peerId);
}
+ public static Path getPeerReplayWALDir(Path remoteWALDir, String peerId) {
+ return getPeerRemoteWALDir(remoteWALDir, peerId).suffix(REMOTE_WAL_REPLAY_SUFFIX);
+ }
+
+ public static Path getPeerSnapshotWALDir(String remoteWALDir, String peerId) {
+ return getPeerRemoteWALDir(remoteWALDir, peerId).suffix(REMOTE_WAL_SNAPSHOT_SUFFIX);
+ }
+
+ public static Path getPeerSnapshotWALDir(Path remoteWALDir, String peerId) {
+ return getPeerRemoteWALDir(remoteWALDir, peerId).suffix(REMOTE_WAL_SNAPSHOT_SUFFIX);
+ }
+
/**
* Do the sleeping logic
* @param msg Why we sleep
http://git-wip-us.apache.org/repos/asf/hbase/blob/46866d9f/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
index 1645d68..7ffd3da 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
@@ -22,9 +22,9 @@ import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHel
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.WRITER_IDLE;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import java.io.IOException;
import java.io.InterruptedIOException;
@@ -40,7 +40,6 @@ import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.Encryptor;
import org.apache.hadoop.fs.Path;
http://git-wip-us.apache.org/repos/asf/hbase/blob/46866d9f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java
index e9e3a97..9860774 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java
@@ -50,7 +50,7 @@ public class RecoverStandbyProcedure extends AbstractPeerProcedure<RecoverStandb
switch (state) {
case RENAME_SYNC_REPLICATION_WALS_DIR:
try {
- replaySyncReplicationWALManager.renamePeerRemoteWALDir(peerId);
+ replaySyncReplicationWALManager.renameToPeerReplayWALDir(peerId);
} catch (IOException e) {
LOG.warn("Failed to rename remote wal dir for peer id={}", peerId, e);
setFailure("master-recover-standby", e);
@@ -67,11 +67,11 @@ public class RecoverStandbyProcedure extends AbstractPeerProcedure<RecoverStandb
.map(wal -> new ReplaySyncReplicationWALProcedure(peerId,
replaySyncReplicationWALManager.removeWALRootPath(wal)))
.toArray(ReplaySyncReplicationWALProcedure[]::new));
- setNextState(RecoverStandbyState.REMOVE_SYNC_REPLICATION_WALS_DIR);
+ setNextState(RecoverStandbyState.SNAPSHOT_SYNC_REPLICATION_WALS_DIR);
return Flow.HAS_MORE_STATE;
- case REMOVE_SYNC_REPLICATION_WALS_DIR:
+ case SNAPSHOT_SYNC_REPLICATION_WALS_DIR:
try {
- replaySyncReplicationWALManager.removePeerReplayWALDir(peerId);
+ replaySyncReplicationWALManager.renameToPeerSnapshotWALDir(peerId);
} catch (IOException e) {
LOG.warn("Failed to cleanup replay wals dir for peer id={}, , retry", peerId, e);
throw new ProcedureYieldException();
@@ -85,7 +85,7 @@ public class RecoverStandbyProcedure extends AbstractPeerProcedure<RecoverStandb
private List<Path> getReplayWALs(ReplaySyncReplicationWALManager replaySyncReplicationWALManager)
throws ProcedureYieldException {
try {
- return replaySyncReplicationWALManager.getReplayWALs(peerId);
+ return replaySyncReplicationWALManager.getReplayWALsAndCleanUpUnusedFiles(peerId);
} catch (IOException e) {
LOG.warn("Failed to get replay wals for peer id={}, , retry", peerId, e);
throw new ProcedureYieldException();
http://git-wip-us.apache.org/repos/asf/hbase/blob/46866d9f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
index 7335fe0..254448a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
@@ -67,10 +67,7 @@ public class RemovePeerProcedure extends ModifyPeerProcedure {
}
private void removeRemoteWALs(MasterProcedureEnv env) throws IOException {
- ReplaySyncReplicationWALManager remoteWALManager =
- env.getMasterServices().getReplaySyncReplicationWALManager();
- remoteWALManager.removePeerRemoteWALs(peerId);
- remoteWALManager.removePeerReplayWALDir(peerId);
+ env.getMasterServices().getReplaySyncReplicationWALManager().removePeerRemoteWALs(peerId);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/46866d9f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java
index eac5aa4..348c134 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java
@@ -17,6 +17,10 @@
*/
package org.apache.hadoop.hbase.master.replication;
+import static org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerRemoteWALDir;
+import static org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerReplayWALDir;
+import static org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerSnapshotWALDir;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -25,31 +29,27 @@ import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
+import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
@InterfaceAudience.Private
public class ReplaySyncReplicationWALManager {
private static final Logger LOG =
LoggerFactory.getLogger(ReplaySyncReplicationWALManager.class);
- private static final String REPLAY_SUFFIX = "-replay";
-
private final MasterServices services;
- private final Configuration conf;
-
private final FileSystem fs;
private final Path walRootDir;
@@ -60,69 +60,86 @@ public class ReplaySyncReplicationWALManager {
public ReplaySyncReplicationWALManager(MasterServices services) {
this.services = services;
- this.conf = services.getConfiguration();
this.fs = services.getMasterFileSystem().getWALFileSystem();
this.walRootDir = services.getMasterFileSystem().getWALRootDir();
this.remoteWALDir = new Path(this.walRootDir, ReplicationUtils.REMOTE_WAL_DIR_NAME);
}
- public Path getPeerRemoteWALDir(String peerId) {
- return new Path(this.remoteWALDir, peerId);
- }
-
- private Path getPeerReplayWALDir(String peerId) {
- return getPeerRemoteWALDir(peerId).suffix(REPLAY_SUFFIX);
- }
-
public void createPeerRemoteWALDir(String peerId) throws IOException {
- Path peerRemoteWALDir = getPeerRemoteWALDir(peerId);
+ Path peerRemoteWALDir = getPeerRemoteWALDir(remoteWALDir, peerId);
if (!fs.exists(peerRemoteWALDir) && !fs.mkdirs(peerRemoteWALDir)) {
throw new IOException("Unable to mkdir " + peerRemoteWALDir);
}
}
- public void renamePeerRemoteWALDir(String peerId) throws IOException {
- Path peerRemoteWALDir = getPeerRemoteWALDir(peerId);
- Path peerReplayWALDir = peerRemoteWALDir.suffix(REPLAY_SUFFIX);
- if (fs.exists(peerRemoteWALDir)) {
- if (!fs.rename(peerRemoteWALDir, peerReplayWALDir)) {
- throw new IOException("Failed rename remote wal dir from " + peerRemoteWALDir + " to "
- + peerReplayWALDir + " for peer id=" + peerId);
+ private void rename(Path src, Path dst, String peerId) throws IOException {
+ if (fs.exists(src)) {
+ deleteDir(dst, peerId);
+ if (!fs.rename(src, dst)) {
+ throw new IOException(
+ "Failed to rename dir from " + src + " to " + dst + " for peer id=" + peerId);
}
- LOG.info("Rename remote wal dir from {} to {} for peer id={}", remoteWALDir, peerReplayWALDir,
- peerId);
- } else if (!fs.exists(peerReplayWALDir)) {
- throw new IOException("Remote wal dir " + peerRemoteWALDir + " and replay wal dir "
- + peerReplayWALDir + " not exist for peer id=" + peerId);
+ LOG.info("Renamed dir from {} to {} for peer id={}", src, dst, peerId);
+ } else if (!fs.exists(dst)) {
+ throw new IOException(
+ "Want to rename from " + src + " to " + dst + ", but they both do not exist");
}
}
- public List<Path> getReplayWALs(String peerId) throws IOException {
- Path peerReplayWALDir = getPeerReplayWALDir(peerId);
- List<Path> replayWals = new ArrayList<>();
- RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(peerReplayWALDir, false);
- while (iterator.hasNext()) {
- replayWals.add(iterator.next().getPath());
+ public void renameToPeerReplayWALDir(String peerId) throws IOException {
+ rename(getPeerRemoteWALDir(remoteWALDir, peerId), getPeerReplayWALDir(remoteWALDir, peerId),
+ peerId);
+ }
+
+ public void renameToPeerSnapshotWALDir(String peerId) throws IOException {
+ rename(getPeerReplayWALDir(remoteWALDir, peerId), getPeerSnapshotWALDir(remoteWALDir, peerId),
+ peerId);
+ }
+
+ public List<Path> getReplayWALsAndCleanUpUnusedFiles(String peerId) throws IOException {
+ Path peerReplayWALDir = getPeerReplayWALDir(remoteWALDir, peerId);
+ for (FileStatus status : fs.listStatus(peerReplayWALDir,
+ p -> p.getName().endsWith(ReplicationUtils.RENAME_WAL_SUFFIX))) {
+ Path src = status.getPath();
+ String srcName = src.getName();
+ String dstName =
+ srcName.substring(0, srcName.length() - ReplicationUtils.RENAME_WAL_SUFFIX.length());
+ FSUtils.renameFile(fs, src, new Path(src.getParent(), dstName));
+ }
+ List<Path> wals = new ArrayList<>();
+ for (FileStatus status : fs.listStatus(peerReplayWALDir)) {
+ Path path = status.getPath();
+ if (path.getName().endsWith(ReplicationUtils.SYNC_WAL_SUFFIX)) {
+ wals.add(path);
+ } else {
+ if (!fs.delete(path, true)) {
+ LOG.warn("Can not delete unused file: " + path);
+ }
+ }
}
- return replayWals;
+ return wals;
}
- public void removePeerReplayWALDir(String peerId) throws IOException {
- Path peerReplayWALDir = getPeerReplayWALDir(peerId);
+ public void snapshotPeerReplayWALDir(String peerId) throws IOException {
+ Path peerReplayWALDir = getPeerReplayWALDir(remoteWALDir, peerId);
if (fs.exists(peerReplayWALDir) && !fs.delete(peerReplayWALDir, true)) {
throw new IOException(
"Failed to remove replay wals dir " + peerReplayWALDir + " for peer id=" + peerId);
}
}
- public void removePeerRemoteWALs(String peerId) throws IOException {
- Path remoteWALDir = getPeerRemoteWALDir(peerId);
- if (fs.exists(remoteWALDir) && !fs.delete(remoteWALDir, true)) {
- throw new IOException(
- "Failed to remove remote WALs dir " + remoteWALDir + " for peer id=" + peerId);
+ private void deleteDir(Path dir, String peerId) throws IOException {
+ if (!fs.delete(dir, true) && fs.exists(dir)) {
+ throw new IOException("Failed to remove dir " + dir + " for peer id=" + peerId);
}
}
+ public void removePeerRemoteWALs(String peerId) throws IOException {
+ deleteDir(getPeerRemoteWALDir(remoteWALDir, peerId), peerId);
+ deleteDir(getPeerReplayWALDir(remoteWALDir, peerId), peerId);
+ deleteDir(getPeerSnapshotWALDir(remoteWALDir, peerId), peerId);
+ }
+
public void initPeerWorkers(String peerId) {
BlockingQueue<ServerName> servers = new LinkedBlockingQueue<>();
services.getServerManager().getOnlineServers().keySet()
@@ -144,4 +161,9 @@ public class ReplaySyncReplicationWALManager {
// remove the "/" too.
return pathStr.substring(walRootDir.toString().length() + 1);
}
+
+ @VisibleForTesting
+ public Path getRemoteWALDir() {
+ return remoteWALDir;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/46866d9f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
index ebe7a93..81ee6b6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
@@ -118,7 +118,7 @@ public class TransitPeerSyncReplicationStateProcedure
env.getReplicationPeerManager().preTransitPeerSyncReplicationState(peerId, toState);
if (toState == SyncReplicationState.ACTIVE) {
Path remoteWALDirForPeer =
- ReplicationUtils.getRemoteWALDirForPeer(desc.getPeerConfig().getRemoteWALDir(), peerId);
+ ReplicationUtils.getPeerRemoteWALDir(desc.getPeerConfig().getRemoteWALDir(), peerId);
// check whether the remote wal directory is present
if (!remoteWALDirForPeer.getFileSystem(env.getMasterConfiguration())
.exists(remoteWALDirForPeer)) {
@@ -152,7 +152,7 @@ public class TransitPeerSyncReplicationStateProcedure
throws ProcedureYieldException, IOException {
MasterFileSystem mfs = env.getMasterFileSystem();
Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME);
- Path remoteWALDirForPeer = ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId);
+ Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId);
FileSystem walFs = mfs.getWALFileSystem();
if (walFs.exists(remoteWALDirForPeer)) {
LOG.warn("Wal dir {} already exists, usually this should not happen, continue anyway",
http://git-wip-us.apache.org/repos/asf/hbase/blob/46866d9f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 5052a0b..90f3099 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1957,8 +1957,7 @@ public class HRegionServer extends HasThread implements
sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
if (this.csm != null) {
// SplitLogWorker needs csm. If none, don't start this.
- this.splitLogWorker = new SplitLogWorker(this, sinkConf, this,
- this, walFactory);
+ this.splitLogWorker = new SplitLogWorker(sinkConf, this, this, walFactory);
splitLogWorker.start();
} else {
LOG.warn("SplitLogWorker Service NOT started; CoordinatedStateManager is null");
http://git-wip-us.apache.org/repos/asf/hbase/blob/46866d9f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
index 4529943..09ec477 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.regionserver.PeerProcedureHandler;
import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider;
import org.apache.yetus.audience.InterfaceAudience;
@@ -37,4 +38,9 @@ public interface ReplicationSourceService extends ReplicationService {
* Returns a Handler to handle peer procedures.
*/
PeerProcedureHandler getPeerProcedureHandler();
+
+ /**
+ * Return the replication peers.
+ */
+ ReplicationPeers getReplicationPeers();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/46866d9f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
index a1c2030..4a9712c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
@@ -23,22 +23,31 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.Optional;
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
-import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor.Status;
+import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
@@ -67,67 +76,133 @@ public class SplitLogWorker implements Runnable {
Thread worker;
// thread pool which executes recovery work
private SplitLogWorkerCoordination coordination;
- private Configuration conf;
private RegionServerServices server;
public SplitLogWorker(Server hserver, Configuration conf, RegionServerServices server,
TaskExecutor splitTaskExecutor) {
this.server = server;
- this.conf = conf;
this.coordination = hserver.getCoordinatedStateManager().getSplitLogWorkerCoordination();
coordination.init(server, conf, splitTaskExecutor, this);
}
- public SplitLogWorker(final Server hserver, final Configuration conf,
- final RegionServerServices server, final LastSequenceId sequenceIdChecker,
- final WALFactory factory) {
- this(hserver, conf, server, new TaskExecutor() {
- @Override
- public Status exec(String filename, CancelableProgressable p) {
- Path walDir;
- FileSystem fs;
- try {
- walDir = FSUtils.getWALRootDir(conf);
- fs = walDir.getFileSystem(conf);
- } catch (IOException e) {
- LOG.warn("could not find root dir or fs", e);
- return Status.RESIGNED;
- }
- // TODO have to correctly figure out when log splitting has been
- // interrupted or has encountered a transient error and when it has
- // encountered a bad non-retry-able persistent error.
- try {
- if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, filename)),
- fs, conf, p, sequenceIdChecker,
- server.getCoordinatedStateManager().getSplitLogWorkerCoordination(), factory)) {
- return Status.PREEMPTED;
- }
- } catch (InterruptedIOException iioe) {
- LOG.warn("log splitting of " + filename + " interrupted, resigning", iioe);
- return Status.RESIGNED;
- } catch (IOException e) {
- if (e instanceof FileNotFoundException) {
- // A wal file may not exist anymore. Nothing can be recovered so move on
- LOG.warn("WAL {} does not exist anymore", filename, e);
- return Status.DONE;
- }
- Throwable cause = e.getCause();
- if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException
- || cause instanceof ConnectException
- || cause instanceof SocketTimeoutException)) {
- LOG.warn("log replaying of " + filename + " can't connect to the target regionserver, "
- + "resigning", e);
- return Status.RESIGNED;
- } else if (cause instanceof InterruptedException) {
- LOG.warn("log splitting of " + filename + " interrupted, resigning", e);
- return Status.RESIGNED;
- }
- LOG.warn("log splitting of " + filename + " failed, returning error", e);
- return Status.ERR;
- }
+ public SplitLogWorker(Configuration conf, RegionServerServices server,
+ LastSequenceId sequenceIdChecker, WALFactory factory) {
+ this(server, conf, server, (f, p) -> splitLog(f, p, conf, server, sequenceIdChecker, factory));
+ }
+
+ // returns whether we need to continue the split work
+ private static boolean processSyncReplicationWAL(String name, Configuration conf,
+ RegionServerServices server, FileSystem fs, Path walDir) throws IOException {
+ Path walFile = new Path(walDir, name);
+ String filename = walFile.getName();
+ Optional<String> optSyncPeerId =
+ SyncReplicationWALProvider.getSyncReplicationPeerIdFromWALName(filename);
+ if (!optSyncPeerId.isPresent()) {
+ return true;
+ }
+ String peerId = optSyncPeerId.get();
+ ReplicationPeerImpl peer =
+ server.getReplicationSourceService().getReplicationPeers().getPeer(peerId);
+ if (peer == null || !peer.getPeerConfig().isSyncReplication()) {
+ return true;
+ }
+ Pair<SyncReplicationState, SyncReplicationState> stateAndNewState =
+ peer.getSyncReplicationStateAndNewState();
+ if (stateAndNewState.getFirst().equals(SyncReplicationState.ACTIVE) &&
+ stateAndNewState.getSecond().equals(SyncReplicationState.NONE)) {
+ // copy the file to remote and overwrite the previous one
+ String remoteWALDir = peer.getPeerConfig().getRemoteWALDir();
+ Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId);
+ Path tmpRemoteWAL = new Path(remoteWALDirForPeer, filename + ".tmp");
+ FileSystem remoteFs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir);
+ try (FSDataInputStream in = fs.open(walFile); @SuppressWarnings("deprecation")
+ FSDataOutputStream out = remoteFs.createNonRecursive(tmpRemoteWAL, true,
+ FSUtils.getDefaultBufferSize(remoteFs), remoteFs.getDefaultReplication(tmpRemoteWAL),
+ remoteFs.getDefaultBlockSize(tmpRemoteWAL), null)) {
+ IOUtils.copy(in, out);
+ }
+ Path toCommitRemoteWAL =
+ new Path(remoteWALDirForPeer, filename + ReplicationUtils.RENAME_WAL_SUFFIX);
+ // Some FileSystem implementations may not support atomic rename so we need to do it in two
+ // phases
+ FSUtils.renameFile(remoteFs, tmpRemoteWAL, toCommitRemoteWAL);
+ FSUtils.renameFile(remoteFs, toCommitRemoteWAL, new Path(remoteWALDirForPeer, filename));
+ } else if ((stateAndNewState.getFirst().equals(SyncReplicationState.ACTIVE) &&
+ stateAndNewState.getSecond().equals(SyncReplicationState.STANDBY)) ||
+ stateAndNewState.getFirst().equals(SyncReplicationState.STANDBY)) {
+ // check whether we still need to process this file
+ // actually we only write wal file which name is ended with .syncrep in A state, and after
+ // transiting to a state other than A, we will reopen all the regions so the data in the wal
+ // will be flushed so the wal file will be archived soon. But it is still possible that there
+ // is a server crash when we are transiting from A to S, to simplify the logic of the transit
+ // procedure, here we will also check the remote snapshot directory in state S, so that we do
+ // not need wait until all the wal files with .syncrep suffix to be archived before finishing
+ // the procedure.
+ String remoteWALDir = peer.getPeerConfig().getRemoteWALDir();
+ Path remoteSnapshotDirForPeer = ReplicationUtils.getPeerSnapshotWALDir(remoteWALDir, peerId);
+ FileSystem remoteFs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir);
+ if (remoteFs.exists(new Path(remoteSnapshotDirForPeer, filename))) {
+ // the file has been replayed when the remote cluster was transited from S to DA, the
+ // content will be replicated back to us so give up split it.
+ LOG.warn("Giveup splitting {} since it has been replayed in the remote cluster and " +
+ "the content will be replicated back", filename);
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static Status splitLog(String name, CancelableProgressable p, Configuration conf,
+ RegionServerServices server, LastSequenceId sequenceIdChecker, WALFactory factory) {
+ Path walDir;
+ FileSystem fs;
+ try {
+ walDir = FSUtils.getWALRootDir(conf);
+ fs = walDir.getFileSystem(conf);
+ } catch (IOException e) {
+ LOG.warn("could not find root dir or fs", e);
+ return Status.RESIGNED;
+ }
+ try {
+ if (!processSyncReplicationWAL(name, conf, server, fs, walDir)) {
return Status.DONE;
}
- });
+ } catch (IOException e) {
+ LOG.warn("failed to process sync replication wal {}", name, e);
+ return Status.RESIGNED;
+ }
+ // TODO have to correctly figure out when log splitting has been
+ // interrupted or has encountered a transient error and when it has
+ // encountered a bad non-retry-able persistent error.
+ try {
+ if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, name)), fs, conf,
+ p, sequenceIdChecker, server.getCoordinatedStateManager().getSplitLogWorkerCoordination(),
+ factory)) {
+ return Status.PREEMPTED;
+ }
+ } catch (InterruptedIOException iioe) {
+ LOG.warn("log splitting of " + name + " interrupted, resigning", iioe);
+ return Status.RESIGNED;
+ } catch (IOException e) {
+ if (e instanceof FileNotFoundException) {
+ // A wal file may not exist anymore. Nothing can be recovered so move on
+ LOG.warn("WAL {} does not exist anymore", name, e);
+ return Status.DONE;
+ }
+ Throwable cause = e.getCause();
+ if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException ||
+ cause instanceof ConnectException || cause instanceof SocketTimeoutException)) {
+ LOG.warn("log replaying of " + name + " can't connect to the target regionserver, " +
+ "resigning", e);
+ return Status.RESIGNED;
+ } else if (cause instanceof InterruptedException) {
+ LOG.warn("log splitting of " + name + " interrupted, resigning", e);
+ return Status.RESIGNED;
+ }
+ LOG.warn("log splitting of " + name + " failed, returning error", e);
+ return Status.ERR;
+ }
+ return Status.DONE;
}
@Override
@@ -191,6 +266,7 @@ public class SplitLogWorker implements Runnable {
* {@link org.apache.hadoop.hbase.master.SplitLogManager} commit the work in
* SplitLogManager.TaskFinisher
*/
+ @FunctionalInterface
public interface TaskExecutor {
enum Status {
DONE(),
http://git-wip-us.apache.org/repos/asf/hbase/blob/46866d9f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java
index 8ecfede..4301ae7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java
@@ -32,13 +32,13 @@ import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
* An {@link AsyncWriter} wrapper which writes data to a set of {@link AsyncWriter} instances.
*/
@InterfaceAudience.Private
-public abstract class CombinedAsyncWriter implements AsyncWriter {
+public final class CombinedAsyncWriter implements AsyncWriter {
private static final Logger LOG = LoggerFactory.getLogger(CombinedAsyncWriter.class);
- protected final ImmutableList<AsyncWriter> writers;
+ private final ImmutableList<AsyncWriter> writers;
- protected CombinedAsyncWriter(ImmutableList<AsyncWriter> writers) {
+ private CombinedAsyncWriter(ImmutableList<AsyncWriter> writers) {
this.writers = writers;
}
@@ -66,69 +66,29 @@ public abstract class CombinedAsyncWriter implements AsyncWriter {
}
}
- protected abstract void doSync(CompletableFuture<Long> future);
-
- @Override
- public CompletableFuture<Long> sync() {
- CompletableFuture<Long> future = new CompletableFuture<>();
- doSync(future);
- return future;
- }
-
@Override
public void append(Entry entry) {
writers.forEach(w -> w.append(entry));
}
- public enum Mode {
- SEQUENTIAL, PARALLEL
+ @Override
+ public CompletableFuture<Long> sync() {
+ CompletableFuture<Long> future = new CompletableFuture<>();
+ AtomicInteger remaining = new AtomicInteger(writers.size());
+ writers.forEach(w -> w.sync().whenComplete((length, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ return;
+ }
+ if (remaining.decrementAndGet() == 0) {
+ future.complete(length);
+ }
+ }));
+ return future;
}
- public static CombinedAsyncWriter create(Mode mode, AsyncWriter writer, AsyncWriter... writers) {
- ImmutableList<AsyncWriter> ws =
- ImmutableList.<AsyncWriter> builder().add(writer).add(writers).build();
- switch (mode) {
- case SEQUENTIAL:
- return new CombinedAsyncWriter(ws) {
-
- private void doSync(CompletableFuture<Long> future, Long length, int index) {
- if (index == writers.size()) {
- future.complete(length);
- return;
- }
- writers.get(index).sync().whenComplete((len, error) -> {
- if (error != null) {
- future.completeExceptionally(error);
- return;
- }
- doSync(future, len, index + 1);
- });
- }
-
- @Override
- protected void doSync(CompletableFuture<Long> future) {
- doSync(future, null, 0);
- }
- };
- case PARALLEL:
- return new CombinedAsyncWriter(ws) {
-
- @Override
- protected void doSync(CompletableFuture<Long> future) {
- AtomicInteger remaining = new AtomicInteger(writers.size());
- writers.forEach(w -> w.sync().whenComplete((length, error) -> {
- if (error != null) {
- future.completeExceptionally(error);
- return;
- }
- if (remaining.decrementAndGet() == 0) {
- future.complete(length);
- }
- }));
- }
- };
- default:
- throw new IllegalArgumentException("Unknown mode: " + mode);
- }
+ public static CombinedAsyncWriter create(AsyncWriter writer, AsyncWriter... writers) {
+ return new CombinedAsyncWriter(
+ ImmutableList.<AsyncWriter> builder().add(writer).add(writers).build());
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/46866d9f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java
index a98567a..3967e78 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
@@ -50,6 +51,13 @@ public class DualAsyncFSWAL extends AsyncFSWAL {
this.remoteWalDir = remoteWalDir;
}
+ // will be overridden in testcase
+ @VisibleForTesting
+ protected AsyncWriter createCombinedAsyncWriter(AsyncWriter localWriter,
+ AsyncWriter remoteWriter) {
+ return CombinedAsyncWriter.create(remoteWriter, localWriter);
+ }
+
@Override
protected AsyncWriter createWriterInstance(Path path) throws IOException {
AsyncWriter localWriter = super.createWriterInstance(path);
@@ -66,8 +74,7 @@ public class DualAsyncFSWAL extends AsyncFSWAL {
closeWriter(localWriter);
}
}
- return CombinedAsyncWriter.create(CombinedAsyncWriter.Mode.SEQUENTIAL, remoteWriter,
- localWriter);
+ return createCombinedAsyncWriter(localWriter, remoteWriter);
}
// Allow temporarily skipping the creation of remote writer. When failing to write to the remote
http://git-wip-us.apache.org/repos/asf/hbase/blob/46866d9f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 2199415..b04f0cb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -288,4 +288,9 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
public SyncReplicationPeerInfoProvider getSyncReplicationPeerInfoProvider() {
return syncReplicationPeerInfoProvider;
}
+
+ @Override
+ public ReplicationPeers getReplicationPeers() {
+ return replicationPeers;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/46866d9f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index f25b073..827cfa9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -652,7 +652,7 @@ public class ReplicationSourceManager implements ReplicationListener {
private void removeRemoteWALs(String peerId, String remoteWALDir, Collection<String> wals)
throws IOException {
- Path remoteWALDirForPeer = ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId);
+ Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId);
FileSystem fs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir);
for (String wal : wals) {
Path walFile = new Path(remoteWALDirForPeer, wal);
http://git-wip-us.apache.org/repos/asf/hbase/blob/46866d9f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
index 75274ea..170441b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
@@ -77,8 +77,7 @@ class SyncReplicationPeerInfoProviderImpl implements SyncReplicationPeerInfoProv
return false;
}
Pair<SyncReplicationState, SyncReplicationState> states =
- peer.getSyncReplicationStateAndNewState();
+ peer.getSyncReplicationStateAndNewState();
return checker.test(states.getFirst(), states.getSecond());
}
-
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/46866d9f/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
index 8a1f948..5b968db 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
@@ -843,6 +843,15 @@ public abstract class FSUtils extends CommonFSUtils {
return frags;
}
+ public static void renameFile(FileSystem fs, Path src, Path dst) throws IOException {
+ if (fs.exists(dst) && !fs.delete(dst, false)) {
+ throw new IOException("Can not delete " + dst);
+ }
+ if (!fs.rename(src, dst)) {
+ throw new IOException("Can not rename from " + src + " to " + dst);
+ }
+ }
+
/**
* A {@link PathFilter} that returns only regular files.
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/46866d9f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
index 8e82d8b..82f8a89 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
@@ -21,6 +21,8 @@ import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALArchiveDir
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALDirectoryName;
import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@@ -51,6 +53,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.com.google.common.collect.Streams;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
@@ -67,8 +70,9 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
private static final Logger LOG = LoggerFactory.getLogger(SyncReplicationWALProvider.class);
+ // only for injecting errors for testcase, do not use it for other purpose.
@VisibleForTesting
- public static final String LOG_SUFFIX = ".syncrep";
+ public static final String DUAL_WAL_IMPL = "hbase.wal.sync.impl";
private final WALProvider provider;
@@ -126,12 +130,35 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
}
private DualAsyncFSWAL createWAL(String peerId, String remoteWALDir) throws IOException {
- return new DualAsyncFSWAL(CommonFSUtils.getWALFileSystem(conf),
- ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir),
- CommonFSUtils.getWALRootDir(conf),
- ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId),
- getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId),
- conf, listeners, true, getLogPrefix(peerId), LOG_SUFFIX, eventLoopGroup, channelClass);
+ Class<? extends DualAsyncFSWAL> clazz =
+ conf.getClass(DUAL_WAL_IMPL, DualAsyncFSWAL.class, DualAsyncFSWAL.class);
+ try {
+ Constructor<?> constructor = null;
+ for (Constructor<?> c : clazz.getDeclaredConstructors()) {
+ if (c.getParameterCount() > 0) {
+ constructor = c;
+ break;
+ }
+ }
+ if (constructor == null) {
+ throw new IllegalArgumentException("No valid constructor provided for class " + clazz);
+ }
+ constructor.setAccessible(true);
+ return (DualAsyncFSWAL) constructor.newInstance(
+ CommonFSUtils.getWALFileSystem(conf),
+ ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir),
+ CommonFSUtils.getWALRootDir(conf),
+ ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId),
+ getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId),
+ conf, listeners, true, getLogPrefix(peerId), ReplicationUtils.SYNC_WAL_SUFFIX,
+ eventLoopGroup, channelClass);
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new RuntimeException(e);
+ } catch (InvocationTargetException e) {
+ Throwable cause = e.getTargetException();
+ Throwables.propagateIfPossible(cause, IOException.class);
+ throw new RuntimeException(cause);
+ }
}
private DualAsyncFSWAL getWAL(String peerId, String remoteWALDir) throws IOException {
@@ -304,7 +331,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
* </p>
*/
public static Optional<String> getSyncReplicationPeerIdFromWALName(String name) {
- if (!name.endsWith(LOG_SUFFIX)) {
+ if (!name.endsWith(ReplicationUtils.SYNC_WAL_SUFFIX)) {
// fast path to return earlier if the name is not for a sync replication peer.
return Optional.empty();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/46866d9f/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index c6ffeea..6462234 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -1141,7 +1141,7 @@ public class TestReplicationAdmin {
LOG.info("Expected error:", e);
}
TEST_UTIL.getTestFileSystem()
- .mkdirs(ReplicationUtils.getRemoteWALDirForPeer(rootDir, ID_SECOND));
+ .mkdirs(ReplicationUtils.getPeerRemoteWALDir(rootDir, ID_SECOND));
hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE);
assertEquals(SyncReplicationState.ACTIVE,
hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
http://git-wip-us.apache.org/repos/asf/hbase/blob/46866d9f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java
index 07aa6a8..f73b4f1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java
@@ -18,8 +18,6 @@
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -39,23 +37,18 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
-@RunWith(Parameterized.class)
@Category({ RegionServerTests.class, MediumTests.class })
public class TestCombinedAsyncWriter {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestCombinedAsyncWriter.class);
+ HBaseClassTestRule.forClass(TestCombinedAsyncWriter.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
@@ -68,15 +61,6 @@ public class TestCombinedAsyncWriter {
@Rule
public final TestName name = new TestName();
- @Parameter
- public CombinedAsyncWriter.Mode mode;
-
- @Parameters(name = "{index}: mode={0}")
- public static List<Object[]> params() {
- return Arrays.asList(new Object[] { CombinedAsyncWriter.Mode.SEQUENTIAL },
- new Object[] { CombinedAsyncWriter.Mode.PARALLEL });
- }
-
@BeforeClass
public static void setUpBeforeClass() throws Exception {
EVENT_LOOP_GROUP = new NioEventLoopGroup();
@@ -125,7 +109,7 @@ public class TestCombinedAsyncWriter {
EVENT_LOOP_GROUP.next(), CHANNEL_CLASS);
AsyncWriter writer2 = AsyncFSWALProvider.createAsyncWriter(conf, fs, path2, false,
EVENT_LOOP_GROUP.next(), CHANNEL_CLASS);
- CombinedAsyncWriter writer = CombinedAsyncWriter.create(mode, writer1, writer2)) {
+ CombinedAsyncWriter writer = CombinedAsyncWriter.create(writer1, writer2)) {
ProtobufLogTestHelper.doWrite(new WriterOverAsyncWriter(writer), withTrailer, tableName,
columnCount, recordCount, row, timestamp);
try (ProtobufLogReader reader = (ProtobufLogReader) WALS.createReader(fs, path1)) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/46866d9f/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java
new file mode 100644
index 0000000..fb3daf2
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
+import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
+
+class DualAsyncFSWALForTest extends DualAsyncFSWAL {
+
+ private boolean localBroken;
+
+ private boolean remoteBroken;
+
+ private CountDownLatch arrive;
+
+ private CountDownLatch resume;
+
+ private final class MyCombinedAsyncWriter implements AsyncWriter {
+
+ private final AsyncWriter localWriter;
+
+ private final AsyncWriter remoteWriter;
+
+ public MyCombinedAsyncWriter(AsyncWriter localWriter, AsyncWriter remoteWriter) {
+ this.localWriter = localWriter;
+ this.remoteWriter = remoteWriter;
+ }
+
+ @Override
+ public long getLength() {
+ return localWriter.getLength();
+ }
+
+ @Override
+ public void close() throws IOException {
+ Closeables.close(localWriter, true);
+ Closeables.close(remoteWriter, true);
+ }
+
+ @Override
+ public CompletableFuture<Long> sync() {
+ CompletableFuture<Long> localFuture;
+ CompletableFuture<Long> remoteFuture;
+ if (!localBroken) {
+ localFuture = localWriter.sync();
+ } else {
+ localFuture = new CompletableFuture<>();
+ localFuture.completeExceptionally(new IOException("Inject error"));
+ }
+ if (!remoteBroken) {
+ remoteFuture = remoteWriter.sync();
+ } else {
+ remoteFuture = new CompletableFuture<>();
+ remoteFuture.completeExceptionally(new IOException("Inject error"));
+ }
+ return CompletableFuture.allOf(localFuture, remoteFuture).thenApply(v -> {
+ return localFuture.getNow(0L);
+ });
+ }
+
+ @Override
+ public void append(Entry entry) {
+ if (!localBroken) {
+ localWriter.append(entry);
+ }
+ if (!remoteBroken) {
+ remoteWriter.append(entry);
+ }
+ }
+ }
+
+ public DualAsyncFSWALForTest(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteWalDir,
+ String logDir, String archiveDir, Configuration conf, List<WALActionsListener> listeners,
+ boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup,
+ Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
+ super(fs, remoteFs, rootDir, remoteWalDir, logDir, archiveDir, conf, listeners, failIfWALExists,
+ prefix, suffix, eventLoopGroup, channelClass);
+ }
+
+ @Override
+ protected AsyncWriter createCombinedAsyncWriter(AsyncWriter localWriter,
+ AsyncWriter remoteWriter) {
+ return new MyCombinedAsyncWriter(localWriter, remoteWriter);
+ }
+
+ @Override
+ protected AsyncWriter createWriterInstance(Path path) throws IOException {
+ if (arrive != null) {
+ arrive.countDown();
+ try {
+ resume.await();
+ } catch (InterruptedException e) {
+ }
+ }
+ if (localBroken || remoteBroken) {
+ throw new IOException("WAL broken");
+ }
+ return super.createWriterInstance(path);
+ }
+
+ public void setLocalBroken() {
+ this.localBroken = true;
+ }
+
+ public void setRemoteBroken() {
+ this.remoteBroken = true;
+ }
+
+ public void suspendLogRoll() {
+ arrive = new CountDownLatch(1);
+ resume = new CountDownLatch(1);
+ }
+
+ public void waitUntilArrive() throws InterruptedException {
+ arrive.await();
+ }
+
+ public void resumeLogRoll() {
+ resume.countDown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/46866d9f/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
index de679be..095be90 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
@@ -72,9 +72,9 @@ public class SyncReplicationTestBase {
protected static String PEER_ID = "1";
- protected static Path remoteWALDir1;
+ protected static Path REMOTE_WAL_DIR1;
- protected static Path remoteWALDir2;
+ protected static Path REMOTE_WAL_DIR2;
private static void initTestingUtility(HBaseTestingUtility util, String zkParent) {
util.setZkCluster(ZK_UTIL.getZkCluster());
@@ -109,22 +109,22 @@ public class SyncReplicationTestBase {
UTIL2.getAdmin().createTable(td);
FileSystem fs1 = UTIL1.getTestFileSystem();
FileSystem fs2 = UTIL2.getTestFileSystem();
- remoteWALDir1 =
+ REMOTE_WAL_DIR1 =
new Path(UTIL1.getMiniHBaseCluster().getMaster().getMasterFileSystem().getWALRootDir(),
"remoteWALs").makeQualified(fs1.getUri(), fs1.getWorkingDirectory());
- remoteWALDir2 =
+ REMOTE_WAL_DIR2 =
new Path(UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem().getWALRootDir(),
"remoteWALs").makeQualified(fs2.getUri(), fs2.getWorkingDirectory());
UTIL1.getAdmin().addReplicationPeer(PEER_ID,
ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey())
.setReplicateAllUserTables(false)
.setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>()))
- .setRemoteWALDir(remoteWALDir2.toUri().toString()).build());
+ .setRemoteWALDir(REMOTE_WAL_DIR2.toUri().toString()).build());
UTIL2.getAdmin().addReplicationPeer(PEER_ID,
ReplicationPeerConfig.newBuilder().setClusterKey(UTIL1.getClusterKey())
.setReplicateAllUserTables(false)
.setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>()))
- .setRemoteWALDir(remoteWALDir1.toUri().toString()).build());
+ .setRemoteWALDir(REMOTE_WAL_DIR1.toUri().toString()).build());
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/hbase/blob/46866d9f/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
index b663c44..fce0cdf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
@@ -37,8 +37,7 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestSyncReplicationActive.class);
-
+ HBaseClassTestRule.forClass(TestSyncReplicationActive.class);
@Test
public void testActive() throws Exception {
@@ -58,7 +57,7 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase {
verifyNotReplicatedThroughRegion(UTIL2, 0, 100);
// Ensure that there's no cluster id in remote log entries.
- verifyNoClusterIdInRemoteLog(UTIL2, remoteWALDir2, PEER_ID);
+ verifyNoClusterIdInRemoteLog(UTIL2, REMOTE_WAL_DIR2, PEER_ID);
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.DOWNGRADE_ACTIVE);
http://git-wip-us.apache.org/repos/asf/hbase/blob/46866d9f/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationMoreLogsInLocalCopyToRemote.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationMoreLogsInLocalCopyToRemote.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationMoreLogsInLocalCopyToRemote.java
new file mode 100644
index 0000000..cf8993b
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationMoreLogsInLocalCopyToRemote.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.ExecutionException;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestSyncReplicationMoreLogsInLocalCopyToRemote extends SyncReplicationTestBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestSyncReplicationMoreLogsInLocalCopyToRemote.class);
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestSyncReplicationMoreLogsInLocalCopyToRemote.class);
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ UTIL1.getConfiguration().setClass(SyncReplicationWALProvider.DUAL_WAL_IMPL,
+ DualAsyncFSWALForTest.class, DualAsyncFSWAL.class);
+ UTIL2.getConfiguration().setClass(SyncReplicationWALProvider.DUAL_WAL_IMPL,
+ DualAsyncFSWALForTest.class, DualAsyncFSWAL.class);
+ SyncReplicationTestBase.setUp();
+ }
+
+ @Test
+ public void testSplitLog() throws Exception {
+ UTIL1.getAdmin().disableReplicationPeer(PEER_ID);
+ UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.STANDBY);
+ UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.ACTIVE);
+ HRegionServer rs = UTIL1.getRSForFirstRegionInTable(TABLE_NAME);
+ DualAsyncFSWALForTest wal =
+ (DualAsyncFSWALForTest) rs.getWAL(RegionInfoBuilder.newBuilder(TABLE_NAME).build());
+ wal.setRemoteBroken();
+ try (AsyncConnection conn =
+ ConnectionFactory.createAsyncConnection(UTIL1.getConfiguration()).get()) {
+ AsyncTable<?> table = conn.getTableBuilder(TABLE_NAME).setMaxAttempts(1).build();
+ try {
+ table.put(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, Bytes.toBytes(0))).get();
+ fail("Should fail since the rs will crash and we will not retry");
+ } catch (ExecutionException e) {
+ // expected
+ LOG.info("Expected error:", e);
+ }
+ }
+ UTIL1.waitFor(60000, new ExplainingPredicate<Exception>() {
+
+ @Override
+ public boolean evaluate() throws Exception {
+ try (Table table = UTIL1.getConnection().getTable(TABLE_NAME)) {
+ return table.exists(new Get(Bytes.toBytes(0)));
+ }
+ }
+
+ @Override
+ public String explainFailure() throws Exception {
+ return "The row is still not available";
+ }
+ });
+ UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.DOWNGRADE_ACTIVE);
+ // We should have copied the local log to remote, so we should be able to get the value
+ try (Table table = UTIL2.getConnection().getTable(TABLE_NAME)) {
+ assertEquals(0, Bytes.toInt(table.get(new Get(Bytes.toBytes(0))).getValue(CF, CQ)));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/46866d9f/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationMoreLogsInLocalGiveUpSplitting.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationMoreLogsInLocalGiveUpSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationMoreLogsInLocalGiveUpSplitting.java
new file mode 100644
index 0000000..9a6d242
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationMoreLogsInLocalGiveUpSplitting.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestSyncReplicationMoreLogsInLocalGiveUpSplitting extends SyncReplicationTestBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestSyncReplicationMoreLogsInLocalGiveUpSplitting.class);
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestSyncReplicationMoreLogsInLocalGiveUpSplitting.class);
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ UTIL1.getConfiguration().setClass(SyncReplicationWALProvider.DUAL_WAL_IMPL,
+ DualAsyncFSWALForTest.class, DualAsyncFSWAL.class);
+ UTIL2.getConfiguration().setClass(SyncReplicationWALProvider.DUAL_WAL_IMPL,
+ DualAsyncFSWALForTest.class, DualAsyncFSWAL.class);
+ SyncReplicationTestBase.setUp();
+ }
+
+ @Test
+ public void testSplitLog() throws Exception {
+ UTIL1.getAdmin().disableReplicationPeer(PEER_ID);
+ UTIL2.getAdmin().disableReplicationPeer(PEER_ID);
+ UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.STANDBY);
+ UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.ACTIVE);
+ try (Table table = UTIL1.getConnection().getTable(TABLE_NAME)) {
+ table.put(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, Bytes.toBytes(0)));
+ }
+ HRegionServer rs = UTIL1.getRSForFirstRegionInTable(TABLE_NAME);
+ DualAsyncFSWALForTest wal =
+ (DualAsyncFSWALForTest) rs.getWAL(RegionInfoBuilder.newBuilder(TABLE_NAME).build());
+ wal.setRemoteBroken();
+ wal.suspendLogRoll();
+ try (AsyncConnection conn =
+ ConnectionFactory.createAsyncConnection(UTIL1.getConfiguration()).get()) {
+ AsyncTable<?> table = conn.getTableBuilder(TABLE_NAME).setMaxAttempts(1)
+ .setWriteRpcTimeout(5, TimeUnit.SECONDS).build();
+ try {
+ table.put(new Put(Bytes.toBytes(1)).addColumn(CF, CQ, Bytes.toBytes(1))).get();
+ fail("Should fail since the rs will hang and we will get a rpc timeout");
+ } catch (ExecutionException e) {
+ // expected
+ LOG.info("Expected error:", e);
+ }
+ }
+ wal.waitUntilArrive();
+ UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.DOWNGRADE_ACTIVE);
+ wal.resumeLogRoll();
+ try (Table table = UTIL2.getConnection().getTable(TABLE_NAME)) {
+ assertEquals(0, Bytes.toInt(table.get(new Get(Bytes.toBytes(0))).getValue(CF, CQ)));
+ // we failed to write this entry to remote so it should not exist
+ assertFalse(table.exists(new Get(Bytes.toBytes(1))));
+ }
+ UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.STANDBY);
+ // make sure that the region is online. We can not use waitTableAvailable since the table in
+ // stand by state can not be read from client.
+ try (Table table = UTIL1.getConnection().getTable(TABLE_NAME)) {
+ try {
+ table.exists(new Get(Bytes.toBytes(0)));
+ } catch (DoNotRetryIOException | RetriesExhaustedException e) {
+ // expected
+ assertThat(e.getMessage(), containsString("STANDBY"));
+ }
+ }
+ HRegion region = UTIL1.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
+ // we give up splitting the whole wal file so this record will also be gone.
+ assertTrue(region.get(new Get(Bytes.toBytes(0))).isEmpty());
+ UTIL2.getAdmin().enableReplicationPeer(PEER_ID);
+ // finally it should be replicated back
+ waitUntilReplicationDone(UTIL1, 1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/46866d9f/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java
index 7d380c1..0cd1846 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -66,12 +65,12 @@ public class TestSyncReplicationRemoveRemoteWAL extends SyncReplicationTestBase
SyncReplicationState.ACTIVE);
MasterFileSystem mfs = UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem();
- Path remoteWALDir = ReplicationUtils.getRemoteWALDirForPeer(
+ Path remoteWALDir = ReplicationUtils.getPeerRemoteWALDir(
new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME), PEER_ID);
FileStatus[] remoteWALStatus = mfs.getWALFileSystem().listStatus(remoteWALDir);
assertEquals(1, remoteWALStatus.length);
Path remoteWAL = remoteWALStatus[0].getPath();
- assertThat(remoteWAL.getName(), endsWith(SyncReplicationWALProvider.LOG_SUFFIX));
+ assertThat(remoteWAL.getName(), endsWith(ReplicationUtils.SYNC_WAL_SUFFIX));
writeAndVerifyReplication(UTIL1, UTIL2, 0, 100);
HRegionServer rs = UTIL1.getRSForFirstRegionInTable(TABLE_NAME);
@@ -81,7 +80,7 @@ public class TestSyncReplicationRemoveRemoteWAL extends SyncReplicationTestBase
remoteWALStatus = mfs.getWALFileSystem().listStatus(remoteWALDir);
assertEquals(1, remoteWALStatus.length);
remoteWAL = remoteWALStatus[0].getPath();
- assertThat(remoteWAL.getName(), endsWith(SyncReplicationWALProvider.LOG_SUFFIX));
+ assertThat(remoteWAL.getName(), endsWith(ReplicationUtils.SYNC_WAL_SUFFIX));
UTIL1.getAdmin().disableReplicationPeer(PEER_ID);
write(UTIL1, 100, 200);
http://git-wip-us.apache.org/repos/asf/hbase/blob/46866d9f/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java
index 8526af8..de409fc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java
@@ -97,25 +97,25 @@ public class TestSyncReplicationStandBy extends SyncReplicationTestBase {
writeAndVerifyReplication(UTIL1, UTIL2, 0, 100);
// Remove the peers in ACTIVE & STANDBY cluster.
- FileSystem fs2 = remoteWALDir2.getFileSystem(UTIL2.getConfiguration());
- Assert.assertTrue(fs2.exists(getRemoteWALDir(remoteWALDir2, PEER_ID)));
+ FileSystem fs2 = REMOTE_WAL_DIR2.getFileSystem(UTIL2.getConfiguration());
+ Assert.assertTrue(fs2.exists(getRemoteWALDir(REMOTE_WAL_DIR2, PEER_ID)));
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.DOWNGRADE_ACTIVE);
- Assert.assertFalse(fs2.exists(getRemoteWALDir(remoteWALDir2, PEER_ID)));
- Assert.assertFalse(fs2.exists(getReplayRemoteWALs(remoteWALDir2, PEER_ID)));
+ Assert.assertFalse(fs2.exists(getRemoteWALDir(REMOTE_WAL_DIR2, PEER_ID)));
+ Assert.assertFalse(fs2.exists(getReplayRemoteWALs(REMOTE_WAL_DIR2, PEER_ID)));
UTIL1.getAdmin().removeReplicationPeer(PEER_ID);
- verifyRemovedPeer(PEER_ID, remoteWALDir1, UTIL1);
+ verifyRemovedPeer(PEER_ID, REMOTE_WAL_DIR1, UTIL1);
// Peer remoteWAL dir will be renamed to replay WAL dir when transit from S to DA, and the
// replay WAL dir will be removed after replaying all WALs, so create a emtpy dir here to test
// whether the removeReplicationPeer would remove the remoteWAL dir.
- fs2.create(getRemoteWALDir(remoteWALDir2, PEER_ID));
- fs2.create(getReplayRemoteWALs(remoteWALDir2, PEER_ID));
- Assert.assertTrue(fs2.exists(getRemoteWALDir(remoteWALDir2, PEER_ID)));
- Assert.assertTrue(fs2.exists(getReplayRemoteWALs(remoteWALDir2, PEER_ID)));
+ fs2.create(getRemoteWALDir(REMOTE_WAL_DIR2, PEER_ID));
+ fs2.create(getReplayRemoteWALs(REMOTE_WAL_DIR2, PEER_ID));
+ Assert.assertTrue(fs2.exists(getRemoteWALDir(REMOTE_WAL_DIR2, PEER_ID)));
+ Assert.assertTrue(fs2.exists(getReplayRemoteWALs(REMOTE_WAL_DIR2, PEER_ID)));
UTIL2.getAdmin().removeReplicationPeer(PEER_ID);
- verifyRemovedPeer(PEER_ID, remoteWALDir2, UTIL2);
+ verifyRemovedPeer(PEER_ID, REMOTE_WAL_DIR2, UTIL2);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/46866d9f/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java
index ebb21a4..2563669 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -151,7 +152,8 @@ public class TestRecoverStandbyProcedure {
}
private void setupSyncReplicationWALs() throws IOException, StreamLacksCapabilityException {
- Path peerRemoteWALDir = replaySyncReplicationWALManager.getPeerRemoteWALDir(PEER_ID);
+ Path peerRemoteWALDir = ReplicationUtils
+ .getPeerRemoteWALDir(replaySyncReplicationWALManager.getRemoteWALDir(), PEER_ID);
if (!fs.exists(peerRemoteWALDir)) {
fs.mkdirs(peerRemoteWALDir);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/46866d9f/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index d98b7f85..febe764 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -84,7 +84,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
@@ -618,7 +617,7 @@ public abstract class TestReplicationSourceManager {
try {
// make sure that we can deal with files which does not exist
String walNameNotExists =
- "remoteWAL-12345-" + slaveId + ".12345" + SyncReplicationWALProvider.LOG_SUFFIX;
+ "remoteWAL-12345-" + slaveId + ".12345" + ReplicationUtils.SYNC_WAL_SUFFIX;
Path wal = new Path(logDir, walNameNotExists);
manager.preLogRoll(wal);
manager.postLogRoll(wal);
@@ -626,7 +625,7 @@ public abstract class TestReplicationSourceManager {
Path remoteLogDirForPeer = new Path(remoteLogDir, slaveId);
fs.mkdirs(remoteLogDirForPeer);
String walName =
- "remoteWAL-12345-" + slaveId + ".23456" + SyncReplicationWALProvider.LOG_SUFFIX;
+ "remoteWAL-12345-" + slaveId + ".23456" + ReplicationUtils.SYNC_WAL_SUFFIX;
Path remoteWAL =
new Path(remoteLogDirForPeer, walName).makeQualified(fs.getUri(), fs.getWorkingDirectory());
fs.create(remoteWAL).close();
http://git-wip-us.apache.org/repos/asf/hbase/blob/46866d9f/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java
index 69ed44d..8189cef 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java
@@ -86,7 +86,6 @@ public class TestSyncReplicationWALProvider {
@Override
public boolean checkState(TableName table,
BiPredicate<SyncReplicationState, SyncReplicationState> checker) {
- // TODO Implement SyncReplicationPeerInfoProvider.isInState
return false;
}
}