You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/01/04 01:23:55 UTC
[21/35] hbase git commit: HBASE-19573 Rewrite ReplicationPeer with
the new replication storage interface
HBASE-19573 Rewrite ReplicationPeer with the new replication storage interface
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/bbbda610
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/bbbda610
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/bbbda610
Branch: refs/heads/HBASE-19397
Commit: bbbda610f946a8b9d9811ef567a1729dc29b5279
Parents: 0d05d79
Author: Guanghao Zhang <zg...@apache.org>
Authored: Tue Dec 26 11:39:34 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Thu Jan 4 09:22:35 2018 +0800
----------------------------------------------------------------------
.../replication/VerifyReplication.java | 5 -
.../hbase/replication/ReplicationPeer.java | 42 ++--
.../hbase/replication/ReplicationPeerImpl.java | 170 ++++++++++++++
.../replication/ReplicationPeerZKImpl.java | 233 -------------------
.../hbase/replication/ReplicationPeers.java | 4 +-
.../replication/ReplicationPeersZKImpl.java | 23 +-
.../replication/TestReplicationStateBasic.java | 7 +-
.../regionserver/PeerProcedureHandlerImpl.java | 29 +--
8 files changed, 217 insertions(+), 296 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/bbbda610/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
index 04db45d..64ef279 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
@@ -49,7 +49,6 @@ import org.apache.hadoop.hbase.mapreduce.TableSplit;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
-import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
@@ -330,7 +329,6 @@ public class VerifyReplication extends Configured implements Tool {
private static Pair<ReplicationPeerConfig, Configuration> getPeerQuorumConfig(
final Configuration conf, String peerId) throws IOException {
ZKWatcher localZKW = null;
- ReplicationPeerZKImpl peer = null;
try {
localZKW = new ZKWatcher(conf, "VerifyReplication",
new Abortable() {
@@ -351,9 +349,6 @@ public class VerifyReplication extends Configured implements Tool {
throw new IOException(
"An error occurred while trying to connect to the remove peer cluster", e);
} finally {
- if (peer != null) {
- peer.close();
- }
if (localZKW != null) {
localZKW.close();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/bbbda610/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
index b66d76d..4846018 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
-
/**
* ReplicationPeer manages enabled / disabled state for the peer.
*/
@@ -49,65 +48,52 @@ public interface ReplicationPeer {
String getId();
/**
- * Get the peer config object
- * @return the ReplicationPeerConfig for this peer
- */
- public ReplicationPeerConfig getPeerConfig();
-
- /**
- * Get the peer config object. if loadFromBackingStore is true, it will load from backing store
- * directly and update its load peer config. otherwise, just return the local cached peer config.
- * @return the ReplicationPeerConfig for this peer
- */
- public ReplicationPeerConfig getPeerConfig(boolean loadFromBackingStore)
- throws ReplicationException;
-
- /**
* Returns the state of the peer by reading local cache.
* @return the enabled state
*/
PeerState getPeerState();
/**
- * Returns the state of peer, if loadFromBackingStore is true, it will load from backing store
- * directly and update its local peer state. otherwise, just return the local cached peer state.
- * @return the enabled state
+ * Get the peer config object
+ * @return the ReplicationPeerConfig for this peer
*/
- PeerState getPeerState(boolean loadFromBackingStore) throws ReplicationException;
+ ReplicationPeerConfig getPeerConfig();
/**
* Get the configuration object required to communicate with this peer
* @return configuration object
*/
- public Configuration getConfiguration();
+ Configuration getConfiguration();
/**
* Get replicable (table, cf-list) map of this peer
* @return the replicable (table, cf-list) map
*/
- public Map<TableName, List<String>> getTableCFs();
+ Map<TableName, List<String>> getTableCFs();
/**
* Get replicable namespace set of this peer
* @return the replicable namespaces set
*/
- public Set<String> getNamespaces();
+ Set<String> getNamespaces();
/**
* Get the per node bandwidth upper limit for this peer
* @return the bandwidth upper limit
*/
- public long getPeerBandwidth();
+ long getPeerBandwidth();
/**
* Register a peer config listener to catch the peer config change event.
* @param listener listener to catch the peer config change event.
*/
- public void registerPeerConfigListener(ReplicationPeerConfigListener listener);
+ void registerPeerConfigListener(ReplicationPeerConfigListener listener);
/**
- * Notify all the registered ReplicationPeerConfigListener to update their peer config.
- * @param newPeerConfig the new peer config.
+ * @deprecated Use {@link #registerPeerConfigListener(ReplicationPeerConfigListener)} instead.
*/
- public void triggerPeerConfigChange(ReplicationPeerConfig newPeerConfig);
-}
+ @Deprecated
+ default void trackPeerConfigChanges(ReplicationPeerConfigListener listener) {
+ registerPeerConfigListener(listener);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/bbbda610/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
new file mode 100644
index 0000000..ed9fe14
--- /dev/null
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
@@ -0,0 +1,170 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
+
+@InterfaceAudience.Private
+public class ReplicationPeerImpl implements ReplicationPeer {
+ private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerImpl.class);
+
+ private final ReplicationPeerStorage peerStorage;
+
+ private final Configuration conf;
+
+ private final String id;
+
+ private volatile ReplicationPeerConfig peerConfig;
+
+ private volatile PeerState peerState;
+
+ private final List<ReplicationPeerConfigListener> peerConfigListeners;
+
+ /**
+ * Constructor that takes all the objects required to communicate with the specified peer, except
+ * for the region server addresses.
+ * @param conf configuration object to this peer
+ * @param id string representation of this peer's identifier
+ * @param peerConfig configuration for the replication peer
+ */
+ public ReplicationPeerImpl(ZKWatcher zkWatcher, Configuration conf, String id,
+ ReplicationPeerConfig peerConfig) {
+ this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zkWatcher, conf);
+ this.conf = conf;
+ this.peerConfig = peerConfig;
+ this.id = id;
+ this.peerConfigListeners = new ArrayList<>();
+ }
+
+ public void refreshPeerState() throws ReplicationException {
+ this.peerState = peerStorage.isPeerEnabled(id) ? PeerState.ENABLED : PeerState.DISABLED;
+ }
+
+ public void refreshPeerConfig() throws ReplicationException {
+ this.peerConfig = peerStorage.getPeerConfig(id).orElse(peerConfig);
+ peerConfigListeners.forEach(listener -> listener.peerConfigUpdated(peerConfig));
+ }
+
+ /**
+ * Get the identifier of this peer
+ * @return string representation of the id (short)
+ */
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public PeerState getPeerState() {
+ return peerState;
+ }
+
+ /**
+ * Get the peer config object
+ * @return the ReplicationPeerConfig for this peer
+ */
+ @Override
+ public ReplicationPeerConfig getPeerConfig() {
+ return peerConfig;
+ }
+
+ /**
+ * Get the configuration object required to communicate with this peer
+ * @return configuration object
+ */
+ @Override
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ /**
+ * Get replicable (table, cf-list) map of this peer
+ * @return the replicable (table, cf-list) map
+ */
+ @Override
+ public Map<TableName, List<String>> getTableCFs() {
+ return this.peerConfig.getTableCFsMap();
+ }
+
+ /**
+ * Get replicable namespace set of this peer
+ * @return the replicable namespaces set
+ */
+ @Override
+ public Set<String> getNamespaces() {
+ return this.peerConfig.getNamespaces();
+ }
+
+ @Override
+ public long getPeerBandwidth() {
+ return this.peerConfig.getBandwidth();
+ }
+
+ @Override
+ public void registerPeerConfigListener(ReplicationPeerConfigListener listener) {
+ this.peerConfigListeners.add(listener);
+ }
+
+ /**
+ * Parse the raw data from ZK to get a peer's state
+ * @param bytes raw ZK data
+ * @return True if the passed in <code>bytes</code> are those of a pb serialized ENABLED state.
+ * @throws DeserializationException
+ */
+ public static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
+ ReplicationProtos.ReplicationState.State state = parseStateFrom(bytes);
+ return ReplicationProtos.ReplicationState.State.ENABLED == state;
+ }
+
+ /**
+ * @param bytes Content of a state znode.
+ * @return State parsed from the passed bytes.
+ * @throws DeserializationException
+ */
+ private static ReplicationProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
+ throws DeserializationException {
+ ProtobufUtil.expectPBMagicPrefix(bytes);
+ int pbLen = ProtobufUtil.lengthOfPBMagic();
+ ReplicationProtos.ReplicationState.Builder builder =
+ ReplicationProtos.ReplicationState.newBuilder();
+ ReplicationProtos.ReplicationState state;
+ try {
+ ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen);
+ state = builder.build();
+ return state.getState();
+ } catch (IOException e) {
+ throw new DeserializationException(e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/bbbda610/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
deleted file mode 100644
index 5d051a0..0000000
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.log.HBaseMarkers;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@InterfaceAudience.Private
-public class ReplicationPeerZKImpl extends ReplicationStateZKBase
- implements ReplicationPeer, Abortable, Closeable {
- private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerZKImpl.class);
-
- private volatile ReplicationPeerConfig peerConfig;
- private final String id;
- private volatile PeerState peerState;
- private volatile Map<TableName, List<String>> tableCFs = new HashMap<>();
- private final Configuration conf;
-
- private final List<ReplicationPeerConfigListener> peerConfigListeners;
-
- /**
- * Constructor that takes all the objects required to communicate with the specified peer, except
- * for the region server addresses.
- * @param conf configuration object to this peer
- * @param id string representation of this peer's identifier
- * @param peerConfig configuration for the replication peer
- */
- public ReplicationPeerZKImpl(ZKWatcher zkWatcher, Configuration conf, String id,
- ReplicationPeerConfig peerConfig, Abortable abortable) throws ReplicationException {
- super(zkWatcher, conf, abortable);
- this.conf = conf;
- this.peerConfig = peerConfig;
- this.id = id;
- this.peerConfigListeners = new ArrayList<>();
- }
-
- private PeerState readPeerState() throws ReplicationException {
- try {
- byte[] data = ZKUtil.getData(zookeeper, this.getPeerStateNode(id));
- this.peerState = isStateEnabled(data) ? PeerState.ENABLED : PeerState.DISABLED;
- } catch (DeserializationException | KeeperException | InterruptedException e) {
- throw new ReplicationException("Get and deserialize peer state data from zookeeper failed: ",
- e);
- }
- return this.peerState;
- }
-
- private ReplicationPeerConfig readPeerConfig() throws ReplicationException {
- try {
- byte[] data = ZKUtil.getData(zookeeper, this.getPeerNode(id));
- if (data != null) {
- this.peerConfig = ReplicationPeerConfigUtil.parsePeerFrom(data);
- }
- } catch (DeserializationException | KeeperException | InterruptedException e) {
- throw new ReplicationException("Get and deserialize peer config date from zookeeper failed: ",
- e);
- }
- return this.peerConfig;
- }
-
- @Override
- public PeerState getPeerState() {
- return peerState;
- }
-
- @Override
- public PeerState getPeerState(boolean loadFromBackingStore) throws ReplicationException {
- if (loadFromBackingStore) {
- return readPeerState();
- } else {
- return peerState;
- }
- }
-
- /**
- * Get the identifier of this peer
- * @return string representation of the id (short)
- */
- @Override
- public String getId() {
- return id;
- }
-
- /**
- * Get the peer config object
- * @return the ReplicationPeerConfig for this peer
- */
- @Override
- public ReplicationPeerConfig getPeerConfig() {
- return peerConfig;
- }
-
- @Override
- public ReplicationPeerConfig getPeerConfig(boolean loadFromBackingStore)
- throws ReplicationException {
- if (loadFromBackingStore) {
- return readPeerConfig();
- } else {
- return peerConfig;
- }
- }
-
- /**
- * Get the configuration object required to communicate with this peer
- * @return configuration object
- */
- @Override
- public Configuration getConfiguration() {
- return conf;
- }
-
- /**
- * Get replicable (table, cf-list) map of this peer
- * @return the replicable (table, cf-list) map
- */
- @Override
- public Map<TableName, List<String>> getTableCFs() {
- this.tableCFs = peerConfig.getTableCFsMap();
- return this.tableCFs;
- }
-
- /**
- * Get replicable namespace set of this peer
- * @return the replicable namespaces set
- */
- @Override
- public Set<String> getNamespaces() {
- return this.peerConfig.getNamespaces();
- }
-
- @Override
- public long getPeerBandwidth() {
- return this.peerConfig.getBandwidth();
- }
-
- @Override
- public void registerPeerConfigListener(ReplicationPeerConfigListener listener) {
- this.peerConfigListeners.add(listener);
- }
-
- @Override
- public void triggerPeerConfigChange(ReplicationPeerConfig newPeerConfig) {
- for (ReplicationPeerConfigListener listener : this.peerConfigListeners) {
- listener.peerConfigUpdated(newPeerConfig);
- }
- }
-
- @Override
- public void abort(String why, Throwable e) {
- LOG.error(HBaseMarkers.FATAL, "The ReplicationPeer corresponding to peer " +
- peerConfig + " was aborted for the following reason(s):" + why, e);
- }
-
- @Override
- public boolean isAborted() {
- // Currently the replication peer is never "Aborted", we just log when the
- // abort method is called.
- return false;
- }
-
- @Override
- public void close() throws IOException {
- // TODO: stop zkw?
- }
-
- /**
- * Parse the raw data from ZK to get a peer's state
- * @param bytes raw ZK data
- * @return True if the passed in <code>bytes</code> are those of a pb serialized ENABLED state.
- * @throws DeserializationException
- */
- public static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
- ReplicationProtos.ReplicationState.State state = parseStateFrom(bytes);
- return ReplicationProtos.ReplicationState.State.ENABLED == state;
- }
-
- /**
- * @param bytes Content of a state znode.
- * @return State parsed from the passed bytes.
- * @throws DeserializationException
- */
- private static ReplicationProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
- throws DeserializationException {
- ProtobufUtil.expectPBMagicPrefix(bytes);
- int pbLen = ProtobufUtil.lengthOfPBMagic();
- ReplicationProtos.ReplicationState.Builder builder =
- ReplicationProtos.ReplicationState.newBuilder();
- ReplicationProtos.ReplicationState state;
- try {
- ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen);
- state = builder.build();
- return state.getState();
- } catch (IOException e) {
- throw new DeserializationException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/bbbda610/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
index 10936bf..afc19bd 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
@@ -116,13 +116,13 @@ public interface ReplicationPeers {
throws ReplicationException;
/**
- * Returns the ReplicationPeer for the specified connected peer. This ReplicationPeer will
+ * Returns the ReplicationPeerImpl for the specified connected peer. This ReplicationPeer will
* continue to track changes to the Peer's state and config. This method returns null if no
* peer has been connected with the given peerId.
* @param peerId id for the peer
* @return ReplicationPeer object
*/
- ReplicationPeer getConnectedPeer(String peerId);
+ ReplicationPeerImpl getConnectedPeer(String peerId);
/**
* Returns the set of peerIds of the clusters that have been connected and have an underlying
http://git-wip-us.apache.org/repos/asf/hbase/blob/bbbda610/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index f2e5647..268ba87 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -80,7 +80,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements ReplicationPeers {
// Map of peer clusters keyed by their id
- private Map<String, ReplicationPeerZKImpl> peerClusters;
+ private ConcurrentMap<String, ReplicationPeerImpl> peerClusters;
private final ReplicationQueueStorage queueStorage;
private Abortable abortable;
@@ -232,7 +232,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
}
String peerStateZNode = getPeerStateNode(id);
try {
- return ReplicationPeerZKImpl.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode));
+ return ReplicationPeerImpl.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode));
} catch (KeeperException e) {
throw new ReplicationException(e);
} catch (DeserializationException e) {
@@ -270,7 +270,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
}
@Override
- public ReplicationPeer getConnectedPeer(String peerId) {
+ public ReplicationPeerImpl getConnectedPeer(String peerId) {
return peerClusters.get(peerId);
}
@@ -425,7 +425,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
public void peerDisconnected(String peerId) {
ReplicationPeer rp = this.peerClusters.get(peerId);
if (rp != null) {
- ((ConcurrentMap<String, ReplicationPeerZKImpl>) peerClusters).remove(peerId, rp);
+ peerClusters.remove(peerId, rp);
}
}
@@ -442,7 +442,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
return false;
}
- ReplicationPeerZKImpl peer = null;
+ ReplicationPeerImpl peer = null;
try {
peer = createPeer(peerId);
} catch (Exception e) {
@@ -451,8 +451,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
if (peer == null) {
return false;
}
- ReplicationPeerZKImpl previous =
- ((ConcurrentMap<String, ReplicationPeerZKImpl>) peerClusters).putIfAbsent(peerId, peer);
+ ReplicationPeerImpl previous = peerClusters.putIfAbsent(peerId, peer);
if (previous == null) {
LOG.info("Added new peer cluster=" + peer.getPeerConfig().getClusterKey());
} else {
@@ -495,19 +494,19 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
* @return object representing the peer
* @throws ReplicationException
*/
- private ReplicationPeerZKImpl createPeer(String peerId) throws ReplicationException {
+ private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException {
Pair<ReplicationPeerConfig, Configuration> pair = getPeerConf(peerId);
if (pair == null) {
return null;
}
Configuration peerConf = pair.getSecond();
- ReplicationPeerZKImpl peer =
- new ReplicationPeerZKImpl(zookeeper, peerConf, peerId, pair.getFirst(), abortable);
+ ReplicationPeerImpl peer =
+ new ReplicationPeerImpl(zookeeper, peerConf, peerId, pair.getFirst());
// Load peer state and peer config by reading zookeeper directly.
- peer.getPeerState(true);
- peer.getPeerConfig(true);
+ peer.refreshPeerState();
+ peer.refreshPeerConfig();
return peer;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/bbbda610/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index 6fe869c..8905d43 100644
--- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -312,12 +312,15 @@ public abstract class TestReplicationStateBasic {
rp.disablePeer(ID_ONE);
// now we do not rely on zk watcher to trigger the state change so we need to trigger it
// manually...
- assertEquals(PeerState.DISABLED, rp.getConnectedPeer(ID_ONE).getPeerState(true));
+ ReplicationPeerImpl peer = rp.getConnectedPeer(ID_ONE);
+ peer.refreshPeerState();
+ assertEquals(PeerState.DISABLED, peer.getPeerState());
assertConnectedPeerStatus(false, ID_ONE);
rp.enablePeer(ID_ONE);
// now we do not rely on zk watcher to trigger the state change so we need to trigger it
// manually...
- assertEquals(PeerState.ENABLED, rp.getConnectedPeer(ID_ONE).getPeerState(true));
+ peer.refreshPeerState();
+ assertEquals(PeerState.ENABLED, peer.getPeerState());
assertConnectedPeerStatus(true, ID_ONE);
// Disconnect peer
http://git-wip-us.apache.org/repos/asf/hbase/blob/bbbda610/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
index 9b493d9..598357c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
@@ -21,15 +21,14 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ReplicationPeer;
-import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
-import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
-import org.apache.log4j.Logger;
+import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
- private static final Logger LOG = Logger.getLogger(PeerProcedureHandlerImpl.class);
+ private static final Logger LOG = LoggerFactory.getLogger(PeerProcedureHandlerImpl.class);
private ReplicationSourceManager replicationSourceManager;
@@ -49,10 +48,11 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
@Override
public void disablePeer(String peerId) throws ReplicationException, IOException {
- ReplicationPeer peer = replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId);
+ ReplicationPeerImpl peer =
+ replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId);
if (peer != null) {
- PeerState peerState = peer.getPeerState(true);
- LOG.info("disablePeer state, peer id: " + peerId + ", state: " + peerState);
+ peer.refreshPeerState();
+ LOG.info("disable replication peer, id: " + peerId + ", new state: " + peer.getPeerState());
} else {
throw new ReplicationException("No connected peer found, peerId=" + peerId);
}
@@ -60,10 +60,11 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
@Override
public void enablePeer(String peerId) throws ReplicationException, IOException {
- ReplicationPeer peer = replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId);
+ ReplicationPeerImpl peer =
+ replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId);
if (peer != null) {
- PeerState peerState = peer.getPeerState(true);
- LOG.info("enablePeer state, peer id: " + peerId + ", state: " + peerState);
+ peer.refreshPeerState();
+ LOG.info("enable replication peer, id: " + peerId + ", new state: " + peer.getPeerState());
} else {
throw new ReplicationException("No connected peer found, peerId=" + peerId);
}
@@ -71,11 +72,11 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
@Override
public void updatePeerConfig(String peerId) throws ReplicationException, IOException {
- ReplicationPeer peer = replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId);
+ ReplicationPeerImpl peer =
+ replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId);
if (peer == null) {
throw new ReplicationException("No connected peer found, peerId=" + peerId);
}
- ReplicationPeerConfig rpc = peer.getPeerConfig(true);
- peer.triggerPeerConfigChange(rpc);
+ peer.refreshPeerConfig();
}
}