You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by wc...@apache.org on 2019/08/07 09:53:11 UTC
[hbase] branch branch-2 updated: HBASE-22731 ReplicationSource and
HBaseInterClusterReplicationEndpoint log messages should include a target
Peer identifier
This is an automated email from the ASF dual-hosted git repository.
wchevreuil pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new afb32a3 HBASE-22731 ReplicationSource and HBaseInterClusterReplicationEndpoint log messages should include a target Peer identifier
afb32a3 is described below
commit afb32a3361170efc86f57a3c077fab4ffd6e910f
Author: Wellington Chevreuil <wc...@apache.org>
AuthorDate: Wed Aug 7 10:52:16 2019 +0100
HBASE-22731 ReplicationSource and HBaseInterClusterReplicationEndpoint log messages should include a target Peer identifier
Signed-off-by: Peter Somogyi <ps...@apache.org>
---
.../HBaseInterClusterReplicationEndpoint.java | 54 +++++++++++------
.../regionserver/ReplicationSource.java | 70 ++++++++++++++--------
2 files changed, 79 insertions(+), 45 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 57301fc..b427c03 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -199,11 +199,14 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
protected boolean sleepForRetries(String msg, int sleepMultiplier) {
try {
if (LOG.isTraceEnabled()) {
- LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
+ LOG.trace("{} {}, sleeping {} times {}",
+ logPeerId(), msg, sleepForRetries, sleepMultiplier);
}
Thread.sleep(this.sleepForRetries * sleepMultiplier);
} catch (InterruptedException e) {
- LOG.debug("Interrupted while sleeping between retries");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} Interrupted while sleeping between retries", logPeerId());
+ }
}
return sleepMultiplier < maxRetriesMultiplier;
}
@@ -288,7 +291,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
try {
connection = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
} catch (IOException ioe) {
- LOG.warn("Failed to create connection for peer cluster", ioe);
+ LOG.warn("{} Failed to create connection for peer cluster", logPeerId(), ioe);
}
if (connection != null) {
this.conn = connection;
@@ -301,8 +304,10 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
for (int i = 0; i < batches.size(); i++) {
List<Entry> entries = batches.get(i);
if (!entries.isEmpty()) {
- LOG.trace("Submitting {} entries of total size {}", entries.size(),
- replicateContext.getSize());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("{} Submitting {} entries of total size {}", logPeerId(), entries.size(),
+ replicateContext.getSize());
+ }
// RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
pool.submit(createReplicator(entries, i));
futures++;
@@ -353,8 +358,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
int numSinks = replicationSinkMgr.getNumSinks();
if (numSinks == 0) {
- LOG.warn("No replication sinks found, returning without replicating. The source should " +
- "retry with the same set of edits.");
+ LOG.warn("{} No replication sinks found, returning without replicating. "
+ + "The source should retry with the same set of edits.", logPeerId());
return false;
}
@@ -376,7 +381,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
} catch (IOException ioe) {
if (ioe instanceof RemoteException) {
ioe = ((RemoteException) ioe).unwrapRemoteException();
- LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe);
+ LOG.warn("{} Can't replicate because of an error on the remote cluster: ", logPeerId(),
+ ioe);
if (ioe instanceof TableNotFoundException) {
if (dropOnDeletedTables) {
// this is a bit fragile, but cannot change how TNFE is serialized
@@ -389,19 +395,20 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
// Would potentially be better to retry in one of the outer loops
// and add a table filter there; but that would break the encapsulation,
// so we're doing the filtering here.
- LOG.info("Missing table detected at sink, local table also does not exist, " +
- "filtering edits for '" + table + "'");
+ LOG.info("{} Missing table detected at sink, local table also does not "
+ + "exist, filtering edits for '{}'", logPeerId(), table);
batches = filterBatches(batches, table);
continue;
}
} catch (IOException iox) {
- LOG.warn("Exception checking for local table: ", iox);
+ LOG.warn("{} Exception checking for local table: ", logPeerId(), iox);
}
}
}
// fall through and sleep below
} else {
- LOG.warn("Peer encountered RemoteException, rechecking all sinks: ", ioe);
+ LOG.warn("{} Peer encountered RemoteException, rechecking all sinks: ", logPeerId(),
+ ioe);
replicationSinkMgr.chooseSinks();
}
} else {
@@ -414,10 +421,10 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
"caused by a machine failure or a massive slowdown",
this.socketTimeoutMultiplier);
} else if (ioe instanceof ConnectException || ioe instanceof UnknownHostException) {
- LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe);
+ LOG.warn("{} Peer is unavailable, rechecking all sinks: ", logPeerId(), ioe);
replicationSinkMgr.chooseSinks();
} else {
- LOG.warn("Can't replicate because of a local or network error: ", ioe);
+ LOG.warn("{} Can't replicate because of a local or network error: ", logPeerId(), ioe);
}
}
if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
@@ -440,7 +447,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
this.conn.close();
this.conn = null;
} catch (IOException e) {
- LOG.warn("Failed to close the connection");
+ LOG.warn("{} Failed to close the connection", logPeerId());
}
}
// Allow currently running replication tasks to finish
@@ -466,17 +473,21 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
int entriesHashCode = System.identityHashCode(entries);
if (LOG.isTraceEnabled()) {
long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
- LOG.trace("Replicating batch {} of {} entries with total size {} bytes to {}",
- entriesHashCode, entries.size(), size, replicationClusterId);
+ LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}",
+ logPeerId(), entriesHashCode, entries.size(), size, replicationClusterId);
}
sinkPeer = replicationSinkMgr.getReplicationSink();
BlockingInterface rrs = sinkPeer.getRegionServer();
try {
ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()]),
replicationClusterId, baseNamespaceDir, hfileArchiveDir);
- LOG.trace("Completed replicating batch {}", entriesHashCode);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode);
+ }
} catch (IOException e) {
- LOG.trace("Failed replicating batch {}", entriesHashCode, e);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("{} Failed replicating batch {}", logPeerId(), entriesHashCode, e);
+ }
throw e;
}
replicationSinkMgr.reportSinkSuccess(sinkPeer);
@@ -514,4 +525,9 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
return isSerial ? () -> serialReplicateRegionEntries(entries, batchIndex)
: () -> replicateEntries(entries, batchIndex);
}
+
+ private String logPeerId(){
+ return "[Source for peer " + this.ctx.getPeerId() + "]:";
+ }
+
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 894ebed..8d0e661 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -207,14 +207,17 @@ public class ReplicationSource implements ReplicationSourceInterface {
}
}
queue.put(log);
- LOG.trace("Added log file {} to queue of source {}.", logPrefix,
- this.replicationQueueInfo.getQueueId());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("{} Added log file {} to queue of source {}.", logPeerId(), logPrefix,
+ this.replicationQueueInfo.getQueueId());
+ }
this.metrics.incrSizeOfLogQueue();
// This will log a warning for each new log that gets created above the warn threshold
int queueSize = queue.size();
if (queueSize > this.logQueueWarnThreshold) {
- LOG.warn("WAL group " + logPrefix + " queue size: " + queueSize
- + " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold);
+ LOG.warn("{} WAL group {} queue size: {} exceeds value of "
+ + "replication.source.log.queue.warn: {}", logPeerId(),
+ logPrefix, queueSize, logQueueWarnThreshold);
}
}
@@ -229,8 +232,8 @@ public class ReplicationSource implements ReplicationSourceInterface {
this.queueStorage.addHFileRefs(peerId, pairs);
metrics.incrSizeOfHFileRefsQueue(pairs.size());
} else {
- LOG.debug("HFiles will not be replicated belonging to the table " + tableName + " family "
- + Bytes.toString(family) + " to peer id " + peerId);
+ LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
+ tableName, Bytes.toString(family), peerId);
}
} else {
// user has explicitly not defined any table cfs for replication, means replicate all the
@@ -302,9 +305,14 @@ public class ReplicationSource implements ReplicationSourceInterface {
ReplicationSourceShipper worker = createNewShipper(walGroupId, queue);
ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker);
if (extant != null) {
- LOG.debug("Someone has beat us to start a worker thread for wal group {}", walGroupId);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{} Someone has beat us to start a worker thread for wal group {}", logPeerId(),
+ walGroupId);
+ }
} else {
- LOG.debug("Starting up worker for wal group {}", walGroupId);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{} Starting up worker for wal group {}", logPeerId(), walGroupId);
+ }
ReplicationSourceWALReader walReader =
createNewWALReader(walGroupId, queue, worker.getStartPosition());
Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName() +
@@ -334,7 +342,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
}
} else {
currentPath = new Path("NO_LOGS_IN_QUEUE");
- LOG.warn("No replication ongoing, waiting for new log");
+ LOG.warn("{} No replication ongoing, waiting for new log", logPeerId());
}
ReplicationStatus.ReplicationStatusBuilder statusBuilder = ReplicationStatus.newBuilder();
statusBuilder.withPeerId(this.getPeerId())
@@ -375,7 +383,8 @@ public class ReplicationSource implements ReplicationSourceInterface {
protected final void uncaughtException(Thread t, Throwable e) {
RSRpcServices.exitIfOOME(e);
- LOG.error("Unexpected exception in " + t.getName() + " currentPath=" + getCurrentPath(), e);
+ LOG.error("Unexpected exception in {} currentPath={}",
+ t.getName(), getCurrentPath(), e);
server.abort("Unexpected exception in " + t.getName(), e);
}
@@ -396,7 +405,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
long sleepTicks = throttler.getNextSleepInterval(batchSize);
if (sleepTicks > 0) {
if (LOG.isTraceEnabled()) {
- LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
+ LOG.trace("{} To sleep {}ms for throttling control", logPeerId(), sleepTicks);
}
Thread.sleep(sleepTicks);
// reset throttler's cycle start tick when sleep for throttling occurs
@@ -430,11 +439,14 @@ public class ReplicationSource implements ReplicationSourceInterface {
protected boolean sleepForRetries(String msg, int sleepMultiplier) {
try {
if (LOG.isTraceEnabled()) {
- LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
+ LOG.trace("{} {}, sleeping {} times {}",
+ logPeerId(), msg, sleepForRetries, sleepMultiplier);
}
Thread.sleep(this.sleepForRetries * sleepMultiplier);
} catch (InterruptedException e) {
- LOG.debug("Interrupted while sleeping between retries");
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{} Interrupted while sleeping between retries", logPeerId());
+ }
Thread.currentThread().interrupt();
}
return sleepMultiplier < maxRetriesMultiplier;
@@ -456,7 +468,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
try {
replicationEndpoint = createReplicationEndpoint();
} catch (Exception e) {
- LOG.warn("error creating ReplicationEndpoint, retry", e);
+ LOG.warn("{} error creating ReplicationEndpoint, retry", logPeerId(), e);
if (sleepForRetries("Error creating ReplicationEndpoint", sleepMultiplier)) {
sleepMultiplier++;
}
@@ -468,7 +480,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
this.replicationEndpoint = replicationEndpoint;
break;
} catch (Exception e) {
- LOG.warn("Error starting ReplicationEndpoint, retry", e);
+ LOG.warn("{} Error starting ReplicationEndpoint, retry", logPeerId(), e);
replicationEndpoint.stop();
if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) {
sleepMultiplier++;
@@ -486,8 +498,10 @@ public class ReplicationSource implements ReplicationSourceInterface {
for (;;) {
peerClusterId = replicationEndpoint.getPeerUUID();
if (this.isSourceActive() && peerClusterId == null) {
- LOG.debug("Could not connect to Peer ZK. Sleeping for "
- + (this.sleepForRetries * sleepMultiplier) + " millis.");
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{} Could not connect to Peer ZK. Sleeping for {} millis", logPeerId(),
+ (this.sleepForRetries * sleepMultiplier));
+ }
if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
sleepMultiplier++;
}
@@ -505,8 +519,8 @@ public class ReplicationSource implements ReplicationSourceInterface {
this.manager.removeSource(this);
return;
}
- LOG.info("Source: {}, is now replicating from cluster: {}; to peer cluster: {};",
- this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId);
+ LOG.info("{} Source: {}, is now replicating from cluster: {}; to peer cluster: {};",
+ logPeerId(), this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId);
initializeWALEntryFilter(peerClusterId);
// start workers
@@ -539,10 +553,10 @@ public class ReplicationSource implements ReplicationSourceInterface {
public void terminate(String reason, Exception cause, boolean join) {
if (cause == null) {
- LOG.info("Closing source " + this.queueId + " because: " + reason);
+ LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId, reason);
} else {
- LOG.error("Closing source " + this.queueId + " because an error occurred: " + reason,
- cause);
+ LOG.error("{} Closing source {} because an error occurred: {}",
+ logPeerId(), this.queueId, reason, cause);
}
this.sourceRunning = false;
if (initThread != null && Thread.currentThread() != initThread) {
@@ -566,7 +580,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
// Wait worker to stop
Thread.sleep(this.sleepForRetries);
} catch (InterruptedException e) {
- LOG.info("Interrupted while waiting " + worker.getName() + " to stop");
+ LOG.info("{} Interrupted while waiting {} to stop", logPeerId(), worker.getName());
Thread.currentThread().interrupt();
}
// If worker still is alive after waiting, interrupt it
@@ -586,15 +600,15 @@ public class ReplicationSource implements ReplicationSourceInterface {
if (join) {
for (ReplicationSourceShipper worker : workers) {
Threads.shutdown(worker, this.sleepForRetries);
- LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated");
+ LOG.info("{} ReplicationSourceWorker {} terminated", logPeerId(), worker.getName());
}
if (this.replicationEndpoint != null) {
try {
this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier,
TimeUnit.MILLISECONDS);
} catch (TimeoutException te) {
- LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :" +
- this.queueId, te);
+ LOG.warn("{} Got exception while waiting for endpoint to shutdown "
+ + "for replication source : {}", logPeerId(), this.queueId, te);
}
}
}
@@ -719,4 +733,8 @@ public class ReplicationSource implements ReplicationSourceInterface {
ReplicationQueueStorage getQueueStorage() {
return queueStorage;
}
+
+ private String logPeerId(){
+ return "[Source for peer " + this.getPeerId() + "]:";
+ }
}