You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/05/24 03:28:01 UTC
[33/37] hbase git commit: HBASE-20576 Check remote WAL directory when
creating peer and transiting peer to A
HBASE-20576 Check remote WAL directory when creating peer and transiting peer to A
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/24c30aea
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/24c30aea
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/24c30aea
Branch: refs/heads/HBASE-19064
Commit: 24c30aea367ca2725d76196ebcbae4c6de327b35
Parents: 63cb212
Author: zhangduo <zh...@apache.org>
Authored: Tue May 15 15:07:40 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Thu May 24 11:13:58 2018 +0800
----------------------------------------------------------------------
.../replication/ReplicationPeerManager.java | 19 +++--
...ransitPeerSyncReplicationStateProcedure.java | 73 +++++++++++++-------
.../replication/TestReplicationAdmin.java | 57 ++++++++++++---
3 files changed, 110 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/24c30aea/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index e1d8b51..8e49137 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
+import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
@@ -31,6 +32,7 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
@@ -45,7 +47,6 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
-import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
@@ -193,9 +194,9 @@ public class ReplicationPeerManager {
}
/**
- * @return the old state, and whether the peer is enabled.
+ * @return the old desciption of the peer
*/
- Pair<SyncReplicationState, Boolean> preTransitPeerSyncReplicationState(String peerId,
+ ReplicationPeerDescription preTransitPeerSyncReplicationState(String peerId,
SyncReplicationState state) throws DoNotRetryIOException {
ReplicationPeerDescription desc = checkPeerExists(peerId);
SyncReplicationState fromState = desc.getSyncReplicationState();
@@ -204,7 +205,7 @@ public class ReplicationPeerManager {
throw new DoNotRetryIOException("Can not transit current cluster state from " + fromState +
" to " + state + " for peer id=" + peerId);
}
- return Pair.newPair(fromState, desc.isEnabled());
+ return desc;
}
public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
@@ -384,6 +385,16 @@ public class ReplicationPeerManager {
"Only support replicated table config for sync replication peer");
}
}
+ Path remoteWALDir = new Path(peerConfig.getRemoteWALDir());
+ if (!remoteWALDir.isAbsolute()) {
+ throw new DoNotRetryIOException(
+ "The remote WAL directory " + peerConfig.getRemoteWALDir() + " is not absolute");
+ }
+ URI remoteWALDirUri = remoteWALDir.toUri();
+ if (remoteWALDirUri.getScheme() == null || remoteWALDirUri.getAuthority() == null) {
+ throw new DoNotRetryIOException("The remote WAL directory " + peerConfig.getRemoteWALDir() +
+ " is not qualified, you must provide scheme and authority");
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/24c30aea/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
index 0175296..ebe7a93 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
@@ -31,9 +32,9 @@ import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
-import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -113,10 +114,20 @@ public class TransitPeerSyncReplicationStateProcedure
if (cpHost != null) {
cpHost.preTransitReplicationPeerSyncReplicationState(peerId, toState);
}
- Pair<SyncReplicationState, Boolean> pair =
+ ReplicationPeerDescription desc =
env.getReplicationPeerManager().preTransitPeerSyncReplicationState(peerId, toState);
- fromState = pair.getFirst();
- enabled = pair.getSecond();
+ if (toState == SyncReplicationState.ACTIVE) {
+ Path remoteWALDirForPeer =
+ ReplicationUtils.getRemoteWALDirForPeer(desc.getPeerConfig().getRemoteWALDir(), peerId);
+ // check whether the remote wal directory is present
+ if (!remoteWALDirForPeer.getFileSystem(env.getMasterConfiguration())
+ .exists(remoteWALDirForPeer)) {
+ throw new DoNotRetryIOException(
+ "The remote WAL directory " + remoteWALDirForPeer + " does not exist");
+ }
+ }
+ fromState = desc.getSyncReplicationState();
+ enabled = desc.isEnabled();
}
private void postTransit(MasterProcedureEnv env) throws IOException {
@@ -152,6 +163,36 @@ public class TransitPeerSyncReplicationStateProcedure
}
}
+ private void setNextStateAfterRefreshBegin() {
+ if (fromState.equals(SyncReplicationState.ACTIVE)) {
+ setNextState(toState.equals(SyncReplicationState.STANDBY)
+ ? PeerSyncReplicationStateTransitionState.REMOVE_ALL_REPLICATION_QUEUES_IN_PEER
+ : PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
+ } else if (fromState.equals(SyncReplicationState.DOWNGRADE_ACTIVE)) {
+ setNextState(toState.equals(SyncReplicationState.STANDBY)
+ ? PeerSyncReplicationStateTransitionState.REMOVE_ALL_REPLICATION_QUEUES_IN_PEER
+ : PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
+ } else {
+ assert toState.equals(SyncReplicationState.DOWNGRADE_ACTIVE);
+ setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER);
+ }
+ }
+
+ private void setNextStateAfterRefreshEnd() {
+ if (toState == SyncReplicationState.STANDBY) {
+ setNextState(
+ enabled ? PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_SET_PEER_ENABLED
+ : PeerSyncReplicationStateTransitionState.CREATE_DIR_FOR_REMOTE_WAL);
+ } else {
+ setNextState(
+ PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION);
+ }
+ }
+
+ private void replayRemoteWAL() {
+ addChildProcedure(new RecoverStandbyProcedure[] { new RecoverStandbyProcedure(peerId) });
+ }
+
@Override
protected Flow executeFromState(MasterProcedureEnv env,
PeerSyncReplicationStateTransitionState state)
@@ -191,21 +232,10 @@ public class TransitPeerSyncReplicationStateProcedure
addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
.map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 0))
.toArray(RefreshPeerProcedure[]::new));
- if (fromState.equals(SyncReplicationState.ACTIVE)) {
- setNextState(toState.equals(SyncReplicationState.STANDBY)
- ? PeerSyncReplicationStateTransitionState.REMOVE_ALL_REPLICATION_QUEUES_IN_PEER
- : PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
- } else if (fromState.equals(SyncReplicationState.DOWNGRADE_ACTIVE)) {
- setNextState(toState.equals(SyncReplicationState.STANDBY)
- ? PeerSyncReplicationStateTransitionState.REMOVE_ALL_REPLICATION_QUEUES_IN_PEER
- : PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
- } else {
- assert toState.equals(SyncReplicationState.DOWNGRADE_ACTIVE);
- setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER);
- }
+ setNextStateAfterRefreshBegin();
return Flow.HAS_MORE_STATE;
case REPLAY_REMOTE_WAL_IN_PEER:
- addChildProcedure(new RecoverStandbyProcedure(peerId));
+ replayRemoteWAL();
setNextState(
PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
return Flow.HAS_MORE_STATE;
@@ -248,14 +278,7 @@ public class TransitPeerSyncReplicationStateProcedure
addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
.map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 1))
.toArray(RefreshPeerProcedure[]::new));
- if (toState == SyncReplicationState.STANDBY) {
- setNextState(
- enabled ? PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_SET_PEER_ENABLED
- : PeerSyncReplicationStateTransitionState.CREATE_DIR_FOR_REMOTE_WAL);
- } else {
- setNextState(
- PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION);
- }
+ setNextStateAfterRefreshEnd();
return Flow.HAS_MORE_STATE;
case SYNC_REPLICATION_SET_PEER_ENABLED:
try {
http://git-wip-us.apache.org/repos/asf/hbase/blob/24c30aea/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index ac98283..c6ffeea 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.InterClusterReplicationEndpointForTest;
import org.apache.hadoop.hbase.replication.regionserver.TestReplicator.ReplicationEndpointForTest;
@@ -981,34 +982,37 @@ public class TestReplicationAdmin {
ReplicationPeerConfig rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
assertNull(rpc.getRemoteWALDir());
+ builder.setRemoteWALDir("hdfs://srv2:8888/hbase");
try {
- builder.setRemoteWALDir("hdfs://srv2:8888/hbase");
hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build());
fail("Change remote wal dir is not allowed");
} catch (Exception e) {
// OK
+ LOG.info("Expected error:", e);
}
builder = ReplicationPeerConfig.newBuilder();
builder.setClusterKey(KEY_SECOND);
- builder.setRemoteWALDir(rootDir);
+ builder.setRemoteWALDir("whatever");
try {
hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
fail("Only support replicated table config for sync replication");
} catch (Exception e) {
// OK
+ LOG.info("Expected error:", e);
}
builder.setReplicateAllUserTables(false);
+ Set<String> namespaces = new HashSet<String>();
+ namespaces.add("ns1");
+ builder.setNamespaces(namespaces);
try {
- Set<String> namespaces = new HashSet<String>();
- namespaces.add("ns1");
- builder.setNamespaces(namespaces);
hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
fail("Only support replicated table config for sync replication");
} catch (Exception e) {
// OK
+ LOG.info("Expected error:", e);
}
builder.setNamespaces(null);
@@ -1017,21 +1021,41 @@ public class TestReplicationAdmin {
fail("Only support replicated table config for sync replication, and tables can't be empty");
} catch (Exception e) {
// OK
+ LOG.info("Expected error:", e);
}
Map<TableName, List<String>> tableCfs = new HashMap<>();
+ tableCfs.put(tableName, Arrays.asList("cf1"));
+ builder.setTableCFsMap(tableCfs);
try {
- tableCfs.put(tableName, Arrays.asList("cf1"));
- builder.setTableCFsMap(tableCfs);
hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
fail("Only support replicated table config for sync replication");
} catch (Exception e) {
// OK
+ LOG.info("Expected error:", e);
}
tableCfs = new HashMap<>();
tableCfs.put(tableName, new ArrayList<>());
builder.setTableCFsMap(tableCfs);
+ try {
+ hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
+ fail("The remote WAL dir must be absolute");
+ } catch (Exception e) {
+ // OK
+ LOG.info("Expected error:", e);
+ }
+
+ builder.setRemoteWALDir("/hbase/remoteWALs");
+ try {
+ hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
+ fail("The remote WAL dir must be qualified");
+ } catch (Exception e) {
+ // OK
+ LOG.info("Expected error:", e);
+ }
+
+ builder.setRemoteWALDir(rootDir);
hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
rpc = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
assertEquals(rootDir, rpc.getRemoteWALDir());
@@ -1042,6 +1066,7 @@ public class TestReplicationAdmin {
fail("Change remote wal dir is not allowed");
} catch (Exception e) {
// OK
+ LOG.info("Expected error:", e);
}
try {
@@ -1050,6 +1075,7 @@ public class TestReplicationAdmin {
fail("Change remote wal dir is not allowed");
} catch (Exception e) {
// OK
+ LOG.info("Expected error:", e);
}
try {
@@ -1062,6 +1088,7 @@ public class TestReplicationAdmin {
"Change replicated table config on an existing synchronous peer is not allowed");
} catch (Exception e) {
// OK
+ LOG.info("Expected error:", e);
}
}
@@ -1079,13 +1106,13 @@ public class TestReplicationAdmin {
try {
hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_ONE,
SyncReplicationState.DOWNGRADE_ACTIVE);
- fail("Can't transit cluster state if replication peer don't config remote wal dir");
+ fail("Can't transit sync replication state if replication peer don't config remote wal dir");
} catch (Exception e) {
// OK
+ LOG.info("Expected error:", e);
}
Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("remoteWAL");
- TEST_UTIL.getTestFileSystem().mkdirs(new Path(rootDir, ID_SECOND));
builder = ReplicationPeerConfig.newBuilder();
builder.setClusterKey(KEY_SECOND);
builder.setRemoteWALDir(rootDir.makeQualified(TEST_UTIL.getTestFileSystem().getUri(),
@@ -1106,6 +1133,15 @@ public class TestReplicationAdmin {
assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE,
hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
+ try {
+ hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE);
+ fail("Can't transit sync replication state to ACTIVE if remote wal dir does not exist");
+ } catch (Exception e) {
+ // OK
+ LOG.info("Expected error:", e);
+ }
+ TEST_UTIL.getTestFileSystem()
+ .mkdirs(ReplicationUtils.getRemoteWALDirForPeer(rootDir, ID_SECOND));
hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE);
assertEquals(SyncReplicationState.ACTIVE,
hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
@@ -1133,9 +1169,10 @@ public class TestReplicationAdmin {
hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
try {
hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE);
- fail("Can't transit cluster state from STANDBY to ACTIVE");
+ fail("Can't transit sync replication state from STANDBY to ACTIVE");
} catch (Exception e) {
// OK
+ LOG.info("Expected error:", e);
}
hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND,
SyncReplicationState.DOWNGRADE_ACTIVE);