You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/01/04 08:19:54 UTC
[20/38] hbase git commit: HBASE-19617 Remove ReplicationQueues,
use ReplicationQueueStorage directly
http://git-wip-us.apache.org/repos/asf/hbase/blob/72f1e971/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index b6cf54d..4b9ed74 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.util.List;
import java.util.UUID;
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -32,9 +31,10 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
-import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.yetus.audience.InterfaceAudience;
/**
* Interface that defines a replication source
@@ -47,15 +47,10 @@ public interface ReplicationSourceInterface {
* @param conf the configuration to use
* @param fs the file system to use
* @param manager the manager to use
- * @param replicationQueues
- * @param replicationPeers
* @param server the server for this region server
- * @param peerClusterZnode
- * @param clusterId
- * @throws IOException
*/
void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
- ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Server server,
+ ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, Server server,
String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint,
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException;
http://git-wip-us.apache.org/repos/asf/hbase/blob/72f1e971/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index b1d82c8..853bafb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -34,19 +34,21 @@ import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
@@ -60,7 +62,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
-import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
@@ -68,6 +70,7 @@ import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -95,7 +98,7 @@ public class ReplicationSourceManager implements ReplicationListener {
private final List<ReplicationSourceInterface> sources;
// List of all the sources we got from died RSs
private final List<ReplicationSourceInterface> oldsources;
- private final ReplicationQueues replicationQueues;
+ private final ReplicationQueueStorage queueStorage;
private final ReplicationTracker replicationTracker;
private final ReplicationPeers replicationPeers;
// UUID for this cluster
@@ -130,7 +133,7 @@ public class ReplicationSourceManager implements ReplicationListener {
/**
* Creates a replication manager and sets the watch on all the other registered region servers
- * @param replicationQueues the interface for manipulating replication queues
+ * @param queueStorage the interface for manipulating replication queues
* @param replicationPeers
* @param replicationTracker
* @param conf the configuration to use
@@ -140,14 +143,14 @@ public class ReplicationSourceManager implements ReplicationListener {
* @param oldLogDir the directory where old logs are archived
* @param clusterId
*/
- public ReplicationSourceManager(ReplicationQueues replicationQueues,
+ public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf,
Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId,
WALFileLengthProvider walFileLengthProvider) throws IOException {
//CopyOnWriteArrayList is thread-safe.
//Generally, reading is more than modifying.
this.sources = new CopyOnWriteArrayList<>();
- this.replicationQueues = replicationQueues;
+ this.queueStorage = queueStorage;
this.replicationPeers = replicationPeers;
this.replicationTracker = replicationTracker;
this.server = server;
@@ -184,6 +187,19 @@ public class ReplicationSourceManager implements ReplicationListener {
connection = ConnectionFactory.createConnection(conf);
}
+ @FunctionalInterface
+ private interface ReplicationQueueOperation {
+ void exec() throws ReplicationException;
+ }
+
+ private void abortWhenFail(ReplicationQueueOperation op) {
+ try {
+ op.exec();
+ } catch (ReplicationException e) {
+ server.abort("Failed to operate on replication queue", e);
+ }
+ }
+
/**
* Provide the id of the peer and a log key and this method will figure which
* wal it belongs to and will log, for this region server, the current
@@ -195,12 +211,13 @@ public class ReplicationSourceManager implements ReplicationListener {
* @param queueRecovered indicates if this queue comes from another region server
* @param holdLogInZK if true then the log is retained in ZK
*/
- public void logPositionAndCleanOldLogs(Path log, String id, long position,
- boolean queueRecovered, boolean holdLogInZK) {
+ public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered,
+ boolean holdLogInZK) {
String fileName = log.getName();
- this.replicationQueues.setLogPosition(id, fileName, position);
+ abortWhenFail(
+ () -> this.queueStorage.setWALPosition(server.getServerName(), id, fileName, position));
if (holdLogInZK) {
- return;
+ return;
}
cleanOldLogs(fileName, id, queueRecovered);
}
@@ -227,36 +244,59 @@ public class ReplicationSourceManager implements ReplicationListener {
}
}
}
- }
+ }
private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
SortedSet<String> walSet = wals.headSet(key);
- LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
+ }
for (String wal : walSet) {
- this.replicationQueues.removeLog(id, wal);
+ abortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), id, wal));
}
walSet.clear();
}
+ private void adoptAbandonedQueues() {
+ List<ServerName> currentReplicators = null;
+ try {
+ currentReplicators = queueStorage.getListOfReplicators();
+ } catch (ReplicationException e) {
+ server.abort("Failed to get all replicators", e);
+ return;
+ }
+ if (currentReplicators == null || currentReplicators.isEmpty()) {
+ return;
+ }
+ List<ServerName> otherRegionServers = replicationTracker.getListOfRegionServers().stream()
+ .map(ServerName::valueOf).collect(Collectors.toList());
+ LOG.info(
+ "Current list of replicators: " + currentReplicators + " other RSs: " + otherRegionServers);
+
+ // Look if there's anything to process after a restart
+ for (ServerName rs : currentReplicators) {
+ if (!otherRegionServers.contains(rs)) {
+ transferQueues(rs);
+ }
+ }
+ }
+
/**
- * Adds a normal source per registered peer cluster and tries to process all
- * old region server wal queues
+ * Adds a normal source per registered peer cluster and tries to process all old region server wal
+ * queues
+ * <p>
+ * The returned future is for adoptAbandonedQueues task.
*/
- void init() throws IOException, ReplicationException {
+ Future<?> init() throws IOException, ReplicationException {
for (String id : this.replicationPeers.getConnectedPeerIds()) {
addSource(id);
if (replicationForBulkLoadDataEnabled) {
// Check if peer exists in hfile-refs queue, if not add it. This can happen in the case
// when a peer was added before replication for bulk loaded data was enabled.
- this.replicationQueues.addPeerToHFileRefs(id);
+ this.queueStorage.addPeerToHFileRefs(id);
}
}
- AdoptAbandonedQueuesWorker adoptionWorker = new AdoptAbandonedQueuesWorker();
- try {
- this.executor.execute(adoptionWorker);
- } catch (RejectedExecutionException ex) {
- LOG.info("Cancelling the adoption of abandoned queues because of " + ex.getMessage());
- }
+ return this.executor.submit(this::adoptAbandonedQueues);
}
/**
@@ -264,15 +304,12 @@ public class ReplicationSourceManager implements ReplicationListener {
* need to enqueue the latest log of each wal group and do replication
* @param id the id of the peer cluster
* @return the source that was created
- * @throws IOException
*/
@VisibleForTesting
ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException {
ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id);
ReplicationPeer peer = replicationPeers.getConnectedPeer(id);
- ReplicationSourceInterface src = getReplicationSource(this.conf, this.fs, this,
- this.replicationQueues, this.replicationPeers, server, id, this.clusterId, peerConfig, peer,
- walFileLengthProvider);
+ ReplicationSourceInterface src = getReplicationSource(id, peerConfig, peer);
synchronized (this.walsById) {
this.sources.add(src);
Map<String, SortedSet<String>> walsByGroup = new HashMap<>();
@@ -287,11 +324,10 @@ public class ReplicationSourceManager implements ReplicationListener {
logs.add(name);
walsByGroup.put(walPrefix, logs);
try {
- this.replicationQueues.addLog(id, name);
+ this.queueStorage.addWAL(server.getServerName(), id, name);
} catch (ReplicationException e) {
- String message =
- "Cannot add log to queue when creating a new source, queueId=" + id
- + ", filename=" + name;
+ String message = "Cannot add log to queue when creating a new source, queueId=" + id +
+ ", filename=" + name;
server.stop(message);
throw e;
}
@@ -316,7 +352,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* @param peerId Id of the peer cluster queue of wals to delete
*/
public void deleteSource(String peerId, boolean closeConnection) {
- this.replicationQueues.removeQueue(peerId);
+ abortWhenFail(() -> this.queueStorage.removeQueue(server.getServerName(), peerId));
if (closeConnection) {
this.replicationPeers.peerDisconnected(peerId);
}
@@ -376,8 +412,8 @@ public class ReplicationSourceManager implements ReplicationListener {
}
@VisibleForTesting
- List<String> getAllQueues() {
- return replicationQueues.getAllQueues();
+ List<String> getAllQueues() throws ReplicationException {
+ return queueStorage.getAllQueues(server.getServerName());
}
void preLogRoll(Path newLog) throws IOException {
@@ -411,10 +447,10 @@ public class ReplicationSourceManager implements ReplicationListener {
synchronized (replicationPeers) {
for (String id : replicationPeers.getConnectedPeerIds()) {
try {
- this.replicationQueues.addLog(id, logName);
+ this.queueStorage.addWAL(server.getServerName(), id, logName);
} catch (ReplicationException e) {
- throw new IOException("Cannot add log to replication queue"
- + " when creating a new source, queueId=" + id + ", filename=" + logName, e);
+ throw new IOException("Cannot add log to replication queue" +
+ " when creating a new source, queueId=" + id + ", filename=" + logName, e);
}
}
}
@@ -461,19 +497,11 @@ public class ReplicationSourceManager implements ReplicationListener {
/**
* Factory method to create a replication source
- * @param conf the configuration to use
- * @param fs the file system to use
- * @param manager the manager to use
- * @param server the server object for this region server
* @param peerId the id of the peer cluster
* @return the created source
- * @throws IOException
*/
- private ReplicationSourceInterface getReplicationSource(Configuration conf, FileSystem fs,
- ReplicationSourceManager manager, ReplicationQueues replicationQueues,
- ReplicationPeers replicationPeers, Server server, String peerId, UUID clusterId,
- ReplicationPeerConfig peerConfig, ReplicationPeer replicationPeer,
- WALFileLengthProvider walFileLengthProvider) throws IOException {
+ private ReplicationSourceInterface getReplicationSource(String peerId,
+ ReplicationPeerConfig peerConfig, ReplicationPeer replicationPeer) throws IOException {
RegionServerCoprocessorHost rsServerHost = null;
TableDescriptors tableDescriptors = null;
if (server instanceof HRegionServer) {
@@ -490,9 +518,8 @@ public class ReplicationSourceManager implements ReplicationListener {
// Default to HBase inter-cluster replication endpoint
replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
}
- @SuppressWarnings("rawtypes")
- Class c = Class.forName(replicationEndpointImpl);
- replicationEndpoint = (ReplicationEndpoint) c.newInstance();
+ replicationEndpoint = Class.forName(replicationEndpointImpl)
+ .asSubclass(ReplicationEndpoint.class).newInstance();
if(rsServerHost != null) {
ReplicationEndpoint newReplicationEndPoint = rsServerHost
.postCreateReplicationEndPoint(replicationEndpoint);
@@ -509,7 +536,7 @@ public class ReplicationSourceManager implements ReplicationListener {
MetricsSource metrics = new MetricsSource(peerId);
// init replication source
- src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId, clusterId,
+ src.init(conf, fs, this, queueStorage, replicationPeers, server, peerId, clusterId,
replicationEndpoint, walFileLengthProvider, metrics);
// init replication endpoint
@@ -520,21 +547,21 @@ public class ReplicationSourceManager implements ReplicationListener {
}
/**
- * Transfer all the queues of the specified to this region server.
- * First it tries to grab a lock and if it works it will move the
- * znodes and finally will delete the old znodes.
- *
+ * Transfer all the queues of the specified to this region server. First it tries to grab a lock
+ * and if it works it will move the znodes and finally will delete the old znodes.
+ * <p>
* It creates one old source for any type of source of the old rs.
- * @param rsZnode
*/
- private void transferQueues(String rsZnode) {
- NodeFailoverWorker transfer =
- new NodeFailoverWorker(rsZnode, this.replicationQueues, this.replicationPeers,
- this.clusterId);
+ private void transferQueues(ServerName deadRS) {
+ if (server.getServerName().equals(deadRS)) {
+ // it's just us, give up
+ return;
+ }
+ NodeFailoverWorker transfer = new NodeFailoverWorker(deadRS);
try {
this.executor.execute(transfer);
} catch (RejectedExecutionException ex) {
- LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage());
+ LOG.info("Cancelling the transfer of " + deadRS + " because of " + ex.getMessage());
}
}
@@ -571,7 +598,7 @@ public class ReplicationSourceManager implements ReplicationListener {
LOG.info("Peer " + id + " connected success, trying to start the replication source thread.");
addSource(id);
if (replicationForBulkLoadDataEnabled) {
- this.replicationQueues.addPeerToHFileRefs(id);
+ this.queueStorage.addPeerToHFileRefs(id);
}
}
}
@@ -624,12 +651,12 @@ public class ReplicationSourceManager implements ReplicationListener {
deleteSource(id, true);
}
// Remove HFile Refs znode from zookeeper
- this.replicationQueues.removePeerFromHFileRefs(id);
+ abortWhenFail(() -> this.queueStorage.removePeerFromHFileRefs(id));
}
@Override
public void regionServerRemoved(String regionserver) {
- transferQueues(regionserver);
+ transferQueues(ServerName.valueOf(regionserver));
}
/**
@@ -638,37 +665,21 @@ public class ReplicationSourceManager implements ReplicationListener {
*/
class NodeFailoverWorker extends Thread {
- private String rsZnode;
- private final ReplicationQueues rq;
- private final ReplicationPeers rp;
- private final UUID clusterId;
+ private final ServerName deadRS;
- /**
- * @param rsZnode
- */
- public NodeFailoverWorker(String rsZnode) {
- this(rsZnode, replicationQueues, replicationPeers, ReplicationSourceManager.this.clusterId);
- }
-
- public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
- final ReplicationPeers replicationPeers, final UUID clusterId) {
- super("Failover-for-"+rsZnode);
- this.rsZnode = rsZnode;
- this.rq = replicationQueues;
- this.rp = replicationPeers;
- this.clusterId = clusterId;
+ @VisibleForTesting
+ public NodeFailoverWorker(ServerName deadRS) {
+ super("Failover-for-" + deadRS);
+ this.deadRS = deadRS;
}
@Override
public void run() {
- if (this.rq.isThisOurRegionServer(rsZnode)) {
- return;
- }
// Wait a bit before transferring the queues, we may be shutting down.
// This sleep may not be enough in some cases.
try {
Thread.sleep(sleepBeforeFailover +
- (long) (ThreadLocalRandom.current().nextFloat() * sleepBeforeFailover));
+ (long) (ThreadLocalRandom.current().nextFloat() * sleepBeforeFailover));
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting before transferring a queue.");
Thread.currentThread().interrupt();
@@ -679,25 +690,30 @@ public class ReplicationSourceManager implements ReplicationListener {
return;
}
Map<String, Set<String>> newQueues = new HashMap<>();
- List<String> peers = rq.getUnClaimedQueueIds(rsZnode);
- while (peers != null && !peers.isEmpty()) {
- Pair<String, SortedSet<String>> peer = this.rq.claimQueue(rsZnode,
- peers.get(ThreadLocalRandom.current().nextInt(peers.size())));
- long sleep = sleepBeforeFailover/2;
- if (peer != null) {
- newQueues.put(peer.getFirst(), peer.getSecond());
- sleep = sleepBeforeFailover;
+ try {
+ List<String> peers = queueStorage.getAllQueues(deadRS);
+ while (!peers.isEmpty()) {
+ Pair<String, SortedSet<String>> peer = queueStorage.claimQueue(deadRS,
+ peers.get(ThreadLocalRandom.current().nextInt(peers.size())), server.getServerName());
+ long sleep = sleepBeforeFailover / 2;
+ if (!peer.getSecond().isEmpty()) {
+ newQueues.put(peer.getFirst(), peer.getSecond());
+ sleep = sleepBeforeFailover;
+ }
+ try {
+ Thread.sleep(sleep);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while waiting before transferring a queue.");
+ Thread.currentThread().interrupt();
+ }
+ peers = queueStorage.getAllQueues(deadRS);
}
- try {
- Thread.sleep(sleep);
- } catch (InterruptedException e) {
- LOG.warn("Interrupted while waiting before transferring a queue.");
- Thread.currentThread().interrupt();
+ if (!peers.isEmpty()) {
+ queueStorage.removeReplicatorIfQueueIsEmpty(deadRS);
}
- peers = rq.getUnClaimedQueueIds(rsZnode);
- }
- if (peers != null) {
- rq.removeReplicatorIfQueueIsEmpty(rsZnode);
+ } catch (ReplicationException e) {
+ server.abort("Failed to claim queue from dead regionserver", e);
+ return;
}
// Copying over the failed queue is completed.
if (newQueues.isEmpty()) {
@@ -722,8 +738,8 @@ public class ReplicationSourceManager implements ReplicationListener {
+ ex);
}
if (peer == null || peerConfig == null) {
- LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode);
- replicationQueues.removeQueue(peerId);
+ LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS);
+ abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), peerId));
continue;
}
// track sources in walsByIdRecoveredQueues
@@ -740,13 +756,11 @@ public class ReplicationSourceManager implements ReplicationListener {
}
// enqueue sources
- ReplicationSourceInterface src =
- getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
- server, peerId, this.clusterId, peerConfig, peer, walFileLengthProvider);
+ ReplicationSourceInterface src = getReplicationSource(peerId, peerConfig, peer);
// synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
// see removePeer
synchronized (oldsources) {
- if (!this.rp.getConnectedPeerIds().contains(src.getPeerId())) {
+ if (!replicationPeers.getConnectedPeerIds().contains(src.getPeerId())) {
src.terminate("Recovered queue doesn't belong to any current peer");
closeRecoveredQueue(src);
continue;
@@ -765,29 +779,6 @@ public class ReplicationSourceManager implements ReplicationListener {
}
}
- class AdoptAbandonedQueuesWorker extends Thread{
-
- public AdoptAbandonedQueuesWorker() {}
-
- @Override
- public void run() {
- List<String> currentReplicators = replicationQueues.getListOfReplicators();
- if (currentReplicators == null || currentReplicators.isEmpty()) {
- return;
- }
- List<String> otherRegionServers = replicationTracker.getListOfRegionServers();
- LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
- + otherRegionServers);
-
- // Look if there's anything to process after a restart
- for (String rs : currentReplicators) {
- if (!otherRegionServers.contains(rs)) {
- transferQueues(rs);
- }
- }
- }
- }
-
/**
* Get the directory where wals are archived
* @return the directory where wals are archived
@@ -846,7 +837,11 @@ public class ReplicationSourceManager implements ReplicationListener {
}
public void cleanUpHFileRefs(String peerId, List<String> files) {
- this.replicationQueues.removeHFileRefs(peerId, files);
+ abortWhenFail(() -> this.queueStorage.removeHFileRefs(peerId, files));
+ }
+
+ int activeFailoverTaskCount() {
+ return executor.getActiveCount();
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/72f1e971/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
index 21b8ac5..9ec244a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
@@ -37,22 +36,19 @@ import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
- * In a scenario of Replication based Disaster/Recovery, when hbase
- * Master-Cluster crashes, this tool is used to sync-up the delta from Master to
- * Slave using the info from ZooKeeper. The tool will run on Master-Cluser, and
- * assume ZK, Filesystem and NetWork still available after hbase crashes
+ * In a scenario of Replication based Disaster/Recovery, when hbase Master-Cluster crashes, this
+ * tool is used to sync-up the delta from Master to Slave using the info from ZooKeeper. The tool
+ * will run on Master-Cluser, and assume ZK, Filesystem and NetWork still available after hbase
+ * crashes
*
+ * <pre>
* hbase org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp
+ * </pre>
*/
-
public class ReplicationSyncUp extends Configured implements Tool {
- private static final Logger LOG = LoggerFactory.getLogger(ReplicationSyncUp.class.getName());
-
private static Configuration conf;
private static final long SLEEP_TIME = 10000;
@@ -105,13 +101,14 @@ public class ReplicationSyncUp extends Configured implements Tool {
System.out.println("Start Replication Server start");
replication = new Replication(new DummyServer(zkw), fs, logDir, oldLogDir);
manager = replication.getReplicationManager();
- manager.init();
+ manager.init().get();
try {
- int numberOfOldSource = 1; // default wait once
- while (numberOfOldSource > 0) {
+ while (manager.activeFailoverTaskCount() > 0) {
+ Thread.sleep(SLEEP_TIME);
+ }
+ while (manager.getOldSources().size() > 0) {
Thread.sleep(SLEEP_TIME);
- numberOfOldSource = manager.getOldSources().size();
}
} catch (InterruptedException e) {
System.err.println("didn't wait long enough:" + e);
@@ -121,7 +118,7 @@ public class ReplicationSyncUp extends Configured implements Tool {
manager.join();
zkw.close();
- return (0);
+ return 0;
}
static class DummyServer implements Server {
http://git-wip-us.apache.org/repos/asf/hbase/blob/72f1e971/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
index 1e75959..2de6608 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
@@ -44,9 +44,8 @@ import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.replication.ReplicationFactory;
-import org.apache.hadoop.hbase.replication.ReplicationQueues;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.testclassification.MasterTests;
@@ -111,9 +110,8 @@ public class TestLogsCleaner {
Replication.decorateMasterConfiguration(conf);
Server server = new DummyServer();
- ReplicationQueues repQueues = ReplicationFactory.getReplicationQueues(
- new ReplicationQueuesArguments(conf, server, server.getZooKeeper()));
- repQueues.init(server.getServerName().toString());
+ ReplicationQueueStorage queueStorage =
+ ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
final Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME);
final Path oldProcedureWALDir = new Path(oldLogDir, "masterProcedureWALs");
String fakeMachineName = URLEncoder.encode(server.getServerName().toString(), "UTF8");
@@ -144,7 +142,7 @@ public class TestLogsCleaner {
// Case 4: put 3 WALs in ZK indicating that they are scheduled for replication so these
// files would pass TimeToLiveLogCleaner but would be rejected by ReplicationLogCleaner
if (i % (30 / 3) == 1) {
- repQueues.addLog(fakeMachineName, fileName.getName());
+ queueStorage.addWAL(server.getServerName(), fakeMachineName, fileName.getName());
LOG.info("Replication log file: " + fileName);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/72f1e971/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
index f83695f..8802e36 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
@@ -46,9 +46,8 @@ import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
-import org.apache.hadoop.hbase.replication.ReplicationQueues;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.testclassification.MasterTests;
@@ -72,19 +71,16 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@Category({ MasterTests.class, SmallTests.class })
public class TestReplicationHFileCleaner {
- private static final Logger LOG = LoggerFactory.getLogger(ReplicationQueuesZKImpl.class);
+ private static final Logger LOG = LoggerFactory.getLogger(TestReplicationHFileCleaner.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static Server server;
- private static ReplicationQueues rq;
+ private static ReplicationQueueStorage rq;
private static ReplicationPeers rp;
private static final String peerId = "TestReplicationHFileCleaner";
private static Configuration conf = TEST_UTIL.getConfiguration();
static FileSystem fs = null;
Path root;
- /**
- * @throws java.lang.Exception
- */
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniZKCluster();
@@ -93,20 +89,10 @@ public class TestReplicationHFileCleaner {
Replication.decorateMasterConfiguration(conf);
rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf, server);
rp.init();
- rq = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, server, server.getZooKeeper()));
- rq.init(server.getServerName().toString());
- try {
- fs = FileSystem.get(conf);
- } finally {
- if (fs != null) {
- fs.close();
- }
- }
+ rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
+ fs = FileSystem.get(conf);
}
- /**
- * @throws java.lang.Exception
- */
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniZKCluster();
http://git-wip-us.apache.org/repos/asf/hbase/blob/72f1e971/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java
index 8178266..2ad8bd7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java
@@ -26,10 +26,8 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.replication.ReplicationFactory;
-import org.apache.hadoop.hbase.replication.ReplicationQueues;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
@@ -49,14 +47,12 @@ public class TestReplicationZKNodeCleaner {
private final Configuration conf;
private final ZKWatcher zkw;
- private final ReplicationQueues repQueues;
+ private final ReplicationQueueStorage repQueues;
public TestReplicationZKNodeCleaner() throws Exception {
conf = TEST_UTIL.getConfiguration();
zkw = new ZKWatcher(conf, "TestReplicationZKNodeCleaner", null);
- repQueues = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, null,
- zkw));
- assertTrue(repQueues instanceof ReplicationQueuesZKImpl);
+ repQueues = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
}
@BeforeClass
@@ -72,9 +68,8 @@ public class TestReplicationZKNodeCleaner {
@Test
public void testReplicationZKNodeCleaner() throws Exception {
- repQueues.init(SERVER_ONE.getServerName());
// add queue for ID_ONE which isn't exist
- repQueues.addLog(ID_ONE, "file1");
+ repQueues.addWAL(SERVER_ONE, ID_ONE, "file1");
ReplicationZKNodeCleaner cleaner = new ReplicationZKNodeCleaner(conf, zkw, null);
Map<ServerName, List<String>> undeletedQueues = cleaner.getUnDeletedQueues();
@@ -84,7 +79,7 @@ public class TestReplicationZKNodeCleaner {
assertTrue(undeletedQueues.get(SERVER_ONE).contains(ID_ONE));
// add a recovery queue for ID_TWO which isn't exist
- repQueues.addLog(ID_TWO + "-" + SERVER_TWO, "file2");
+ repQueues.addWAL(SERVER_ONE, ID_TWO + "-" + SERVER_TWO, "file2");
undeletedQueues = cleaner.getUnDeletedQueues();
assertEquals(1, undeletedQueues.size());
@@ -100,11 +95,10 @@ public class TestReplicationZKNodeCleaner {
@Test
public void testReplicationZKNodeCleanerChore() throws Exception {
- repQueues.init(SERVER_ONE.getServerName());
// add queue for ID_ONE which isn't exist
- repQueues.addLog(ID_ONE, "file1");
+ repQueues.addWAL(SERVER_ONE, ID_ONE, "file1");
// add a recovery queue for ID_TWO which isn't exist
- repQueues.addLog(ID_TWO + "-" + SERVER_TWO, "file2");
+ repQueues.addWAL(SERVER_ONE, ID_TWO + "-" + SERVER_TWO, "file2");
// Wait the cleaner chore to run
Thread.sleep(20000);
http://git-wip-us.apache.org/repos/asf/hbase/blob/72f1e971/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index 7ea79f9..14c5e56 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -46,9 +45,10 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
MetricsSource metrics;
WALFileLengthProvider walFileLengthProvider;
AtomicBoolean startup = new AtomicBoolean(false);
+
@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
- ReplicationQueues rq, ReplicationPeers rp, Server server, String peerClusterId,
+ ReplicationQueueStorage rq, ReplicationPeers rp, Server server, String peerClusterId,
UUID clusterId, ReplicationEndpoint replicationEndpoint,
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
this.manager = manager;
http://git-wip-us.apache.org/repos/asf/hbase/blob/72f1e971/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
index 0a602ad..0313b3b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -64,7 +64,6 @@ public class TestReplicationSyncUpTool extends TestReplicationBase {
@Before
public void setUp() throws Exception {
-
HColumnDescriptor fam;
t1_syncupSource = new HTableDescriptor(t1_su);
@@ -100,7 +99,7 @@ public class TestReplicationSyncUpTool extends TestReplicationBase {
* check's gone Also check the puts and deletes are not replicated back to
* the originating cluster.
*/
- @Test(timeout = 300000)
+ @Test
public void testSyncUpTool() throws Exception {
/**
@@ -176,7 +175,6 @@ public class TestReplicationSyncUpTool extends TestReplicationBase {
* verify correctly replicated to Slave
*/
mimicSyncUpAfterPut();
-
}
protected void setupReplication() throws Exception {
http://git-wip-us.apache.org/repos/asf/hbase/blob/72f1e971/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 7b07d15..f6724df 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -40,7 +40,6 @@ import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -67,10 +66,10 @@ import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
-import org.apache.hadoop.hbase.replication.ReplicationQueues;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
@@ -95,11 +94,13 @@ import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
-import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
+import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
+
/**
* An abstract class that tests ReplicationSourceManager. Classes that extend this class should
* set up the proper config for this class and initialize the proper cluster using
@@ -328,18 +329,14 @@ public abstract class TestReplicationSourceManager {
@Test
public void testClaimQueues() throws Exception {
- final Server server = new DummyServer("hostname0.example.org");
-
-
- ReplicationQueues rq =
- ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server,
- server.getZooKeeper()));
- rq.init(server.getServerName().toString());
+ Server server = new DummyServer("hostname0.example.org");
+ ReplicationQueueStorage rq = ReplicationStorageFactory
+ .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
// populate some znodes in the peer znode
files.add("log1");
files.add("log2");
for (String file : files) {
- rq.addLog("1", file);
+ rq.addWAL(server.getServerName(), "1", file);
}
// create 3 DummyServers
Server s1 = new DummyServer("dummyserver1.example.org");
@@ -347,12 +344,9 @@ public abstract class TestReplicationSourceManager {
Server s3 = new DummyServer("dummyserver3.example.org");
// create 3 DummyNodeFailoverWorkers
- DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker(
- server.getServerName().getServerName(), s1);
- DummyNodeFailoverWorker w2 = new DummyNodeFailoverWorker(
- server.getServerName().getServerName(), s2);
- DummyNodeFailoverWorker w3 = new DummyNodeFailoverWorker(
- server.getServerName().getServerName(), s3);
+ DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker(server.getServerName(), s1);
+ DummyNodeFailoverWorker w2 = new DummyNodeFailoverWorker(server.getServerName(), s2);
+ DummyNodeFailoverWorker w3 = new DummyNodeFailoverWorker(server.getServerName(), s3);
latch = new CountDownLatch(3);
// start the threads
@@ -371,11 +365,9 @@ public abstract class TestReplicationSourceManager {
@Test
public void testCleanupFailoverQueues() throws Exception {
- final Server server = new DummyServer("hostname1.example.org");
- ReplicationQueues rq =
- ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server,
- server.getZooKeeper()));
- rq.init(server.getServerName().toString());
+ Server server = new DummyServer("hostname1.example.org");
+ ReplicationQueueStorage rq = ReplicationStorageFactory
+ .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
// populate some znodes in the peer znode
SortedSet<String> files = new TreeSet<>();
String group = "testgroup";
@@ -384,19 +376,14 @@ public abstract class TestReplicationSourceManager {
files.add(file1);
files.add(file2);
for (String file : files) {
- rq.addLog("1", file);
+ rq.addWAL(server.getServerName(), "1", file);
}
Server s1 = new DummyServer("dummyserver1.example.org");
- ReplicationQueues rq1 =
- ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1,
- s1.getZooKeeper()));
- rq1.init(s1.getServerName().toString());
ReplicationPeers rp1 =
ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1);
rp1.init();
NodeFailoverWorker w1 =
- manager.new NodeFailoverWorker(server.getServerName().getServerName(), rq1, rp1, new UUID(
- new Long(1), new Long(2)));
+ manager.new NodeFailoverWorker(server.getServerName());
w1.run();
assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
String id = "1-" + server.getServerName().getServerName();
@@ -408,17 +395,16 @@ public abstract class TestReplicationSourceManager {
@Test
public void testCleanupUnknownPeerZNode() throws Exception {
- final Server server = new DummyServer("hostname2.example.org");
- ReplicationQueues rq = ReplicationFactory.getReplicationQueues(
- new ReplicationQueuesArguments(server.getConfiguration(), server, server.getZooKeeper()));
- rq.init(server.getServerName().toString());
+ Server server = new DummyServer("hostname2.example.org");
+ ReplicationQueueStorage rq = ReplicationStorageFactory
+ .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
// populate some znodes in the peer znode
// add log to an unknown peer
String group = "testgroup";
- rq.addLog("2", group + ".log1");
- rq.addLog("2", group + ".log2");
+ rq.addWAL(server.getServerName(), "2", group + ".log1");
+ rq.addWAL(server.getServerName(), "2", group + ".log2");
- NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName().getServerName());
+ NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName());
w1.run();
// The log of the unknown peer should be removed from zk
@@ -481,10 +467,8 @@ public abstract class TestReplicationSourceManager {
.setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase");
try {
DummyServer server = new DummyServer();
- final ReplicationQueues rq =
- ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(
- server.getConfiguration(), server, server.getZooKeeper()));
- rq.init(server.getServerName().toString());
+ ReplicationQueueStorage rq = ReplicationStorageFactory
+ .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
// Purposely fail ReplicationSourceManager.addSource() by causing ReplicationSourceInterface
// initialization to throw an exception.
conf.set("replication.replicationsource.implementation",
@@ -498,11 +482,11 @@ public abstract class TestReplicationSourceManager {
assertNull(manager.getSource(peerId));
// Create a replication queue for the fake peer
- rq.addLog(peerId, "FakeFile");
+ rq.addWAL(server.getServerName(), peerId, "FakeFile");
// Unregister peer, this should remove the peer and clear all queues associated with it
// Need to wait for the ReplicationTracker to pick up the changes and notify listeners.
removePeerAndWait(peerId);
- assertFalse(rq.getAllQueues().contains(peerId));
+ assertFalse(rq.getAllQueues(server.getServerName()).contains(peerId));
} finally {
conf.set("replication.replicationsource.implementation", replicationSourceImplName);
removePeerAndWait(peerId);
@@ -625,11 +609,12 @@ public abstract class TestReplicationSourceManager {
}
}
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
- @Override public boolean evaluate() throws Exception {
+ @Override
+ public boolean evaluate() throws Exception {
List<String> peers = rp.getAllPeerIds();
- return (!manager.getAllQueues().contains(peerId)) && (rp.getConnectedPeer(peerId) == null)
- && (!peers.contains(peerId))
- && manager.getSource(peerId) == null;
+ return (!manager.getAllQueues().contains(peerId)) &&
+ (rp.getConnectedPeer(peerId) == null) && (!peers.contains(peerId)) &&
+ manager.getSource(peerId) == null;
}
});
}
@@ -672,25 +657,24 @@ public abstract class TestReplicationSourceManager {
static class DummyNodeFailoverWorker extends Thread {
private Map<String, Set<String>> logZnodesMap;
Server server;
- private String deadRsZnode;
- ReplicationQueues rq;
+ private ServerName deadRS;
+ ReplicationQueueStorage rq;
- public DummyNodeFailoverWorker(String znode, Server s) throws Exception {
- this.deadRsZnode = znode;
+ public DummyNodeFailoverWorker(ServerName deadRS, Server s) throws Exception {
+ this.deadRS = deadRS;
this.server = s;
- this.rq =
- ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server,
- server.getZooKeeper()));
- this.rq.init(this.server.getServerName().toString());
+ this.rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(),
+ server.getConfiguration());
}
@Override
public void run() {
try {
logZnodesMap = new HashMap<>();
- List<String> queues = rq.getUnClaimedQueueIds(deadRsZnode);
- for(String queue:queues){
- Pair<String, SortedSet<String>> pair = rq.claimQueue(deadRsZnode, queue);
+ List<String> queues = rq.getAllQueues(deadRS);
+ for (String queue : queues) {
+ Pair<String, SortedSet<String>> pair =
+ rq.claimQueue(deadRS, queue, server.getServerName());
if (pair != null) {
logZnodesMap.put(pair.getFirst(), pair.getSecond());
}
@@ -729,7 +713,7 @@ public abstract class TestReplicationSourceManager {
@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
- ReplicationQueues rq, ReplicationPeers rp, Server server, String peerClusterId,
+ ReplicationQueueStorage rq, ReplicationPeers rp, Server server, String peerClusterId,
UUID clusterId, ReplicationEndpoint replicationEndpoint,
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
throw new IOException("Failing deliberately");
http://git-wip-us.apache.org/repos/asf/hbase/blob/72f1e971/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
index aeab8b0..c6d9eef 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
@@ -25,11 +25,10 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
-import org.apache.hadoop.hbase.replication.ReplicationQueues;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.junit.BeforeClass;
@@ -41,8 +40,9 @@ import org.junit.experimental.categories.Category;
* ReplicationQueuesClientZkImpl. Also includes extra tests outside of those in
* TestReplicationSourceManager that test ReplicationQueueZkImpl-specific behaviors.
*/
-@Category({ReplicationTests.class, MediumTests.class})
+@Category({ ReplicationTests.class, MediumTests.class })
public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceManager {
+
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf = HBaseConfiguration.create();
@@ -58,16 +58,14 @@ public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceMan
// Tests the naming convention of adopted queues for ReplicationQueuesZkImpl
@Test
public void testNodeFailoverDeadServerParsing() throws Exception {
- final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com");
- ReplicationQueues repQueues =
- ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, server,
- server.getZooKeeper()));
- repQueues.init(server.getServerName().toString());
+ Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com");
+ ReplicationQueueStorage queueStorage =
+ ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
// populate some znodes in the peer znode
files.add("log1");
files.add("log2");
for (String file : files) {
- repQueues.addLog("1", file);
+ queueStorage.addWAL(server.getServerName(), "1", file);
}
// create 3 DummyServers
@@ -76,30 +74,22 @@ public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceMan
Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com");
// simulate three servers fail sequentially
- ReplicationQueues rq1 =
- ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1,
- s1.getZooKeeper()));
- rq1.init(s1.getServerName().toString());
- String serverName = server.getServerName().getServerName();
- List<String> unclaimed = rq1.getUnClaimedQueueIds(serverName);
- rq1.claimQueue(serverName, unclaimed.get(0)).getSecond();
- rq1.removeReplicatorIfQueueIsEmpty(unclaimed.get(0));
- ReplicationQueues rq2 =
- ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s2.getConfiguration(), s2,
- s2.getZooKeeper()));
- rq2.init(s2.getServerName().toString());
- serverName = s1.getServerName().getServerName();
- unclaimed = rq2.getUnClaimedQueueIds(serverName);
- rq2.claimQueue(serverName, unclaimed.get(0)).getSecond();
- rq2.removeReplicatorIfQueueIsEmpty(unclaimed.get(0));
- ReplicationQueues rq3 =
- ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s3.getConfiguration(), s3,
- s3.getZooKeeper()));
- rq3.init(s3.getServerName().toString());
- serverName = s2.getServerName().getServerName();
- unclaimed = rq3.getUnClaimedQueueIds(serverName);
- String queue3 = rq3.claimQueue(serverName, unclaimed.get(0)).getFirst();
- rq3.removeReplicatorIfQueueIsEmpty(unclaimed.get(0));
+ ServerName serverName = server.getServerName();
+ List<String> unclaimed = queueStorage.getAllQueues(serverName);
+ queueStorage.claimQueue(serverName, unclaimed.get(0), s1.getServerName());
+ queueStorage.removeReplicatorIfQueueIsEmpty(serverName);
+
+ serverName = s1.getServerName();
+ unclaimed = queueStorage.getAllQueues(serverName);
+ queueStorage.claimQueue(serverName, unclaimed.get(0), s2.getServerName());
+ queueStorage.removeReplicatorIfQueueIsEmpty(serverName);
+
+ serverName = s2.getServerName();
+ unclaimed = queueStorage.getAllQueues(serverName);
+ String queue3 =
+ queueStorage.claimQueue(serverName, unclaimed.get(0), s3.getServerName()).getFirst();
+ queueStorage.removeReplicatorIfQueueIsEmpty(serverName);
+
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queue3);
List<ServerName> result = replicationQueueInfo.getDeadRegionServers();
// verify