You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/03/09 13:00:12 UTC
[39/40] hbase git commit: HBASE-19923 Reset peer state and config
when refresh replication source failed
HBASE-19923 Reset peer state and config when refresh replication source failed
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/55d0d302
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/55d0d302
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/55d0d302
Branch: refs/heads/branch-2
Commit: 55d0d3026a3d2733fbffd48237c5d7729e1e234d
Parents: d1e775e
Author: Guanghao Zhang <zg...@apache.org>
Authored: Tue Feb 6 14:58:39 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Fri Mar 9 20:55:48 2018 +0800
----------------------------------------------------------------------
.../hbase/replication/ReplicationPeerImpl.java | 4 ++--
.../regionserver/PeerProcedureHandlerImpl.java | 24 ++++++++++++++++----
2 files changed, 22 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/55d0d302/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
index 604e0bb..d656466 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
@@ -54,11 +54,11 @@ public class ReplicationPeerImpl implements ReplicationPeer {
this.peerConfigListeners = new ArrayList<>();
}
- void setPeerState(boolean enabled) {
+ public void setPeerState(boolean enabled) {
this.peerState = enabled ? PeerState.ENABLED : PeerState.DISABLED;
}
- void setPeerConfig(ReplicationPeerConfig peerConfig) {
+ public void setPeerConfig(ReplicationPeerConfig peerConfig) {
this.peerConfig = peerConfig;
peerConfigListeners.forEach(listener -> listener.peerConfigUpdated(peerConfig));
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/55d0d302/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
index ce8fdae..a02d181 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
@@ -62,18 +62,26 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
private void refreshPeerState(String peerId) throws ReplicationException, IOException {
PeerState newState;
Lock peerLock = peersLock.acquireLock(peerId);
+ ReplicationPeerImpl peer = null;
+ PeerState oldState = null;
+ boolean success = false;
try {
- ReplicationPeerImpl peer = replicationSourceManager.getReplicationPeers().getPeer(peerId);
+ peer = replicationSourceManager.getReplicationPeers().getPeer(peerId);
if (peer == null) {
throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
}
- PeerState oldState = peer.getPeerState();
+ oldState = peer.getPeerState();
newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId);
// RS need to start work with the new replication state change
if (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED)) {
replicationSourceManager.refreshSources(peerId);
}
+ success = true;
} finally {
+ if (!success && peer != null) {
+ // Reset peer state if refresh source failed
+ peer.setPeerState(oldState.equals(PeerState.ENABLED));
+ }
peerLock.unlock();
}
}
@@ -91,19 +99,27 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
@Override
public void updatePeerConfig(String peerId) throws ReplicationException, IOException {
Lock peerLock = peersLock.acquireLock(peerId);
+ ReplicationPeerImpl peer = null;
+ ReplicationPeerConfig oldConfig = null;
+ boolean success = false;
try {
- ReplicationPeerImpl peer = replicationSourceManager.getReplicationPeers().getPeer(peerId);
+ peer = replicationSourceManager.getReplicationPeers().getPeer(peerId);
if (peer == null) {
throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
}
- ReplicationPeerConfig oldConfig = peer.getPeerConfig();
+ oldConfig = peer.getPeerConfig();
ReplicationPeerConfig newConfig =
replicationSourceManager.getReplicationPeers().refreshPeerConfig(peerId);
// RS need to start work with the new replication config change
if (!ReplicationUtils.isKeyConfigEqual(oldConfig, newConfig)) {
replicationSourceManager.refreshSources(peerId);
}
+ success = true;
} finally {
+ if (!success && peer != null) {
+ // Reset peer config if refresh source failed
+ peer.setPeerConfig(oldConfig);
+ }
peerLock.unlock();
}
}