You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by wc...@apache.org on 2019/08/07 10:19:35 UTC

[hbase] branch branch-2.1 updated: HBASE-22731 ReplicationSource and HBaseInterClusterReplicationEndpoint log messages should include a target Peer identifier

This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 46d0e5a  HBASE-22731 ReplicationSource and HBaseInterClusterReplicationEndpoint log messages should include a target Peer identifier
46d0e5a is described below

commit 46d0e5a05d2edc9a3efb6e284606ccc77bece791
Author: Wellington Chevreuil <wc...@apache.org>
AuthorDate: Wed Aug 7 11:07:11 2019 +0100

    HBASE-22731 ReplicationSource and HBaseInterClusterReplicationEndpoint log messages should include a target Peer identifier
    
    Signed-off-by: Peter Somogyi <ps...@apache.org>
---
 .../HBaseInterClusterReplicationEndpoint.java      | 54 ++++++++++++-------
 .../regionserver/ReplicationSource.java            | 60 +++++++++++++++-------
 2 files changed, 76 insertions(+), 38 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 7db53aa..0014d9d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -199,11 +199,14 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
   protected boolean sleepForRetries(String msg, int sleepMultiplier) {
     try {
       if (LOG.isTraceEnabled()) {
-        LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
+        LOG.trace("{} {}, sleeping {} times {}",
+          logPeerId(), msg, sleepForRetries, sleepMultiplier);
       }
       Thread.sleep(this.sleepForRetries * sleepMultiplier);
     } catch (InterruptedException e) {
-      LOG.debug("Interrupted while sleeping between retries");
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("{} Interrupted while sleeping between retries", logPeerId());
+      }
     }
     return sleepMultiplier < maxRetriesMultiplier;
   }
@@ -288,7 +291,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
     try {
       connection = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
     } catch (IOException ioe) {
-      LOG.warn("Failed to create connection for peer cluster", ioe);
+      LOG.warn("{} Failed to create connection for peer cluster", logPeerId(), ioe);
     }
     if (connection != null) {
       this.conn = connection;
@@ -301,8 +304,10 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
     for (int i = 0; i < batches.size(); i++) {
       List<Entry> entries = batches.get(i);
       if (!entries.isEmpty()) {
-        LOG.trace("Submitting {} entries of total size {}", entries.size(),
-          replicateContext.getSize());
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("{} Submitting {} entries of total size {}", logPeerId(), entries.size(),
+            replicateContext.getSize());
+        }
         // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
         pool.submit(createReplicator(entries, i));
         futures++;
@@ -354,8 +359,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
 
     int numSinks = replicationSinkMgr.getNumSinks();
     if (numSinks == 0) {
-      LOG.warn("No replication sinks found, returning without replicating. The source should " +
-          "retry with the same set of edits.");
+      LOG.warn("{} No replication sinks found, returning without replicating. "
+        + "The source should retry with the same set of edits.", logPeerId());
       return false;
     }
 
@@ -386,7 +391,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
         this.metrics.refreshAgeOfLastShippedOp(walGroupId);
         if (ioe instanceof RemoteException) {
           ioe = ((RemoteException) ioe).unwrapRemoteException();
-          LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe);
+          LOG.warn("{} Can't replicate because of an error on the remote cluster: ",
+            logPeerId(), ioe);
           if (ioe instanceof TableNotFoundException) {
             if (dropOnDeletedTables) {
               // this is a bit fragile, but cannot change how TNFE is serialized
@@ -399,19 +405,20 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
                     // Would potentially be better to retry in one of the outer loops
                     // and add a table filter there; but that would break the encapsulation,
                     // so we're doing the filtering here.
-                    LOG.info("Missing table detected at sink, local table also does not exist, " +
-                        "filtering edits for '" + table + "'");
+                    LOG.info("{} Missing table detected at sink, local table also does not "
+                      + "exist, filtering edits for '{}'", logPeerId(), table);
                     batches = filterBatches(batches, table);
                     continue;
                   }
                 } catch (IOException iox) {
-                  LOG.warn("Exception checking for local table: ", iox);
+                  LOG.warn("{} Exception checking for local table: ", logPeerId(), iox);
                 }
               }
             }
             // fall through and sleep below
           } else {
-            LOG.warn("Peer encountered RemoteException, rechecking all sinks: ", ioe);
+            LOG.warn("{} Peer encountered RemoteException, rechecking all sinks: ", logPeerId(),
+              ioe);
             replicationSinkMgr.chooseSinks();
           }
         } else {
@@ -424,10 +431,10 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
                   "caused by a machine failure or a massive slowdown",
               this.socketTimeoutMultiplier);
           } else if (ioe instanceof ConnectException || ioe instanceof UnknownHostException) {
-            LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe);
+            LOG.warn("{} Peer is unavailable, rechecking all sinks: ", logPeerId(), ioe);
             replicationSinkMgr.chooseSinks();
           } else {
-            LOG.warn("Can't replicate because of a local or network error: ", ioe);
+            LOG.warn("{} Can't replicate because of a local or network error: ", logPeerId(), ioe);
           }
         }
         if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
