You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/01/29 12:35:44 UTC
[17/50] [abbrv] hbase git commit: HBASE-19622 Reimplement
ReplicationPeers with the new replication storage interface
HBASE-19622 Reimplement ReplicationPeers with the new replication storage interface
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ad6af567
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ad6af567
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ad6af567
Branch: refs/heads/HBASE-19397-branch-2
Commit: ad6af5678b5b7dcf0835039a8f33ae75a4ea91a6
Parents: a4196dc
Author: huzheng <op...@gmail.com>
Authored: Tue Dec 26 16:46:10 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Mon Jan 29 20:34:11 2018 +0800
----------------------------------------------------------------------
.../replication/ReplicationPeerConfigUtil.java | 10 +-
.../replication/VerifyReplication.java | 9 +-
.../hbase/replication/ReplicationFactory.java | 10 +-
.../hbase/replication/ReplicationPeerImpl.java | 60 +-
.../replication/ReplicationPeerStorage.java | 3 +-
.../hbase/replication/ReplicationPeers.java | 238 ++++----
.../replication/ReplicationPeersZKImpl.java | 552 -------------------
.../replication/ZKReplicationPeerStorage.java | 12 +-
.../replication/ZKReplicationStorageBase.java | 3 +-
.../replication/TestReplicationStateBasic.java | 125 ++---
.../replication/TestReplicationStateZKImpl.java | 2 +-
.../TestZKReplicationPeerStorage.java | 12 +-
.../cleaner/ReplicationZKNodeCleaner.java | 57 +-
.../replication/ReplicationPeerManager.java | 6 +-
.../regionserver/DumpReplicationQueues.java | 2 +-
.../regionserver/PeerProcedureHandlerImpl.java | 49 +-
.../replication/regionserver/Replication.java | 2 +-
.../regionserver/ReplicationSource.java | 7 +-
.../regionserver/ReplicationSourceManager.java | 45 +-
.../cleaner/TestReplicationHFileCleaner.java | 7 +-
.../replication/TestMultiSlaveReplication.java | 2 -
.../TestReplicationTrackerZKImpl.java | 36 +-
.../TestReplicationSourceManager.java | 17 +-
.../hadoop/hbase/HBaseZKTestingUtility.java | 3 +-
24 files changed, 308 insertions(+), 961 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad6af567/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
index 022bf64..a234a9b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
@@ -247,22 +247,22 @@ public final class ReplicationPeerConfigUtil {
public static ReplicationPeerConfig parsePeerFrom(final byte[] bytes)
throws DeserializationException {
if (ProtobufUtil.isPBMagicPrefix(bytes)) {
- int pblen = ProtobufUtil.lengthOfPBMagic();
+ int pbLen = ProtobufUtil.lengthOfPBMagic();
ReplicationProtos.ReplicationPeer.Builder builder =
ReplicationProtos.ReplicationPeer.newBuilder();
ReplicationProtos.ReplicationPeer peer;
try {
- ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
+ ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen);
peer = builder.build();
} catch (IOException e) {
throw new DeserializationException(e);
}
return convert(peer);
} else {
- if (bytes.length > 0) {
- return ReplicationPeerConfig.newBuilder().setClusterKey(Bytes.toString(bytes)).build();
+ if (bytes == null || bytes.length <= 0) {
+ throw new DeserializationException("Bytes to deserialize should not be empty.");
}
- return ReplicationPeerConfig.newBuilder().setClusterKey("").build();
+ return ReplicationPeerConfig.newBuilder().setClusterKey(Bytes.toString(bytes)).build();
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad6af567/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
index 09d4b4b..f0070f0 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
@@ -339,15 +339,10 @@ public class VerifyReplication extends Configured implements Tool {
@Override public boolean isAborted() {return false;}
});
- ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW);
+ ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf);
rp.init();
- Pair<ReplicationPeerConfig, Configuration> pair = rp.getPeerConf(peerId);
- if (pair == null) {
- throw new IOException("Couldn't get peer conf!");
- }
-
- return pair;
+ return Pair.newPair(rp.getPeerConfig(peerId), rp.getPeerClusterConfiguration(peerId));
} catch (ReplicationException e) {
throw new IOException(
"An error occurred while trying to connect to the remove peer cluster", e);
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad6af567/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
index 5e70e57..6c66aff 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
@@ -29,14 +29,8 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public class ReplicationFactory {
- public static ReplicationPeers getReplicationPeers(ZKWatcher zk, Configuration conf,
- Abortable abortable) {
- return getReplicationPeers(zk, conf, null, abortable);
- }
-
- public static ReplicationPeers getReplicationPeers(ZKWatcher zk, Configuration conf,
- ReplicationQueueStorage queueStorage, Abortable abortable) {
- return new ReplicationPeersZKImpl(zk, conf, queueStorage, abortable);
+ public static ReplicationPeers getReplicationPeers(ZKWatcher zk, Configuration conf) {
+ return new ReplicationPeers(zk, conf);
}
public static ReplicationTracker getReplicationTracker(ZKWatcher zookeeper,
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad6af567/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
index 2c7ea9b..3e17025 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
@@ -18,28 +18,16 @@
*/
package org.apache.hadoop.hbase.replication;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
@InterfaceAudience.Private
public class ReplicationPeerImpl implements ReplicationPeer {
- private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerImpl.class);
-
- private final ReplicationPeerStorage peerStorage;
-
private final Configuration conf;
private final String id;
@@ -57,21 +45,21 @@ public class ReplicationPeerImpl implements ReplicationPeer {
* @param id string representation of this peer's identifier
* @param peerConfig configuration for the replication peer
*/
- public ReplicationPeerImpl(ZKWatcher zkWatcher, Configuration conf, String id,
+ public ReplicationPeerImpl(Configuration conf, String id, boolean peerState,
ReplicationPeerConfig peerConfig) {
- this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zkWatcher, conf);
this.conf = conf;
- this.peerConfig = peerConfig;
this.id = id;
+ this.peerState = peerState ? PeerState.ENABLED : PeerState.DISABLED;
+ this.peerConfig = peerConfig;
this.peerConfigListeners = new ArrayList<>();
}
- public void refreshPeerState() throws ReplicationException {
- this.peerState = peerStorage.isPeerEnabled(id) ? PeerState.ENABLED : PeerState.DISABLED;
+ void setPeerState(boolean enabled) {
+ this.peerState = enabled ? PeerState.ENABLED : PeerState.DISABLED;
}
- public void refreshPeerConfig() throws ReplicationException {
- this.peerConfig = peerStorage.getPeerConfig(id).orElse(peerConfig);
+ void setPeerConfig(ReplicationPeerConfig peerConfig) {
+ this.peerConfig = peerConfig;
peerConfigListeners.forEach(listener -> listener.peerConfigUpdated(peerConfig));
}
@@ -134,36 +122,4 @@ public class ReplicationPeerImpl implements ReplicationPeer {
public void registerPeerConfigListener(ReplicationPeerConfigListener listener) {
this.peerConfigListeners.add(listener);
}
-
- /**
- * Parse the raw data from ZK to get a peer's state
- * @param bytes raw ZK data
- * @return True if the passed in <code>bytes</code> are those of a pb serialized ENABLED state.
- * @throws DeserializationException
- */
- public static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
- ReplicationProtos.ReplicationState.State state = parseStateFrom(bytes);
- return ReplicationProtos.ReplicationState.State.ENABLED == state;
- }
-
- /**
- * @param bytes Content of a state znode.
- * @return State parsed from the passed bytes.
- * @throws DeserializationException
- */
- private static ReplicationProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
- throws DeserializationException {
- ProtobufUtil.expectPBMagicPrefix(bytes);
- int pbLen = ProtobufUtil.lengthOfPBMagic();
- ReplicationProtos.ReplicationState.Builder builder =
- ReplicationProtos.ReplicationState.newBuilder();
- ReplicationProtos.ReplicationState state;
- try {
- ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen);
- state = builder.build();
- return state.getState();
- } catch (IOException e) {
- throw new DeserializationException(e);
- }
- }
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad6af567/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java
index e00cd0d..1adda02 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.replication;
import java.util.List;
-import java.util.Optional;
import org.apache.yetus.audience.InterfaceAudience;
@@ -70,5 +69,5 @@ public interface ReplicationPeerStorage {
* Get the peer config of a replication peer.
* @throws ReplicationException if there are errors accessing the storage service.
*/
- Optional<ReplicationPeerConfig> getPeerConfig(String peerId) throws ReplicationException;
+ ReplicationPeerConfig getPeerConfig(String peerId) throws ReplicationException;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad6af567/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
index afc19bd..e58482e 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,58 +17,53 @@
*/
package org.apache.hadoop.hbase.replication;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
+import java.io.IOException;
import java.util.Set;
-
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.CompoundConfiguration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
- * This provides an interface for maintaining a set of peer clusters. These peers are remote slave
- * clusters that data is replicated to. A peer cluster can be in three different states:
- *
- * 1. Not-Registered - There is no notion of the peer cluster.
- * 2. Registered - The peer has an id and is being tracked but there is no connection.
- * 3. Connected - There is an active connection to the remote peer.
- *
- * In the registered or connected state, a peer cluster can either be enabled or disabled.
+ * This provides an class for maintaining a set of peer clusters. These peers are remote slave
+ * clusters that data is replicated to.
*/
@InterfaceAudience.Private
-public interface ReplicationPeers {
+public class ReplicationPeers {
- /**
- * Initialize the ReplicationPeers interface.
- */
- void init() throws ReplicationException;
+ private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeers.class);
- /**
- * Add a new remote slave cluster for replication.
- * @param peerId a short that identifies the cluster
- * @param peerConfig configuration for the replication slave cluster
- */
- default void registerPeer(String peerId, ReplicationPeerConfig peerConfig)
- throws ReplicationException {
- registerPeer(peerId, peerConfig, true);
+ private final Configuration conf;
+
+ // Map of peer clusters keyed by their id
+ private final ConcurrentMap<String, ReplicationPeerImpl> peerCache;
+ private final ReplicationPeerStorage peerStorage;
+
+ protected ReplicationPeers(ZKWatcher zookeeper, Configuration conf) {
+ this.conf = conf;
+ this.peerCache = new ConcurrentHashMap<>();
+ this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zookeeper, conf);
}
- /**
- * Add a new remote slave cluster for replication.
- * @param peerId a short that identifies the cluster
- * @param peerConfig configuration for the replication slave cluster
- * @param enabled peer state, true if ENABLED and false if DISABLED
- */
- void registerPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
- throws ReplicationException;
+ public void init() throws ReplicationException {
+ // Loading all existing peerIds into peer cache.
+ for (String peerId : this.peerStorage.listPeerIds()) {
+ addPeer(peerId);
+ }
+ }
- /**
- * Removes a remote slave cluster and stops the replication to it.
- * @param peerId a short that identifies the cluster
- */
- void unregisterPeer(String peerId) throws ReplicationException;
+ @VisibleForTesting
+ public ReplicationPeerStorage getPeerStorage() {
+ return this.peerStorage;
+ }
/**
* Method called after a peer has been connected. It will create a ReplicationPeer to track the
@@ -78,111 +72,115 @@ public interface ReplicationPeers {
* @return whether a ReplicationPeer was successfully created
* @throws ReplicationException
*/
- boolean peerConnected(String peerId) throws ReplicationException;
-
- /**
- * Method called after a peer has been disconnected. It will remove the ReplicationPeer that
- * tracked the disconnected cluster.
- * @param peerId a short that identifies the cluster
- */
- void peerDisconnected(String peerId);
+ public boolean addPeer(String peerId) throws ReplicationException {
+ if (this.peerCache.containsKey(peerId)) {
+ return false;
+ }
- /**
- * Restart the replication to the specified remote slave cluster.
- * @param peerId a short that identifies the cluster
- */
- void enablePeer(String peerId) throws ReplicationException;
-
- /**
- * Stop the replication to the specified remote slave cluster.
- * @param peerId a short that identifies the cluster
- */
- void disablePeer(String peerId) throws ReplicationException;
+ peerCache.put(peerId, createPeer(peerId));
+ return true;
+ }
- /**
- * Get the table and column-family list string of the peer from the underlying storage.
- * @param peerId a short that identifies the cluster
- */
- public Map<TableName, List<String>> getPeerTableCFsConfig(String peerId)
- throws ReplicationException;
+ public void removePeer(String peerId) {
+ peerCache.remove(peerId);
+ }
/**
- * Set the table and column-family list string of the peer to the underlying storage.
+ * Get the peer state for the specified connected remote slave cluster. The value might be read
+ * from cache, so it is recommended to use {@link #peerStorage } to read storage directly if
+ * reading the state after enabling or disabling it.
* @param peerId a short that identifies the cluster
- * @param tableCFs the table and column-family list which will be replicated for this peer
+ * @return true if replication is enabled, false otherwise.
*/
- public void setPeerTableCFsConfig(String peerId,
- Map<TableName, ? extends Collection<String>> tableCFs)
- throws ReplicationException;
+ public boolean isPeerEnabled(String peerId) {
+ ReplicationPeer replicationPeer = this.peerCache.get(peerId);
+ if (replicationPeer == null) {
+ throw new IllegalArgumentException("Peer with id= " + peerId + " is not cached");
+ }
+ return replicationPeer.getPeerState() == PeerState.ENABLED;
+ }
/**
- * Returns the ReplicationPeerImpl for the specified connected peer. This ReplicationPeer will
- * continue to track changes to the Peer's state and config. This method returns null if no
- * peer has been connected with the given peerId.
+ * Returns the ReplicationPeerImpl for the specified cached peer. This ReplicationPeer will
+ * continue to track changes to the Peer's state and config. This method returns null if no peer
+ * has been cached with the given peerId.
* @param peerId id for the peer
* @return ReplicationPeer object
*/
- ReplicationPeerImpl getConnectedPeer(String peerId);
+ public ReplicationPeerImpl getPeer(String peerId) {
+ return peerCache.get(peerId);
+ }
/**
* Returns the set of peerIds of the clusters that have been connected and have an underlying
* ReplicationPeer.
* @return a Set of Strings for peerIds
*/
- public Set<String> getConnectedPeerIds();
+ public Set<String> getAllPeerIds() {
+ return peerCache.keySet();
+ }
- /**
- * Get the replication status for the specified connected remote slave cluster.
- * The value might be read from cache, so it is recommended to
- * use {@link #getStatusOfPeerFromBackingStore(String)}
- * if reading the state after enabling or disabling it.
- * @param peerId a short that identifies the cluster
- * @return true if replication is enabled, false otherwise.
- */
- boolean getStatusOfPeer(String peerId);
+ public ReplicationPeerConfig getPeerConfig(String peerId) {
+ ReplicationPeer replicationPeer = this.peerCache.get(peerId);
+ if (replicationPeer == null) {
+ throw new IllegalArgumentException("Peer with id= " + peerId + " is not cached");
+ }
+ return replicationPeer.getPeerConfig();
+ }
- /**
- * Get the replication status for the specified remote slave cluster, which doesn't
- * have to be connected. The state is read directly from the backing store.
- * @param peerId a short that identifies the cluster
- * @return true if replication is enabled, false otherwise.
- * @throws ReplicationException thrown if there's an error contacting the store
- */
- boolean getStatusOfPeerFromBackingStore(String peerId) throws ReplicationException;
+ public Configuration getPeerClusterConfiguration(String peerId) throws ReplicationException {
+ ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
- /**
- * List the cluster replication configs of all remote slave clusters (whether they are
- * enabled/disabled or connected/disconnected).
- * @return A map of peer ids to peer cluster keys
- */
- Map<String, ReplicationPeerConfig> getAllPeerConfigs();
+ Configuration otherConf;
+ try {
+ otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey());
+ } catch (IOException e) {
+ throw new ReplicationException("Can't get peer configuration for peerId=" + peerId, e);
+ }
- /**
- * List the peer ids of all remote slave clusters (whether they are enabled/disabled or
- * connected/disconnected).
- * @return A list of peer ids
- */
- List<String> getAllPeerIds();
+ if (!peerConfig.getConfiguration().isEmpty()) {
+ CompoundConfiguration compound = new CompoundConfiguration();
+ compound.add(otherConf);
+ compound.addStringMap(peerConfig.getConfiguration());
+ return compound;
+ }
- /**
- * Returns the configured ReplicationPeerConfig for this peerId
- * @param peerId a short name that identifies the cluster
- * @return ReplicationPeerConfig for the peer
- */
- ReplicationPeerConfig getReplicationPeerConfig(String peerId) throws ReplicationException;
+ return otherConf;
+ }
- /**
- * Returns the configuration needed to talk to the remote slave cluster.
- * @param peerId a short that identifies the cluster
- * @return the configuration for the peer cluster, null if it was unable to get the configuration
- */
- Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId) throws ReplicationException;
+ public PeerState refreshPeerState(String peerId) throws ReplicationException {
+ ReplicationPeerImpl peer = peerCache.get(peerId);
+ if (peer == null) {
+ throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
+ }
+ peer.setPeerState(peerStorage.isPeerEnabled(peerId));
+ return peer.getPeerState();
+ }
+
+ public ReplicationPeerConfig refreshPeerConfig(String peerId) throws ReplicationException {
+ ReplicationPeerImpl peer = peerCache.get(peerId);
+ if (peer == null) {
+ throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
+ }
+ peer.setPeerConfig(peerStorage.getPeerConfig(peerId));
+ return peer.getPeerConfig();
+ }
/**
+<<<<<<< 2bb2fd611d4b88c724a2b561f10433b56c6fd3dd
* Update the peerConfig for the a given peer cluster
* @param id a short that identifies the cluster
* @param peerConfig new config for the peer cluster
* @throws ReplicationException
- */
- void updatePeerConfig(String id, ReplicationPeerConfig peerConfig) throws ReplicationException;
+=======
+ * Helper method to connect to a peer
+ * @param peerId peer's identifier
+ * @return object representing the peer
+>>>>>>> HBASE-19622 Reimplement ReplicationPeers with the new replication storage interface
+ */
+ private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException {
+ ReplicationPeerConfig peerConf = peerStorage.getPeerConfig(peerId);
+ boolean enabled = peerStorage.isPeerEnabled(peerId);
+ return new ReplicationPeerImpl(getPeerClusterConfiguration(peerId), peerId, enabled, peerConf);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad6af567/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
deleted file mode 100644
index 7f6498d..0000000
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ /dev/null
@@ -1,552 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.CompoundConfiguration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
-import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.ZKConfig;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class provides an implementation of the ReplicationPeers interface using ZooKeeper. The
- * peers znode contains a list of all peer replication clusters and the current replication state of
- * those clusters. It has one child peer znode for each peer cluster. The peer znode is named with
- * the cluster id provided by the user in the HBase shell. The value of the peer znode contains the
- * peers cluster key provided by the user in the HBase Shell. The cluster key contains a list of
- * zookeeper quorum peers, the client port for the zookeeper quorum, and the base znode for HBase.
- * For example:
- *
- * /hbase/replication/peers/1 [Value: zk1.host.com,zk2.host.com,zk3.host.com:2181:/hbase]
- * /hbase/replication/peers/2 [Value: zk5.host.com,zk6.host.com,zk7.host.com:2181:/hbase]
- *
- * Each of these peer znodes has a child znode that indicates whether or not replication is enabled
- * on that peer cluster. These peer-state znodes do not have child znodes and simply contain a
- * boolean value (i.e. ENABLED or DISABLED). This value is read/maintained by the
- * ReplicationPeer.PeerStateTracker class. For example:
- *
- * /hbase/replication/peers/1/peer-state [Value: ENABLED]
- *
- * Each of these peer znodes has a child znode that indicates which data will be replicated
- * to the peer cluster. These peer-tableCFs znodes do not have child znodes and only have a
- * table/cf list config. This value is read/maintained by the ReplicationPeer.TableCFsTracker
- * class. For example:
- *
- * /hbase/replication/peers/1/tableCFs [Value: "table1; table2:cf1,cf3; table3:cfx,cfy"]
- */
-@InterfaceAudience.Private
-public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements ReplicationPeers {
-
- // Map of peer clusters keyed by their id
- private ConcurrentMap<String, ReplicationPeerImpl> peerClusters;
- private final ReplicationQueueStorage queueStorage;
- private Abortable abortable;
-
- private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeersZKImpl.class);
-
- public ReplicationPeersZKImpl(ZKWatcher zk, Configuration conf,
- ReplicationQueueStorage queueStorage, Abortable abortable) {
- super(zk, conf, abortable);
- this.abortable = abortable;
- this.peerClusters = new ConcurrentHashMap<>();
- this.queueStorage = queueStorage;
- }
-
- @Override
- public void init() throws ReplicationException {
- try {
- if (ZKUtil.checkExists(this.zookeeper, this.peersZNode) < 0) {
- ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
- }
- } catch (KeeperException e) {
- throw new ReplicationException("Could not initialize replication peers", e);
- }
- addExistingPeers();
- }
-
- @Override
- public void registerPeer(String id, ReplicationPeerConfig peerConfig, boolean enabled)
- throws ReplicationException {
- try {
- if (peerExists(id)) {
- throw new IllegalArgumentException("Cannot add a peer with id=" + id
- + " because that id already exists.");
- }
-
- if(id.contains("-")){
- throw new IllegalArgumentException("Found invalid peer name:" + id);
- }
-
- if (peerConfig.getClusterKey() != null) {
- try {
- ZKConfig.validateClusterKey(peerConfig.getClusterKey());
- } catch (IOException ioe) {
- throw new IllegalArgumentException(ioe.getMessage());
- }
- }
-
- checkQueuesDeleted(id);
-
- ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
-
- List<ZKUtilOp> listOfOps = new ArrayList<>(2);
- ZKUtilOp op1 =
- ZKUtilOp.createAndFailSilent(getPeerNode(id),
- ReplicationPeerConfigUtil.toByteArray(peerConfig));
- ZKUtilOp op2 =
- ZKUtilOp.createAndFailSilent(getPeerStateNode(id), enabled ? ENABLED_ZNODE_BYTES
- : DISABLED_ZNODE_BYTES);
- listOfOps.add(op1);
- listOfOps.add(op2);
- ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
- } catch (KeeperException e) {
- throw new ReplicationException("Could not add peer with id=" + id + ", peerConfif=>"
- + peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"), e);
- }
- }
-
- @Override
- public void unregisterPeer(String id) throws ReplicationException {
- try {
- if (!peerExists(id)) {
- throw new IllegalArgumentException("Cannot remove peer with id=" + id
- + " because that id does not exist.");
- }
- ZKUtil.deleteNodeRecursively(this.zookeeper, ZNodePaths.joinZNode(this.peersZNode, id));
- } catch (KeeperException e) {
- throw new ReplicationException("Could not remove peer with id=" + id, e);
- }
- }
-
- @Override
- public void enablePeer(String id) throws ReplicationException {
- changePeerState(id, ReplicationProtos.ReplicationState.State.ENABLED);
- LOG.info("peer " + id + " is enabled");
- }
-
- @Override
- public void disablePeer(String id) throws ReplicationException {
- changePeerState(id, ReplicationProtos.ReplicationState.State.DISABLED);
- LOG.info("peer " + id + " is disabled");
- }
-
- @Override
- public Map<TableName, List<String>> getPeerTableCFsConfig(String id) throws ReplicationException {
- try {
- if (!peerExists(id)) {
- throw new IllegalArgumentException("peer " + id + " doesn't exist");
- }
- try {
- ReplicationPeerConfig rpc = getReplicationPeerConfig(id);
- if (rpc == null) {
- throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id);
- }
- return rpc.getTableCFsMap();
- } catch (Exception e) {
- throw new ReplicationException(e);
- }
- } catch (KeeperException e) {
- throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id, e);
- }
- }
-
- @Override
- public void setPeerTableCFsConfig(String id,
- Map<TableName, ? extends Collection<String>> tableCFs)
- throws ReplicationException {
- try {
- if (!peerExists(id)) {
- throw new IllegalArgumentException("Cannot set peer tableCFs because id=" + id
- + " does not exist.");
- }
- ReplicationPeerConfig rpc = getReplicationPeerConfig(id);
- if (rpc == null) {
- throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id);
- }
- rpc.setTableCFsMap(tableCFs);
- ZKUtil.setData(this.zookeeper, getPeerNode(id),
- ReplicationPeerConfigUtil.toByteArray(rpc));
- LOG.info("Peer tableCFs with id= " + id + " is now " +
- ReplicationPeerConfigUtil.convertToString(tableCFs));
- } catch (KeeperException e) {
- throw new ReplicationException("Unable to change tableCFs of the peer with id=" + id, e);
- }
- }
-
- @Override
- public boolean getStatusOfPeer(String id) {
- ReplicationPeer replicationPeer = this.peerClusters.get(id);
- if (replicationPeer == null) {
- throw new IllegalArgumentException("Peer with id= " + id + " is not cached");
- }
- return replicationPeer.getPeerState() == PeerState.ENABLED;
- }
-
- @Override
- public boolean getStatusOfPeerFromBackingStore(String id) throws ReplicationException {
- try {
- if (!peerExists(id)) {
- throw new IllegalArgumentException("peer " + id + " doesn't exist");
- }
- String peerStateZNode = getPeerStateNode(id);
- try {
- return ReplicationPeerImpl.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode));
- } catch (KeeperException e) {
- throw new ReplicationException(e);
- } catch (DeserializationException e) {
- throw new ReplicationException(e);
- }
- } catch (KeeperException e) {
- throw new ReplicationException("Unable to get status of the peer with id=" + id +
- " from backing store", e);
- } catch (InterruptedException e) {
- throw new ReplicationException(e);
- }
- }
-
- @Override
- public Map<String, ReplicationPeerConfig> getAllPeerConfigs() {
- Map<String, ReplicationPeerConfig> peers = new TreeMap<>();
- List<String> ids = null;
- try {
- ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
- for (String id : ids) {
- ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id);
- if (peerConfig == null) {
- LOG.warn("Failed to get replication peer configuration of clusterid=" + id
- + " znode content, continuing.");
- continue;
- }
- peers.put(id, peerConfig);
- }
- } catch (KeeperException e) {
- this.abortable.abort("Cannot get the list of peers ", e);
- } catch (ReplicationException e) {
- this.abortable.abort("Cannot get the list of peers ", e);
- }
- return peers;
- }
-
- @Override
- public ReplicationPeerImpl getConnectedPeer(String peerId) {
- return peerClusters.get(peerId);
- }
-
- @Override
- public Set<String> getConnectedPeerIds() {
- return peerClusters.keySet(); // this is not thread-safe
- }
-
- /**
- * Returns a ReplicationPeerConfig from the znode or null for the given peerId.
- */
- @Override
- public ReplicationPeerConfig getReplicationPeerConfig(String peerId)
- throws ReplicationException {
- String znode = getPeerNode(peerId);
- byte[] data = null;
- try {
- data = ZKUtil.getData(this.zookeeper, znode);
- } catch (InterruptedException e) {
- LOG.warn("Could not get configuration for peer because the thread " +
- "was interrupted. peerId=" + peerId);
- Thread.currentThread().interrupt();
- return null;
- } catch (KeeperException e) {
- throw new ReplicationException("Error getting configuration for peer with id="
- + peerId, e);
- }
- if (data == null) {
- LOG.error("Could not get configuration for peer because it doesn't exist. peerId=" + peerId);
- return null;
- }
-
- try {
- return ReplicationPeerConfigUtil.parsePeerFrom(data);
- } catch (DeserializationException e) {
- LOG.warn("Failed to parse cluster key from peerId=" + peerId
- + ", specifically the content from the following znode: " + znode);
- return null;
- }
- }
-
- @Override
- public Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId)
- throws ReplicationException {
- ReplicationPeerConfig peerConfig = getReplicationPeerConfig(peerId);
-
- if (peerConfig == null) {
- return null;
- }
-
- Configuration otherConf;
- try {
- otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey());
- } catch (IOException e) {
- LOG.error("Can't get peer configuration for peerId=" + peerId + " because:", e);
- return null;
- }
-
- if (!peerConfig.getConfiguration().isEmpty()) {
- CompoundConfiguration compound = new CompoundConfiguration();
- compound.add(otherConf);
- compound.addStringMap(peerConfig.getConfiguration());
- return new Pair<>(peerConfig, compound);
- }
-
- return new Pair<>(peerConfig, otherConf);
- }
-
- @Override
- public void updatePeerConfig(String id, ReplicationPeerConfig newConfig)
- throws ReplicationException {
- ReplicationPeer peer = getConnectedPeer(id);
- if (peer == null){
- throw new ReplicationException("Could not find peer Id " + id + " in connected peers");
- }
- ReplicationPeerConfig existingConfig = peer.getPeerConfig();
- if (!isStringEquals(newConfig.getClusterKey(), existingConfig.getClusterKey())) {
- throw new ReplicationException(
- "Changing the cluster key on an existing peer is not allowed." + " Existing key '" +
- existingConfig.getClusterKey() + "' does not match new key '" +
- newConfig.getClusterKey() + "'");
- }
- if (!isStringEquals(newConfig.getReplicationEndpointImpl(),
- existingConfig.getReplicationEndpointImpl())) {
- throw new ReplicationException("Changing the replication endpoint implementation class " +
- "on an existing peer is not allowed. Existing class '" +
- existingConfig.getReplicationEndpointImpl() + "' does not match new class '" +
- newConfig.getReplicationEndpointImpl() + "'");
- }
-
- // Update existingConfig's peer config and peer data with the new values, but don't touch config
- // or data that weren't explicitly changed
- ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(existingConfig);
- builder.putAllConfiguration(newConfig.getConfiguration())
- .putAllPeerData(newConfig.getPeerData())
- .setReplicateAllUserTables(newConfig.replicateAllUserTables())
- .setNamespaces(newConfig.getNamespaces()).setTableCFsMap(newConfig.getTableCFsMap())
- .setExcludeNamespaces(newConfig.getExcludeNamespaces())
- .setExcludeTableCFsMap(newConfig.getExcludeTableCFsMap())
- .setBandwidth(newConfig.getBandwidth());
-
- try {
- ZKUtil.setData(this.zookeeper, getPeerNode(id),
- ReplicationPeerConfigUtil.toByteArray(builder.build()));
- }
- catch(KeeperException ke){
- throw new ReplicationException("There was a problem trying to save changes to the " +
- "replication peer " + id, ke);
- }
- }
-
- /**
- * List all registered peer clusters and set a watch on their znodes.
- */
- @Override
- public List<String> getAllPeerIds() {
- List<String> ids = null;
- try {
- ids = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode);
- } catch (KeeperException e) {
- this.abortable.abort("Cannot get the list of peers ", e);
- }
- return ids;
- }
-
- /**
- * A private method used during initialization. This method attempts to add all registered
- * peer clusters. This method does not set a watch on the peer cluster znodes.
- */
- private void addExistingPeers() throws ReplicationException {
- List<String> znodes = null;
- try {
- znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
- } catch (KeeperException e) {
- throw new ReplicationException("Error getting the list of peer clusters.", e);
- }
- if (znodes != null) {
- for (String z : znodes) {
- createAndAddPeer(z);
- }
- }
- }
-
- @Override
- public boolean peerConnected(String peerId) throws ReplicationException {
- return createAndAddPeer(peerId);
- }
-
- @Override
- public void peerDisconnected(String peerId) {
- ReplicationPeer rp = this.peerClusters.get(peerId);
- if (rp != null) {
- peerClusters.remove(peerId, rp);
- }
- }
-
- /**
- * Attempt to connect to a new remote slave cluster.
- * @param peerId a short that identifies the cluster
- * @return true if a new connection was made, false if no new connection was made.
- */
- public boolean createAndAddPeer(String peerId) throws ReplicationException {
- if (peerClusters == null) {
- return false;
- }
- if (this.peerClusters.containsKey(peerId)) {
- return false;
- }
-
- ReplicationPeerImpl peer = null;
- try {
- peer = createPeer(peerId);
- } catch (Exception e) {
- throw new ReplicationException("Error adding peer with id=" + peerId, e);
- }
- if (peer == null) {
- return false;
- }
- ReplicationPeerImpl previous = peerClusters.putIfAbsent(peerId, peer);
- if (previous == null) {
- LOG.info("Added new peer cluster=" + peer.getPeerConfig().getClusterKey());
- } else {
- LOG.info("Peer already present, " + previous.getPeerConfig().getClusterKey() +
- ", new cluster=" + peer.getPeerConfig().getClusterKey());
- }
- return true;
- }
-
- /**
- * Update the state znode of a peer cluster.
- * @param id
- * @param state
- */
- private void changePeerState(String id, ReplicationProtos.ReplicationState.State state)
- throws ReplicationException {
- try {
- if (!peerExists(id)) {
- throw new IllegalArgumentException("Cannot enable/disable peer because id=" + id
- + " does not exist.");
- }
- String peerStateZNode = getPeerStateNode(id);
- byte[] stateBytes =
- (state == ReplicationProtos.ReplicationState.State.ENABLED) ? ENABLED_ZNODE_BYTES
- : DISABLED_ZNODE_BYTES;
- if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) {
- ZKUtil.setData(this.zookeeper, peerStateZNode, stateBytes);
- } else {
- ZKUtil.createAndWatch(this.zookeeper, peerStateZNode, stateBytes);
- }
- LOG.info("Peer with id= " + id + " is now " + state.name());
- } catch (KeeperException e) {
- throw new ReplicationException("Unable to change state of the peer with id=" + id, e);
- }
- }
-
- /**
- * Helper method to connect to a peer
- * @param peerId peer's identifier
- * @return object representing the peer
- * @throws ReplicationException
- */
- private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException {
- Pair<ReplicationPeerConfig, Configuration> pair = getPeerConf(peerId);
- if (pair == null) {
- return null;
- }
- Configuration peerConf = pair.getSecond();
-
- ReplicationPeerImpl peer =
- new ReplicationPeerImpl(zookeeper, peerConf, peerId, pair.getFirst());
-
- // Load peer state and peer config by reading zookeeper directly.
- peer.refreshPeerState();
- peer.refreshPeerConfig();
-
- return peer;
- }
-
- private void checkQueuesDeleted(String peerId) throws ReplicationException {
- if (queueStorage == null) {
- return;
- }
- try {
- List<ServerName> replicators = queueStorage.getListOfReplicators();
- if (replicators == null || replicators.isEmpty()) {
- return;
- }
- for (ServerName replicator : replicators) {
- List<String> queueIds = queueStorage.getAllQueues(replicator);
- for (String queueId : queueIds) {
- ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
- if (queueInfo.getPeerId().equals(peerId)) {
- throw new IllegalArgumentException("undeleted queue for peerId: " + peerId
- + ", replicator: " + replicator + ", queueId: " + queueId);
- }
- }
- }
- // Check for hfile-refs queue
- if (-1 != ZKUtil.checkExists(zookeeper, hfileRefsZNode)
- && queueStorage.getAllPeersFromHFileRefsQueue().contains(peerId)) {
- throw new IllegalArgumentException("Undeleted queue for peerId: " + peerId
- + ", found in hfile-refs node path " + hfileRefsZNode);
- }
- } catch (KeeperException e) {
- throw new ReplicationException("Could not check queues deleted with id=" + peerId, e);
- }
- }
-
- /**
- * For replication peer cluster key or endpoint class, null and empty string is same. So here
- * don't use {@link StringUtils#equals(CharSequence, CharSequence)} directly.
- */
- private boolean isStringEquals(String s1, String s2) {
- if (StringUtils.isBlank(s1)) {
- return StringUtils.isBlank(s2);
- }
- return s1.equals(s2);
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad6af567/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
index 49af4c3..bf448e8 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.replication;
import java.util.Arrays;
import java.util.List;
-import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
@@ -144,7 +143,7 @@ class ZKReplicationPeerStorage extends ZKReplicationStorageBase implements Repli
}
@Override
- public Optional<ReplicationPeerConfig> getPeerConfig(String peerId) throws ReplicationException {
+ public ReplicationPeerConfig getPeerConfig(String peerId) throws ReplicationException {
byte[] data;
try {
data = ZKUtil.getData(zookeeper, getPeerNode(peerId));
@@ -152,13 +151,14 @@ class ZKReplicationPeerStorage extends ZKReplicationStorageBase implements Repli
throw new ReplicationException("Error getting configuration for peer with id=" + peerId, e);
}
if (data == null || data.length == 0) {
- return Optional.empty();
+ throw new ReplicationException(
+ "Replication peer config data shouldn't be empty, peerId=" + peerId);
}
try {
- return Optional.of(ReplicationPeerConfigUtil.parsePeerFrom(data));
+ return ReplicationPeerConfigUtil.parsePeerFrom(data);
} catch (DeserializationException e) {
- LOG.warn("Failed to parse replication peer config for peer with id=" + peerId, e);
- return Optional.empty();
+ throw new ReplicationException(
+ "Failed to parse replication peer config for peer with id=" + peerId, e);
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad6af567/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java
index b8a2044..d09a56b 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java
@@ -48,8 +48,7 @@ class ZKReplicationStorageBase {
String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
this.replicationZNode =
- ZNodePaths.joinZNode(this.zookeeper.znodePaths.baseZNode, replicationZNodeName);
-
+ ZNodePaths.joinZNode(this.zookeeper.znodePaths.baseZNode, replicationZNodeName);
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad6af567/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index 4afda5d..2589199 100644
--- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -55,7 +55,6 @@ public abstract class TestReplicationStateBasic {
protected static String KEY_TWO;
// For testing when we try to replicate to ourself
- protected String OUR_ID = "3";
protected String OUR_KEY;
protected static int zkTimeoutCount;
@@ -152,37 +151,6 @@ public abstract class TestReplicationStateBasic {
}
@Test
- public void testInvalidClusterKeys() throws ReplicationException, KeeperException {
- rp.init();
-
- try {
- rp.registerPeer(ID_ONE,
- new ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:hbase"));
- fail("Should throw an IllegalArgumentException because " +
- "zookeeper.znode.parent is missing leading '/'.");
- } catch (IllegalArgumentException e) {
- // Expected.
- }
-
- try {
- rp.registerPeer(ID_ONE,
- new ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:/"));
- fail("Should throw an IllegalArgumentException because zookeeper.znode.parent is missing.");
- } catch (IllegalArgumentException e) {
- // Expected.
- }
-
- try {
- rp.registerPeer(ID_ONE,
- new ReplicationPeerConfig().setClusterKey("hostname1.example.org::/hbase"));
- fail("Should throw an IllegalArgumentException because " +
- "hbase.zookeeper.property.clientPort is missing.");
- } catch (IllegalArgumentException e) {
- // Expected.
- }
- }
-
- @Test
public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException {
rp.init();
@@ -192,7 +160,8 @@ public abstract class TestReplicationStateBasic {
files1.add(new Pair<>(null, new Path("file_3")));
assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty());
assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size());
- rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
+ rp.getPeerStorage().addPeer(ID_ONE,
+ ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true);
rqs.addPeerToHFileRefs(ID_ONE);
rqs.addHFileRefs(ID_ONE, files1);
assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size());
@@ -208,15 +177,17 @@ public abstract class TestReplicationStateBasic {
hfiles2.add(removedString);
rqs.removeHFileRefs(ID_ONE, hfiles2);
assertEquals(0, rqs.getReplicableHFiles(ID_ONE).size());
- rp.unregisterPeer(ID_ONE);
+ rp.getPeerStorage().removePeer(ID_ONE);
}
@Test
public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException {
rp.init();
- rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
+ rp.getPeerStorage().addPeer(ID_ONE,
+ ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true);
rqs.addPeerToHFileRefs(ID_ONE);
- rp.registerPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO));
+ rp.getPeerStorage().addPeer(ID_TWO,
+ ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build(), true);
rqs.addPeerToHFileRefs(ID_TWO);
List<Pair<Path, Path>> files1 = new ArrayList<>(3);
@@ -229,13 +200,13 @@ public abstract class TestReplicationStateBasic {
assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size());
assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size());
- rp.unregisterPeer(ID_ONE);
+ rp.getPeerStorage().removePeer(ID_ONE);
rqs.removePeerFromHFileRefs(ID_ONE);
assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size());
assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty());
assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size());
- rp.unregisterPeer(ID_TWO);
+ rp.getPeerStorage().removePeer(ID_TWO);
rqs.removePeerFromHFileRefs(ID_TWO);
assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size());
assertTrue(rqs.getReplicableHFiles(ID_TWO).isEmpty());
@@ -245,74 +216,77 @@ public abstract class TestReplicationStateBasic {
public void testReplicationPeers() throws Exception {
rp.init();
- // Test methods with non-existent peer ids
try {
- rp.unregisterPeer("bogus");
+ rp.getPeerStorage().setPeerState("bogus", true);
fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
- } catch (IllegalArgumentException e) {
+ } catch (ReplicationException e) {
}
try {
- rp.enablePeer("bogus");
+ rp.getPeerStorage().setPeerState("bogus", false);
fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
- } catch (IllegalArgumentException e) {
+ } catch (ReplicationException e) {
}
try {
- rp.disablePeer("bogus");
+ rp.isPeerEnabled("bogus");
fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
} catch (IllegalArgumentException e) {
}
+
try {
- rp.getStatusOfPeer("bogus");
- fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
- } catch (IllegalArgumentException e) {
+ assertFalse(rp.addPeer("bogus"));
+ fail("Should have thrown an ReplicationException when passed a bogus peerId");
+ } catch (ReplicationException e) {
+ }
+
+ try {
+ assertNull(rp.getPeerClusterConfiguration("bogus"));
+ fail("Should have thrown an ReplicationException when passed a bogus peerId");
+ } catch (ReplicationException e) {
}
- assertFalse(rp.peerConnected("bogus"));
- rp.peerDisconnected("bogus");
- assertNull(rp.getPeerConf("bogus"));
assertNumberOfPeers(0);
// Add some peers
- rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
+ rp.getPeerStorage().addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), true);
assertNumberOfPeers(1);
- rp.registerPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO));
+ rp.getPeerStorage().addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), true);
assertNumberOfPeers(2);
// Test methods with a peer that is added but not connected
try {
- rp.getStatusOfPeer(ID_ONE);
+ rp.isPeerEnabled(ID_ONE);
fail("There are no connected peers, should have thrown an IllegalArgumentException");
} catch (IllegalArgumentException e) {
}
- assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE).getSecond()));
- rp.unregisterPeer(ID_ONE);
- rp.peerDisconnected(ID_ONE);
+ assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(rp.getPeerClusterConfiguration(ID_ONE)));
+ rp.getPeerStorage().removePeer(ID_ONE);
+ rp.removePeer(ID_ONE);
assertNumberOfPeers(1);
// Add one peer
- rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
- rp.peerConnected(ID_ONE);
+ rp.getPeerStorage().addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), true);
+ rp.addPeer(ID_ONE);
assertNumberOfPeers(2);
- assertTrue(rp.getStatusOfPeer(ID_ONE));
- rp.disablePeer(ID_ONE);
+ assertTrue(rp.isPeerEnabled(ID_ONE));
+ rp.getPeerStorage().setPeerState(ID_ONE, false);
// now we do not rely on zk watcher to trigger the state change so we need to trigger it
// manually...
- ReplicationPeerImpl peer = rp.getConnectedPeer(ID_ONE);
- peer.refreshPeerState();
+ ReplicationPeerImpl peer = rp.getPeer(ID_ONE);
+ rp.refreshPeerState(peer.getId());
assertEquals(PeerState.DISABLED, peer.getPeerState());
assertConnectedPeerStatus(false, ID_ONE);
- rp.enablePeer(ID_ONE);
+ rp.getPeerStorage().setPeerState(ID_ONE, true);
// now we do not rely on zk watcher to trigger the state change so we need to trigger it
// manually...
- peer.refreshPeerState();
+ rp.refreshPeerState(peer.getId());
assertEquals(PeerState.ENABLED, peer.getPeerState());
assertConnectedPeerStatus(true, ID_ONE);
// Disconnect peer
- rp.peerDisconnected(ID_ONE);
+ rp.removePeer(ID_ONE);
assertNumberOfPeers(2);
try {
- rp.getStatusOfPeer(ID_ONE);
+ rp.isPeerEnabled(ID_ONE);
fail("There are no connected peers, should have thrown an IllegalArgumentException");
} catch (IllegalArgumentException e) {
}
@@ -320,16 +294,16 @@ public abstract class TestReplicationStateBasic {
protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception {
// we can first check if the value was changed in the store, if it wasn't then fail right away
- if (status != rp.getStatusOfPeerFromBackingStore(peerId)) {
+ if (status != rp.getPeerStorage().isPeerEnabled(peerId)) {
fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK");
}
while (true) {
- if (status == rp.getStatusOfPeer(peerId)) {
+ if (status == rp.isPeerEnabled(peerId)) {
return;
}
if (zkTimeoutCount < ZK_MAX_COUNT) {
- LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status +
- ", sleeping and trying again.");
+ LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status
+ + ", sleeping and trying again.");
Thread.sleep(ZK_SLEEP_INTERVAL);
} else {
fail("Timed out waiting for ConnectedPeerStatus to be " + status);
@@ -337,10 +311,8 @@ public abstract class TestReplicationStateBasic {
}
}
- protected void assertNumberOfPeers(int total) {
- assertEquals(total, rp.getAllPeerConfigs().size());
- assertEquals(total, rp.getAllPeerIds().size());
- assertEquals(total, rp.getAllPeerIds().size());
+ protected void assertNumberOfPeers(int total) throws ReplicationException {
+ assertEquals(total, rp.getPeerStorage().listPeerIds().size());
}
/*
@@ -359,8 +331,9 @@ public abstract class TestReplicationStateBasic {
rqs.addWAL(server3, "qId" + i, "filename" + j);
}
// Add peers for the corresponding queues so they are not orphans
- rp.registerPeer("qId" + i,
- new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus" + i));
+ rp.getPeerStorage().addPeer("qId" + i,
+ ReplicationPeerConfig.newBuilder().setClusterKey("localhost:2818:/bogus" + i).build(),
+ true);
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad6af567/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
index ac869d9..6825c36 100644
--- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
+++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
@@ -79,7 +79,7 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
public void setUp() {
zkTimeoutCount = 0;
rqs = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
- rp = ReplicationFactory.getReplicationPeers(zkw, conf, new WarnOnlyAbortable());
+ rp = ReplicationFactory.getReplicationPeers(zkw, conf);
OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad6af567/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
index a3be1e6..e8098c8 100644
--- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
+++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.HashMap;
@@ -143,14 +144,14 @@ public class TestZKReplicationPeerStorage {
assertEquals(peerCount, peerIds.size());
for (String peerId : peerIds) {
int seed = Integer.parseInt(peerId);
- assertConfigEquals(getConfig(seed), STORAGE.getPeerConfig(peerId).get());
+ assertConfigEquals(getConfig(seed), STORAGE.getPeerConfig(peerId));
}
for (int i = 0; i < peerCount; i++) {
STORAGE.updatePeerConfig(Integer.toString(i), getConfig(i + 1));
}
for (String peerId : peerIds) {
int seed = Integer.parseInt(peerId);
- assertConfigEquals(getConfig(seed + 1), STORAGE.getPeerConfig(peerId).get());
+ assertConfigEquals(getConfig(seed + 1), STORAGE.getPeerConfig(peerId));
}
for (int i = 0; i < peerCount; i++) {
assertEquals(i % 2 == 0, STORAGE.isPeerEnabled(Integer.toString(i)));
@@ -166,6 +167,11 @@ public class TestZKReplicationPeerStorage {
peerIds = STORAGE.listPeerIds();
assertEquals(peerCount - 1, peerIds.size());
assertFalse(peerIds.contains(toRemove));
- assertFalse(STORAGE.getPeerConfig(toRemove).isPresent());
+
+ try {
+ STORAGE.getPeerConfig(toRemove);
+ fail("Should throw a ReplicationException when get peer config of a peerId");
+ } catch (ReplicationException e) {
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad6af567/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java
index af41399..f2c3ec9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java
@@ -30,8 +30,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ReplicationFactory;
-import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
@@ -51,20 +50,14 @@ import org.slf4j.LoggerFactory;
public class ReplicationZKNodeCleaner {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationZKNodeCleaner.class);
private final ReplicationQueueStorage queueStorage;
- private final ReplicationPeers replicationPeers;
+ private final ReplicationPeerStorage peerStorage;
private final ReplicationQueueDeletor queueDeletor;
public ReplicationZKNodeCleaner(Configuration conf, ZKWatcher zkw, Abortable abortable)
throws IOException {
- try {
- this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
- this.replicationPeers =
- ReplicationFactory.getReplicationPeers(zkw, conf, this.queueStorage, abortable);
- this.replicationPeers.init();
- this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, abortable);
- } catch (ReplicationException e) {
- throw new IOException("failed to construct ReplicationZKNodeCleaner", e);
- }
+ this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
+ this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zkw, conf);
+ this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, abortable);
}
/**
@@ -73,8 +66,8 @@ public class ReplicationZKNodeCleaner {
*/
public Map<ServerName, List<String>> getUnDeletedQueues() throws IOException {
Map<ServerName, List<String>> undeletedQueues = new HashMap<>();
- Set<String> peerIds = new HashSet<>(this.replicationPeers.getAllPeerIds());
try {
+ Set<String> peerIds = new HashSet<>(peerStorage.listPeerIds());
List<ServerName> replicators = this.queueStorage.getListOfReplicators();
if (replicators == null || replicators.isEmpty()) {
return undeletedQueues;
@@ -84,8 +77,7 @@ public class ReplicationZKNodeCleaner {
for (String queueId : queueIds) {
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
if (!peerIds.contains(queueInfo.getPeerId())) {
- undeletedQueues.computeIfAbsent(replicator, (key) -> new ArrayList<>()).add(
- queueId);
+ undeletedQueues.computeIfAbsent(replicator, (key) -> new ArrayList<>()).add(queueId);
if (LOG.isDebugEnabled()) {
LOG.debug("Undeleted replication queue for removed peer found: "
+ String.format("[removedPeerId=%s, replicator=%s, queueId=%s]",
@@ -106,9 +98,9 @@ public class ReplicationZKNodeCleaner {
*/
public Set<String> getUnDeletedHFileRefsQueues() throws IOException {
Set<String> undeletedHFileRefsQueue = new HashSet<>();
- Set<String> peerIds = new HashSet<>(this.replicationPeers.getAllPeerIds());
String hfileRefsZNode = queueDeletor.getHfileRefsZNode();
try {
+ Set<String> peerIds = new HashSet<>(peerStorage.listPeerIds());
List<String> listOfPeers = this.queueStorage.getAllPeersFromHFileRefsQueue();
Set<String> peers = new HashSet<>(listOfPeers);
peers.removeAll(peerIds);
@@ -116,15 +108,15 @@ public class ReplicationZKNodeCleaner {
undeletedHFileRefsQueue.addAll(peers);
}
} catch (ReplicationException e) {
- throw new IOException(
- "Failed to get list of all peers from hfile-refs znode " + hfileRefsZNode, e);
+ throw new IOException("Failed to get list of all peers from hfile-refs znode "
+ + hfileRefsZNode, e);
}
return undeletedHFileRefsQueue;
}
private class ReplicationQueueDeletor extends ReplicationStateZKBase {
- public ReplicationQueueDeletor(ZKWatcher zk, Configuration conf, Abortable abortable) {
+ ReplicationQueueDeletor(ZKWatcher zk, Configuration conf, Abortable abortable) {
super(zk, conf, abortable);
}
@@ -132,19 +124,20 @@ public class ReplicationZKNodeCleaner {
* @param replicator The regionserver which has undeleted queue
* @param queueId The undeleted queue id
*/
- public void removeQueue(final ServerName replicator, final String queueId) throws IOException {
- String queueZnodePath = ZNodePaths
- .joinZNode(ZNodePaths.joinZNode(this.queuesZNode, replicator.getServerName()), queueId);
+ void removeQueue(final ServerName replicator, final String queueId) throws IOException {
+ String queueZnodePath =
+ ZNodePaths.joinZNode(ZNodePaths.joinZNode(this.queuesZNode, replicator.getServerName()),
+ queueId);
try {
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
- if (!replicationPeers.getAllPeerIds().contains(queueInfo.getPeerId())) {
+ if (!peerStorage.listPeerIds().contains(queueInfo.getPeerId())) {
ZKUtil.deleteNodeRecursively(this.zookeeper, queueZnodePath);
- LOG.info("Successfully removed replication queue, replicator: " + replicator +
- ", queueId: " + queueId);
+ LOG.info("Successfully removed replication queue, replicator: " + replicator
+ + ", queueId: " + queueId);
}
- } catch (KeeperException e) {
- throw new IOException(
- "Failed to delete queue, replicator: " + replicator + ", queueId: " + queueId);
+ } catch (ReplicationException | KeeperException e) {
+ throw new IOException("Failed to delete queue, replicator: " + replicator + ", queueId: "
+ + queueId);
}
}
@@ -152,17 +145,17 @@ public class ReplicationZKNodeCleaner {
* @param hfileRefsQueueId The undeleted hfile-refs queue id
* @throws IOException
*/
- public void removeHFileRefsQueue(final String hfileRefsQueueId) throws IOException {
+ void removeHFileRefsQueue(final String hfileRefsQueueId) throws IOException {
String node = ZNodePaths.joinZNode(this.hfileRefsZNode, hfileRefsQueueId);
try {
- if (!replicationPeers.getAllPeerIds().contains(hfileRefsQueueId)) {
+ if (!peerStorage.listPeerIds().contains(hfileRefsQueueId)) {
ZKUtil.deleteNodeRecursively(this.zookeeper, node);
LOG.info("Successfully removed hfile-refs queue " + hfileRefsQueueId + " from path "
+ hfileRefsZNode);
}
- } catch (KeeperException e) {
+ } catch (ReplicationException | KeeperException e) {
throw new IOException("Failed to delete hfile-refs queue " + hfileRefsQueueId
- + " from path " + hfileRefsZNode);
+ + " from path " + hfileRefsZNode, e);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad6af567/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index f4ccce8..b6732d7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -314,12 +314,12 @@ public class ReplicationPeerManager {
public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf)
throws ReplicationException {
ReplicationPeerStorage peerStorage =
- ReplicationStorageFactory.getReplicationPeerStorage(zk, conf);
+ ReplicationStorageFactory.getReplicationPeerStorage(zk, conf);
ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>();
for (String peerId : peerStorage.listPeerIds()) {
- Optional<ReplicationPeerConfig> peerConfig = peerStorage.getPeerConfig(peerId);
+ ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
boolean enabled = peerStorage.isPeerEnabled(peerId);
- peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig.get()));
+ peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig));
}
return new ReplicationPeerManager(peerStorage,
ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers);
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad6af567/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
index 73e600e..27bda2d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
@@ -310,7 +310,7 @@ public class DumpReplicationQueues extends Configured implements Tool {
queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
replicationPeers =
- ReplicationFactory.getReplicationPeers(zkw, getConf(), queueStorage, connection);
+ ReplicationFactory.getReplicationPeers(zkw, getConf());
replicationTracker = ReplicationFactory.getReplicationTracker(zkw, replicationPeers, getConf(),
new WarnOnlyAbortable(), new WarnOnlyStoppable());
Set<String> liveRegionServers = new HashSet<>(replicationTracker.getListOfRegionServers());
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad6af567/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
index 598357c..1efe180 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
@@ -19,9 +19,10 @@
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
+import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,7 +31,8 @@ import org.slf4j.LoggerFactory;
public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
private static final Logger LOG = LoggerFactory.getLogger(PeerProcedureHandlerImpl.class);
- private ReplicationSourceManager replicationSourceManager;
+ private final ReplicationSourceManager replicationSourceManager;
+ private final ReentrantLock peersLock = new ReentrantLock();
public PeerProcedureHandlerImpl(ReplicationSourceManager replicationSourceManager) {
this.replicationSourceManager = replicationSourceManager;
@@ -38,45 +40,40 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
@Override
public void addPeer(String peerId) throws ReplicationException, IOException {
- replicationSourceManager.addPeer(peerId);
+ peersLock.lock();
+ try {
+ replicationSourceManager.addPeer(peerId);
+ } finally {
+ peersLock.unlock();
+ }
}
@Override
public void removePeer(String peerId) throws ReplicationException, IOException {
- replicationSourceManager.removePeer(peerId);
+ peersLock.lock();
+ try {
+ if (replicationSourceManager.getReplicationPeers().getPeer(peerId) != null) {
+ replicationSourceManager.removePeer(peerId);
+ }
+ } finally {
+ peersLock.unlock();
+ }
}
@Override
public void disablePeer(String peerId) throws ReplicationException, IOException {
- ReplicationPeerImpl peer =
- replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId);
- if (peer != null) {
- peer.refreshPeerState();
- LOG.info("disable replication peer, id: " + peerId + ", new state: " + peer.getPeerState());
- } else {
- throw new ReplicationException("No connected peer found, peerId=" + peerId);
- }
+ PeerState newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId);
+ LOG.info("disable replication peer, id: " + peerId + ", new state: " + newState);
}
@Override
public void enablePeer(String peerId) throws ReplicationException, IOException {
- ReplicationPeerImpl peer =
- replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId);
- if (peer != null) {
- peer.refreshPeerState();
- LOG.info("enable replication peer, id: " + peerId + ", new state: " + peer.getPeerState());
- } else {
- throw new ReplicationException("No connected peer found, peerId=" + peerId);
- }
+ PeerState newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId);
+ LOG.info("enable replication peer, id: " + peerId + ", new state: " + newState);
}
@Override
public void updatePeerConfig(String peerId) throws ReplicationException, IOException {
- ReplicationPeerImpl peer =
- replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId);
- if (peer == null) {
- throw new ReplicationException("No connected peer found, peerId=" + peerId);
- }
- peer.refreshPeerConfig();
+ replicationSourceManager.getReplicationPeers().refreshPeerConfig(peerId);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad6af567/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index d555c6e..a8991a0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -130,7 +130,7 @@ public class Replication implements
this.queueStorage =
ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
this.replicationPeers =
- ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf, this.server);
+ ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf);
this.replicationPeers.init();
this.replicationTracker =
ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers,
http://git-wip-us.apache.org/repos/asf/hbase/blob/ad6af567/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 58ea6ee..8250992 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -222,8 +222,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
// A peerId will not have "-" in its name, see HBASE-11394
peerId = peerClusterZnode.split("-")[0];
}
- Map<TableName, List<String>> tableCFMap =
- replicationPeers.getConnectedPeer(peerId).getTableCFs();
+ Map<TableName, List<String>> tableCFMap = replicationPeers.getPeer(peerId).getTableCFs();
if (tableCFMap != null) {
List<String> tableCfs = tableCFMap.get(tableName);
if (tableCFMap.containsKey(tableName)
@@ -371,7 +370,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
}
private long getCurrentBandwidth() {
- ReplicationPeer replicationPeer = this.replicationPeers.getConnectedPeer(peerId);
+ ReplicationPeer replicationPeer = this.replicationPeers.getPeer(peerId);
long peerBandwidth = replicationPeer != null ? replicationPeer.getPeerBandwidth() : 0;
// user can set peer bandwidth to 0 to use default bandwidth
return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
@@ -416,7 +415,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
*/
@Override
public boolean isPeerEnabled() {
- return this.replicationPeers.getStatusOfPeer(this.peerId);
+ return this.replicationPeers.isPeerEnabled(this.peerId);
}
@Override