You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ec...@apache.org on 2015/10/28 20:10:28 UTC
[2/2] hbase git commit: HBASE-6617 ReplicationSourceManager should be
able to track multiple WAL paths (Yu Li)
HBASE-6617 ReplicationSourceManager should be able to track multiple WAL paths (Yu Li)
Signed-off-by: Elliott Clark <ec...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/242adad5
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/242adad5
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/242adad5
Branch: refs/heads/branch-1.2
Commit: 242adad59f2d260e5fb264341d5ad4e2eb73c345
Parents: af1d176
Author: tedyu <yu...@gmail.com>
Authored: Fri Sep 11 09:30:58 2015 -0700
Committer: Elliott Clark <ec...@apache.org>
Committed: Wed Oct 28 11:57:40 2015 -0700
----------------------------------------------------------------------
.../hbase/regionserver/HRegionServer.java | 9 +
.../hadoop/hbase/regionserver/LogRoller.java | 16 +-
.../hbase/replication/ReplicationEndpoint.java | 8 +
.../HBaseInterClusterReplicationEndpoint.java | 6 +-
.../replication/regionserver/MetricsSource.java | 33 +-
.../regionserver/ReplicationSource.java | 1204 ++++++++++--------
.../regionserver/ReplicationSourceManager.java | 189 ++-
.../hadoop/hbase/wal/DefaultWALProvider.java | 11 +
.../org/apache/hadoop/hbase/wal/WALFactory.java | 2 +-
.../replication/TestReplicationEndpoint.java | 29 +-
.../TestReplicationEndpointWithMultipleWAL.java | 33 +
...onKillMasterRSCompressedWithMultipleWAL.java | 34 +
...estReplicationSyncUpToolWithMultipleWAL.java | 34 +
...egionReplicaReplicationEndpointNoMaster.java | 4 +-
.../TestReplicationSourceManager.java | 20 +-
.../TestReplicationWALReaderManager.java | 2 +-
16 files changed, 1022 insertions(+), 612 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/242adad5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 5dddd9d..d1147d4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -3325,4 +3325,13 @@ public class HRegionServer extends HasThread implements
}
return max;
}
+
+ /**
+ * For testing
+ * @return whether all wal roll request finished for this regionserver
+ */
+ @VisibleForTesting
+ public boolean walRollRequestFinished() {
+ return this.walRoller.walRollFinished();
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/242adad5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
index f35fdb7..136e03e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
@@ -197,4 +197,18 @@ public class LogRoller extends HasThread {
requester);
}
}
-}
\ No newline at end of file
+
+ /**
+ * For testing only
+ * @return true if all WAL roll finished
+ */
+ @VisibleForTesting
+ public boolean walRollFinished() {
+ for (boolean needRoll : walNeedsRoll.values()) {
+ if (needRoll) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/242adad5/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
index e8a7ddc..ac1257f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
@@ -137,6 +137,7 @@ public interface ReplicationEndpoint extends Service {
static class ReplicateContext {
List<Entry> entries;
int size;
+ String walGroupId;
@InterfaceAudience.Private
public ReplicateContext() {
}
@@ -149,12 +150,19 @@ public interface ReplicationEndpoint extends Service {
this.size = size;
return this;
}
+ public ReplicateContext setWalGroupId(String walGroupId) {
+ this.walGroupId = walGroupId;
+ return this;
+ }
public List<Entry> getEntries() {
return entries;
}
public int getSize() {
return size;
}
+ public String getWalGroupId(){
+ return walGroupId;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/242adad5/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 44ec804..0dfad0c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -154,6 +154,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
@Override
public boolean replicate(ReplicateContext replicateContext) {
List<Entry> entries = replicateContext.getEntries();
+ String walGroupId = replicateContext.getWalGroupId();
int sleepMultiplier = 1;
if (!peersSelected && this.isRunning()) {
@@ -222,12 +223,13 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
throw iox;
}
// update metrics
- this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());
+ this.metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
+ walGroupId);
return true;
} catch (IOException ioe) {
// Didn't ship anything, but must still age the last time we did
- this.metrics.refreshAgeOfLastShippedOp();
+ this.metrics.refreshAgeOfLastShippedOp(walGroupId);
if (ioe instanceof RemoteException) {
ioe = ((RemoteException) ioe).unwrapRemoteException();
LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe);
http://git-wip-us.apache.org/repos/asf/hbase/blob/242adad5/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index cf08787..f9f7001 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -18,6 +18,9 @@
package org.apache.hadoop.hbase.replication.regionserver;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -34,7 +37,8 @@ public class MetricsSource {
private static final Log LOG = LogFactory.getLog(MetricsSource.class);
- private long lastTimestamp = 0;
+ // tracks last shipped timestamp for each wal group
+ private Map<String, Long> lastTimeStamps = new HashMap<String, Long>();
private int lastQueueSize = 0;
private String id;
@@ -56,23 +60,29 @@ public class MetricsSource {
/**
* Set the age of the last edit that was shipped
- *
* @param timestamp write time of the edit
+ * @param walGroup which group we are setting
*/
- public void setAgeOfLastShippedOp(long timestamp) {
+ public void setAgeOfLastShippedOp(long timestamp, String walGroup) {
long age = EnvironmentEdgeManager.currentTime() - timestamp;
singleSourceSource.setLastShippedAge(age);
globalSourceSource.setLastShippedAge(age);
- this.lastTimestamp = timestamp;
+ this.lastTimeStamps.put(walGroup, timestamp);
}
/**
* Convenience method to use the last given timestamp to refresh the age of the last edit. Used
* when replication fails and need to keep that metric accurate.
+ * @param walGroupId id of the group to update
*/
- public void refreshAgeOfLastShippedOp() {
- if (this.lastTimestamp > 0) {
- setAgeOfLastShippedOp(this.lastTimestamp);
+ public void refreshAgeOfLastShippedOp(String walGroupId) {
+ Long lastTimestamp = this.lastTimeStamps.get(walGroupId);
+ if (lastTimestamp == null) {
+ this.lastTimeStamps.put(walGroupId, 0L);
+ lastTimestamp = 0L;
+ }
+ if (lastTimestamp > 0) {
+ setAgeOfLastShippedOp(lastTimestamp, walGroupId);
}
}
@@ -143,6 +153,7 @@ public class MetricsSource {
public void clear() {
singleSourceSource.clear();
globalSourceSource.decrSizeOfLogQueue(lastQueueSize);
+ lastTimeStamps.clear();
lastQueueSize = 0;
}
@@ -163,10 +174,16 @@ public class MetricsSource {
}
/**
- * Get the timeStampsOfLastShippedOp
+ * Get the timeStampsOfLastShippedOp, if there are multiple groups, return the latest one
* @return lastTimestampForAge
*/
public long getTimeStampOfLastShippedOp() {
+ long lastTimestamp = 0L;
+ for (long ts : lastTimeStamps.values()) {
+ if (ts > lastTimestamp) {
+ lastTimestamp = ts;
+ }
+ }
return lastTimestamp;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/242adad5/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 697968e..62c31e6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -22,11 +22,17 @@ import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
@@ -77,8 +83,12 @@ public class ReplicationSource extends Thread
implements ReplicationSourceInterface {
private static final Log LOG = LogFactory.getLog(ReplicationSource.class);
- // Queue of logs to process
- private PriorityBlockingQueue<Path> queue;
+ // Queues of logs to process, entry in format of walGroupId->queue,
+ // each presents a queue for one wal group
+ private Map<String, PriorityBlockingQueue<Path>> queues =
+ new HashMap<String, PriorityBlockingQueue<Path>>();
+ // per group queue size, keep no more than this number of logs in each wal group
+ private int queueSizePerGroup;
private ReplicationQueues replicationQueues;
private ReplicationPeers replicationPeers;
@@ -96,35 +106,23 @@ public class ReplicationSource extends Thread
private long replicationQueueSizeCapacity;
// Max number of entries in entriesArray
private int replicationQueueNbCapacity;
- // Our reader for the current log. open/close handled by repLogReader
- private WAL.Reader reader;
- // Last position in the log that we sent to ZooKeeper
- private long lastLoggedPosition = -1;
- // Path of the current log
- private volatile Path currentPath;
private FileSystem fs;
// id of this cluster
private UUID clusterId;
// id of the other cluster
private UUID peerClusterId;
// total number of edits we replicated
- private long totalReplicatedEdits = 0;
+ private AtomicLong totalReplicatedEdits = new AtomicLong(0);
// total number of edits we replicated
- private long totalReplicatedOperations = 0;
+ private AtomicLong totalReplicatedOperations = new AtomicLong(0);
// The znode we currently play with
private String peerClusterZnode;
// Maximum number of retries before taking bold actions
private int maxRetriesMultiplier;
- // Current number of operations (Put/Delete) that we need to replicate
- private int currentNbOperations = 0;
- // Current size of data we need to replicate
- private int currentSize = 0;
// Indicates if this particular source is running
- private volatile boolean running = true;
+ private volatile boolean sourceRunning = false;
// Metrics for this source
private MetricsSource metrics;
- // Handle on the log reader helper
- private ReplicationWALReaderManager repLogReader;
//WARN threshold for the number of queued logs, defaults to 2
private int logQueueWarnThreshold;
// ReplicationEndpoint which will handle the actual replication
@@ -133,6 +131,9 @@ public class ReplicationSource extends Thread
private WALEntryFilter walEntryFilter;
// throttler
private ReplicationThrottler throttler;
+ private AtomicInteger logQueueSize = new AtomicInteger(0);
+ private ConcurrentHashMap<String, ReplicationSourceWorkerThread> workerThreads =
+ new ConcurrentHashMap<String, ReplicationSourceWorkerThread>();
/**
* Instantiation method used by region servers
@@ -165,10 +166,7 @@ public class ReplicationSource extends Thread
this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second
this.maxRetriesMultiplier =
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
- this.queue =
- new PriorityBlockingQueue<Path>(
- this.conf.getInt("hbase.regionserver.maxlogs", 32),
- new LogsComparator());
+ this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
this.replicationQueues = replicationQueues;
@@ -176,7 +174,6 @@ public class ReplicationSource extends Thread
this.manager = manager;
this.fs = fs;
this.metrics = metrics;
- this.repLogReader = new ReplicationWALReaderManager(this.fs, this.conf);
this.clusterId = clusterId;
this.peerClusterZnode = peerClusterZnode;
@@ -196,13 +193,33 @@ public class ReplicationSource extends Thread
@Override
public void enqueueLog(Path log) {
- this.queue.put(log);
- int queueSize = queue.size();
+ String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(log.getName());
+ PriorityBlockingQueue<Path> queue = queues.get(logPrefix);
+ if (queue == null) {
+ queue = new PriorityBlockingQueue<Path>(queueSizePerGroup, new LogsComparator());
+ queues.put(logPrefix, queue);
+ if (this.sourceRunning) {
+ // new wal group observed after source startup, start a new worker thread to track it
+ // notice: it's possible that log enqueued when this.running is set but worker thread
+ // still not launched, so it's necessary to check workerThreads before start the worker
+ final ReplicationSourceWorkerThread worker =
+ new ReplicationSourceWorkerThread(logPrefix, queue, replicationQueueInfo, this);
+ ReplicationSourceWorkerThread extant = workerThreads.putIfAbsent(logPrefix, worker);
+ if (extant != null) {
+ LOG.debug("Someone has beat us to start a worker thread for wal group " + logPrefix);
+ } else {
+ LOG.debug("Starting up worker for wal group " + logPrefix);
+ worker.startup();
+ }
+ }
+ }
+ queue.put(log);
+ int queueSize = logQueueSize.incrementAndGet();
this.metrics.setSizeOfLogQueue(queueSize);
// This will log a warning for each new log that gets created above the warn threshold
- if (queueSize > this.logQueueWarnThreshold) {
- LOG.warn("Queue size: " + queueSize +
- " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold);
+ if (queue.size() > this.logQueueWarnThreshold) {
+ LOG.warn("WAL group " + logPrefix + " queue size: " + queueSize
+ + " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold);
}
}
@@ -217,12 +234,8 @@ public class ReplicationSource extends Thread
@Override
public void run() {
- // We were stopped while looping to connect to sinks, just abort
- if (!this.isActive()) {
- uninitialize();
- return;
- }
-
+ // mark we are running now
+ this.sourceRunning = true;
try {
// start the endpoint, connect to the cluster
Service.State state = replicationEndpoint.start().get();
@@ -247,22 +260,14 @@ public class ReplicationSource extends Thread
int sleepMultiplier = 1;
// delay this until we are in an asynchronous thread
- while (this.isActive() && this.peerClusterId == null) {
+ while (this.isSourceActive() && this.peerClusterId == null) {
this.peerClusterId = replicationEndpoint.getPeerUUID();
- if (this.isActive() && this.peerClusterId == null) {
+ if (this.isSourceActive() && this.peerClusterId == null) {
if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
sleepMultiplier++;
}
}
}
- // We were stopped while looping to contact peer's zk ensemble, just abort
- if (!this.isActive()) {
- uninitialize();
- return;
- }
-
- // resetting to 1 to reuse later
- sleepMultiplier = 1;
// In rare case, zookeeper setting may be messed up. That leads to the incorrect
// peerClusterId value, which is the same as the source clusterId
@@ -271,349 +276,21 @@ public class ReplicationSource extends Thread
+ peerClusterId + " which is not allowed by ReplicationEndpoint:"
+ replicationEndpoint.getClass().getName(), null, false);
}
- LOG.info("Replicating "+clusterId + " -> " + peerClusterId);
-
- // If this is recovered, the queue is already full and the first log
- // normally has a position (unless the RS failed between 2 logs)
- if (this.replicationQueueInfo.isQueueRecovered()) {
- try {
- this.repLogReader.setPosition(this.replicationQueues.getLogPosition(this.peerClusterZnode,
- this.queue.peek().getName()));
- if (LOG.isTraceEnabled()) {
- LOG.trace("Recovered queue started with log " + this.queue.peek() +
- " at position " + this.repLogReader.getPosition());
- }
- } catch (ReplicationException e) {
- this.terminate("Couldn't get the position of this recovered queue " +
- this.peerClusterZnode, e);
- }
- }
- // Loop until we close down
- while (isActive()) {
- // Sleep until replication is enabled again
- if (!isPeerEnabled()) {
- if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
- sleepMultiplier++;
- }
- continue;
- }
- Path oldPath = getCurrentPath(); //note that in the current scenario,
- //oldPath will be null when a log roll
- //happens.
- // Get a new path
- boolean hasCurrentPath = getNextPath();
- if (getCurrentPath() != null && oldPath == null) {
- sleepMultiplier = 1; //reset the sleepMultiplier on a path change
- }
- if (!hasCurrentPath) {
- if (sleepForRetries("No log to process", sleepMultiplier)) {
- sleepMultiplier++;
- }
- continue;
- }
- boolean currentWALisBeingWrittenTo = false;
- //For WAL files we own (rather than recovered), take a snapshot of whether the
- //current WAL file (this.currentPath) is in use (for writing) NOW!
- //Since the new WAL paths are enqueued only after the prev WAL file
- //is 'closed', presence of an element in the queue means that
- //the previous WAL file was closed, else the file is in use (currentPath)
- //We take the snapshot now so that we are protected against races
- //where a new file gets enqueued while the current file is being processed
- //(and where we just finished reading the current file).
- if (!this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0) {
- currentWALisBeingWrittenTo = true;
- }
- // Open a reader on it
- if (!openReader(sleepMultiplier)) {
- // Reset the sleep multiplier, else it'd be reused for the next file
- sleepMultiplier = 1;
- continue;
- }
-
- // If we got a null reader but didn't continue, then sleep and continue
- if (this.reader == null) {
- if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
- sleepMultiplier++;
- }
- continue;
- }
-
- boolean gotIOE = false;
- currentNbOperations = 0;
- List<WAL.Entry> entries = new ArrayList<WAL.Entry>(1);
- currentSize = 0;
- try {
- if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) {
- continue;
- }
- } catch (IOException ioe) {
- LOG.warn(this.peerClusterZnode + " Got: ", ioe);
- gotIOE = true;
- if (ioe.getCause() instanceof EOFException) {
-
- boolean considerDumping = false;
- if (this.replicationQueueInfo.isQueueRecovered()) {
- try {
- FileStatus stat = this.fs.getFileStatus(this.currentPath);
- if (stat.getLen() == 0) {
- LOG.warn(this.peerClusterZnode + " Got EOF and the file was empty");
- }
- considerDumping = true;
- } catch (IOException e) {
- LOG.warn(this.peerClusterZnode + " Got while getting file size: ", e);
- }
- }
-
- if (considerDumping &&
- sleepMultiplier == this.maxRetriesMultiplier &&
- processEndOfFile()) {
- continue;
- }
- }
- } finally {
- try {
- this.reader = null;
- this.repLogReader.closeReader();
- } catch (IOException e) {
- gotIOE = true;
- LOG.warn("Unable to finalize the tailing of a file", e);
- }
- }
-
- // If we didn't get anything to replicate, or if we hit a IOE,
- // wait a bit and retry.
- // But if we need to stop, don't bother sleeping
- if (this.isActive() && (gotIOE || entries.isEmpty())) {
- if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
- this.manager.logPositionAndCleanOldLogs(this.currentPath,
- this.peerClusterZnode, this.repLogReader.getPosition(),
- this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
- this.lastLoggedPosition = this.repLogReader.getPosition();
- }
- // Reset the sleep multiplier if nothing has actually gone wrong
- if (!gotIOE) {
- sleepMultiplier = 1;
- // if there was nothing to ship and it's not an error
- // set "ageOfLastShippedOp" to <now> to indicate that we're current
- this.metrics.setAgeOfLastShippedOp(System.currentTimeMillis());
- }
- if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
- sleepMultiplier++;
- }
- continue;
- }
- sleepMultiplier = 1;
- shipEdits(currentWALisBeingWrittenTo, entries);
- }
- uninitialize();
- }
-
- /**
- * Read all the entries from the current log files and retain those
- * that need to be replicated. Else, process the end of the current file.
- * @param currentWALisBeingWrittenTo is the current WAL being written to
- * @param entries resulting entries to be replicated
- * @return true if we got nothing and went to the next file, false if we got
- * entries
- * @throws IOException
- */
- protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo,
- List<WAL.Entry> entries) throws IOException {
- long seenEntries = 0;
- if (LOG.isTraceEnabled()) {
- LOG.trace("Seeking in " + this.currentPath + " at position "
- + this.repLogReader.getPosition());
- }
- this.repLogReader.seek();
- long positionBeforeRead = this.repLogReader.getPosition();
- WAL.Entry entry =
- this.repLogReader.readNextAndSetPosition();
- while (entry != null) {
- this.metrics.incrLogEditsRead();
- seenEntries++;
-
- // don't replicate if the log entries have already been consumed by the cluster
- if (replicationEndpoint.canReplicateToSameCluster()
- || !entry.getKey().getClusterIds().contains(peerClusterId)) {
- // Remove all KVs that should not be replicated
- entry = walEntryFilter.filter(entry);
- WALEdit edit = null;
- WALKey logKey = null;
- if (entry != null) {
- edit = entry.getEdit();
- logKey = entry.getKey();
- }
-
- if (edit != null && edit.size() != 0) {
- //Mark that the current cluster has the change
- logKey.addClusterId(clusterId);
- currentNbOperations += countDistinctRowKeys(edit);
- entries.add(entry);
- currentSize += entry.getEdit().heapSize();
- } else {
- this.metrics.incrLogEditsFiltered();
- }
- }
- // Stop if too many entries or too big
- if (currentSize >= this.replicationQueueSizeCapacity ||
- entries.size() >= this.replicationQueueNbCapacity) {
- break;
- }
- try {
- entry = this.repLogReader.readNextAndSetPosition();
- } catch (IOException ie) {
- LOG.debug("Break on IOE: " + ie.getMessage());
- break;
- }
- }
- metrics.incrLogReadInBytes(this.repLogReader.getPosition() - positionBeforeRead);
- if (currentWALisBeingWrittenTo) {
- return false;
- }
- // If we didn't get anything and the queue has an object, it means we
- // hit the end of the file for sure
- return seenEntries == 0 && processEndOfFile();
- }
-
- /**
- * Poll for the next path
- * @return true if a path was obtained, false if not
- */
- protected boolean getNextPath() {
- try {
- if (this.currentPath == null) {
- this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS);
- this.metrics.setSizeOfLogQueue(queue.size());
- if (this.currentPath != null) {
- this.manager.cleanOldLogs(this.currentPath.getName(),
- this.peerId,
- this.replicationQueueInfo.isQueueRecovered());
- if (LOG.isTraceEnabled()) {
- LOG.trace("New log: " + this.currentPath);
- }
- }
- }
- } catch (InterruptedException e) {
- LOG.warn("Interrupted while reading edits", e);
- }
- return this.currentPath != null;
- }
-
- /**
- * Open a reader on the current path
- *
- * @param sleepMultiplier by how many times the default sleeping time is augmented
- * @return true if we should continue with that file, false if we are over with it
- */
- protected boolean openReader(int sleepMultiplier) {
- try {
- try {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Opening log " + this.currentPath);
- }
- this.reader = repLogReader.openReader(this.currentPath);
- } catch (FileNotFoundException fnfe) {
- if (this.replicationQueueInfo.isQueueRecovered()) {
- // We didn't find the log in the archive directory, look if it still
- // exists in the dead RS folder (there could be a chain of failures
- // to look at)
- List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
- LOG.info("NB dead servers : " + deadRegionServers.size());
- final Path rootDir = FSUtils.getRootDir(this.conf);
- for (String curDeadServerName : deadRegionServers) {
- final Path deadRsDirectory = new Path(rootDir,
- DefaultWALProvider.getWALDirectoryName(curDeadServerName));
- Path[] locs = new Path[] {
- new Path(deadRsDirectory, currentPath.getName()),
- new Path(deadRsDirectory.suffix(DefaultWALProvider.SPLITTING_EXT),
- currentPath.getName()),
- };
- for (Path possibleLogLocation : locs) {
- LOG.info("Possible location " + possibleLogLocation.toUri().toString());
- if (this.manager.getFs().exists(possibleLogLocation)) {
- // We found the right new location
- LOG.info("Log " + this.currentPath + " still exists at " +
- possibleLogLocation);
- // Breaking here will make us sleep since reader is null
- // TODO why don't we need to set currentPath and call openReader here?
- return true;
- }
- }
- }
- // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
- // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
- if (stopper instanceof ReplicationSyncUp.DummyServer) {
- // N.B. the ReplicationSyncUp tool sets the manager.getLogDir to the root of the wal
- // area rather than to the wal area for a particular region server.
- FileStatus[] rss = fs.listStatus(manager.getLogDir());
- for (FileStatus rs : rss) {
- Path p = rs.getPath();
- FileStatus[] logs = fs.listStatus(p);
- for (FileStatus log : logs) {
- p = new Path(p, log.getPath().getName());
- if (p.getName().equals(currentPath.getName())) {
- currentPath = p;
- LOG.info("Log " + currentPath.getName() + " found at " + currentPath);
- // Open the log at the new location
- this.openReader(sleepMultiplier);
- return true;
- }
- }
- }
- }
-
- // TODO What happens if the log was missing from every single location?
- // Although we need to check a couple of times as the log could have
- // been moved by the master between the checks
- // It can also happen if a recovered queue wasn't properly cleaned,
- // such that the znode pointing to a log exists but the log was
- // deleted a long time ago.
- // For the moment, we'll throw the IO and processEndOfFile
- throw new IOException("File from recovered queue is " +
- "nowhere to be found", fnfe);
- } else {
- // If the log was archived, continue reading from there
- Path archivedLogLocation =
- new Path(manager.getOldLogDir(), currentPath.getName());
- if (this.manager.getFs().exists(archivedLogLocation)) {
- currentPath = archivedLogLocation;
- LOG.info("Log " + this.currentPath + " was moved to " +
- archivedLogLocation);
- // Open the log at the new location
- this.openReader(sleepMultiplier);
-
- }
- // TODO What happens the log is missing in both places?
- }
- }
- } catch (IOException ioe) {
- if (ioe instanceof EOFException && isCurrentLogEmpty()) return true;
- LOG.warn(this.peerClusterZnode + " Got: ", ioe);
- this.reader = null;
- if (ioe.getCause() instanceof NullPointerException) {
- // Workaround for race condition in HDFS-4380
- // which throws a NPE if we open a file before any data node has the most recent block
- // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
- LOG.warn("Got NPE opening reader, will retry.");
- } else if (sleepMultiplier == this.maxRetriesMultiplier) {
- // TODO Need a better way to determine if a file is really gone but
- // TODO without scanning all logs dir
- LOG.warn("Waited too long for this file, considering dumping");
- return !processEndOfFile();
+ LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
+ // start workers
+ for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
+ String walGroupId = entry.getKey();
+ PriorityBlockingQueue<Path> queue = entry.getValue();
+ final ReplicationSourceWorkerThread worker =
+ new ReplicationSourceWorkerThread(walGroupId, queue, replicationQueueInfo, this);
+ ReplicationSourceWorkerThread extant = workerThreads.putIfAbsent(walGroupId, worker);
+ if (extant != null) {
+ LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
+ } else {
+ LOG.debug("Starting up worker for wal group " + walGroupId);
+ worker.startup();
}
}
- return true;
- }
-
- /*
- * Checks whether the current log file is empty, and it is not a recovered queue. This is to
- * handle scenario when in an idle cluster, there is no entry in the current log and we keep on
- * trying to read the log file and get EOFException. In case of a recovered queue the last log
- * file may be empty, and we don't want to retry that.
- */
- private boolean isCurrentLogEmpty() {
- return (this.repLogReader.getPosition() == 0 &&
- !this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0);
}
/**
@@ -636,101 +313,6 @@ public class ReplicationSource extends Thread
}
/**
- * Count the number of different row keys in the given edit because of
- * mini-batching. We assume that there's at least one Cell in the WALEdit.
- * @param edit edit to count row keys from
- * @return number of different row keys
- */
- private int countDistinctRowKeys(WALEdit edit) {
- List<Cell> cells = edit.getCells();
- int distinctRowKeys = 1;
- Cell lastCell = cells.get(0);
- for (int i = 0; i < edit.size(); i++) {
- if (!CellUtil.matchingRow(cells.get(i), lastCell)) {
- distinctRowKeys++;
- }
- lastCell = cells.get(i);
- }
- return distinctRowKeys;
- }
-
- /**
- * Do the shipping logic
- * @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
- * written to when this method was called
- */
- protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries) {
- int sleepMultiplier = 0;
- if (entries.isEmpty()) {
- LOG.warn("Was given 0 edits to ship");
- return;
- }
- while (this.isActive()) {
- try {
- if (this.throttler.isEnabled()) {
- long sleepTicks = this.throttler.getNextSleepInterval(currentSize);
- if (sleepTicks > 0) {
- try {
- if (LOG.isTraceEnabled()) {
- LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
- }
- Thread.sleep(sleepTicks);
- } catch (InterruptedException e) {
- LOG.debug("Interrupted while sleeping for throttling control");
- Thread.currentThread().interrupt();
- // current thread might be interrupted to terminate
- // directly go back to while() for confirm this
- continue;
- }
- // reset throttler's cycle start tick when sleep for throttling occurs
- this.throttler.resetStartTick();
- }
- }
- // create replicateContext here, so the entries can be GC'd upon return from this call stack
- ReplicationEndpoint.ReplicateContext replicateContext = new ReplicationEndpoint.ReplicateContext();
- replicateContext.setEntries(entries).setSize(currentSize);
-
- long startTimeNs = System.nanoTime();
- // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
- boolean replicated = replicationEndpoint.replicate(replicateContext);
- long endTimeNs = System.nanoTime();
-
- if (!replicated) {
- continue;
- } else {
- sleepMultiplier = Math.max(sleepMultiplier-1, 0);
- }
-
- if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
- this.manager.logPositionAndCleanOldLogs(this.currentPath,
- this.peerClusterZnode, this.repLogReader.getPosition(),
- this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
- this.lastLoggedPosition = this.repLogReader.getPosition();
- }
- if (this.throttler.isEnabled()) {
- this.throttler.addPushSize(currentSize);
- }
- this.totalReplicatedEdits += entries.size();
- this.totalReplicatedOperations += currentNbOperations;
- this.metrics.shipBatch(this.currentNbOperations, this.currentSize/1024);
- this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());
- if (LOG.isTraceEnabled()) {
- LOG.trace("Replicated " + this.totalReplicatedEdits + " entries in total, or "
- + this.totalReplicatedOperations + " operations in " +
- ((endTimeNs - startTimeNs)/1000000) + " ms");
- }
- break;
- } catch (Exception ex) {
- LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:" +
- org.apache.hadoop.util.StringUtils.stringifyException(ex));
- if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
- sleepMultiplier++;
- }
- }
- }
- }
-
- /**
* check whether the peer is enabled or not
*
* @return true if the peer is enabled, otherwise false
@@ -739,53 +321,17 @@ public class ReplicationSource extends Thread
return this.replicationPeers.getStatusOfPeer(this.peerId);
}
- /**
- * If the queue isn't empty, switch to the next one
- * Else if this is a recovered queue, it means we're done!
- * Else we'll just continue to try reading the log file
- * @return true if we're done with the current file, false if we should
- * continue trying to read from it
- */
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DE_MIGHT_IGNORE",
- justification="Yeah, this is how it works")
- protected boolean processEndOfFile() {
- if (this.queue.size() != 0) {
- if (LOG.isTraceEnabled()) {
- String filesize = "N/A";
- try {
- FileStatus stat = this.fs.getFileStatus(this.currentPath);
- filesize = stat.getLen()+"";
- } catch (IOException ex) {}
- LOG.trace("Reached the end of a log, stats: " + getStats() +
- ", and the length of the file is " + filesize);
- }
- this.currentPath = null;
- this.repLogReader.finishCurrentFile();
- this.reader = null;
- return true;
- } else if (this.replicationQueueInfo.isQueueRecovered()) {
- this.manager.closeRecoveredQueue(this);
- LOG.info("Finished recovering the queue with the following stats " + getStats());
- this.running = false;
- return true;
- }
- return false;
- }
-
@Override
public void startup() {
String n = Thread.currentThread().getName();
- Thread.UncaughtExceptionHandler handler =
- new Thread.UncaughtExceptionHandler() {
- @Override
- public void uncaughtException(final Thread t, final Throwable e) {
- LOG.error("Unexpected exception in ReplicationSource," +
- " currentPath=" + currentPath, e);
- }
- };
- Threads.setDaemonThreadRunning(
- this, n + ".replicationSource," +
- this.peerClusterZnode, handler);
+ Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(final Thread t, final Throwable e) {
+ LOG.error("Unexpected exception in ReplicationSource", e);
+ }
+ };
+ Threads
+ .setDaemonThreadRunning(this, n + ".replicationSource," + this.peerClusterZnode, handler);
}
@Override
@@ -807,14 +353,21 @@ public class ReplicationSource extends Thread
LOG.error("Closing source " + this.peerClusterZnode
+ " because an error occurred: " + reason, cause);
}
- this.running = false;
- this.interrupt();
+ this.sourceRunning = false;
+ Collection<ReplicationSourceWorkerThread> workers = workerThreads.values();
+ for (ReplicationSourceWorkerThread worker : workers) {
+ worker.setWorkerRunning(false);
+ worker.interrupt();
+ }
ListenableFuture<Service.State> future = null;
if (this.replicationEndpoint != null) {
future = this.replicationEndpoint.stop();
}
if (join) {
- Threads.shutdown(this, this.sleepForRetries);
+ for (ReplicationSourceWorkerThread worker : workers) {
+ Threads.shutdown(worker, this.sleepForRetries);
+ LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated");
+ }
if (future != null) {
try {
future.get();
@@ -837,11 +390,15 @@ public class ReplicationSource extends Thread
@Override
public Path getCurrentPath() {
- return this.currentPath;
+ // only for testing
+ for (ReplicationSourceWorkerThread worker : workerThreads.values()) {
+ if (worker.getCurrentPath() != null) return worker.getCurrentPath();
+ }
+ return null;
}
- private boolean isActive() {
- return !this.stopper.isStopped() && this.running && !isInterrupted();
+ private boolean isSourceActive() {
+ return !this.stopper.isStopped() && this.sourceRunning;
}
/**
@@ -868,10 +425,23 @@ public class ReplicationSource extends Thread
@Override
public String getStats() {
- long position = this.repLogReader.getPosition();
- return "Total replicated edits: " + totalReplicatedEdits +
- ", currently replicating from: " + this.currentPath +
- " at position: " + position;
+ StringBuilder sb = new StringBuilder();
+ sb.append("Total replicated edits: ").append(totalReplicatedEdits)
+ .append(", current progress: \n");
+ for (Map.Entry<String, ReplicationSourceWorkerThread> entry : workerThreads.entrySet()) {
+ String walGroupId = entry.getKey();
+ ReplicationSourceWorkerThread worker = entry.getValue();
+ long position = worker.getCurrentPosition();
+ Path currentPath = worker.getCurrentPath();
+ sb.append("walGroup [").append(walGroupId).append("]: ");
+ if (currentPath != null) {
+ sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
+ .append(position).append("\n");
+ } else {
+ sb.append("no replication ongoing, waiting for new log");
+ }
+ }
+ return sb.toString();
}
/**
@@ -881,4 +451,572 @@ public class ReplicationSource extends Thread
public MetricsSource getSourceMetrics() {
return this.metrics;
}
+
+ public class ReplicationSourceWorkerThread extends Thread {
+ private ReplicationSource source;
+ private String walGroupId;
+ private PriorityBlockingQueue<Path> queue;
+ private ReplicationQueueInfo replicationQueueInfo;
+ // Our reader for the current log. open/close handled by repLogReader
+ private WAL.Reader reader;
+ // Last position in the log that we sent to ZooKeeper
+ private long lastLoggedPosition = -1;
+ // Path of the current log
+ private volatile Path currentPath;
+ // Handle on the log reader helper
+ private ReplicationWALReaderManager repLogReader;
+ // Current number of operations (Put/Delete) that we need to replicate
+ private int currentNbOperations = 0;
+ // Current size of data we need to replicate
+ private int currentSize = 0;
+ // Indicates whether this particular worker is running
+ private boolean workerRunning = true;
+
+ public ReplicationSourceWorkerThread(String walGroupId, PriorityBlockingQueue<Path> queue,
+ ReplicationQueueInfo replicationQueueInfo, ReplicationSource source) {
+ this.walGroupId = walGroupId;
+ this.queue = queue;
+ this.replicationQueueInfo = replicationQueueInfo;
+ this.repLogReader = new ReplicationWALReaderManager(fs, conf);
+ this.source = source;
+ }
+
+ @Override
+ public void run() {
+ // If this is recovered, the queue is already full and the first log
+ // normally has a position (unless the RS failed between 2 logs)
+ if (this.replicationQueueInfo.isQueueRecovered()) {
+ try {
+ this.repLogReader.setPosition(replicationQueues.getLogPosition(peerClusterZnode,
+ this.queue.peek().getName()));
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position "
+ + this.repLogReader.getPosition());
+ }
+ } catch (ReplicationException e) {
+ terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
+ }
+ }
+ // Loop until we close down
+ while (isWorkerActive()) {
+ int sleepMultiplier = 1;
+ // Sleep until replication is enabled again
+ if (!isPeerEnabled()) {
+ if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
+ sleepMultiplier++;
+ }
+ continue;
+ }
+ Path oldPath = getCurrentPath(); //note that in the current scenario,
+ //oldPath will be null when a log roll
+ //happens.
+ // Get a new path
+ boolean hasCurrentPath = getNextPath();
+ if (getCurrentPath() != null && oldPath == null) {
+ sleepMultiplier = 1; //reset the sleepMultiplier on a path change
+ }
+ if (!hasCurrentPath) {
+ if (sleepForRetries("No log to process", sleepMultiplier)) {
+ sleepMultiplier++;
+ }
+ continue;
+ }
+ boolean currentWALisBeingWrittenTo = false;
+ //For WAL files we own (rather than recovered), take a snapshot of whether the
+ //current WAL file (this.currentPath) is in use (for writing) NOW!
+ //Since the new WAL paths are enqueued only after the prev WAL file
+ //is 'closed', presence of an element in the queue means that
+ //the previous WAL file was closed, else the file is in use (currentPath)
+ //We take the snapshot now so that we are protected against races
+ //where a new file gets enqueued while the current file is being processed
+ //(and where we just finished reading the current file).
+ if (!this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0) {
+ currentWALisBeingWrittenTo = true;
+ }
+ // Open a reader on it
+ if (!openReader(sleepMultiplier)) {
+ // Reset the sleep multiplier, else it'd be reused for the next file
+ sleepMultiplier = 1;
+ continue;
+ }
+
+ // If we got a null reader but didn't continue, then sleep and continue
+ if (this.reader == null) {
+ if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
+ sleepMultiplier++;
+ }
+ continue;
+ }
+
+ boolean gotIOE = false;
+ currentNbOperations = 0;
+ List<WAL.Entry> entries = new ArrayList<WAL.Entry>(1);
+ currentSize = 0;
+ try {
+ if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) {
+ continue;
+ }
+ } catch (IOException ioe) {
+ LOG.warn(peerClusterZnode + " Got: ", ioe);
+ gotIOE = true;
+ if (ioe.getCause() instanceof EOFException) {
+
+ boolean considerDumping = false;
+ if (this.replicationQueueInfo.isQueueRecovered()) {
+ try {
+ FileStatus stat = fs.getFileStatus(this.currentPath);
+ if (stat.getLen() == 0) {
+ LOG.warn(peerClusterZnode + " Got EOF and the file was empty");
+ }
+ considerDumping = true;
+ } catch (IOException e) {
+ LOG.warn(peerClusterZnode + " Got while getting file size: ", e);
+ }
+ }
+
+ if (considerDumping &&
+ sleepMultiplier == maxRetriesMultiplier &&
+ processEndOfFile()) {
+ continue;
+ }
+ }
+ } finally {
+ try {
+ this.reader = null;
+ this.repLogReader.closeReader();
+ } catch (IOException e) {
+ gotIOE = true;
+ LOG.warn("Unable to finalize the tailing of a file", e);
+ }
+ }
+
+ // If we didn't get anything to replicate, or if we hit a IOE,
+ // wait a bit and retry.
+ // But if we need to stop, don't bother sleeping
+ if (isWorkerActive() && (gotIOE || entries.isEmpty())) {
+ if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
+ manager.logPositionAndCleanOldLogs(this.currentPath,
+ peerClusterZnode, this.repLogReader.getPosition(),
+ this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
+ this.lastLoggedPosition = this.repLogReader.getPosition();
+ }
+ // Reset the sleep multiplier if nothing has actually gone wrong
+ if (!gotIOE) {
+ sleepMultiplier = 1;
+ // if there was nothing to ship and it's not an error
+ // set "ageOfLastShippedOp" to <now> to indicate that we're current
+ metrics.setAgeOfLastShippedOp(System.currentTimeMillis(), walGroupId);
+ }
+ if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
+ sleepMultiplier++;
+ }
+ continue;
+ }
+ sleepMultiplier = 1;
+ shipEdits(currentWALisBeingWrittenTo, entries);
+ }
+ if (replicationQueueInfo.isQueueRecovered()) {
+ // use synchronize to make sure one last thread will clean the queue
+ synchronized (workerThreads) {
+ Threads.sleep(100);// wait a short while for other worker thread to fully exit
+ boolean allOtherTaskDone = true;
+ for (ReplicationSourceWorkerThread worker : workerThreads.values()) {
+ if (!worker.equals(this) && worker.isAlive()) {
+ allOtherTaskDone = false;
+ break;
+ }
+ }
+ if (allOtherTaskDone) {
+ manager.closeRecoveredQueue(this.source);
+ LOG.info("Finished recovering queue " + peerClusterZnode
+ + " with the following stats: " + getStats());
+ }
+ }
+ }
+ }
+
+ /**
+ * Read all the entries from the current log files and retain those that need to be replicated.
+ * Else, process the end of the current file.
+ * @param currentWALisBeingWrittenTo is the current WAL being written to
+ * @param entries resulting entries to be replicated
+ * @return true if we got nothing and went to the next file, false if we got entries
+ * @throws IOException
+ */
+ protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo,
+ List<WAL.Entry> entries) throws IOException {
+ long seenEntries = 0;
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Seeking in " + this.currentPath + " at position "
+ + this.repLogReader.getPosition());
+ }
+ this.repLogReader.seek();
+ long positionBeforeRead = this.repLogReader.getPosition();
+ WAL.Entry entry = this.repLogReader.readNextAndSetPosition();
+ while (entry != null) {
+ metrics.incrLogEditsRead();
+ seenEntries++;
+
+ // don't replicate if the log entries have already been consumed by the cluster
+ if (replicationEndpoint.canReplicateToSameCluster()
+ || !entry.getKey().getClusterIds().contains(peerClusterId)) {
+ // Remove all KVs that should not be replicated
+ entry = walEntryFilter.filter(entry);
+ WALEdit edit = null;
+ WALKey logKey = null;
+ if (entry != null) {
+ edit = entry.getEdit();
+ logKey = entry.getKey();
+ }
+
+ if (edit != null && edit.size() != 0) {
+ // Mark that the current cluster has the change
+ logKey.addClusterId(clusterId);
+ currentNbOperations += countDistinctRowKeys(edit);
+ entries.add(entry);
+ currentSize += entry.getEdit().heapSize();
+ } else {
+ metrics.incrLogEditsFiltered();
+ }
+ }
+ // Stop if too many entries or too big
+ // FIXME check the relationship between single wal group and overall
+ if (currentSize >= replicationQueueSizeCapacity
+ || entries.size() >= replicationQueueNbCapacity) {
+ break;
+ }
+ try {
+ entry = this.repLogReader.readNextAndSetPosition();
+ } catch (IOException ie) {
+ LOG.debug("Break on IOE: " + ie.getMessage());
+ break;
+ }
+ }
+ metrics.incrLogReadInBytes(this.repLogReader.getPosition() - positionBeforeRead);
+ if (currentWALisBeingWrittenTo) {
+ return false;
+ }
+ // If we didn't get anything and the queue has an object, it means we
+ // hit the end of the file for sure
+ return seenEntries == 0 && processEndOfFile();
+ }
+
+ /**
+ * Poll for the next path
+ * @return true if a path was obtained, false if not
+ */
+ protected boolean getNextPath() {
+ try {
+ if (this.currentPath == null) {
+ this.currentPath = queue.poll(sleepForRetries, TimeUnit.MILLISECONDS);
+ int queueSize = logQueueSize.decrementAndGet();
+ metrics.setSizeOfLogQueue(queueSize);
+ if (this.currentPath != null) {
+ // For recovered queue: must use peerClusterZnode since peerId is a parsed value
+ manager.cleanOldLogs(this.currentPath.getName(), peerClusterZnode,
+ this.replicationQueueInfo.isQueueRecovered());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("New log: " + this.currentPath);
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while reading edits", e);
+ }
+ return this.currentPath != null;
+ }
+
+ /**
+ * Open a reader on the current path
+ *
+ * @param sleepMultiplier by how many times the default sleeping time is augmented
+ * @return true if we should continue with that file, false if we are over with it
+ */
+ protected boolean openReader(int sleepMultiplier) {
+
+ try {
+ try {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Opening log " + this.currentPath);
+ }
+ this.reader = repLogReader.openReader(this.currentPath);
+ } catch (FileNotFoundException fnfe) {
+ if (this.replicationQueueInfo.isQueueRecovered()) {
+ // We didn't find the log in the archive directory, look if it still
+ // exists in the dead RS folder (there could be a chain of failures
+ // to look at)
+ List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
+ LOG.info("NB dead servers : " + deadRegionServers.size());
+ final Path rootDir = FSUtils.getRootDir(conf);
+ for (String curDeadServerName : deadRegionServers) {
+ final Path deadRsDirectory = new Path(rootDir,
+ DefaultWALProvider.getWALDirectoryName(curDeadServerName));
+ Path[] locs = new Path[] {
+ new Path(deadRsDirectory, currentPath.getName()),
+ new Path(deadRsDirectory.suffix(DefaultWALProvider.SPLITTING_EXT),
+ currentPath.getName()),
+ };
+ for (Path possibleLogLocation : locs) {
+ LOG.info("Possible location " + possibleLogLocation.toUri().toString());
+ if (manager.getFs().exists(possibleLogLocation)) {
+ // We found the right new location
+ LOG.info("Log " + this.currentPath + " still exists at " +
+ possibleLogLocation);
+ // Breaking here will make us sleep since reader is null
+ // TODO why don't we need to set currentPath and call openReader here?
+ return true;
+ }
+ }
+ }
+ // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
+ // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
+ if (stopper instanceof ReplicationSyncUp.DummyServer) {
+ // N.B. the ReplicationSyncUp tool sets the manager.getLogDir to the root of the wal
+ // area rather than to the wal area for a particular region server.
+ FileStatus[] rss = fs.listStatus(manager.getLogDir());
+ for (FileStatus rs : rss) {
+ Path p = rs.getPath();
+ FileStatus[] logs = fs.listStatus(p);
+ for (FileStatus log : logs) {
+ p = new Path(p, log.getPath().getName());
+ if (p.getName().equals(currentPath.getName())) {
+ currentPath = p;
+ LOG.info("Log " + currentPath.getName() + " found at " + currentPath);
+ // Open the log at the new location
+ this.openReader(sleepMultiplier);
+ return true;
+ }
+ }
+ }
+ }
+
+ // TODO What happens if the log was missing from every single location?
+ // Although we need to check a couple of times as the log could have
+ // been moved by the master between the checks
+ // It can also happen if a recovered queue wasn't properly cleaned,
+ // such that the znode pointing to a log exists but the log was
+ // deleted a long time ago.
+ // For the moment, we'll throw the IO and processEndOfFile
+ throw new IOException("File from recovered queue is " +
+ "nowhere to be found", fnfe);
+ } else {
+ // If the log was archived, continue reading from there
+ Path archivedLogLocation =
+ new Path(manager.getOldLogDir(), currentPath.getName());
+ if (manager.getFs().exists(archivedLogLocation)) {
+ currentPath = archivedLogLocation;
+ LOG.info("Log " + this.currentPath + " was moved to " +
+ archivedLogLocation);
+ // Open the log at the new location
+ this.openReader(sleepMultiplier);
+
+ }
+ // TODO What happens the log is missing in both places?
+ }
+ }
+ } catch (IOException ioe) {
+ if (ioe instanceof EOFException && isCurrentLogEmpty()) return true;
+ LOG.warn(peerClusterZnode + " Got: ", ioe);
+ this.reader = null;
+ if (ioe.getCause() instanceof NullPointerException) {
+ // Workaround for race condition in HDFS-4380
+ // which throws a NPE if we open a file before any data node has the most recent block
+ // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
+ LOG.warn("Got NPE opening reader, will retry.");
+ } else if (sleepMultiplier == maxRetriesMultiplier) {
+ // TODO Need a better way to determine if a file is really gone but
+ // TODO without scanning all logs dir
+ LOG.warn("Waited too long for this file, considering dumping");
+ return !processEndOfFile();
+ }
+ }
+ return true;
+ }
+
+ /*
+ * Checks whether the current log file is empty, and it is not a recovered queue. This is to
+ * handle scenario when in an idle cluster, there is no entry in the current log and we keep on
+ * trying to read the log file and get EOFException. In case of a recovered queue the last log
+ * file may be empty, and we don't want to retry that.
+ */
+ private boolean isCurrentLogEmpty() {
+ return (this.repLogReader.getPosition() == 0 &&
+ !this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0);
+ }
+
+ /**
+ * Count the number of different row keys in the given edit because of mini-batching. We assume
+ * that there's at least one Cell in the WALEdit.
+ * @param edit edit to count row keys from
+ * @return number of different row keys
+ */
+ private int countDistinctRowKeys(WALEdit edit) {
+ List<Cell> cells = edit.getCells();
+ int distinctRowKeys = 1;
+ Cell lastCell = cells.get(0);
+ for (int i = 0; i < edit.size(); i++) {
+ if (!CellUtil.matchingRow(cells.get(i), lastCell)) {
+ distinctRowKeys++;
+ }
+ }
+ return distinctRowKeys;
+ }
+
+ /**
+ * Do the shipping logic
+ * @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
+ * written to when this method was called
+ */
+ protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries) {
+ int sleepMultiplier = 0;
+ if (entries.isEmpty()) {
+ LOG.warn("Was given 0 edits to ship");
+ return;
+ }
+ while (isWorkerActive()) {
+ try {
+ if (throttler.isEnabled()) {
+ long sleepTicks = throttler.getNextSleepInterval(currentSize);
+ if (sleepTicks > 0) {
+ try {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
+ }
+ Thread.sleep(sleepTicks);
+ } catch (InterruptedException e) {
+ LOG.debug("Interrupted while sleeping for throttling control");
+ Thread.currentThread().interrupt();
+ // current thread might be interrupted to terminate
+ // directly go back to while() for confirm this
+ continue;
+ }
+ // reset throttler's cycle start tick when sleep for throttling occurs
+ throttler.resetStartTick();
+ }
+ }
+ // create replicateContext here, so the entries can be GC'd upon return from this call
+ // stack
+ ReplicationEndpoint.ReplicateContext replicateContext =
+ new ReplicationEndpoint.ReplicateContext();
+ replicateContext.setEntries(entries).setSize(currentSize);
+ replicateContext.setWalGroupId(walGroupId);
+
+ long startTimeNs = System.nanoTime();
+ // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
+ boolean replicated = replicationEndpoint.replicate(replicateContext);
+ long endTimeNs = System.nanoTime();
+
+ if (!replicated) {
+ continue;
+ } else {
+ sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
+ }
+
+ if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
+ manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode,
+ this.repLogReader.getPosition(), this.replicationQueueInfo.isQueueRecovered(),
+ currentWALisBeingWrittenTo);
+ this.lastLoggedPosition = this.repLogReader.getPosition();
+ }
+ if (throttler.isEnabled()) {
+ throttler.addPushSize(currentSize);
+ }
+ totalReplicatedEdits.addAndGet(entries.size());
+ totalReplicatedOperations.addAndGet(currentNbOperations);
+ // FIXME check relationship between wal group and overall
+ metrics.shipBatch(currentNbOperations, currentSize / 1024);
+ metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
+ walGroupId);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Replicated " + totalReplicatedEdits + " entries in total, or "
+ + totalReplicatedOperations + " operations in "
+ + ((endTimeNs - startTimeNs) / 1000000) + " ms");
+ }
+ break;
+ } catch (Exception ex) {
+ LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:"
+ + org.apache.hadoop.util.StringUtils.stringifyException(ex));
+ if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
+ sleepMultiplier++;
+ }
+ }
+ }
+ }
+
+ /**
+ * If the queue isn't empty, switch to the next one Else if this is a recovered queue, it means
+ * we're done! Else we'll just continue to try reading the log file
+ * @return true if we're done with the current file, false if we should continue trying to read
+ * from it
+ */
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE",
+ justification = "Yeah, this is how it works")
+ protected boolean processEndOfFile() {
+ if (this.queue.size() != 0) {
+ if (LOG.isTraceEnabled()) {
+ String filesize = "N/A";
+ try {
+ FileStatus stat = fs.getFileStatus(this.currentPath);
+ filesize = stat.getLen() + "";
+ } catch (IOException ex) {
+ }
+ LOG.trace("Reached the end of log " + this.currentPath + ", stats: " + getStats()
+ + ", and the length of the file is " + filesize);
+ }
+ this.currentPath = null;
+ this.repLogReader.finishCurrentFile();
+ this.reader = null;
+ return true;
+ } else if (this.replicationQueueInfo.isQueueRecovered()) {
+ LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
+ + peerClusterZnode);
+ workerRunning = false;
+ return true;
+ }
+ return false;
+ }
+
+ public void startup() {
+ String n = Thread.currentThread().getName();
+ Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(final Thread t, final Throwable e) {
+ LOG.error("Unexpected exception in ReplicationSourceWorkerThread," + " currentPath="
+ + getCurrentPath(), e);
+ }
+ };
+ Threads.setDaemonThreadRunning(this, n + ".replicationSource." + walGroupId + ","
+ + peerClusterZnode, handler);
+ workerThreads.put(walGroupId, this);
+ }
+
+ public Path getCurrentPath() {
+ return this.currentPath;
+ }
+
+ public long getCurrentPosition() {
+ return this.repLogReader.getPosition();
+ }
+
+ private boolean isWorkerActive() {
+ return !stopper.isStopped() && workerRunning && !isInterrupted();
+ }
+
+ private void terminate(String reason, Exception cause) {
+ if (cause == null) {
+ LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + reason);
+
+ } else {
+ LOG.error("Closing worker for wal group " + this.walGroupId
+ + " because an error occurred: " + reason, cause);
+ }
+ this.interrupt();
+ Threads.shutdown(this, sleepForRetries);
+ LOG.info("ReplicationSourceWorker " + this.getName() + " terminated");
+ }
+
+ public void setWorkerRunning(boolean workerRunning) {
+ this.workerRunning = workerRunning;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/242adad5/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 0c8f6f9..a8cffba 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -23,9 +23,12 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeSet;
@@ -56,6 +59,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -91,13 +95,14 @@ public class ReplicationSourceManager implements ReplicationListener {
// All about stopping
private final Server server;
// All logs we are currently tracking
- private final Map<String, SortedSet<String>> walsById;
+ // Index structure of the map is: peer_id->logPrefix/logGroup->logs
+ private final Map<String, Map<String, SortedSet<String>>> walsById;
// Logs for recovered sources we are currently tracking
- private final Map<String, SortedSet<String>> walsByIdRecoveredQueues;
+ private final Map<String, Map<String, SortedSet<String>>> walsByIdRecoveredQueues;
private final Configuration conf;
private final FileSystem fs;
- // The path to the latest log we saw, for new coming sources
- private Path latestPath;
+ // The paths to the latest log of each wal group, for new coming peers
+ private Set<Path> latestPaths;
// Path to the wals directories
private final Path logDir;
// Path to the wal archive
@@ -133,8 +138,8 @@ public class ReplicationSourceManager implements ReplicationListener {
this.replicationPeers = replicationPeers;
this.replicationTracker = replicationTracker;
this.server = server;
- this.walsById = new HashMap<String, SortedSet<String>>();
- this.walsByIdRecoveredQueues = new ConcurrentHashMap<String, SortedSet<String>>();
+ this.walsById = new HashMap<String, Map<String, SortedSet<String>>>();
+ this.walsByIdRecoveredQueues = new ConcurrentHashMap<String, Map<String, SortedSet<String>>>();
this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
this.conf = conf;
this.fs = fs;
@@ -158,6 +163,7 @@ public class ReplicationSourceManager implements ReplicationListener {
tfb.setDaemon(true);
this.executor.setThreadFactory(tfb.build());
this.rand = new Random();
+ this.latestPaths = Collections.synchronizedSet(new HashSet<Path>());
}
/**
@@ -189,15 +195,16 @@ public class ReplicationSourceManager implements ReplicationListener {
* @param queueRecovered Whether this is a recovered queue
*/
public void cleanOldLogs(String key, String id, boolean queueRecovered) {
+ String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(key);
if (queueRecovered) {
- SortedSet<String> wals = walsByIdRecoveredQueues.get(id);
+ SortedSet<String> wals = walsByIdRecoveredQueues.get(id).get(logPrefix);
if (wals != null && !wals.first().equals(key)) {
cleanOldLogs(wals, key, id);
}
} else {
synchronized (this.walsById) {
- SortedSet<String> wals = walsById.get(id);
- if (!wals.first().equals(key)) {
+ SortedSet<String> wals = walsById.get(id).get(logPrefix);
+ if (wals != null && !wals.first().equals(key)) {
cleanOldLogs(wals, key, id);
}
}
@@ -238,36 +245,44 @@ public class ReplicationSourceManager implements ReplicationListener {
}
/**
- * Add a new normal source to this region server
+ * Add sources for the given peer cluster on this region server. For the newly added peer, we only
+ * need to enqueue the latest log of each wal group and do replication
* @param id the id of the peer cluster
* @return the source that was created
* @throws IOException
*/
protected ReplicationSourceInterface addSource(String id) throws IOException,
ReplicationException {
- ReplicationPeerConfig peerConfig
- = replicationPeers.getReplicationPeerConfig(id);
+ ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id);
ReplicationPeer peer = replicationPeers.getPeer(id);
ReplicationSourceInterface src =
getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
this.replicationPeers, server, id, this.clusterId, peerConfig, peer);
synchronized (this.walsById) {
this.sources.add(src);
- this.walsById.put(id, new TreeSet<String>());
+ Map<String, SortedSet<String>> walsByGroup = new HashMap<String, SortedSet<String>>();
+ this.walsById.put(id, walsByGroup);
// Add the latest wal to that source's queue
- if (this.latestPath != null) {
- String name = this.latestPath.getName();
- this.walsById.get(id).add(name);
- try {
- this.replicationQueues.addLog(src.getPeerClusterZnode(), name);
- } catch (ReplicationException e) {
- String message =
- "Cannot add log to queue when creating a new source, queueId="
- + src.getPeerClusterZnode() + ", filename=" + name;
- server.stop(message);
- throw e;
+ synchronized (latestPaths) {
+ if (this.latestPaths.size() > 0) {
+ for (Path logPath : latestPaths) {
+ String name = logPath.getName();
+ String walPrefix = DefaultWALProvider.getWALPrefixFromWALName(name);
+ SortedSet<String> logs = new TreeSet<String>();
+ logs.add(name);
+ walsByGroup.put(walPrefix, logs);
+ try {
+ this.replicationQueues.addLog(id, name);
+ } catch (ReplicationException e) {
+ String message =
+ "Cannot add log to queue when creating a new source, queueId=" + id
+ + ", filename=" + name;
+ server.stop(message);
+ throw e;
+ }
+ src.enqueueLog(logPath);
+ }
}
- src.enqueueLog(this.latestPath);
}
}
src.startup();
@@ -302,7 +317,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* Get a copy of the wals of the first source on this rs
* @return a sorted set of wal names
*/
- protected Map<String, SortedSet<String>> getWALs() {
+ protected Map<String, Map<String, SortedSet<String>>> getWALs() {
return Collections.unmodifiableMap(walsById);
}
@@ -310,7 +325,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* Get a copy of the wals of the recovered sources on this rs
* @return a sorted set of wal names
*/
- protected Map<String, SortedSet<String>> getWalsByIdRecoveredQueues() {
+ protected Map<String, Map<String, SortedSet<String>>> getWalsByIdRecoveredQueues() {
return Collections.unmodifiableMap(walsByIdRecoveredQueues);
}
@@ -331,27 +346,70 @@ public class ReplicationSourceManager implements ReplicationListener {
}
void preLogRoll(Path newLog) throws IOException {
- synchronized (this.walsById) {
- String name = newLog.getName();
- for (ReplicationSourceInterface source : this.sources) {
+ recordLog(newLog);
+ String logName = newLog.getName();
+ String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(logName);
+ synchronized (latestPaths) {
+ Iterator<Path> iterator = latestPaths.iterator();
+ while (iterator.hasNext()) {
+ Path path = iterator.next();
+ if (path.getName().contains(logPrefix)) {
+ iterator.remove();
+ break;
+ }
+ }
+ this.latestPaths.add(newLog);
+ }
+ }
+
+ /**
+ * Check and enqueue the given log to the correct source. If there's still no source for the
+ * group to which the given log belongs, create one
+ * @param logPath the log path to check and enqueue
+ * @throws IOException
+ */
+ private void recordLog(Path logPath) throws IOException {
+ String logName = logPath.getName();
+ String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(logName);
+ // update replication queues on ZK
+ synchronized (replicationPeers) {// synchronize on replicationPeers to avoid adding source for
+ // the to-be-removed peer
+ for (String id : replicationPeers.getPeerIds()) {
try {
- this.replicationQueues.addLog(source.getPeerClusterZnode(), name);
+ this.replicationQueues.addLog(id, logName);
} catch (ReplicationException e) {
- throw new IOException("Cannot add log to replication queue with id="
- + source.getPeerClusterZnode() + ", filename=" + name, e);
+ throw new IOException("Cannot add log to replication queue"
+ + " when creating a new source, queueId=" + id + ", filename=" + logName, e);
}
}
- for (SortedSet<String> wals : this.walsById.values()) {
- if (this.sources.isEmpty()) {
- // If there's no slaves, don't need to keep the old wals since
- // we only consider the last one when a new slave comes in
- wals.clear();
+ }
+ // update walsById map
+ synchronized (walsById) {
+ for (Map.Entry<String, Map<String, SortedSet<String>>> entry : this.walsById.entrySet()) {
+ String peerId = entry.getKey();
+ Map<String, SortedSet<String>> walsByPrefix = entry.getValue();
+ boolean existingPrefix = false;
+ for (Map.Entry<String, SortedSet<String>> walsEntry : walsByPrefix.entrySet()) {
+ SortedSet<String> wals = walsEntry.getValue();
+ if (this.sources.isEmpty()) {
+ // If there's no slaves, don't need to keep the old wals since
+ // we only consider the last one when a new slave comes in
+ wals.clear();
+ }
+ if (logPrefix.equals(walsEntry.getKey())) {
+ wals.add(logName);
+ existingPrefix = true;
+ }
+ }
+ if (!existingPrefix) {
+ // The new log belongs to a new group, add it into this peer
+ LOG.debug("Start tracking logs for wal group " + logPrefix + " for peer " + peerId);
+ SortedSet<String> wals = new TreeSet<String>();
+ wals.add(logName);
+ walsByPrefix.put(logPrefix, wals);
}
- wals.add(name);
}
}
-
- this.latestPath = newLog;
}
void postLogRoll(Path newLog) throws IOException {
@@ -376,7 +434,7 @@ public class ReplicationSourceManager implements ReplicationListener {
final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
final Server server, final String peerId, final UUID clusterId,
final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer)
- throws IOException {
+ throws IOException {
RegionServerCoprocessorHost rsServerHost = null;
TableDescriptors tableDescriptors = null;
if (server instanceof HRegionServer) {
@@ -414,7 +472,8 @@ public class ReplicationSourceManager implements ReplicationListener {
}
}
} catch (Exception e) {
- LOG.warn("Passed replication endpoint implementation throws errors", e);
+ LOG.warn("Passed replication endpoint implementation throws errors"
+ + " while initializing ReplicationSource for peer: " + peerId, e);
throw new IOException(e);
}
@@ -470,7 +529,7 @@ public class ReplicationSourceManager implements ReplicationListener {
+ sources.size() + " and another "
+ oldsources.size() + " that were recovered");
String terminateMessage = "Replication stream was removed by a user";
- ReplicationSourceInterface srcToRemove = null;
+ List<ReplicationSourceInterface> srcToRemove = new ArrayList<ReplicationSourceInterface>();
List<ReplicationSourceInterface> oldSourcesToDelete =
new ArrayList<ReplicationSourceInterface>();
// First close all the recovered sources for this peer
@@ -486,19 +545,23 @@ public class ReplicationSourceManager implements ReplicationListener {
LOG.info("Number of deleted recovered sources for " + id + ": "
+ oldSourcesToDelete.size());
// Now look for the one on this cluster
- for (ReplicationSourceInterface src : this.sources) {
- if (id.equals(src.getPeerClusterId())) {
- srcToRemove = src;
- break;
+ synchronized (this.replicationPeers) {// synchronize on replicationPeers to avoid adding source
+ // for the to-be-removed peer
+ for (ReplicationSourceInterface src : this.sources) {
+ if (id.equals(src.getPeerClusterId())) {
+ srcToRemove.add(src);
+ }
}
+ if (srcToRemove.size() == 0) {
+ LOG.error("The queue we wanted to close is missing " + id);
+ return;
+ }
+ for (ReplicationSourceInterface toRemove : srcToRemove) {
+ toRemove.terminate(terminateMessage);
+ this.sources.remove(toRemove);
+ }
+ deleteSource(id, true);
}
- if (srcToRemove == null) {
- LOG.error("The queue we wanted to close is missing " + id);
- return;
- }
- srcToRemove.terminate(terminateMessage);
- this.sources.remove(srcToRemove);
- deleteSource(id, true);
}
@Override
@@ -580,6 +643,7 @@ public class ReplicationSourceManager implements ReplicationListener {
for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
String peerId = entry.getKey();
+ SortedSet<String> walsSet = entry.getValue();
try {
// there is not an actual peer defined corresponding to peerId for the failover.
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
@@ -596,7 +660,20 @@ public class ReplicationSourceManager implements ReplicationListener {
LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode);
continue;
}
+ // track sources in walsByIdRecoveredQueues
+ Map<String, SortedSet<String>> walsByGroup = new HashMap<String, SortedSet<String>>();
+ walsByIdRecoveredQueues.put(peerId, walsByGroup);
+ for (String wal : walsSet) {
+ String walPrefix = DefaultWALProvider.getWALPrefixFromWALName(wal);
+ SortedSet<String> wals = walsByGroup.get(walPrefix);
+ if (wals == null) {
+ wals = new TreeSet<String>();
+ walsByGroup.put(walPrefix, wals);
+ }
+ wals.add(wal);
+ }
+ // enqueue sources
ReplicationSourceInterface src =
getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
server, peerId, this.clusterId, peerConfig, peer);
@@ -605,12 +682,10 @@ public class ReplicationSourceManager implements ReplicationListener {
break;
}
oldsources.add(src);
- SortedSet<String> walsSet = entry.getValue();
for (String wal : walsSet) {
src.enqueueLog(new Path(oldLogDir, wal));
}
src.startup();
- walsByIdRecoveredQueues.put(peerId, walsSet);
} catch (IOException e) {
// TODO manage it
LOG.error("Failed creating a source", e);
http://git-wip-us.apache.org/repos/asf/hbase/blob/242adad5/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
index 44a033b..661016d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
@@ -366,4 +366,15 @@ public class DefaultWALProvider implements WALProvider {
}
}
+ /**
+ * Get prefix of the log from its name, assuming WAL name in format of
+ * log_prefix.filenumber.log_suffix @see {@link FSHLog#getCurrentFileName()}
+ * @param name Name of the WAL to parse
+ * @return prefix of the log
+ */
+ public static String getWALPrefixFromWALName(String name) {
+ int endIndex = name.replaceAll(META_WAL_PROVIDER_ID, "").lastIndexOf(".");
+ return name.substring(0, endIndex);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/242adad5/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
index 4ef320a..e44a4d1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
@@ -84,7 +84,7 @@ public class WALFactory {
}
}
- static final String WAL_PROVIDER = "hbase.wal.provider";
+ public static final String WAL_PROVIDER = "hbase.wal.provider";
static final String DEFAULT_WAL_PROVIDER = Providers.defaultProvider.name();
static final String META_WAL_PROVIDER = "hbase.wal.meta_provider";