@@ -450,7 +457,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
         this.conn.close();
         this.conn = null;
       } catch (IOException e) {
-        LOG.warn("Failed to close the connection");
+        LOG.warn("{} Failed to close the connection", logPeerId());
       }
     }
     // Allow currently running replication tasks to finish
@@ -476,17 +483,21 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
       int entriesHashCode = System.identityHashCode(entries);
       if (LOG.isTraceEnabled()) {
         long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
-        LOG.trace("Replicating batch {} of {} entries with total size {} bytes to {}",
-          entriesHashCode, entries.size(), size, replicationClusterId);
+        LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}",
+          logPeerId(), entriesHashCode, entries.size(), size, replicationClusterId);
       }
       sinkPeer = replicationSinkMgr.getReplicationSink();
       BlockingInterface rrs = sinkPeer.getRegionServer();
       try {
         ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()]),
           replicationClusterId, baseNamespaceDir, hfileArchiveDir);
-        LOG.trace("Completed replicating batch {}", entriesHashCode);
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode);
+        }
       } catch (IOException e) {
-        LOG.trace("Failed replicating batch {}", entriesHashCode, e);
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("{} Failed replicating batch {}", logPeerId(), entriesHashCode, e);
+        }
         throw e;
       }
       replicationSinkMgr.reportSinkSuccess(sinkPeer);
@@ -524,4 +535,9 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
     return isSerial ? () -> serialReplicateRegionEntries(entries, batchIndex)
         : () -> replicateEntries(entries, batchIndex);
   }
+
+  private String logPeerId(){
+    return "[Source for peer " + this.ctx.getPeerId() + "]:";
+  }
+
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 8dd8568..b4c9ad3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -207,6 +207,10 @@ public class ReplicationSource implements ReplicationSourceInterface {
       }
     }
     queue.put(log);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("{} Added log file {} to queue of source {}.", logPeerId(), logPrefix,
+        this.replicationQueueInfo.getQueueId());
+    }
     this.metrics.incrSizeOfLogQueue();
     // This will log a warning for each new log that gets created above the warn threshold
     int queueSize = queue.size();
@@ -227,8 +231,8 @@ public class ReplicationSource implements ReplicationSourceInterface {
         this.queueStorage.addHFileRefs(peerId, pairs);
         metrics.incrSizeOfHFileRefsQueue(pairs.size());
       } else {
-        LOG.debug("HFiles will not be replicated belonging to the table " + tableName + " family "
-            + Bytes.toString(family) + " to peer id " + peerId);
+        LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
+          tableName, Bytes.toString(family), peerId);
       }
     } else {
       // user has explicitly not defined any table cfs for replication, means replicate all the
@@ -300,9 +304,14 @@ public class ReplicationSource implements ReplicationSourceInterface {
     ReplicationSourceShipper worker = createNewShipper(walGroupId, queue);
     ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker);
     if (extant != null) {
-      LOG.debug("Someone has beat us to start a worker thread for wal group {}", walGroupId);
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("{} Someone has beat us to start a worker thread for wal group {}", logPeerId(),
+          walGroupId);
+      }
     } else {
-      LOG.debug("Starting up worker for wal group {}", walGroupId);
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("{} Starting up worker for wal group {}", logPeerId(), walGroupId);
+      }
       ReplicationSourceWALReader walReader =
         createNewWALReader(walGroupId, queue, worker.getStartPosition());
       Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName() +
@@ -334,7 +343,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
         }
       } else {
         currentPath = new Path("NO_LOGS_IN_QUEUE");
-        LOG.warn("No replication ongoing, waiting for new log");
+        LOG.warn("{} No replication ongoing, waiting for new log", logPeerId());
       }
       ReplicationStatus.ReplicationStatusBuilder statusBuilder = ReplicationStatus.newBuilder();
       statusBuilder.withPeerId(this.getPeerId())
