You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/01/29 12:36:05 UTC
[38/50] [abbrv] hbase git commit: HBASE-19633 Clean up the
replication queues in the postPeerModification stage when removing a peer
HBASE-19633 Clean up the replication queues in the postPeerModification stage when removing a peer
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/196b151d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/196b151d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/196b151d
Branch: refs/heads/HBASE-19397-branch-2
Commit: 196b151d860ef668110e68648407bb0f41d8513e
Parents: ad6af56
Author: zhangduo <zh...@apache.org>
Authored: Tue Jan 2 09:57:23 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Mon Jan 29 20:34:11 2018 +0800
----------------------------------------------------------------------
.../replication/ReplicationPeerConfig.java | 2 +-
.../replication/VerifyReplication.java | 34 ++++++++++-------
.../hbase/replication/ReplicationPeers.java | 32 ++++++----------
.../replication/ZKReplicationQueueStorage.java | 3 +-
.../replication/ZKReplicationStorageBase.java | 4 +-
.../replication/TestReplicationStateBasic.java | 10 +----
.../master/replication/AddPeerProcedure.java | 5 +--
.../replication/DisablePeerProcedure.java | 3 +-
.../master/replication/EnablePeerProcedure.java | 3 +-
.../master/replication/ModifyPeerProcedure.java | 34 +++++++++--------
.../replication/RefreshPeerProcedure.java | 17 ++++-----
.../master/replication/RemovePeerProcedure.java | 7 ++--
.../replication/ReplicationPeerManager.java | 31 +++++++++++++++-
.../replication/UpdatePeerConfigProcedure.java | 3 +-
.../RemoteProcedureResultReporter.java | 3 +-
.../regionserver/RefreshPeerCallable.java | 5 ++-
.../regionserver/ReplicationSourceManager.java | 39 +++++++-------------
.../TestReplicationAdminUsingProcedure.java | 7 ++--
18 files changed, 124 insertions(+), 118 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/196b151d/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
index b80ee16..fdae288 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
@@ -27,8 +27,8 @@ import java.util.Set;
import java.util.TreeMap;
import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
/**
* A configuration for the replication peer cluster.
http://git-wip-us.apache.org/repos/asf/hbase/blob/196b151d/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
index f0070f0..fe45762 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.mapreduce.replication;
import java.io.IOException;
import java.util.Arrays;
import java.util.UUID;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
@@ -45,13 +44,14 @@ import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
-import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableSplit;
import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
@@ -66,6 +66,7 @@ import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
@@ -333,19 +334,24 @@ public class VerifyReplication extends Configured implements Tool {
final Configuration conf, String peerId) throws IOException {
ZKWatcher localZKW = null;
try {
- localZKW = new ZKWatcher(conf, "VerifyReplication",
- new Abortable() {
- @Override public void abort(String why, Throwable e) {}
- @Override public boolean isAborted() {return false;}
- });
-
- ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf);
- rp.init();
+ localZKW = new ZKWatcher(conf, "VerifyReplication", new Abortable() {
+ @Override
+ public void abort(String why, Throwable e) {
+ }
- return Pair.newPair(rp.getPeerConfig(peerId), rp.getPeerClusterConfiguration(peerId));
+ @Override
+ public boolean isAborted() {
+ return false;
+ }
+ });
+ ReplicationPeerStorage storage =
+ ReplicationStorageFactory.getReplicationPeerStorage(localZKW, conf);
+ ReplicationPeerConfig peerConfig = storage.getPeerConfig(peerId);
+ return Pair.newPair(peerConfig,
+ ReplicationPeers.getPeerClusterConfiguration(peerConfig, conf));
} catch (ReplicationException e) {
- throw new IOException(
- "An error occurred while trying to connect to the remove peer cluster", e);
+ throw new IOException("An error occurred while trying to connect to the remove peer cluster",
+ e);
} finally {
if (localZKW != null) {
localZKW.close();
http://git-wip-us.apache.org/repos/asf/hbase/blob/196b151d/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
index e58482e..422801b 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
@@ -27,8 +27,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
@@ -39,20 +37,22 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
@InterfaceAudience.Private
public class ReplicationPeers {
- private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeers.class);
-
private final Configuration conf;
// Map of peer clusters keyed by their id
private final ConcurrentMap<String, ReplicationPeerImpl> peerCache;
private final ReplicationPeerStorage peerStorage;
- protected ReplicationPeers(ZKWatcher zookeeper, Configuration conf) {
+ ReplicationPeers(ZKWatcher zookeeper, Configuration conf) {
this.conf = conf;
this.peerCache = new ConcurrentHashMap<>();
this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zookeeper, conf);
}
+ public Configuration getConf() {
+ return conf;
+ }
+
public void init() throws ReplicationException {
// Loading all existing peerIds into peer cache.
for (String peerId : this.peerStorage.listPeerIds()) {
@@ -120,22 +120,13 @@ public class ReplicationPeers {
return peerCache.keySet();
}
- public ReplicationPeerConfig getPeerConfig(String peerId) {
- ReplicationPeer replicationPeer = this.peerCache.get(peerId);
- if (replicationPeer == null) {
- throw new IllegalArgumentException("Peer with id= " + peerId + " is not cached");
- }
- return replicationPeer.getPeerConfig();
- }
-
- public Configuration getPeerClusterConfiguration(String peerId) throws ReplicationException {
- ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
-
+ public static Configuration getPeerClusterConfiguration(ReplicationPeerConfig peerConfig,
+ Configuration baseConf) throws ReplicationException {
Configuration otherConf;
try {
- otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey());
+ otherConf = HBaseConfiguration.createClusterConf(baseConf, peerConfig.getClusterKey());
} catch (IOException e) {
- throw new ReplicationException("Can't get peer configuration for peerId=" + peerId, e);
+ throw new ReplicationException("Can't get peer configuration for peer " + peerConfig, e);
}
if (!peerConfig.getConfiguration().isEmpty()) {
@@ -179,8 +170,9 @@ public class ReplicationPeers {
>>>>>>> HBASE-19622 Reimplement ReplicationPeers with the new replication storage interface
*/
private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException {
- ReplicationPeerConfig peerConf = peerStorage.getPeerConfig(peerId);
+ ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
boolean enabled = peerStorage.isPeerEnabled(peerId);
- return new ReplicationPeerImpl(getPeerClusterConfiguration(peerId), peerId, enabled, peerConf);
+ return new ReplicationPeerImpl(getPeerClusterConfiguration(peerConfig, conf), peerId, enabled,
+ peerConfig);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/196b151d/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
index 41f50d8..ee237f2 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
@@ -27,7 +27,6 @@ import java.util.List;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
@@ -50,7 +49,7 @@ import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
* ZK based replication queue storage.
http://git-wip-us.apache.org/repos/asf/hbase/blob/196b151d/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java
index d09a56b..2321e4f 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java
@@ -19,13 +19,13 @@ package org.apache.hadoop.hbase.replication;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream;
+import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
http://git-wip-us.apache.org/repos/asf/hbase/blob/196b151d/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index 2589199..07c6c15 100644
--- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -238,12 +237,6 @@ public abstract class TestReplicationStateBasic {
} catch (ReplicationException e) {
}
- try {
- assertNull(rp.getPeerClusterConfiguration("bogus"));
- fail("Should have thrown an ReplicationException when passed a bogus peerId");
- } catch (ReplicationException e) {
- }
-
assertNumberOfPeers(0);
// Add some peers
@@ -258,7 +251,8 @@ public abstract class TestReplicationStateBasic {
fail("There are no connected peers, should have thrown an IllegalArgumentException");
} catch (IllegalArgumentException e) {
}
- assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(rp.getPeerClusterConfiguration(ID_ONE)));
+ assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationPeers
+ .getPeerClusterConfiguration(rp.getPeerStorage().getPeerConfig(ID_ONE), rp.getConf())));
rp.getPeerStorage().removePeer(ID_ONE);
rp.removePeer(ID_ONE);
assertNumberOfPeers(1);
http://git-wip-us.apache.org/repos/asf/hbase/blob/196b151d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
index a4f9b32..f0f7704 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
-
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
@@ -74,8 +73,8 @@ public class AddPeerProcedure extends ModifyPeerProcedure {
@Override
protected void postPeerModification(MasterProcedureEnv env) throws IOException {
- LOG.info("Successfully added " + (enabled ? "ENABLED" : "DISABLED") + " peer " + peerId +
- ", config " + peerConfig);
+ LOG.info("Successfully added {} peer {}, config {}", enabled ? "ENABLED" : "DISABLED", peerId,
+ peerConfig);
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {
env.getMasterCoprocessorHost().postAddReplicationPeer(peerId, peerConfig);
http://git-wip-us.apache.org/repos/asf/hbase/blob/196b151d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java
index 10e35a8..0871575 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
-
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.replication.ReplicationException;
@@ -62,7 +61,7 @@ public class DisablePeerProcedure extends ModifyPeerProcedure {
@Override
protected void postPeerModification(MasterProcedureEnv env) throws IOException {
- LOG.info("Successfully disabled peer " + peerId);
+ LOG.info("Successfully disabled peer {}", peerId);
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {
cpHost.postDisableReplicationPeer(peerId);
http://git-wip-us.apache.org/repos/asf/hbase/blob/196b151d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java
index f2a9f01..890462f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
-
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.replication.ReplicationException;
@@ -62,7 +61,7 @@ public class EnablePeerProcedure extends ModifyPeerProcedure {
@Override
protected void postPeerModification(MasterProcedureEnv env) throws IOException {
- LOG.info("Successfully enabled peer " + peerId);
+ LOG.info("Successfully enabled peer {}", peerId);
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {
cpHost.postEnableReplicationPeer(peerId);
http://git-wip-us.apache.org/repos/asf/hbase/blob/196b151d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
index a682606..c225619 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
-
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
@@ -84,10 +83,13 @@ public abstract class ModifyPeerProcedure
* Called before we finish the procedure. The implementation can do some logging work, and also
* call the coprocessor hook if any.
* <p>
- * Notice that, since we have already done the actual work, throwing exception here will not fail
- * this procedure, we will just ignore it and finish the procedure as suceeded.
+ * Notice that, since we have already done the actual work, throwing {@code IOException} here will
+ * not fail this procedure, we will just ignore it and finish the procedure as suceeded. If
+ * {@code ReplicationException} is thrown we will retry since this usually means we fails to
+ * update the peer storage.
*/
- protected abstract void postPeerModification(MasterProcedureEnv env) throws IOException;
+ protected abstract void postPeerModification(MasterProcedureEnv env)
+ throws IOException, ReplicationException;
private void releaseLatch() {
ProcedurePrepareLatch.releaseLatch(latch, this);
@@ -101,16 +103,14 @@ public abstract class ModifyPeerProcedure
try {
prePeerModification(env);
} catch (IOException e) {
- LOG.warn(
- getClass().getName() + " failed to call CP hook or the pre check is failed for peer " +
- peerId + ", mark the procedure as failure and give up",
- e);
+ LOG.warn("{} failed to call pre CP hook or the pre check is failed for peer {}, " +
+ "mark the procedure as failure and give up", getClass().getName(), peerId, e);
setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", e);
releaseLatch();
return Flow.NO_MORE_STATE;
} catch (ReplicationException e) {
- LOG.warn(getClass().getName() + " failed to call prePeerModification for peer " + peerId +
- ", retry", e);
+ LOG.warn("{} failed to call prePeerModification for peer {}, retry", getClass().getName(),
+ peerId, e);
throw new ProcedureYieldException();
}
setNextState(PeerModificationState.UPDATE_PEER_STORAGE);
@@ -119,8 +119,8 @@ public abstract class ModifyPeerProcedure
try {
updatePeerStorage(env);
} catch (ReplicationException e) {
- LOG.warn(
- getClass().getName() + " update peer storage for peer " + peerId + " failed, retry", e);
+ LOG.warn("{} update peer storage for peer {} failed, retry", getClass().getName(), peerId,
+ e);
throw new ProcedureYieldException();
}
setNextState(PeerModificationState.REFRESH_PEER_ON_RS);
@@ -134,9 +134,13 @@ public abstract class ModifyPeerProcedure
case POST_PEER_MODIFICATION:
try {
postPeerModification(env);
+ } catch (ReplicationException e) {
+ LOG.warn("{} failed to call postPeerModification for peer {}, retry",
+ getClass().getName(), peerId, e);
+ throw new ProcedureYieldException();
} catch (IOException e) {
- LOG.warn(getClass().getName() + " failed to call prePeerModification for peer " + peerId +
- ", ignore since the procedure has already done", e);
+ LOG.warn("{} failed to call post CP hook for peer {}, " +
+ "ignore since the procedure has already done", getClass().getName(), peerId, e);
}
releaseLatch();
return Flow.NO_MORE_STATE;
@@ -175,7 +179,7 @@ public abstract class ModifyPeerProcedure
throws IOException, InterruptedException {
if (state == PeerModificationState.PRE_PEER_MODIFICATION) {
// actually the peer related operations has no rollback, but if we haven't done any
- // modifications on the peer storage, we can just return.
+ // modifications on the peer storage yet, we can just return.
return;
}
throw new UnsupportedOperationException();
http://git-wip-us.apache.org/repos/asf/hbase/blob/196b151d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java
index ba4285f..1253ef9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
-
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
@@ -122,17 +121,15 @@ public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv>
private void complete(MasterProcedureEnv env, Throwable error) {
if (event == null) {
- LOG.warn("procedure event for " + getProcId() +
- " is null, maybe the procedure is created when recovery",
- new Exception());
+ LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery",
+ getProcId());
return;
}
if (error != null) {
- LOG.warn("Refresh peer " + peerId + " for " + type + " on " + targetServer + " failed",
- error);
+ LOG.warn("Refresh peer {} for {} on {} failed", peerId, type, targetServer, error);
this.succ = false;
} else {
- LOG.info("Refresh peer " + peerId + " for " + type + " on " + targetServer + " suceeded");
+ LOG.info("Refresh peer {} for {} on {} suceeded", peerId, type, targetServer);
this.succ = true;
}
@@ -168,9 +165,9 @@ public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv>
dispatched = false;
}
if (!env.getRemoteDispatcher().addOperationToNode(targetServer, this)) {
- LOG.info("Can not add remote operation for refreshing peer " + peerId + " for " + type +
- " to " + targetServer + ", this usually because the server is already dead," +
- " give up and mark the procedure as complete");
+ LOG.info("Can not add remote operation for refreshing peer {} for {} to {}, " +
+ "this usually because the server is already dead, " +
+ "give up and mark the procedure as complete", peerId, type, targetServer);
return null;
}
dispatched = true;
http://git-wip-us.apache.org/repos/asf/hbase/blob/196b151d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
index 6e9c384..64faf2b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
-
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.replication.ReplicationException;
@@ -61,8 +60,10 @@ public class RemovePeerProcedure extends ModifyPeerProcedure {
}
@Override
- protected void postPeerModification(MasterProcedureEnv env) throws IOException {
- LOG.info("Successfully removed peer " + peerId);
+ protected void postPeerModification(MasterProcedureEnv env)
+ throws IOException, ReplicationException {
+ env.getReplicationPeerManager().removeAllQueuesAndHFileRefs(peerId);
+ LOG.info("Successfully removed peer {}", peerId);
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {
cpHost.postRemoveReplicationPeer(peerId);
http://git-wip-us.apache.org/repos/asf/hbase/blob/196b151d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index b6732d7..1414d22 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -28,7 +28,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -217,6 +216,36 @@ public class ReplicationPeerManager {
return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty();
}
+ private void removeAllQueues0(String peerId) throws ReplicationException {
+ for (ServerName replicator : queueStorage.getListOfReplicators()) {
+ List<String> queueIds = queueStorage.getAllQueues(replicator);
+ for (String queueId : queueIds) {
+ ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
+ if (queueInfo.getPeerId().equals(peerId)) {
+ queueStorage.removeQueue(replicator, queueId);
+ }
+ }
+ queueStorage.removeReplicatorIfQueueIsEmpty(replicator);
+ }
+ }
+
+ public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException {
+ // Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still
+ // on-going when the refresh peer config procedure is done, if a RS which has already been
+ // scanned claims the queue of a RS which has not been scanned yet, we will miss that queue in
+ // the scan here, and if the RS who has claimed the queue crashed before creating recovered
+ // source, then the queue will leave there until the another RS detects the crash and helps
+ // removing the queue.
+ // A two pass scan can solve the problem. Anyway, the queue will not disappear during the
+ // claiming, it will either under the old RS or under the new RS, and a queue can only be
+ // claimed once after the refresh peer procedure done(as the next claim queue will just delete
+ // it), so we can make sure that a two pass scan will finally find the queue and remove it,
+ // unless it has already been removed by others.
+ removeAllQueues0(peerId);
+ removeAllQueues0(peerId);
+ queueStorage.removePeerFromHFileRefs(peerId);
+ }
+
private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException {
checkClusterKey(peerConfig.getClusterKey());
http://git-wip-us.apache.org/repos/asf/hbase/blob/196b151d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
index a43532d..3497447 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
-
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
@@ -70,7 +69,7 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure {
@Override
protected void postPeerModification(MasterProcedureEnv env) throws IOException {
- LOG.info("Successfully updated peer config of " + peerId + " to " + peerConfig);
+ LOG.info("Successfully updated peer config of {} to {}", peerId, peerConfig);
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {
cpHost.postUpdateReplicationPeerConfig(peerId, peerConfig);
http://git-wip-us.apache.org/repos/asf/hbase/blob/196b151d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java
index e4be422..ac3e95a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java
@@ -28,7 +28,8 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat;
+import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RemoteProcedureResult;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest;
http://git-wip-us.apache.org/repos/asf/hbase/blob/196b151d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java
index c3f33aa..7ada24b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java
@@ -20,11 +20,12 @@ package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationType;
import org.apache.log4j.Logger;
import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationType;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerParameter;
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/196b151d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 1359575..c289c74 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -59,7 +59,6 @@ import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationListener;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
-import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
@@ -306,9 +305,8 @@ public class ReplicationSourceManager implements ReplicationListener {
*/
@VisibleForTesting
ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException {
- ReplicationPeerConfig peerConfig = replicationPeers.getPeerConfig(id);
ReplicationPeer peer = replicationPeers.getPeer(id);
- ReplicationSourceInterface src = getReplicationSource(id, peerConfig, peer);
+ ReplicationSourceInterface src = getReplicationSource(id, peer);
synchronized (this.walsById) {
this.sources.add(src);
Map<String, SortedSet<String>> walsByGroup = new HashMap<>();
@@ -499,8 +497,8 @@ public class ReplicationSourceManager implements ReplicationListener {
* @param peerId the id of the peer cluster
* @return the created source
*/
- private ReplicationSourceInterface getReplicationSource(String peerId,
- ReplicationPeerConfig peerConfig, ReplicationPeer replicationPeer) throws IOException {
+ private ReplicationSourceInterface getReplicationSource(String peerId, ReplicationPeer peer)
+ throws IOException {
RegionServerCoprocessorHost rsServerHost = null;
TableDescriptors tableDescriptors = null;
if (server instanceof HRegionServer) {
@@ -512,24 +510,24 @@ public class ReplicationSourceManager implements ReplicationListener {
ReplicationEndpoint replicationEndpoint = null;
try {
- String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
+ String replicationEndpointImpl = peer.getPeerConfig().getReplicationEndpointImpl();
if (replicationEndpointImpl == null) {
// Default to HBase inter-cluster replication endpoint
replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
}
replicationEndpoint = Class.forName(replicationEndpointImpl)
.asSubclass(ReplicationEndpoint.class).newInstance();
- if(rsServerHost != null) {
- ReplicationEndpoint newReplicationEndPoint = rsServerHost
- .postCreateReplicationEndPoint(replicationEndpoint);
- if(newReplicationEndPoint != null) {
+ if (rsServerHost != null) {
+ ReplicationEndpoint newReplicationEndPoint =
+ rsServerHost.postCreateReplicationEndPoint(replicationEndpoint);
+ if (newReplicationEndPoint != null) {
// Override the newly created endpoint from the hook with configured end point
replicationEndpoint = newReplicationEndPoint;
}
}
} catch (Exception e) {
- LOG.warn("Passed replication endpoint implementation throws errors"
- + " while initializing ReplicationSource for peer: " + peerId, e);
+ LOG.warn("Passed replication endpoint implementation throws errors" +
+ " while initializing ReplicationSource for peer: " + peerId, e);
throw new IOException(e);
}
@@ -539,8 +537,8 @@ public class ReplicationSourceManager implements ReplicationListener {
replicationEndpoint, walFileLengthProvider, metrics);
// init replication endpoint
- replicationEndpoint.init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(),
- fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors, server));
+ replicationEndpoint.init(new ReplicationEndpoint.Context(conf, peer.getConfiguration(), fs,
+ peerId, clusterId, peer, metrics, tableDescriptors, server));
return src;
}
@@ -736,17 +734,6 @@ public class ReplicationSourceManager implements ReplicationListener {
abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), peerId));
continue;
}
-
- ReplicationPeerConfig peerConfig = null;
- try {
- peerConfig = replicationPeers.getPeerConfig(actualPeerId);
- } catch (Exception e) {
- LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS
- + ", failed to read peer config", e);
- abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), peerId));
- continue;
- }
-
// track sources in walsByIdRecoveredQueues
Map<String, SortedSet<String>> walsByGroup = new HashMap<>();
walsByIdRecoveredQueues.put(peerId, walsByGroup);
@@ -761,7 +748,7 @@ public class ReplicationSourceManager implements ReplicationListener {
}
// enqueue sources
- ReplicationSourceInterface src = getReplicationSource(peerId, peerConfig, peer);
+ ReplicationSourceInterface src = getReplicationSource(peerId, peer);
// synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
// see removePeer
synchronized (oldsources) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/196b151d/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminUsingProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminUsingProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminUsingProcedure.java
index b09a8a7..1300376 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminUsingProcedure.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminUsingProcedure.java
@@ -15,19 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hbase.client.replication;
import java.io.IOException;
-
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList;
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -39,6 +35,9 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+
@Category({ MediumTests.class, ClientTests.class })
public class TestReplicationAdminUsingProcedure extends TestReplicationBase {