You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/01/06 03:35:36 UTC
[27/48] hbase git commit: HBASE-19543 Abstract a replication storage
interface to extract the zk specific code
http://git-wip-us.apache.org/repos/asf/hbase/blob/52123806/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java
deleted file mode 100644
index b6f8784..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master.replication;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ReplicationFactory;
-import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
-import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
-import org.apache.hadoop.hbase.replication.ReplicationPeers;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.yetus.audience.InterfaceAudience;
-
-/**
- * Manages and performs all replication admin operations.
- * <p>
- * Used to add/remove a replication peer.
- */
-@InterfaceAudience.Private
-public class ReplicationManager {
- private final ReplicationQueuesClient replicationQueuesClient;
- private final ReplicationPeers replicationPeers;
-
- public ReplicationManager(Configuration conf, ZKWatcher zkw, Abortable abortable)
- throws IOException {
- try {
- this.replicationQueuesClient = ReplicationFactory
- .getReplicationQueuesClient(new ReplicationQueuesClientArguments(conf, abortable, zkw));
- this.replicationQueuesClient.init();
- this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf,
- this.replicationQueuesClient, abortable);
- this.replicationPeers.init();
- } catch (Exception e) {
- throw new IOException("Failed to construct ReplicationManager", e);
- }
- }
-
- public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
- throws ReplicationException {
- checkPeerConfig(peerConfig);
- replicationPeers.registerPeer(peerId, peerConfig, enabled);
- replicationPeers.peerConnected(peerId);
- }
-
- public void removeReplicationPeer(String peerId) throws ReplicationException {
- replicationPeers.peerDisconnected(peerId);
- replicationPeers.unregisterPeer(peerId);
- }
-
- public void enableReplicationPeer(String peerId) throws ReplicationException {
- this.replicationPeers.enablePeer(peerId);
- }
-
- public void disableReplicationPeer(String peerId) throws ReplicationException {
- this.replicationPeers.disablePeer(peerId);
- }
-
- public ReplicationPeerConfig getPeerConfig(String peerId)
- throws ReplicationException, ReplicationPeerNotFoundException {
- ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(peerId);
- if (peerConfig == null) {
- throw new ReplicationPeerNotFoundException(peerId);
- }
- return peerConfig;
- }
-
- public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
- throws ReplicationException, IOException {
- checkPeerConfig(peerConfig);
- this.replicationPeers.updatePeerConfig(peerId, peerConfig);
- }
-
- public List<ReplicationPeerDescription> listReplicationPeers(Pattern pattern)
- throws ReplicationException {
- List<ReplicationPeerDescription> peers = new ArrayList<>();
- List<String> peerIds = replicationPeers.getAllPeerIds();
- for (String peerId : peerIds) {
- if (pattern == null || (pattern != null && pattern.matcher(peerId).matches())) {
- peers.add(new ReplicationPeerDescription(peerId,
- replicationPeers.getStatusOfPeerFromBackingStore(peerId),
- replicationPeers.getReplicationPeerConfig(peerId)));
- }
- }
- return peers;
- }
-
- /**
- * If replicate_all flag is true, it means all user tables will be replicated to peer cluster.
- * Then allow config exclude namespaces or exclude table-cfs which can't be replicated to
- * peer cluster.
- *
- * If replicate_all flag is false, it means all user tables can't be replicated to peer cluster.
- * Then allow to config namespaces or table-cfs which will be replicated to peer cluster.
- */
- private void checkPeerConfig(ReplicationPeerConfig peerConfig) {
- if (peerConfig.replicateAllUserTables()) {
- if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) ||
- (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
- throw new IllegalArgumentException("Need clean namespaces or table-cfs config firstly " +
- "when you want replicate all cluster");
- }
- checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(),
- peerConfig.getExcludeTableCFsMap());
- } else {
- if ((peerConfig.getExcludeNamespaces() != null
- && !peerConfig.getExcludeNamespaces().isEmpty())
- || (peerConfig.getExcludeTableCFsMap() != null
- && !peerConfig.getExcludeTableCFsMap().isEmpty())) {
- throw new IllegalArgumentException(
- "Need clean exclude-namespaces or exclude-table-cfs config firstly"
- + " when replicate_all flag is false");
- }
- checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
- peerConfig.getTableCFsMap());
- }
- checkConfiguredWALEntryFilters(peerConfig);
- }
-
- /**
- * Set a namespace in the peer config means that all tables in this namespace will be replicated
- * to the peer cluster.
- * <ol>
- * <li>If peer config already has a namespace, then not allow set any table of this namespace to
- * the peer config.</li>
- * <li>If peer config already has a table, then not allow set this table's namespace to the peer
- * config.</li>
- * </ol>
- * <p>
- * Set a exclude namespace in the peer config means that all tables in this namespace can't be
- * replicated to the peer cluster.
- * <ol>
- * <li>If peer config already has a exclude namespace, then not allow set any exclude table of
- * this namespace to the peer config.</li>
- * <li>If peer config already has a exclude table, then not allow set this table's namespace as a
- * exclude namespace.</li>
- * </ol>
- */
- private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces,
- Map<TableName, ? extends Collection<String>> tableCfs) {
- if (namespaces == null || namespaces.isEmpty()) {
- return;
- }
- if (tableCfs == null || tableCfs.isEmpty()) {
- return;
- }
- for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
- TableName table = entry.getKey();
- if (namespaces.contains(table.getNamespaceAsString())) {
- throw new IllegalArgumentException("Table-cfs " + table + " is conflict with namespaces "
- + table.getNamespaceAsString() + " in peer config");
- }
- }
- }
-
- private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig) {
- String filterCSV = peerConfig.getConfiguration()
- .get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY);
- if (filterCSV != null && !filterCSV.isEmpty()) {
- String[] filters = filterCSV.split(",");
- for (String filter : filters) {
- try {
- Class.forName(filter).newInstance();
- } catch (Exception e) {
- throw new IllegalArgumentException("Configured WALEntryFilter " + filter +
- " could not be created. Failing add/update " + "peer operation.", e);
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/52123806/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
new file mode 100644
index 0000000..5abd874
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Manages and performs all replication admin operations.
+ * <p>
+ * Used to add/remove a replication peer.
+ */
+@InterfaceAudience.Private
+public final class ReplicationPeerManager {
+
+ private final ReplicationPeerStorage peerStorage;
+
+ private final ReplicationQueueStorage queueStorage;
+
+ private final ConcurrentMap<String, ReplicationPeerDescription> peers;
+
+ private ReplicationPeerManager(ReplicationPeerStorage peerStorage,
+ ReplicationQueueStorage queueStorage,
+ ConcurrentMap<String, ReplicationPeerDescription> peers) {
+ this.peerStorage = peerStorage;
+ this.queueStorage = queueStorage;
+ this.peers = peers;
+ }
+
+ private void checkQueuesDeleted(String peerId)
+ throws ReplicationException, DoNotRetryIOException {
+ for (ServerName replicator : queueStorage.getListOfReplicators()) {
+ List<String> queueIds = queueStorage.getAllQueues(replicator);
+ for (String queueId : queueIds) {
+ ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
+ if (queueInfo.getPeerId().equals(peerId)) {
+ throw new DoNotRetryIOException("undeleted queue for peerId: " + peerId +
+ ", replicator: " + replicator + ", queueId: " + queueId);
+ }
+ }
+ }
+ if (queueStorage.getAllPeersFromHFileRefsQueue().contains(peerId)) {
+ throw new DoNotRetryIOException("Undeleted queue for peer " + peerId + " in hfile-refs");
+ }
+ }
+
+ public void preAddPeer(String peerId, ReplicationPeerConfig peerConfig)
+ throws DoNotRetryIOException, ReplicationException {
+ if (peerId.contains("-")) {
+ throw new DoNotRetryIOException("Found invalid peer name: " + peerId);
+ }
+ checkPeerConfig(peerConfig);
+ if (peers.containsKey(peerId)) {
+ throw new DoNotRetryIOException("Replication peer " + peerId + " already exists");
+ }
+ // make sure that there is no queues with the same peer id. This may happen when we create a
+ // peer with the same id with a old deleted peer. If the replication queues for the old peer
+ // have not been cleaned up yet then we should not create the new peer, otherwise the old wal
+ // file may also be replicated.
+ checkQueuesDeleted(peerId);
+ }
+
+ private ReplicationPeerDescription checkPeerExists(String peerId) throws DoNotRetryIOException {
+ ReplicationPeerDescription desc = peers.get(peerId);
+ if (desc == null) {
+ throw new DoNotRetryIOException("Replication peer " + peerId + " does not exist");
+ }
+ return desc;
+ }
+
+ public void preRemovePeer(String peerId) throws DoNotRetryIOException {
+ checkPeerExists(peerId);
+ }
+
+ public void preEnablePeer(String peerId) throws DoNotRetryIOException {
+ ReplicationPeerDescription desc = checkPeerExists(peerId);
+ if (desc.isEnabled()) {
+ throw new DoNotRetryIOException("Replication peer " + peerId + " has already been enabled");
+ }
+ }
+
+ public void preDisablePeer(String peerId) throws DoNotRetryIOException {
+ ReplicationPeerDescription desc = checkPeerExists(peerId);
+ if (!desc.isEnabled()) {
+ throw new DoNotRetryIOException("Replication peer " + peerId + " has already been disabled");
+ }
+ }
+
+ public void preUpdatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
+ throws DoNotRetryIOException {
+ checkPeerConfig(peerConfig);
+ ReplicationPeerDescription desc = checkPeerExists(peerId);
+ ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig();
+ if (!StringUtils.isBlank(peerConfig.getClusterKey()) &&
+ !peerConfig.getClusterKey().equals(oldPeerConfig.getClusterKey())) {
+ throw new DoNotRetryIOException(
+ "Changing the cluster key on an existing peer is not allowed. Existing key '" +
+ oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not match new key '" +
+ peerConfig.getClusterKey() + "'");
+ }
+
+ if (!StringUtils.isBlank(peerConfig.getReplicationEndpointImpl()) &&
+ !peerConfig.getReplicationEndpointImpl().equals(oldPeerConfig.getReplicationEndpointImpl())) {
+ throw new DoNotRetryIOException("Changing the replication endpoint implementation class " +
+ "on an existing peer is not allowed. Existing class '" +
+ oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId +
+ " does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'");
+ }
+ }
+
+ private ReplicationPeerConfig copy(ReplicationPeerConfig peerConfig) {
+ ReplicationPeerConfig copiedPeerConfig = new ReplicationPeerConfig();
+ copiedPeerConfig.getConfiguration().putAll(peerConfig.getConfiguration());
+ copiedPeerConfig.getPeerData().putAll(peerConfig.getPeerData());
+ copiedPeerConfig.setTableCFsMap(peerConfig.getTableCFsMap());
+ copiedPeerConfig.setNamespaces(peerConfig.getNamespaces());
+ copiedPeerConfig.setExcludeTableCFsMap(peerConfig.getExcludeTableCFsMap());
+ copiedPeerConfig.setExcludeNamespaces(peerConfig.getExcludeNamespaces());
+ copiedPeerConfig.setBandwidth(peerConfig.getBandwidth());
+ copiedPeerConfig.setReplicateAllUserTables(peerConfig.replicateAllUserTables());
+ copiedPeerConfig.setClusterKey(peerConfig.getClusterKey());
+ copiedPeerConfig.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl());
+ return copiedPeerConfig;
+ }
+
+ public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
+ throws ReplicationException {
+ if (peers.containsKey(peerId)) {
+ // this should be a retry, just return
+ return;
+ }
+ ReplicationPeerConfig copiedPeerConfig = copy(peerConfig);
+ peerStorage.addPeer(peerId, copiedPeerConfig, enabled);
+ peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig));
+ }
+
+ public void removePeer(String peerId) throws ReplicationException {
+ if (!peers.containsKey(peerId)) {
+ // this should be a retry, just return
+ return;
+ }
+ peerStorage.removePeer(peerId);
+ peers.remove(peerId);
+ }
+
+ private void setPeerState(String peerId, boolean enabled) throws ReplicationException {
+ ReplicationPeerDescription desc = peers.get(peerId);
+ if (desc.isEnabled() == enabled) {
+ // this should be a retry, just return
+ return;
+ }
+ peerStorage.setPeerState(peerId, enabled);
+ peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, desc.getPeerConfig()));
+ }
+
+ public void enablePeer(String peerId) throws ReplicationException {
+ setPeerState(peerId, true);
+ }
+
+ public void disablePeer(String peerId) throws ReplicationException {
+ setPeerState(peerId, false);
+ }
+
+ public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
+ throws ReplicationException {
+ // the checking rules are too complicated here so we give up checking whether this is a retry.
+ ReplicationPeerDescription desc = peers.get(peerId);
+ ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig();
+ ReplicationPeerConfig newPeerConfig = copy(peerConfig);
+ // we need to use the new conf to overwrite the old one.
+ newPeerConfig.getConfiguration().putAll(oldPeerConfig.getConfiguration());
+ newPeerConfig.getConfiguration().putAll(peerConfig.getConfiguration());
+ newPeerConfig.getPeerData().putAll(oldPeerConfig.getPeerData());
+ newPeerConfig.getPeerData().putAll(peerConfig.getPeerData());
+
+ peerStorage.updatePeerConfig(peerId, newPeerConfig);
+ peers.put(peerId, new ReplicationPeerDescription(peerId, desc.isEnabled(), newPeerConfig));
+ }
+
+ public List<ReplicationPeerDescription> listPeers(Pattern pattern) {
+ if (pattern == null) {
+ return new ArrayList<>(peers.values());
+ }
+ return peers.values().stream().filter(r -> pattern.matcher(r.getPeerId()).matches())
+ .collect(Collectors.toList());
+ }
+
+ public Optional<ReplicationPeerConfig> getPeerConfig(String peerId) {
+ ReplicationPeerDescription desc = peers.get(peerId);
+ return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty();
+ }
+
+ /**
+ * If replicate_all flag is true, it means all user tables will be replicated to peer cluster.
+ * Then allow config exclude namespaces or exclude table-cfs which can't be replicated to peer
+ * cluster.
+ * <p>
+ * If replicate_all flag is false, it means all user tables can't be replicated to peer cluster.
+ * Then allow to config namespaces or table-cfs which will be replicated to peer cluster.
+ */
+ private static void checkPeerConfig(ReplicationPeerConfig peerConfig)
+ throws DoNotRetryIOException {
+ if (peerConfig.replicateAllUserTables()) {
+ if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) ||
+ (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
+ throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly " +
+ "when you want replicate all cluster");
+ }
+ checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(),
+ peerConfig.getExcludeTableCFsMap());
+ } else {
+ if ((peerConfig.getExcludeNamespaces() != null &&
+ !peerConfig.getExcludeNamespaces().isEmpty()) ||
+ (peerConfig.getExcludeTableCFsMap() != null &&
+ !peerConfig.getExcludeTableCFsMap().isEmpty())) {
+ throw new DoNotRetryIOException(
+ "Need clean exclude-namespaces or exclude-table-cfs config firstly" +
+ " when replicate_all flag is false");
+ }
+ checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
+ peerConfig.getTableCFsMap());
+ }
+ checkConfiguredWALEntryFilters(peerConfig);
+ }
+
+ /**
+ * Set a namespace in the peer config means that all tables in this namespace will be replicated
+ * to the peer cluster.
+ * <ol>
+ * <li>If peer config already has a namespace, then not allow set any table of this namespace to
+ * the peer config.</li>
+ * <li>If peer config already has a table, then not allow set this table's namespace to the peer
+ * config.</li>
+ * </ol>
+ * <p>
+ * Set a exclude namespace in the peer config means that all tables in this namespace can't be
+ * replicated to the peer cluster.
+ * <ol>
+ * <li>If peer config already has a exclude namespace, then not allow set any exclude table of
+ * this namespace to the peer config.</li>
+ * <li>If peer config already has a exclude table, then not allow set this table's namespace as a
+ * exclude namespace.</li>
+ * </ol>
+ */
+ private static void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces,
+ Map<TableName, ? extends Collection<String>> tableCfs) throws DoNotRetryIOException {
+ if (namespaces == null || namespaces.isEmpty()) {
+ return;
+ }
+ if (tableCfs == null || tableCfs.isEmpty()) {
+ return;
+ }
+ for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
+ TableName table = entry.getKey();
+ if (namespaces.contains(table.getNamespaceAsString())) {
+ throw new DoNotRetryIOException("Table-cfs " + table + " is conflict with namespaces " +
+ table.getNamespaceAsString() + " in peer config");
+ }
+ }
+ }
+
+ private static void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig)
+ throws DoNotRetryIOException {
+ String filterCSV = peerConfig.getConfiguration()
+ .get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY);
+ if (filterCSV != null && !filterCSV.isEmpty()) {
+ String[] filters = filterCSV.split(",");
+ for (String filter : filters) {
+ try {
+ Class.forName(filter).newInstance();
+ } catch (Exception e) {
+ throw new DoNotRetryIOException("Configured WALEntryFilter " + filter +
+ " could not be created. Failing add/update " + "peer operation.", e);
+ }
+ }
+ }
+ }
+
+ public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf)
+ throws ReplicationException {
+ ReplicationPeerStorage peerStorage =
+ ReplicationStorageFactory.getReplicationPeerStorage(zk, conf);
+ ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>();
+ for (String peerId : peerStorage.listPeerIds()) {
+ Optional<ReplicationPeerConfig> peerConfig = peerStorage.getPeerConfig(peerId);
+ boolean enabled = peerStorage.isPeerEnabled(peerId);
+ peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig.get()));
+ }
+ return new ReplicationPeerManager(peerStorage,
+ ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/52123806/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
index d8154dc..a43532d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -59,12 +60,12 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure {
if (cpHost != null) {
cpHost.preUpdateReplicationPeerConfig(peerId, peerConfig);
}
+ env.getReplicationPeerManager().preUpdatePeerConfig(peerId, peerConfig);
}
@Override
- protected void updatePeerStorage(MasterProcedureEnv env)
- throws IllegalArgumentException, Exception {
- env.getReplicationManager().updatePeerConfig(peerId, peerConfig);
+ protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException {
+ env.getReplicationPeerManager().updatePeerConfig(peerId, peerConfig);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/52123806/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index fb29e9e..c2fcd8c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -17,6 +17,12 @@
*/
package org.apache.hadoop.hbase.client.replication;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
@@ -31,18 +37,17 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
-import org.apache.hadoop.hbase.replication.ReplicationQueues;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -50,15 +55,6 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
/**
* Unit testing of ReplicationAdmin
@@ -66,8 +62,6 @@ import static org.junit.Assert.fail;
@Category({MediumTests.class, ClientTests.class})
public class TestReplicationAdmin {
- private static final Logger LOG =
- LoggerFactory.getLogger(TestReplicationAdmin.class);
private final static HBaseTestingUtility TEST_UTIL =
new HBaseTestingUtility();
@@ -102,16 +96,17 @@ public class TestReplicationAdmin {
}
@After
- public void cleanupPeer() {
- try {
- hbaseAdmin.removeReplicationPeer(ID_ONE);
- } catch (Exception e) {
- LOG.debug("Replication peer " + ID_ONE + " may already be removed");
+ public void tearDown() throws Exception {
+ for (ReplicationPeerDescription desc : hbaseAdmin.listReplicationPeers()) {
+ hbaseAdmin.removeReplicationPeer(desc.getPeerId());
}
- try {
- hbaseAdmin.removeReplicationPeer(ID_SECOND);
- } catch (Exception e) {
- LOG.debug("Replication peer " + ID_SECOND + " may already be removed");
+ ReplicationQueueStorage queueStorage = ReplicationStorageFactory
+ .getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration());
+ for (ServerName serverName : queueStorage.getListOfReplicators()) {
+ for (String queue : queueStorage.getAllQueues(serverName)) {
+ queueStorage.removeQueue(serverName, queue);
+ }
+ queueStorage.removeReplicatorIfQueueIsEmpty(serverName);
}
}
@@ -201,32 +196,29 @@ public class TestReplicationAdmin {
ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
rpc2.setClusterKey(KEY_SECOND);
Configuration conf = TEST_UTIL.getConfiguration();
- ZKWatcher zkw = new ZKWatcher(conf, "Test HBaseAdmin", null);
- ReplicationQueues repQueues =
- ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, null, zkw));
- repQueues.init("server1");
+ ReplicationQueueStorage queueStorage =
+ ReplicationStorageFactory.getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), conf);
+ ServerName serverName = ServerName.valueOf("server1", 8000, 1234);
// add queue for ID_ONE
- repQueues.addLog(ID_ONE, "file1");
+ queueStorage.addWAL(serverName, ID_ONE, "file1");
try {
admin.addPeer(ID_ONE, rpc1, null);
fail();
} catch (Exception e) {
// OK!
}
- repQueues.removeQueue(ID_ONE);
- assertEquals(0, repQueues.getAllQueues().size());
+ queueStorage.removeQueue(serverName, ID_ONE);
+ assertEquals(0, queueStorage.getAllQueues(serverName).size());
// add recovered queue for ID_ONE
- repQueues.addLog(ID_ONE + "-server2", "file1");
+ queueStorage.addWAL(serverName, ID_ONE + "-server2", "file1");
try {
admin.addPeer(ID_ONE, rpc2, null);
fail();
} catch (Exception e) {
// OK!
}
- repQueues.removeAllQueues();
- zkw.close();
}
/**
@@ -422,7 +414,7 @@ public class TestReplicationAdmin {
tableCFs.clear();
tableCFs.put(tableName2, null);
admin.removePeerTableCFs(ID_ONE, tableCFs);
- assertTrue(false);
+ fail();
} catch (ReplicationException e) {
}
tableCFs.clear();
http://git-wip-us.apache.org/repos/asf/hbase/blob/52123806/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
index 8442530..7196b7c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.master;
import static org.mockito.Mockito.mock;
+import com.google.protobuf.Service;
+
import java.io.IOException;
import java.util.List;
@@ -42,7 +44,7 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.locking.LockManager;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
-import org.apache.hadoop.hbase.master.replication.ReplicationManager;
+import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
import org.apache.hadoop.hbase.procedure2.LockedResource;
@@ -56,8 +58,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import com.google.protobuf.Service;
-
public class MockNoopMasterServices implements MasterServices, Server {
private final Configuration conf;
private final MetricsMaster metricsMaster;
@@ -462,7 +462,7 @@ public class MockNoopMasterServices implements MasterServices, Server {
}
@Override
- public ProcedureEvent getInitializedEvent() {
+ public ProcedureEvent<?> getInitializedEvent() {
return null;
}
@@ -477,7 +477,7 @@ public class MockNoopMasterServices implements MasterServices, Server {
}
@Override
- public ReplicationManager getReplicationManager() {
+ public ReplicationPeerManager getReplicationPeerManager() {
return null;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/52123806/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java
index fd44c89..f5d36bf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java
@@ -44,6 +44,8 @@ import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
import org.apache.hadoop.hbase.testclassification.MasterTests;
@@ -270,7 +272,7 @@ public class TestMasterNoCluster {
@Override
void initializeZKBasedSystemTrackers() throws IOException, InterruptedException,
- KeeperException, CoordinatedStateException {
+ KeeperException, CoordinatedStateException, ReplicationException {
super.initializeZKBasedSystemTrackers();
// Record a newer server in server manager at first
getServerManager().recordNewServerWithLock(newServer,
http://git-wip-us.apache.org/repos/asf/hbase/blob/52123806/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDisableInactivePeer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDisableInactivePeer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDisableInactivePeer.java
index 1675496..24bb4d0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDisableInactivePeer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDisableInactivePeer.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -47,9 +46,6 @@ public class TestReplicationDisableInactivePeer extends TestReplicationBase {
*/
@Test(timeout = 600000)
public void testDisableInactivePeer() throws Exception {
-
- // enabling and shutdown the peer
- admin.enablePeer("2");
utility2.shutdownMiniHBaseCluster();
byte[] rowkey = Bytes.toBytes("disable inactive peer");