@@ -375,7 +384,8 @@ public class ReplicationSource implements ReplicationSourceInterface {
 
   protected final void uncaughtException(Thread t, Throwable e) {
     RSRpcServices.exitIfOOME(e);
-    LOG.error("Unexpected exception in " + t.getName() + " currentPath=" + getCurrentPath(), e);
+    LOG.error("Unexpected exception in {} currentPath={}",
+      t.getName(), getCurrentPath(), e);
     server.abort("Unexpected exception in " + t.getName(), e);
   }
 
@@ -396,7 +406,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
       long sleepTicks = throttler.getNextSleepInterval(batchSize);
       if (sleepTicks > 0) {
         if (LOG.isTraceEnabled()) {
-          LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
+          LOG.trace("{} To sleep {}ms for throttling control", logPeerId(), sleepTicks);
         }
         Thread.sleep(sleepTicks);
         // reset throttler's cycle start tick when sleep for throttling occurs
@@ -430,11 +440,14 @@ public class ReplicationSource implements ReplicationSourceInterface {
   protected boolean sleepForRetries(String msg, int sleepMultiplier) {
     try {
       if (LOG.isTraceEnabled()) {
-        LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
+        LOG.trace("{} {}, sleeping {} times {}",
+          logPeerId(), msg, sleepForRetries, sleepMultiplier);
       }
       Thread.sleep(this.sleepForRetries * sleepMultiplier);
     } catch (InterruptedException e) {
-      LOG.debug("Interrupted while sleeping between retries");
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("{} Interrupted while sleeping between retries", logPeerId());
+      }
       Thread.currentThread().interrupt();
     }
     return sleepMultiplier < maxRetriesMultiplier;
@@ -456,7 +469,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
       try {
         replicationEndpoint = createReplicationEndpoint();
       } catch (Exception e) {
-        LOG.warn("error creating ReplicationEndpoint, retry", e);
+        LOG.warn("{} error creating ReplicationEndpoint, retry", logPeerId(), e);
         if (sleepForRetries("Error creating ReplicationEndpoint", sleepMultiplier)) {
           sleepMultiplier++;
         }
@@ -468,7 +481,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
         this.replicationEndpoint = replicationEndpoint;
         break;
       } catch (Exception e) {
-        LOG.warn("Error starting ReplicationEndpoint, retry", e);
+        LOG.warn("{} Error starting ReplicationEndpoint, retry", logPeerId(), e);
         replicationEndpoint.stop();
         if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) {
           sleepMultiplier++;
@@ -486,6 +499,10 @@ public class ReplicationSource implements ReplicationSourceInterface {
     for (;;) {
       peerClusterId = replicationEndpoint.getPeerUUID();
       if (this.isSourceActive() && peerClusterId == null) {
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("{} Could not connect to Peer ZK. Sleeping for {} millis", logPeerId(),
+            (this.sleepForRetries * sleepMultiplier));
+        }
         if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
           sleepMultiplier++;
         }
@@ -503,7 +520,8 @@ public class ReplicationSource implements ReplicationSourceInterface {
       this.manager.removeSource(this);
       return;
     }
-    LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
+    LOG.info("{} Source: {}, is now replicating from cluster: {}; to peer cluster: {};",
+      logPeerId(), this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId);
 
     initializeWALEntryFilter(peerClusterId);
     // start workers
@@ -536,10 +554,10 @@ public class ReplicationSource implements ReplicationSourceInterface {
 
   public void terminate(String reason, Exception cause, boolean join) {
     if (cause == null) {
-      LOG.info("Closing source " + this.queueId + " because: " + reason);
+      LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId, reason);
     } else {
-      LOG.error("Closing source " + this.queueId + " because an error occurred: " + reason,
-        cause);
+      LOG.error("{} Closing source {} because an error occurred: {}",
+        logPeerId(), this.queueId, reason, cause);
     }
     this.sourceRunning = false;
     if (initThread != null && Thread.currentThread() != initThread) {
@@ -561,7 +579,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
           // Wait worker to stop
           Thread.sleep(this.sleepForRetries);
         } catch (InterruptedException e) {
-          LOG.info("Interrupted while waiting " + worker.getName() + " to stop");
+          LOG.info("{} Interrupted while waiting {} to stop", logPeerId(), worker.getName());
           Thread.currentThread().interrupt();
         }
         // If worker still is alive after waiting, interrupt it
@@ -581,15 +599,15 @@ public class ReplicationSource implements ReplicationSourceInterface {
     if (join) {
       for (ReplicationSourceShipper worker : workers) {
         Threads.shutdown(worker, this.sleepForRetries);
-        LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated");
+        LOG.info("{} ReplicationSourceWorker {} terminated", logPeerId(), worker.getName());
       }
       if (this.replicationEndpoint != null) {
         try {
           this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier,
             TimeUnit.MILLISECONDS);
         } catch (TimeoutException te) {
-          LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :" +
-            this.queueId, te);
+          LOG.warn("{} Got exception while waiting for endpoint to shutdown "
+            + "for replication source : {}", logPeerId(), this.queueId, te);
         }
       }
     }
@@ -697,4 +715,8 @@ public class ReplicationSource implements ReplicationSourceInterface {
   ReplicationQueueStorage getQueueStorage() {
     return queueStorage;
   }
+
+  private String logPeerId(){
+    return "[Source for peer " + this.getPeerId() + "]:";
+  }
 }