You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2017/06/01 06:56:02 UTC
hbase git commit: HBASE-18130 Refactor ReplicationSource
Repository: hbase
Updated Branches:
refs/heads/master db8ce0566 -> 123086eda
HBASE-18130 Refactor ReplicationSource
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/123086ed
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/123086ed
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/123086ed
Branch: refs/heads/master
Commit: 123086edad75308f8682a491ff8affa545babe4c
Parents: db8ce05
Author: Guanghao Zhang <zg...@apache.org>
Authored: Sun May 28 16:39:59 2017 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Thu Jun 1 14:50:45 2017 +0800
----------------------------------------------------------------------
.../RecoveredReplicationSource.java | 182 ++++++
...RecoveredReplicationSourceShipperThread.java | 147 +++++
.../regionserver/ReplicationSource.java | 626 ++++---------------
.../regionserver/ReplicationSourceFactory.java | 55 ++
.../ReplicationSourceInterface.java | 65 +-
.../regionserver/ReplicationSourceManager.java | 23 +-
.../ReplicationSourceShipperThread.java | 336 ++++++++++
.../ReplicationSourceWALReaderThread.java | 1 +
.../replication/ReplicationSourceDummy.java | 36 +-
.../TestReplicationSourceManager.java | 2 +-
10 files changed, 930 insertions(+), 543 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/123086ed/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
new file mode 100644
index 0000000..388b8d4
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -0,0 +1,182 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.PriorityBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+
+/**
+ * Class that handles the recovered source of a replication stream, which is transfered from
+ * another dead region server. This will be closed when all logs are pushed to peer cluster.
+ */
+@InterfaceAudience.Private
+public class RecoveredReplicationSource extends ReplicationSource {
+
+ private static final Log LOG = LogFactory.getLog(RecoveredReplicationSource.class);
+
+ private String actualPeerId;
+
+ @Override
+ public void init(final Configuration conf, final FileSystem fs,
+ final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
+ final ReplicationPeers replicationPeers, final Stoppable stopper,
+ final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
+ final MetricsSource metrics) throws IOException {
+ super.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerClusterZnode,
+ clusterId, replicationEndpoint, metrics);
+ this.actualPeerId = this.replicationQueueInfo.getPeerId();
+ }
+
+ @Override
+ protected void tryStartNewShipperThread(String walGroupId, PriorityBlockingQueue<Path> queue) {
+ final RecoveredReplicationSourceShipperThread worker =
+ new RecoveredReplicationSourceShipperThread(conf, walGroupId, queue, this,
+ this.replicationQueues);
+ ReplicationSourceShipperThread extant = workerThreads.putIfAbsent(walGroupId, worker);
+ if (extant != null) {
+ LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
+ } else {
+ LOG.debug("Starting up worker for wal group " + walGroupId);
+ worker.startup(getUncaughtExceptionHandler());
+ worker.setWALReader(
+ startNewWALReaderThread(worker.getName(), walGroupId, queue, worker.getStartPosition()));
+ workerThreads.put(walGroupId, worker);
+ }
+ }
+
+ public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOException {
+ boolean hasPathChanged = false;
+ PriorityBlockingQueue<Path> newPaths =
+ new PriorityBlockingQueue<Path>(queueSizePerGroup, new LogsComparator());
+ pathsLoop: for (Path path : queue) {
+ if (fs.exists(path)) { // still in same location, don't need to do anything
+ newPaths.add(path);
+ continue;
+ }
+ // Path changed - try to find the right path.
+ hasPathChanged = true;
+ if (stopper instanceof ReplicationSyncUp.DummyServer) {
+ // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
+ // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
+ Path newPath = getReplSyncUpPath(path);
+ newPaths.add(newPath);
+ continue;
+ } else {
+ // See if Path exists in the dead RS folder (there could be a chain of failures
+ // to look at)
+ List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
+ LOG.info("NB dead servers : " + deadRegionServers.size());
+ final Path walDir = FSUtils.getWALRootDir(conf);
+ for (String curDeadServerName : deadRegionServers) {
+ final Path deadRsDirectory =
+ new Path(walDir, AbstractFSWALProvider.getWALDirectoryName(curDeadServerName));
+ Path[] locs = new Path[] { new Path(deadRsDirectory, path.getName()), new Path(
+ deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), path.getName()) };
+ for (Path possibleLogLocation : locs) {
+ LOG.info("Possible location " + possibleLogLocation.toUri().toString());
+ if (manager.getFs().exists(possibleLogLocation)) {
+ // We found the right new location
+ LOG.info("Log " + path + " still exists at " + possibleLogLocation);
+ newPaths.add(possibleLogLocation);
+ continue pathsLoop;
+ }
+ }
+ }
+ // didn't find a new location
+ LOG.error(
+ String.format("WAL Path %s doesn't exist and couldn't find its new location", path));
+ newPaths.add(path);
+ }
+ }
+
+ if (hasPathChanged) {
+ if (newPaths.size() != queue.size()) { // this shouldn't happen
+ LOG.error("Recovery queue size is incorrect");
+ throw new IOException("Recovery queue size error");
+ }
+ // put the correct locations in the queue
+ // since this is a recovered queue with no new incoming logs,
+ // there shouldn't be any concurrency issues
+ queue.clear();
+ for (Path path : newPaths) {
+ queue.add(path);
+ }
+ }
+ }
+
+ // N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal
+ // area rather than to the wal area for a particular region server.
+ private Path getReplSyncUpPath(Path path) throws IOException {
+ FileStatus[] rss = fs.listStatus(manager.getLogDir());
+ for (FileStatus rs : rss) {
+ Path p = rs.getPath();
+ FileStatus[] logs = fs.listStatus(p);
+ for (FileStatus log : logs) {
+ p = new Path(p, log.getPath().getName());
+ if (p.getName().equals(path.getName())) {
+ LOG.info("Log " + p.getName() + " found at " + p);
+ return p;
+ }
+ }
+ }
+ LOG.error("Didn't find path for: " + path.getName());
+ return path;
+ }
+
+ public void tryFinish() {
+ // use synchronize to make sure one last thread will clean the queue
+ synchronized (workerThreads) {
+ Threads.sleep(100);// wait a short while for other worker thread to fully exit
+ boolean allOtherTaskDone = true;
+ for (ReplicationSourceShipperThread worker : workerThreads.values()) {
+ if (worker.isActive()) {
+ allOtherTaskDone = false;
+ break;
+ }
+ }
+ if (allOtherTaskDone) {
+ manager.closeRecoveredQueue(this);
+ LOG.info("Finished recovering queue " + peerClusterZnode + " with the following stats: "
+ + getStats());
+ }
+ }
+ }
+
+ @Override
+ public String getPeerId() {
+ return this.actualPeerId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/123086ed/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipperThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipperThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipperThread.java
new file mode 100644
index 0000000..024b0c4
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipperThread.java
@@ -0,0 +1,147 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+import java.util.concurrent.PriorityBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch;
+import org.apache.hadoop.hbase.util.Threads;
+
+/**
+ * Used by a {@link RecoveredReplicationSource}.
+ */
+@InterfaceAudience.Private
+public class RecoveredReplicationSourceShipperThread extends ReplicationSourceShipperThread {
+
+ private static final Log LOG = LogFactory.getLog(RecoveredReplicationSourceShipperThread.class);
+ protected final RecoveredReplicationSource source;
+ private final ReplicationQueues replicationQueues;
+
+ public RecoveredReplicationSourceShipperThread(Configuration conf, String walGroupId,
+ PriorityBlockingQueue<Path> queue, RecoveredReplicationSource source,
+ ReplicationQueues replicationQueues) {
+ super(conf, walGroupId, queue, source);
+ this.source = source;
+ this.replicationQueues = replicationQueues;
+ }
+
+ @Override
+ public void run() {
+ // Loop until we close down
+ while (isActive()) {
+ int sleepMultiplier = 1;
+ // Sleep until replication is enabled again
+ if (!source.isPeerEnabled()) {
+ if (source.sleepForRetries("Replication is disabled", sleepMultiplier)) {
+ sleepMultiplier++;
+ }
+ continue;
+ }
+
+ while (entryReader == null) {
+ if (source.sleepForRetries("Replication WAL entry reader thread not initialized",
+ sleepMultiplier)) {
+ sleepMultiplier++;
+ }
+ }
+
+ try {
+ WALEntryBatch entryBatch = entryReader.take();
+ shipEdits(entryBatch);
+ if (entryBatch.getWalEntries().isEmpty()
+ && entryBatch.getLastSeqIds().isEmpty()) {
+ LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
+ + source.getPeerClusterZnode());
+ source.getSourceMetrics().incrCompletedRecoveryQueue();
+ setWorkerRunning(false);
+ continue;
+ }
+ } catch (InterruptedException e) {
+ LOG.trace("Interrupted while waiting for next replication entry batch", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ source.tryFinish();
+ }
+
+ @Override
+ public long getStartPosition() {
+ long startPosition = getRecoveredQueueStartPos();
+ int numRetries = 0;
+ while (numRetries <= maxRetriesMultiplier) {
+ try {
+ source.locateRecoveredPaths(queue);
+ break;
+ } catch (IOException e) {
+ LOG.error("Error while locating recovered queue paths, attempt #" + numRetries);
+ numRetries++;
+ }
+ }
+ return startPosition;
+ }
+
+ // If this is a recovered queue, the queue is already full and the first log
+ // normally has a position (unless the RS failed between 2 logs)
+ private long getRecoveredQueueStartPos() {
+ long startPosition = 0;
+ String peerClusterZnode = source.getPeerClusterZnode();
+ try {
+ startPosition = this.replicationQueues.getLogPosition(peerClusterZnode,
+ this.queue.peek().getName());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position "
+ + startPosition);
+ }
+ } catch (ReplicationException e) {
+ terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
+ }
+ return startPosition;
+ }
+
+ @Override
+ protected void updateLogPosition(long lastReadPosition) {
+ source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getPeerClusterZnode(),
+ lastReadPosition, true, false);
+ lastLoggedPosition = lastReadPosition;
+ }
+
+ private void terminate(String reason, Exception cause) {
+ if (cause == null) {
+ LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + reason);
+
+ } else {
+ LOG.error("Closing worker for wal group " + this.walGroupId
+ + " because an error occurred: " + reason, cause);
+ }
+ entryReader.interrupt();
+ Threads.shutdown(entryReader, sleepForRetries);
+ this.interrupt();
+ Threads.shutdown(this, sleepForRetries);
+ LOG.info("ReplicationSourceWorker " + this.getName() + " terminated");
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/123086ed/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 72da9bd..b86f35f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -18,9 +18,6 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
@@ -42,19 +39,14 @@ import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
@@ -66,11 +58,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
@@ -96,33 +84,30 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
// each presents a queue for one wal group
private Map<String, PriorityBlockingQueue<Path>> queues = new HashMap<>();
// per group queue size, keep no more than this number of logs in each wal group
- private int queueSizePerGroup;
- private ReplicationQueues replicationQueues;
+ protected int queueSizePerGroup;
+ protected ReplicationQueues replicationQueues;
private ReplicationPeers replicationPeers;
- private Configuration conf;
- private ReplicationQueueInfo replicationQueueInfo;
+ protected Configuration conf;
+ protected ReplicationQueueInfo replicationQueueInfo;
// id of the peer cluster this source replicates to
private String peerId;
- String actualPeerId;
// The manager of all sources to which we ping back our progress
- private ReplicationSourceManager manager;
+ protected ReplicationSourceManager manager;
// Should we stop everything?
- private Stoppable stopper;
+ protected Stoppable stopper;
// How long should we sleep for each retry
private long sleepForRetries;
- private FileSystem fs;
+ protected FileSystem fs;
// id of this cluster
private UUID clusterId;
// id of the other cluster
private UUID peerClusterId;
// total number of edits we replicated
private AtomicLong totalReplicatedEdits = new AtomicLong(0);
- // total number of edits we replicated
- private AtomicLong totalReplicatedOperations = new AtomicLong(0);
// The znode we currently play with
- private String peerClusterZnode;
+ protected String peerClusterZnode;
// Maximum number of retries before taking bold actions
private int maxRetriesMultiplier;
// Indicates if this particular source is running
@@ -139,7 +124,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
private ReplicationThrottler throttler;
private long defaultBandwidth;
private long currentBandwidth;
- private ConcurrentHashMap<String, ReplicationSourceShipperThread> workerThreads = new ConcurrentHashMap<>();
+ protected final ConcurrentHashMap<String, ReplicationSourceShipperThread> workerThreads =
+ new ConcurrentHashMap<>();
private AtomicLong totalBufferUsed;
@@ -182,8 +168,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
// ReplicationQueueInfo parses the peerId out of the znode for us
this.peerId = this.replicationQueueInfo.getPeerId();
- ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
- this.actualPeerId = replicationQueueInfo.getPeerId();
this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
this.replicationEndpoint = replicationEndpoint;
@@ -213,15 +197,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
// new wal group observed after source startup, start a new worker thread to track it
// notice: it's possible that log enqueued when this.running is set but worker thread
// still not launched, so it's necessary to check workerThreads before start the worker
- final ReplicationSourceShipperThread worker =
- new ReplicationSourceShipperThread(logPrefix, queue, replicationQueueInfo, this);
- ReplicationSourceShipperThread extant = workerThreads.putIfAbsent(logPrefix, worker);
- if (extant != null) {
- LOG.debug("Someone has beat us to start a worker thread for wal group " + logPrefix);
- } else {
- LOG.debug("Starting up worker for wal group " + logPrefix);
- worker.startup();
- }
+ tryStartNewShipperThread(logPrefix, queue);
}
}
queue.put(log);
@@ -262,15 +238,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
}
}
- private void uninitialize() {
- LOG.debug("Source exiting " + this.peerId);
- metrics.clear();
- if (replicationEndpoint.state() == Service.State.STARTING
- || replicationEndpoint.state() == Service.State.RUNNING) {
- replicationEndpoint.stopAndWait();
- }
- }
-
@Override
public void run() {
// mark we are running now
@@ -322,15 +289,98 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
String walGroupId = entry.getKey();
PriorityBlockingQueue<Path> queue = entry.getValue();
- final ReplicationSourceShipperThread worker =
- new ReplicationSourceShipperThread(walGroupId, queue, replicationQueueInfo, this);
- ReplicationSourceShipperThread extant = workerThreads.putIfAbsent(walGroupId, worker);
- if (extant != null) {
- LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
- } else {
- LOG.debug("Starting up worker for wal group " + walGroupId);
- worker.startup();
+ tryStartNewShipperThread(walGroupId, queue);
+ }
+ }
+
+ protected void tryStartNewShipperThread(String walGroupId, PriorityBlockingQueue<Path> queue) {
+ final ReplicationSourceShipperThread worker = new ReplicationSourceShipperThread(conf,
+ walGroupId, queue, this);
+ ReplicationSourceShipperThread extant = workerThreads.putIfAbsent(walGroupId, worker);
+ if (extant != null) {
+ LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
+ } else {
+ LOG.debug("Starting up worker for wal group " + walGroupId);
+ worker.startup(getUncaughtExceptionHandler());
+ worker.setWALReader(startNewWALReaderThread(worker.getName(), walGroupId, queue,
+ worker.getStartPosition()));
+ workerThreads.put(walGroupId, worker);
+ }
+ }
+
+ protected ReplicationSourceWALReaderThread startNewWALReaderThread(String threadName,
+ String walGroupId, PriorityBlockingQueue<Path> queue, long startPosition) {
+ ArrayList<WALEntryFilter> filters = Lists.newArrayList(walEntryFilter,
+ new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
+ ChainWALEntryFilter readerFilter = new ChainWALEntryFilter(filters);
+ ReplicationSourceWALReaderThread walReader = new ReplicationSourceWALReaderThread(manager,
+ replicationQueueInfo, queue, startPosition, fs, conf, readerFilter, metrics);
+ Threads.setDaemonThreadRunning(walReader, threadName
+ + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + peerClusterZnode,
+ getUncaughtExceptionHandler());
+ return walReader;
+ }
+
+ public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
+ return new Thread.UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(final Thread t, final Throwable e) {
+ RSRpcServices.exitIfOOME(e);
+ LOG.error("Unexpected exception in " + t.getName() + " currentPath=" + getCurrentPath(), e);
+ stopper.stop("Unexpected exception in " + t.getName());
}
+ };
+ }
+
+ @Override
+ public ReplicationEndpoint getReplicationEndpoint() {
+ return this.replicationEndpoint;
+ }
+
+ @Override
+ public ReplicationSourceManager getSourceManager() {
+ return this.manager;
+ }
+
+ @Override
+ public void tryThrottle(int batchSize) throws InterruptedException {
+ checkBandwidthChangeAndResetThrottler();
+ if (throttler.isEnabled()) {
+ long sleepTicks = throttler.getNextSleepInterval(batchSize);
+ if (sleepTicks > 0) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
+ }
+ Thread.sleep(sleepTicks);
+ // reset throttler's cycle start tick when sleep for throttling occurs
+ throttler.resetStartTick();
+ }
+ }
+ }
+
+ private void checkBandwidthChangeAndResetThrottler() {
+ long peerBandwidth = getCurrentBandwidth();
+ if (peerBandwidth != currentBandwidth) {
+ currentBandwidth = peerBandwidth;
+ throttler.setBandwidth((double) currentBandwidth / 10.0);
+ LOG.info("ReplicationSource : " + peerId
+ + " bandwidth throttling changed, currentBandWidth=" + currentBandwidth);
+ }
+ }
+
+ private long getCurrentBandwidth() {
+ ReplicationPeer replicationPeer = this.replicationPeers.getConnectedPeer(peerId);
+ long peerBandwidth = replicationPeer != null ? replicationPeer.getPeerBandwidth() : 0;
+ // user can set peer bandwidth to 0 to use default bandwidth
+ return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
+ }
+
+ private void uninitialize() {
+ LOG.debug("Source exiting " + this.peerId);
+ metrics.clear();
+ if (replicationEndpoint.state() == Service.State.STARTING
+ || replicationEndpoint.state() == Service.State.RUNNING) {
+ replicationEndpoint.stopAndWait();
}
}
@@ -358,7 +408,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
*
* @return true if the peer is enabled, otherwise false
*/
- protected boolean isPeerEnabled() {
+ @Override
+ public boolean isPeerEnabled() {
return this.replicationPeers.getStatusOfPeer(this.peerId);
}
@@ -428,7 +479,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
}
@Override
- public String getPeerClusterId() {
+ public String getPeerId() {
return this.peerId;
}
@@ -441,7 +492,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
return null;
}
- private boolean isSourceActive() {
+ @Override
+ public boolean isSourceActive() {
return !this.stopper.isStopped() && this.sourceRunning;
}
@@ -492,467 +544,17 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
* Get Replication Source Metrics
* @return sourceMetrics
*/
+ @Override
public MetricsSource getSourceMetrics() {
return this.metrics;
}
- private long getCurrentBandwidth() {
- ReplicationPeer replicationPeer = this.replicationPeers.getConnectedPeer(peerId);
- long peerBandwidth = replicationPeer != null ? replicationPeer.getPeerBandwidth() : 0;
- // user can set peer bandwidth to 0 to use default bandwidth
- return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
- }
-
- // This thread reads entries from a queue and ships them.
- // Entries are placed onto the queue by ReplicationSourceWALReaderThread
- public class ReplicationSourceShipperThread extends Thread {
- ReplicationSourceInterface source;
- String walGroupId;
- PriorityBlockingQueue<Path> queue;
- ReplicationQueueInfo replicationQueueInfo;
- // Last position in the log that we sent to ZooKeeper
- private long lastLoggedPosition = -1;
- // Path of the current log
- private volatile Path currentPath;
- // Indicates whether this particular worker is running
- private boolean workerRunning = true;
- ReplicationSourceWALReaderThread entryReader;
- // Use guava cache to set ttl for each key
- private LoadingCache<String, Boolean> canSkipWaitingSet = CacheBuilder.newBuilder()
- .expireAfterAccess(1, TimeUnit.DAYS).build(
- new CacheLoader<String, Boolean>() {
- @Override
- public Boolean load(String key) throws Exception {
- return false;
- }
- }
- );
-
- public ReplicationSourceShipperThread(String walGroupId,
- PriorityBlockingQueue<Path> queue, ReplicationQueueInfo replicationQueueInfo,
- ReplicationSourceInterface source) {
- this.walGroupId = walGroupId;
- this.queue = queue;
- this.replicationQueueInfo = replicationQueueInfo;
- this.source = source;
- }
-
- @Override
- public void run() {
- // Loop until we close down
- while (isWorkerActive()) {
- int sleepMultiplier = 1;
- // Sleep until replication is enabled again
- if (!isPeerEnabled()) {
- if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
- sleepMultiplier++;
- }
- continue;
- }
- while (entryReader == null) {
- if (sleepForRetries("Replication WAL entry reader thread not initialized",
- sleepMultiplier)) {
- sleepMultiplier++;
- }
- if (sleepMultiplier == maxRetriesMultiplier) {
- LOG.warn("Replication WAL entry reader thread not initialized");
- }
- }
-
- try {
- WALEntryBatch entryBatch = entryReader.take();
- for (Map.Entry<String, Long> entry : entryBatch.getLastSeqIds().entrySet()) {
- waitingUntilCanPush(entry);
- }
- shipEdits(entryBatch);
- releaseBufferQuota((int) entryBatch.getHeapSize());
- if (replicationQueueInfo.isQueueRecovered() && entryBatch.getWalEntries().isEmpty()
- && entryBatch.getLastSeqIds().isEmpty()) {
- LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
- + peerClusterZnode);
- metrics.incrCompletedRecoveryQueue();
- setWorkerRunning(false);
- continue;
- }
- } catch (InterruptedException e) {
- LOG.trace("Interrupted while waiting for next replication entry batch", e);
- Thread.currentThread().interrupt();
- }
- }
-
- if (replicationQueueInfo.isQueueRecovered()) {
- // use synchronize to make sure one last thread will clean the queue
- synchronized (workerThreads) {
- Threads.sleep(100);// wait a short while for other worker thread to fully exit
- boolean allOtherTaskDone = true;
- for (ReplicationSourceShipperThread worker : workerThreads.values()) {
- if (!worker.equals(this) && worker.isAlive()) {
- allOtherTaskDone = false;
- break;
- }
- }
- if (allOtherTaskDone) {
- manager.closeRecoveredQueue(this.source);
- LOG.info("Finished recovering queue " + peerClusterZnode
- + " with the following stats: " + getStats());
- }
- }
- }
- }
-
- private void waitingUntilCanPush(Map.Entry<String, Long> entry) {
- String key = entry.getKey();
- long seq = entry.getValue();
- boolean deleteKey = false;
- if (seq <= 0) {
- // There is a REGION_CLOSE marker, we can not continue skipping after this entry.
- deleteKey = true;
- seq = -seq;
- }
-
- if (!canSkipWaitingSet.getUnchecked(key)) {
- try {
- manager.waitUntilCanBePushed(Bytes.toBytes(key), seq, actualPeerId);
- } catch (IOException e) {
- LOG.error("waitUntilCanBePushed fail", e);
- stopper.stop("waitUntilCanBePushed fail");
- } catch (InterruptedException e) {
- LOG.warn("waitUntilCanBePushed interrupted", e);
- Thread.currentThread().interrupt();
- }
- canSkipWaitingSet.put(key, true);
- }
- if (deleteKey) {
- canSkipWaitingSet.invalidate(key);
- }
- }
-
- private void cleanUpHFileRefs(WALEdit edit) throws IOException {
- String peerId = peerClusterZnode;
- if (peerId.contains("-")) {
- // peerClusterZnode will be in the form peerId + "-" + rsZNode.
- // A peerId will not have "-" in its name, see HBASE-11394
- peerId = peerClusterZnode.split("-")[0];
- }
- List<Cell> cells = edit.getCells();
- int totalCells = cells.size();
- for (int i = 0; i < totalCells; i++) {
- Cell cell = cells.get(i);
- if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
- BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
- List<StoreDescriptor> stores = bld.getStoresList();
- int totalStores = stores.size();
- for (int j = 0; j < totalStores; j++) {
- List<String> storeFileList = stores.get(j).getStoreFileList();
- manager.cleanUpHFileRefs(peerId, storeFileList);
- metrics.decrSizeOfHFileRefsQueue(storeFileList.size());
- }
- }
- }
- }
-
- private void checkBandwidthChangeAndResetThrottler() {
- long peerBandwidth = getCurrentBandwidth();
- if (peerBandwidth != currentBandwidth) {
- currentBandwidth = peerBandwidth;
- throttler.setBandwidth((double) currentBandwidth / 10.0);
- LOG.info("ReplicationSource : " + peerId
- + " bandwidth throttling changed, currentBandWidth=" + currentBandwidth);
- }
- }
-
- /**
- * Do the shipping logic
- */
- protected void shipEdits(WALEntryBatch entryBatch) {
- List<Entry> entries = entryBatch.getWalEntries();
- long lastReadPosition = entryBatch.getLastWalPosition();
- currentPath = entryBatch.getLastWalPath();
- int sleepMultiplier = 0;
- if (entries.isEmpty()) {
- if (lastLoggedPosition != lastReadPosition) {
- // Save positions to meta table before zk.
- updateSerialRepPositions(entryBatch.getLastSeqIds());
- updateLogPosition(lastReadPosition);
- // if there was nothing to ship and it's not an error
- // set "ageOfLastShippedOp" to <now> to indicate that we're current
- metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), walGroupId);
- }
- return;
- }
- int currentSize = (int) entryBatch.getHeapSize();
- while (isWorkerActive()) {
- try {
- checkBandwidthChangeAndResetThrottler();
- if (throttler.isEnabled()) {
- long sleepTicks = throttler.getNextSleepInterval(currentSize);
- if (sleepTicks > 0) {
- try {
- if (LOG.isTraceEnabled()) {
- LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
- }
- Thread.sleep(sleepTicks);
- } catch (InterruptedException e) {
- LOG.debug("Interrupted while sleeping for throttling control");
- Thread.currentThread().interrupt();
- // current thread might be interrupted to terminate
- // directly go back to while() for confirm this
- continue;
- }
- // reset throttler's cycle start tick when sleep for throttling occurs
- throttler.resetStartTick();
- }
- }
- // create replicateContext here, so the entries can be GC'd upon return from this call
- // stack
- ReplicationEndpoint.ReplicateContext replicateContext =
- new ReplicationEndpoint.ReplicateContext();
- replicateContext.setEntries(entries).setSize(currentSize);
- replicateContext.setWalGroupId(walGroupId);
-
- long startTimeNs = System.nanoTime();
- // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
- boolean replicated = replicationEndpoint.replicate(replicateContext);
- long endTimeNs = System.nanoTime();
-
- if (!replicated) {
- continue;
- } else {
- sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
- }
-
- if (this.lastLoggedPosition != lastReadPosition) {
- //Clean up hfile references
- int size = entries.size();
- for (int i = 0; i < size; i++) {
- cleanUpHFileRefs(entries.get(i).getEdit());
- }
-
- // Save positions to meta table before zk.
- updateSerialRepPositions(entryBatch.getLastSeqIds());
-
- //Log and clean up WAL logs
- updateLogPosition(lastReadPosition);
- }
- if (throttler.isEnabled()) {
- throttler.addPushSize(currentSize);
- }
- totalReplicatedEdits.addAndGet(entries.size());
- totalReplicatedOperations.addAndGet(entryBatch.getNbOperations());
- // FIXME check relationship between wal group and overall
- metrics.shipBatch(entryBatch.getNbOperations(), currentSize, entryBatch.getNbHFiles());
- metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
- walGroupId);
- if (LOG.isTraceEnabled()) {
- LOG.trace("Replicated " + totalReplicatedEdits + " entries in total, or "
- + totalReplicatedOperations + " operations in "
- + ((endTimeNs - startTimeNs) / 1000000) + " ms");
- }
- break;
- } catch (Exception ex) {
- LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:"
- + org.apache.hadoop.util.StringUtils.stringifyException(ex));
- if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
- sleepMultiplier++;
- }
- }
- }
- }
-
- private void updateSerialRepPositions(Map<String, Long> lastPositionsForSerialScope) {
- try {
- MetaTableAccessor.updateReplicationPositions(manager.getConnection(), actualPeerId,
- lastPositionsForSerialScope);
- } catch (IOException e) {
- LOG.error("updateReplicationPositions fail", e);
- stopper.stop("updateReplicationPositions fail");
- }
- }
-
- private void updateLogPosition(long lastReadPosition) {
- manager.logPositionAndCleanOldLogs(currentPath, peerClusterZnode, lastReadPosition,
- this.replicationQueueInfo.isQueueRecovered(), false);
- lastLoggedPosition = lastReadPosition;
- }
-
- public void startup() {
- String n = Thread.currentThread().getName();
- Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
- @Override
- public void uncaughtException(final Thread t, final Throwable e) {
- RSRpcServices.exitIfOOME(e);
- LOG.error("Unexpected exception in ReplicationSourceWorkerThread," + " currentPath="
- + getCurrentPath(), e);
- stopper.stop("Unexpected exception in ReplicationSourceWorkerThread");
- }
- };
- Threads.setDaemonThreadRunning(this, n + ".replicationSource." + walGroupId + ","
- + peerClusterZnode, handler);
- workerThreads.put(walGroupId, this);
-
- long startPosition = 0;
-
- if (this.replicationQueueInfo.isQueueRecovered()) {
- startPosition = getRecoveredQueueStartPos(startPosition);
- int numRetries = 0;
- while (numRetries <= maxRetriesMultiplier) {
- try {
- locateRecoveredPaths();
- break;
- } catch (IOException e) {
- LOG.error("Error while locating recovered queue paths, attempt #" + numRetries);
- numRetries++;
- }
- }
- }
-
- startWALReaderThread(n, handler, startPosition);
- }
-
- // If this is a recovered queue, the queue is already full and the first log
- // normally has a position (unless the RS failed between 2 logs)
- private long getRecoveredQueueStartPos(long startPosition) {
- try {
- startPosition =
- (replicationQueues.getLogPosition(peerClusterZnode, this.queue.peek().getName()));
- if (LOG.isTraceEnabled()) {
- LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position "
- + startPosition);
- }
- } catch (ReplicationException e) {
- terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
- }
- return startPosition;
- }
-
- // start a background thread to read and batch entries
- private void startWALReaderThread(String threadName, Thread.UncaughtExceptionHandler handler,
- long startPosition) {
- ArrayList<WALEntryFilter> filters = Lists.newArrayList(walEntryFilter,
- new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
- ChainWALEntryFilter readerFilter = new ChainWALEntryFilter(filters);
- entryReader = new ReplicationSourceWALReaderThread(manager, replicationQueueInfo, queue,
- startPosition, fs, conf, readerFilter, metrics);
- Threads.setDaemonThreadRunning(entryReader, threadName
- + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + peerClusterZnode,
- handler);
- }
-
- // Loops through the recovered queue and tries to find the location of each log
- // this is necessary because the logs may have moved before recovery was initiated
- private void locateRecoveredPaths() throws IOException {
- boolean hasPathChanged = false;
- PriorityBlockingQueue<Path> newPaths =
- new PriorityBlockingQueue<Path>(queueSizePerGroup, new LogsComparator());
- pathsLoop: for (Path path : queue) {
- if (fs.exists(path)) { // still in same location, don't need to do anything
- newPaths.add(path);
- continue;
- }
- // Path changed - try to find the right path.
- hasPathChanged = true;
- if (stopper instanceof ReplicationSyncUp.DummyServer) {
- // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
- // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
- Path newPath = getReplSyncUpPath(path);
- newPaths.add(newPath);
- continue;
- } else {
- // See if Path exists in the dead RS folder (there could be a chain of failures
- // to look at)
- List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
- LOG.info("NB dead servers : " + deadRegionServers.size());
- final Path walDir = FSUtils.getWALRootDir(conf);
- for (String curDeadServerName : deadRegionServers) {
- final Path deadRsDirectory =
- new Path(walDir, AbstractFSWALProvider.getWALDirectoryName(curDeadServerName));
- Path[] locs = new Path[] { new Path(deadRsDirectory, path.getName()), new Path(
- deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), path.getName()) };
- for (Path possibleLogLocation : locs) {
- LOG.info("Possible location " + possibleLogLocation.toUri().toString());
- if (manager.getFs().exists(possibleLogLocation)) {
- // We found the right new location
- LOG.info("Log " + path + " still exists at " + possibleLogLocation);
- newPaths.add(possibleLogLocation);
- continue pathsLoop;
- }
- }
- }
- // didn't find a new location
- LOG.error(
- String.format("WAL Path %s doesn't exist and couldn't find its new location", path));
- newPaths.add(path);
- }
- }
-
- if (hasPathChanged) {
- if (newPaths.size() != queue.size()) { // this shouldn't happen
- LOG.error("Recovery queue size is incorrect");
- throw new IOException("Recovery queue size error");
- }
- // put the correct locations in the queue
- // since this is a recovered queue with no new incoming logs,
- // there shouldn't be any concurrency issues
- queue.clear();
- for (Path path : newPaths) {
- queue.add(path);
- }
- }
- }
-
- // N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal
- // area rather than to the wal area for a particular region server.
- private Path getReplSyncUpPath(Path path) throws IOException {
- FileStatus[] rss = fs.listStatus(manager.getLogDir());
- for (FileStatus rs : rss) {
- Path p = rs.getPath();
- FileStatus[] logs = fs.listStatus(p);
- for (FileStatus log : logs) {
- p = new Path(p, log.getPath().getName());
- if (p.getName().equals(path.getName())) {
- LOG.info("Log " + p.getName() + " found at " + p);
- return p;
- }
- }
- }
- LOG.error("Didn't find path for: " + path.getName());
- return path;
- }
-
- public Path getCurrentPath() {
- return this.currentPath;
- }
-
- public long getCurrentPosition() {
- return this.lastLoggedPosition;
- }
-
- private boolean isWorkerActive() {
- return !stopper.isStopped() && workerRunning && !isInterrupted();
- }
-
- private void terminate(String reason, Exception cause) {
- if (cause == null) {
- LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + reason);
-
- } else {
- LOG.error("Closing worker for wal group " + this.walGroupId
- + " because an error occurred: " + reason, cause);
- }
- entryReader.interrupt();
- Threads.shutdown(entryReader, sleepForRetries);
- this.interrupt();
- Threads.shutdown(this, sleepForRetries);
- LOG.info("ReplicationSourceWorker " + this.getName() + " terminated");
- }
-
- public void setWorkerRunning(boolean workerRunning) {
- entryReader.setReaderRunning(workerRunning);
- this.workerRunning = workerRunning;
- }
-
- private void releaseBufferQuota(int size) {
- totalBufferUsed.addAndGet(-size);
+ @Override
+ public void postShipEdits(List<Entry> entries, int batchSize) {
+ if (throttler.isEnabled()) {
+ throttler.addPushSize(batchSize);
}
+ totalReplicatedEdits.addAndGet(entries.size());
+ totalBufferUsed.addAndGet(-batchSize);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/123086ed/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
new file mode 100644
index 0000000..ee01773
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
@@ -0,0 +1,55 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
+
+/**
+ * Constructs a {@link ReplicationSourceInterface}
+ */
+@InterfaceAudience.Private
+public class ReplicationSourceFactory {
+
+ private static final Log LOG = LogFactory.getLog(ReplicationSourceFactory.class);
+
+ static ReplicationSourceInterface create(Configuration conf, String peerId) {
+ ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
+ boolean isQueueRecovered = replicationQueueInfo.isQueueRecovered();
+ ReplicationSourceInterface src;
+ try {
+ String defaultReplicationSourceImpl =
+ isQueueRecovered ? RecoveredReplicationSource.class.getCanonicalName()
+ : ReplicationSource.class.getCanonicalName();
+ @SuppressWarnings("rawtypes")
+ Class c = Class.forName(
+ conf.get("replication.replicationsource.implementation", defaultReplicationSourceImpl));
+ src = (ReplicationSourceInterface) c.newInstance();
+ } catch (Exception e) {
+ LOG.warn("Passed replication source implementation throws errors, "
+ + "defaulting to ReplicationSource",
+ e);
+ src = isQueueRecovered ? new RecoveredReplicationSource() : new ReplicationSource();
+ }
+ return src;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/123086ed/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 8d5451c..4912948 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
/**
* Interface that defines a replication source
@@ -65,10 +66,15 @@ public interface ReplicationSourceInterface {
void enqueueLog(Path log);
/**
- * Get the current log that's replicated
- * @return the current log
+ * Add hfile names to the queue to be replicated.
+ * @param tableName Name of the table these files belongs to
+ * @param family Name of the family these files belong to
+ * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which
+ * will be added in the queue for replication}
+ * @throws ReplicationException If failed to add hfile references
*/
- Path getCurrentPath();
+ void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
+ throws ReplicationException;
/**
* Start the replication
@@ -89,6 +95,12 @@ public interface ReplicationSourceInterface {
void terminate(String reason, Exception cause);
/**
+ * Get the current log that's replicated
+ * @return the current log
+ */
+ Path getCurrentPath();
+
+ /**
* Get the id that the source is replicating to
*
* @return peer cluster id
@@ -98,9 +110,9 @@ public interface ReplicationSourceInterface {
/**
* Get the id that the source is replicating to.
*
- * @return peer cluster id
+ * @return peer id
*/
- String getPeerClusterId();
+ String getPeerId();
/**
* Get a string representation of the current statistics
@@ -110,14 +122,41 @@ public interface ReplicationSourceInterface {
String getStats();
/**
- * Add hfile names to the queue to be replicated.
- * @param tableName Name of the table these files belongs to
- * @param family Name of the family these files belong to
- * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which
- * will be added in the queue for replication}
- * @throws ReplicationException If failed to add hfile references
+ * @return peer enabled or not
*/
- void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
- throws ReplicationException;
+ boolean isPeerEnabled();
+ /**
+ * @return active or not
+ */
+ boolean isSourceActive();
+
+ /**
+ * @return metrics of this replication source
+ */
+ MetricsSource getSourceMetrics();
+
+ /**
+ * @return the replication endpoint used by this replication source
+ */
+ ReplicationEndpoint getReplicationEndpoint();
+
+ /**
+ * @return the replication source manager
+ */
+ ReplicationSourceManager getSourceManager();
+
+ /**
+ * Try to throttle when the peer config with a bandwidth
+ * @param batchSize entries size will be pushed
+ * @throws InterruptedException
+ */
+ void tryThrottle(int batchSize) throws InterruptedException;
+
+ /**
+ * Call this after the shipper thread ship some entries to peer cluster.
+ * @param entries pushed
+ * @param batchSize entries size pushed
+ */
+ void postShipEdits(List<Entry> entries, int batchSize);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/123086ed/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index a38e264..cb631c1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -464,17 +464,8 @@ public class ReplicationSourceManager implements ReplicationListener {
rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
tableDescriptors = ((HRegionServer) server).getTableDescriptors();
}
- ReplicationSourceInterface src;
- try {
- @SuppressWarnings("rawtypes")
- Class c = Class.forName(conf.get("replication.replicationsource.implementation",
- ReplicationSource.class.getCanonicalName()));
- src = (ReplicationSourceInterface) c.newInstance();
- } catch (Exception e) {
- LOG.warn("Passed replication source implementation throws errors, " +
- "defaulting to ReplicationSource", e);
- src = new ReplicationSource();
- }
+
+ ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, peerId);
ReplicationEndpoint replicationEndpoint = null;
try {
@@ -575,7 +566,7 @@ public class ReplicationSourceManager implements ReplicationListener {
synchronized (oldsources) {
// First close all the recovered sources for this peer
for (ReplicationSourceInterface src : oldsources) {
- if (id.equals(src.getPeerClusterId())) {
+ if (id.equals(src.getPeerId())) {
oldSourcesToDelete.add(src);
}
}
@@ -591,7 +582,7 @@ public class ReplicationSourceManager implements ReplicationListener {
// synchronize on replicationPeers to avoid adding source for the to-be-removed peer
synchronized (this.replicationPeers) {
for (ReplicationSourceInterface src : this.sources) {
- if (id.equals(src.getPeerClusterId())) {
+ if (id.equals(src.getPeerId())) {
srcToRemove.add(src);
}
}
@@ -752,7 +743,7 @@ public class ReplicationSourceManager implements ReplicationListener {
// synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
// see removePeer
synchronized (oldsources) {
- if (!this.rp.getConnectedPeerIds().contains(src.getPeerClusterId())) {
+ if (!this.rp.getConnectedPeerIds().contains(src.getPeerId())) {
src.terminate("Recovered queue doesn't belong to any current peer");
closeRecoveredQueue(src);
continue;
@@ -834,11 +825,11 @@ public class ReplicationSourceManager implements ReplicationListener {
public String getStats() {
StringBuffer stats = new StringBuffer();
for (ReplicationSourceInterface source : sources) {
- stats.append("Normal source for cluster " + source.getPeerClusterId() + ": ");
+ stats.append("Normal source for cluster " + source.getPeerId() + ": ");
stats.append(source.getStats() + "\n");
}
for (ReplicationSourceInterface oldSource : oldsources) {
- stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId()+": ");
+ stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerId()+": ");
stats.append(oldSource.getStats()+ "\n");
}
return stats.toString();
http://git-wip-us.apache.org/repos/asf/hbase/blob/123086ed/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java
new file mode 100644
index 0000000..b0f7fee
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java
@@ -0,0 +1,336 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
+/**
+ * This thread reads entries from a queue and ships them. Entries are placed onto the queue by
+ * ReplicationSourceWALReaderThread
+ */
+@InterfaceAudience.Private
+public class ReplicationSourceShipperThread extends Thread {
+ private static final Log LOG = LogFactory.getLog(ReplicationSourceShipperThread.class);
+
+ protected final Configuration conf;
+ protected final String walGroupId;
+ protected final PriorityBlockingQueue<Path> queue;
+ protected final ReplicationSourceInterface source;
+
+ // Last position in the log that we sent to ZooKeeper
+ protected long lastLoggedPosition = -1;
+ // Path of the current log
+ protected volatile Path currentPath;
+ // Indicates whether this particular worker is running
+ private boolean workerRunning = true;
+ protected ReplicationSourceWALReaderThread entryReader;
+
+ // How long should we sleep for each retry
+ protected final long sleepForRetries;
+ // Maximum number of retries before taking bold actions
+ protected final int maxRetriesMultiplier;
+
+ // Use guava cache to set ttl for each key
+ private final LoadingCache<String, Boolean> canSkipWaitingSet = CacheBuilder.newBuilder()
+ .expireAfterAccess(1, TimeUnit.DAYS).build(
+ new CacheLoader<String, Boolean>() {
+ @Override
+ public Boolean load(String key) throws Exception {
+ return false;
+ }
+ }
+ );
+
+ public ReplicationSourceShipperThread(Configuration conf, String walGroupId,
+ PriorityBlockingQueue<Path> queue, ReplicationSourceInterface source) {
+ this.conf = conf;
+ this.walGroupId = walGroupId;
+ this.queue = queue;
+ this.source = source;
+ this.sleepForRetries =
+ this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second
+ this.maxRetriesMultiplier =
+ this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
+ }
+
+ @Override
+ public void run() {
+ // Loop until we close down
+ while (isActive()) {
+ int sleepMultiplier = 1;
+ // Sleep until replication is enabled again
+ if (!source.isPeerEnabled()) {
+ if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
+ sleepMultiplier++;
+ }
+ continue;
+ }
+
+ while (entryReader == null) {
+ if (sleepForRetries("Replication WAL entry reader thread not initialized",
+ sleepMultiplier)) {
+ sleepMultiplier++;
+ }
+ }
+
+ try {
+ WALEntryBatch entryBatch = entryReader.take();
+ for (Map.Entry<String, Long> entry : entryBatch.getLastSeqIds().entrySet()) {
+ waitingUntilCanPush(entry);
+ }
+ shipEdits(entryBatch);
+ } catch (InterruptedException e) {
+ LOG.trace("Interrupted while waiting for next replication entry batch", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /**
+ * Do the shipping logic
+ */
+ protected void shipEdits(WALEntryBatch entryBatch) {
+ List<Entry> entries = entryBatch.getWalEntries();
+ long lastReadPosition = entryBatch.getLastWalPosition();
+ currentPath = entryBatch.getLastWalPath();
+ int sleepMultiplier = 0;
+ if (entries.isEmpty()) {
+ if (lastLoggedPosition != lastReadPosition) {
+ // Save positions to meta table before zk.
+ updateSerialRepPositions(entryBatch.getLastSeqIds());
+ updateLogPosition(lastReadPosition);
+ // if there was nothing to ship and it's not an error
+ // set "ageOfLastShippedOp" to <now> to indicate that we're current
+ source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(),
+ walGroupId);
+ }
+ return;
+ }
+ int currentSize = (int) entryBatch.getHeapSize();
+ while (isActive()) {
+ try {
+ try {
+ source.tryThrottle(currentSize);
+ } catch (InterruptedException e) {
+ LOG.debug("Interrupted while sleeping for throttling control");
+ Thread.currentThread().interrupt();
+ // current thread might be interrupted to terminate
+ // directly go back to while() for confirm this
+ continue;
+ }
+
+ // create replicateContext here, so the entries can be GC'd upon return from this call
+ // stack
+ ReplicationEndpoint.ReplicateContext replicateContext =
+ new ReplicationEndpoint.ReplicateContext();
+ replicateContext.setEntries(entries).setSize(currentSize);
+ replicateContext.setWalGroupId(walGroupId);
+
+ long startTimeNs = System.nanoTime();
+ // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
+ boolean replicated = source.getReplicationEndpoint().replicate(replicateContext);
+ long endTimeNs = System.nanoTime();
+
+ if (!replicated) {
+ continue;
+ } else {
+ sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
+ }
+
+ if (this.lastLoggedPosition != lastReadPosition) {
+ //Clean up hfile references
+ int size = entries.size();
+ for (int i = 0; i < size; i++) {
+ cleanUpHFileRefs(entries.get(i).getEdit());
+ }
+
+ // Save positions to meta table before zk.
+ updateSerialRepPositions(entryBatch.getLastSeqIds());
+ //Log and clean up WAL logs
+ updateLogPosition(lastReadPosition);
+ }
+
+ source.postShipEdits(entries, currentSize);
+ // FIXME check relationship between wal group and overall
+ source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize,
+ entryBatch.getNbHFiles());
+ source.getSourceMetrics().setAgeOfLastShippedOp(
+ entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Replicated " + entries.size() + " entries or " + entryBatch.getNbOperations()
+ + " operations in " + ((endTimeNs - startTimeNs) / 1000000) + " ms");
+ }
+ break;
+ } catch (Exception ex) {
+ LOG.warn(source.getReplicationEndpoint().getClass().getName() + " threw unknown exception:"
+ + org.apache.hadoop.util.StringUtils.stringifyException(ex));
+ if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
+ sleepMultiplier++;
+ }
+ }
+ }
+ }
+
+ private void waitingUntilCanPush(Map.Entry<String, Long> entry) {
+ String key = entry.getKey();
+ long seq = entry.getValue();
+ boolean deleteKey = false;
+ if (seq <= 0) {
+ // There is a REGION_CLOSE marker, we can not continue skipping after this entry.
+ deleteKey = true;
+ seq = -seq;
+ }
+
+ if (!canSkipWaitingSet.getUnchecked(key)) {
+ try {
+ source.getSourceManager().waitUntilCanBePushed(Bytes.toBytes(key), seq, source.getPeerId());
+ } catch (IOException e) {
+ LOG.error("waitUntilCanBePushed fail", e);
+ throw new RuntimeException("waitUntilCanBePushed fail");
+ } catch (InterruptedException e) {
+ LOG.warn("waitUntilCanBePushed interrupted", e);
+ Thread.currentThread().interrupt();
+ }
+ canSkipWaitingSet.put(key, true);
+ }
+ if (deleteKey) {
+ canSkipWaitingSet.invalidate(key);
+ }
+ }
+
+ private void cleanUpHFileRefs(WALEdit edit) throws IOException {
+ String peerId = source.getPeerId();
+ if (peerId.contains("-")) {
+ // peerClusterZnode will be in the form peerId + "-" + rsZNode.
+ // A peerId will not have "-" in its name, see HBASE-11394
+ peerId = peerId.split("-")[0];
+ }
+ List<Cell> cells = edit.getCells();
+ int totalCells = cells.size();
+ for (int i = 0; i < totalCells; i++) {
+ Cell cell = cells.get(i);
+ if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
+ BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
+ List<StoreDescriptor> stores = bld.getStoresList();
+ int totalStores = stores.size();
+ for (int j = 0; j < totalStores; j++) {
+ List<String> storeFileList = stores.get(j).getStoreFileList();
+ source.getSourceManager().cleanUpHFileRefs(peerId, storeFileList);
+ source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size());
+ }
+ }
+ }
+ }
+
+ protected void updateLogPosition(long lastReadPosition) {
+ source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getPeerClusterZnode(),
+ lastReadPosition, false, false);
+ lastLoggedPosition = lastReadPosition;
+ }
+
+ private void updateSerialRepPositions(Map<String, Long> lastPositionsForSerialScope) {
+ try {
+ MetaTableAccessor.updateReplicationPositions(source.getSourceManager().getConnection(),
+ source.getPeerId(), lastPositionsForSerialScope);
+ } catch (IOException e) {
+ LOG.error("updateReplicationPositions fail", e);
+ throw new RuntimeException("updateReplicationPositions fail");
+ }
+ }
+
+ public void startup(UncaughtExceptionHandler handler) {
+ String name = Thread.currentThread().getName();
+ Threads.setDaemonThreadRunning(this, name + ".replicationSource." + walGroupId + ","
+ + source.getPeerClusterZnode(), handler);
+ }
+
+ public PriorityBlockingQueue<Path> getLogQueue() {
+ return this.queue;
+ }
+
+ public Path getCurrentPath() {
+ return this.currentPath;
+ }
+
+ public long getCurrentPosition() {
+ return this.lastLoggedPosition;
+ }
+
+ public void setWALReader(ReplicationSourceWALReaderThread entryReader) {
+ this.entryReader = entryReader;
+ }
+
+ public long getStartPosition() {
+ return 0;
+ }
+
+ protected boolean isActive() {
+ return source.isSourceActive() && workerRunning && !isInterrupted();
+ }
+
+ public void setWorkerRunning(boolean workerRunning) {
+ entryReader.setReaderRunning(workerRunning);
+ this.workerRunning = workerRunning;
+ }
+
+ /**
+ * Do the sleeping logic
+ * @param msg Why we sleep
+ * @param sleepMultiplier by how many times the default sleeping time is augmented
+ * @return True if <code>sleepMultiplier</code> is < <code>maxRetriesMultiplier</code>
+ */
+ public boolean sleepForRetries(String msg, int sleepMultiplier) {
+ try {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
+ }
+ Thread.sleep(this.sleepForRetries * sleepMultiplier);
+ } catch (InterruptedException e) {
+ LOG.debug("Interrupted while sleeping between retries");
+ Thread.currentThread().interrupt();
+ }
+ return sleepMultiplier < maxRetriesMultiplier;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/123086ed/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
index 29808e9..ad08866 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
+import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
http://git-wip-us.apache.org/repos/asf/hbase/blob/123086ed/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index 57e54d7..3a7f77b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
/**
* Source that does nothing at all, helpful to test ReplicationSourceManager
@@ -82,7 +83,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
}
@Override
- public String getPeerClusterId() {
+ public String getPeerId() {
String[] parts = peerClusterId.split("-", 2);
return parts.length != 1 ?
parts[0] : peerClusterId;
@@ -98,4 +99,37 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
throws ReplicationException {
return;
}
+
+ @Override
+ public boolean isPeerEnabled() {
+ return true;
+ }
+
+ @Override
+ public boolean isSourceActive() {
+ return true;
+ }
+
+ @Override
+ public MetricsSource getSourceMetrics() {
+ return null;
+ }
+
+ @Override
+ public ReplicationEndpoint getReplicationEndpoint() {
+ return null;
+ }
+
+ @Override
+ public ReplicationSourceManager getSourceManager() {
+ return manager;
+ }
+
+ @Override
+ public void tryThrottle(int batchSize) throws InterruptedException {
+ }
+
+ @Override
+ public void postShipEdits(List<Entry> entries, int batchSize) {
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/123086ed/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 026f8e4..26aee6d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -462,7 +462,7 @@ public abstract class TestReplicationSourceManager {
// Make sure that the replication source was not initialized
List<ReplicationSourceInterface> sources = manager.getSources();
for (ReplicationSourceInterface source : sources) {
- assertNotEquals("FakePeer", source.getPeerClusterId());
+ assertNotEquals("FakePeer", source.getPeerId());
}
// Create a replication queue for the fake peer