You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2020/09/22 17:14:08 UTC

[hbase] branch branch-2 updated: HBASE-25067 Edit of log messages around async WAL Replication; checkstyle fixes; and a bugfix

This is an automated email from the ASF dual-hosted git repository.

stack pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 9797943  HBASE-25067 Edit of log messages around async WAL Replication; checkstyle fixes; and a bugfix
9797943 is described below

commit 97979436f8cc544491f01dce71e86ed2d3e3cc97
Author: stack <st...@apache.org>
AuthorDate: Fri Sep 18 12:36:23 2020 -0700

    HBASE-25067 Edit of log messages around async WAL Replication; checkstyle fixes; and a bugfix
    
    Editing logging around region replicas: shortening and adding context.
    Checkstyle fixes in edited files while I was in there.
    Bug fix in AssignRegionHandler -- was using M_RS_CLOSE_META to open
    a Region instead of a M_RS_OPEN_META.
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../hbase/replication/ReplicationPeerImpl.java     |  4 +--
 .../org/apache/hadoop/hbase/master/HMaster.java    |  2 +-
 .../master/procedure/EnableTableProcedure.java     |  4 +--
 .../apache/hadoop/hbase/regionserver/HRegion.java  | 23 +++++++------
 .../hadoop/hbase/regionserver/HRegionServer.java   | 19 +++++-----
 .../regionserver/handler/AssignRegionHandler.java  | 13 ++++---
 .../handler/RegionReplicaFlushHandler.java         | 26 +++++++-------
 .../handler/UnassignRegionHandler.java             | 14 ++++----
 .../hbase/regionserver/wal/ProtobufLogReader.java  |  4 +--
 .../regionserver/ReplicationSource.java            | 40 ++++++++++++----------
 .../replication/regionserver/WALEntryStream.java   | 12 +++----
 .../hadoop/hbase/zookeeper/MetaTableLocator.java   |  6 ++--
 12 files changed, 84 insertions(+), 83 deletions(-)

diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
index d656466..5e33aa9 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -49,7 +49,7 @@ public class ReplicationPeerImpl implements ReplicationPeer {
       ReplicationPeerConfig peerConfig) {
     this.conf = conf;
     this.id = id;
-    this.peerState = peerState ? PeerState.ENABLED : PeerState.DISABLED;
+    setPeerState(peerState);
     this.peerConfig = peerConfig;
     this.peerConfigListeners = new ArrayList<>();
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index ffd5774..d042588 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -1765,7 +1765,7 @@ public class HMaster extends HRegionServer implements MasterServices {
           toPrint = regionsInTransition.subList(0, max);
           truncated = true;
         }
-        LOG.info(prefix + "unning balancer because " + regionsInTransition.size() +
+        LOG.info(prefix + " not running balancer because " + regionsInTransition.size() +
           " region(s) in transition: " + toPrint + (truncated? "(truncated list)": ""));
         if (!force || metaInTransition) return false;
       }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java
index 99d5a27..4ac887f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java
@@ -141,9 +141,9 @@ public class EnableTableProcedure
             }
           } else {
             // the replicasFound is less than the regionReplication
-            LOG.info("Number of replicas has increased. Assigning new region replicas." +
+            LOG.info("Number of replicas has increased for {}. Assigning new region replicas." +
                 "The previous replica count was {}. The current replica count is {}.",
-                (currentMaxReplica + 1), configuredReplicaCount);
+              this.tableName, (currentMaxReplica + 1), configuredReplicaCount);
             regionsOfTable = RegionReplicaUtil.addReplicas(regionsOfTable,
               currentMaxReplica + 1, configuredReplicaCount);
           }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 9babc65..9537dda 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -2408,11 +2408,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     status.setStatus("Acquiring readlock on region");
     // block waiting for the lock for flushing cache
     lock.readLock().lock();
+    boolean flushed = true;
     try {
       if (this.closed.get()) {
         String msg = "Skipping flush on " + this + " because closed";
         LOG.debug(msg);
         status.abort(msg);
+        flushed = false;
         return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false);
       }
       if (coprocessorHost != null) {
@@ -2429,15 +2431,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         if (!writestate.flushing && writestate.writesEnabled) {
           this.writestate.flushing = true;
         } else {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("NOT flushing memstore for region " + this
-                + ", flushing=" + writestate.flushing + ", writesEnabled="
-                + writestate.writesEnabled);
-          }
-          String msg = "Not flushing since "
-              + (writestate.flushing ? "already flushing"
-              : "writes not enabled");
+          String msg = "NOT flushing " + this + " as " + (writestate.flushing ? "already flushing"
+            : "writes are not enabled");
+          LOG.debug(msg);
           status.abort(msg);
+          flushed = false;
           return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false);
         }
       }
@@ -2475,8 +2473,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       }
     } finally {
       lock.readLock().unlock();
-      LOG.debug("Flush status journal for {}:\n{}", this.getRegionInfo().getEncodedName(),
-        status.prettyPrintJournal());
+      if (flushed) {
+        // Don't log this journal stuff if no flush -- confusing.
+        LOG.debug("Flush status journal for {}:\n{}", this.getRegionInfo().getEncodedName(),
+          status.prettyPrintJournal());
+      }
       status.cleanup();
     }
   }
@@ -5003,7 +5004,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
   public void setReadsEnabled(boolean readsEnabled) {
    if (readsEnabled && !this.writestate.readsEnabled) {
-     LOG.info(getRegionInfo().getEncodedName() + " : Enabling reads for region.");
+     LOG.info("Enabling reads for {}", getRegionInfo().getEncodedName());
     }
     this.writestate.setReadsEnabled(readsEnabled);
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 11d6559..a80881e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -119,10 +119,11 @@ import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.log.HBaseMarkers;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.LoadBalancer;
-import org.apache.hadoop.hbase.master.RegionState.State;
 import org.apache.hadoop.hbase.master.RegionState;
 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
 import org.apache.hadoop.hbase.mob.MobFileCache;
+import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
+import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore;
 import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
 import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
 import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore;
@@ -139,8 +140,6 @@ import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
 import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
 import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
-import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
-import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore;
 import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
@@ -161,7 +160,6 @@ import org.apache.hadoop.hbase.util.CompressionTest;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.util.JvmPauseMonitor;
 import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
 import org.apache.hadoop.hbase.util.Pair;
@@ -2224,9 +2222,9 @@ public class HRegionServer extends Thread implements
         this.walRoller.addWAL(wal);
       }
       return wal;
-    }catch (FailedCloseWALAfterInitializedErrorException ex) {
+    } catch (FailedCloseWALAfterInitializedErrorException ex) {
       // see HBASE-21751 for details
-      abort("wal can not clean up after init failed", ex);
+      abort("WAL can not clean up after init failed", ex);
       throw ex;
     }
   }
@@ -2467,10 +2465,13 @@ public class HRegionServer extends Thread implements
     region.setReadsEnabled(false); // disable reads before marking the region as opened.
     // RegionReplicaFlushHandler might reset this.
 
-    // submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
+    // Submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
     if (this.executorService != null) {
       this.executorService.submit(new RegionReplicaFlushHandler(this, clusterConnection,
-          rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region));
+        rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region));
+    } else {
+      LOG.info("Executor is null; not running flush of primary region replica for {}",
+        region.getRegionInfo());
     }
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java
index 3474bf9..98d09b2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java
@@ -92,7 +92,7 @@ public class AssignRegionHandler extends EventHandler {
     String regionName = regionInfo.getRegionNameAsString();
     Region onlineRegion = rs.getRegion(encodedName);
     if (onlineRegion != null) {
-      LOG.warn("Received OPEN for the region:{}, which is already online", regionName);
+      LOG.warn("Received OPEN for {} which is already online", regionName);
       // Just follow the old behavior, do we need to call reportRegionStateTransition? Maybe not?
       // For normal case, it could happen that the rpc call to schedule this handler is succeeded,
       // but before returning to master the connection is broken. And when master tries again, we
@@ -104,7 +104,7 @@ public class AssignRegionHandler extends EventHandler {
     if (previous != null) {
       if (previous) {
         // The region is opening and this maybe a retry on the rpc call, it is safe to ignore it.
-        LOG.info("Receiving OPEN for the region:{}, which we are already trying to OPEN" +
+        LOG.info("Receiving OPEN for {} which we are already trying to OPEN" +
           " - ignoring this new request for this region.", regionName);
       } else {
         // The region is closing. This is possible as we will update the region state to CLOSED when
@@ -113,7 +113,7 @@ public class AssignRegionHandler extends EventHandler {
         // closing process.
         long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
         LOG.info(
-          "Receiving OPEN for the region:{}, which we are trying to close, try again after {}ms",
+          "Receiving OPEN for {} which we are trying to close, try again after {}ms",
           regionName, backoff);
         rs.getExecutorService().delayedSubmit(this, backoff, TimeUnit.MILLISECONDS);
       }
@@ -145,11 +145,10 @@ public class AssignRegionHandler extends EventHandler {
     Boolean current = rs.getRegionsInTransitionInRS().remove(regionInfo.getEncodedNameAsBytes());
     if (current == null) {
       // Should NEVER happen, but let's be paranoid.
-      LOG.error("Bad state: we've just opened a region that was NOT in transition. Region={}",
-        regionName);
+      LOG.error("Bad state: we've just opened {} which was NOT in transition", regionName);
     } else if (!current) {
       // Should NEVER happen, but let's be paranoid.
-      LOG.error("Bad state: we've just opened a region that was closing. Region={}", regionName);
+      LOG.error("Bad state: we've just opened {} which was closing", regionName);
     }
   }
 
@@ -168,7 +167,7 @@ public class AssignRegionHandler extends EventHandler {
       long openProcId, TableDescriptor tableDesc, long masterSystemTime) {
     EventType eventType;
     if (regionInfo.isMetaRegion()) {
-      eventType = EventType.M_RS_CLOSE_META;
+      eventType = EventType.M_RS_OPEN_META;
     } else if (regionInfo.getTable().isSystemTable() ||
       (tableDesc != null && tableDesc.getPriority() >= HConstants.ADMIN_QOS)) {
       eventType = EventType.M_RS_OPEN_PRIORITY_REGION;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java
index 81b6d7e..a20443c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.regionserver.handler;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
@@ -42,9 +41,9 @@ import org.apache.hadoop.hbase.util.RetryCounterFactory;
 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 
 /**
- * HBASE-11580: With the async wal approach (HBASE-11568), the edits are not persisted to wal in
+ * HBASE-11580: With the async wal approach (HBASE-11568), the edits are not persisted to WAL in
  * secondary region replicas. This means that a secondary region replica can serve some edits from
- * it's memstore that that is still not flushed from primary. We do not want to allow secondary
+ * it's memstore that are still not flushed from primary. We do not want to allow secondary
  * region's seqId to go back in time, when this secondary region is opened elsewhere after a
  * crash or region move. We will trigger a flush cache in the primary region replica and wait
  * for observing a complete flush cycle before marking the region readsEnabled. This handler does
@@ -53,7 +52,6 @@ import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
  */
 @InterfaceAudience.Private
 public class RegionReplicaFlushHandler extends EventHandler {
-
   private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaFlushHandler.class);
 
   private final ClusterConnection connection;
@@ -83,7 +81,7 @@ public class RegionReplicaFlushHandler extends EventHandler {
     if (t instanceof InterruptedIOException || t instanceof InterruptedException) {
       LOG.error("Caught throwable while processing event " + eventType, t);
     } else if (t instanceof RuntimeException) {
-      server.abort("ServerAborting because a runtime exception was thrown", t);
+      server.abort("Server aborting", t);
     } else {
       // something fishy since we cannot flush the primary region until all retries (retries from
       // rpc times 35 trigger). We cannot close the region since there is no such mechanism to
@@ -111,9 +109,9 @@ public class RegionReplicaFlushHandler extends EventHandler {
     RetryCounter counter = new RetryCounterFactory(maxAttempts, (int)pause).create();
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Attempting to do an RPC to the primary region replica " + ServerRegionReplicaUtil
-        .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName() + " of region "
-       + region.getRegionInfo().getEncodedName() + " to trigger a flush");
+      LOG.debug("RPC'ing to primary region replica " +
+        ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()) + " from " +
+        region.getRegionInfo() + " to trigger FLUSH");
     }
     while (!region.isClosing() && !region.isClosed()
         && !server.isAborted() && !server.isStopped()) {
@@ -139,11 +137,11 @@ public class RegionReplicaFlushHandler extends EventHandler {
         // then we have to wait for seeing the flush entry. All reads will be rejected until we see
         // a complete flush cycle or replay a region open event
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Successfully triggered a flush of primary region replica "
+          LOG.debug("Triggered flush of primary region replica "
               + ServerRegionReplicaUtil
                 .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName()
-                + " of region " + region.getRegionInfo().getEncodedName()
-                + " Now waiting and blocking reads until observing a full flush cycle");
+                + " for " + region.getRegionInfo().getEncodedName()
+                + "; now waiting and blocking reads until completes a full flush cycle");
         }
         region.setReadsEnabled(true);
         break;
@@ -151,10 +149,10 @@ public class RegionReplicaFlushHandler extends EventHandler {
         if (response.hasWroteFlushWalMarker()) {
           if(response.getWroteFlushWalMarker()) {
             if (LOG.isDebugEnabled()) {
-              LOG.debug("Successfully triggered an empty flush marker(memstore empty) of primary "
+              LOG.debug("Triggered empty flush marker (memstore empty) on primary "
                   + "region replica " + ServerRegionReplicaUtil
                     .getRegionInfoForDefaultReplica(region.getRegionInfo()).getEncodedName()
-                  + " of region " + region.getRegionInfo().getEncodedName() + " Now waiting and "
+                  + " for " + region.getRegionInfo().getEncodedName() + "; now waiting and "
                   + "blocking reads until observing a flush marker");
             }
             region.setReadsEnabled(true);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java
index 8b275d0..0bf2543a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -84,19 +84,18 @@ public class UnassignRegionHandler extends EventHandler {
         // reportRegionStateTransition, so the HMaster will think the region is online, before we
         // actually open the region, as reportRegionStateTransition is part of the opening process.
         long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
-        LOG.warn("Received CLOSE for the region: {}, which we are already " +
-          "trying to OPEN. try again after {}ms", encodedName, backoff);
+        LOG.warn("Received CLOSE for {} which we are already " +
+          "trying to OPEN; try again after {}ms", encodedName, backoff);
         rs.getExecutorService().delayedSubmit(this, backoff, TimeUnit.MILLISECONDS);
       } else {
-        LOG.info("Received CLOSE for the region: {}, which we are already trying to CLOSE," +
+        LOG.info("Received CLOSE for {} which we are already trying to CLOSE," +
           " but not completed yet", encodedName);
       }
       return;
     }
     HRegion region = rs.getRegion(encodedName);
     if (region == null) {
-      LOG.debug(
-        "Received CLOSE for a region {} which is not online, and we're not opening/closing.",
+      LOG.debug("Received CLOSE for {} which is not ONLINE and we're not opening/closing.",
         encodedName);
       rs.getRegionsInTransitionInRS().remove(encodedNameBytes, Boolean.FALSE);
       return;
@@ -114,10 +113,11 @@ public class UnassignRegionHandler extends EventHandler {
     if (region.close(abort) == null) {
       // XXX: Is this still possible? The old comment says about split, but now split is done at
       // master side, so...
-      LOG.warn("Can't close region {}, was already closed during close()", regionName);
+      LOG.warn("Can't close {} already closed during close()", regionName);
       rs.getRegionsInTransitionInRS().remove(encodedNameBytes, Boolean.FALSE);
       return;
     }
+
     rs.removeRegion(region, destination);
     if (!rs.reportRegionStateTransition(
       new RegionStateTransitionContext(TransitionCode.CLOSED, HConstants.NO_SEQNUM, closeProcId,
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
index 6f537df..0967c10 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
@@ -412,14 +412,14 @@ public class ProtobufLogReader extends ReaderBase {
           + "because originalPosition is negative. last offset={}", this.inputStream.getPos(), eof);
         throw eof;
       }
-      // If stuck at the same place and we got and exception, lets go back at the beginning.
+      // If stuck at the same place and we got an exception, lets go back at the beginning.
       if (inputStream.getPos() == originalPosition) {
         if (resetPosition) {
           LOG.warn("Encountered a malformed edit, seeking to the beginning of the WAL since "
             + "current position and original position match at {}", originalPosition);
           seekOnFs(0);
         } else {
-          LOG.debug("Reached the end of file at position {}", originalPosition);
+          LOG.debug("EOF at position {}", originalPosition);
         }
       } else {
         // Else restore our position to original location in hope that next time through we will
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index c495376..0b7ed16 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -237,21 +237,22 @@ public class ReplicationSource implements ReplicationSourceInterface {
       LOG.trace("NOT replicating {}", wal);
       return;
     }
-    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName());
-    PriorityBlockingQueue<Path> queue = queues.get(logPrefix);
+    // Use WAL prefix as the WALGroupId for this peer.
+    String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName());
+    PriorityBlockingQueue<Path> queue = queues.get(walPrefix);
     if (queue == null) {
       queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator());
-      queues.put(logPrefix, queue);
+      queues.put(walPrefix, queue);
       if (this.isSourceActive() && this.walEntryFilter != null) {
         // new wal group observed after source startup, start a new worker thread to track it
         // notice: it's possible that wal enqueued when this.running is set but worker thread
         // still not launched, so it's necessary to check workerThreads before start the worker
-        tryStartNewShipper(logPrefix, queue);
+        tryStartNewShipper(walPrefix, queue);
       }
     }
     queue.put(wal);
     if (LOG.isTraceEnabled()) {
-      LOG.trace("{} Added wal {} to queue of source {}.", logPeerId(), logPrefix,
+      LOG.trace("{} Added wal {} to queue of source {}.", logPeerId(), walPrefix,
         this.replicationQueueInfo.getQueueId());
     }
     this.metrics.incrSizeOfLogQueue();
@@ -260,7 +261,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
     if (queueSize > this.logQueueWarnThreshold) {
       LOG.warn("{} WAL group {} queue size: {} exceeds value of "
           + "replication.source.log.queue.warn: {}", logPeerId(),
-        logPrefix, queueSize, logQueueWarnThreshold);
+        walPrefix, queueSize, logQueueWarnThreshold);
     }
   }
 
@@ -357,14 +358,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
     ReplicationSourceShipper worker = createNewShipper(walGroupId, queue);
     ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker);
     if (extant != null) {
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("{} Someone has beat us to start a worker thread for wal group {}", logPeerId(),
-          walGroupId);
-      }
+        LOG.debug("{} preempted start of worker walGroupId={}", logPeerId(), walGroupId);
     } else {
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("{} Starting up worker for wal group {}", logPeerId(), walGroupId);
-      }
+      LOG.debug("{} starting worker for walGroupId={}", logPeerId(), walGroupId);
       ReplicationSourceWALReader walReader =
         createNewWALReader(walGroupId, queue, worker.getStartPosition());
       Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName() +
@@ -435,8 +431,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
 
   /**
    * Call after {@link #initializeWALEntryFilter(UUID)} else it will be null.
-   * @return The WAL Entry Filter Chain this ReplicationSource will use on WAL files filtering
-   * out WALEntry edits.
+   * @return WAL Entry Filter Chain to use on WAL files filtering *out* WALEntry edits.
    */
   @VisibleForTesting
   WALEntryFilter getWalEntryFilter() {
@@ -575,7 +570,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
     if (!this.isSourceActive()) {
       return;
     }
-    LOG.info("{} Source: {}, is now replicating from cluster: {}; to peer cluster: {};",
+    LOG.info("{} queueId={} is replicating from cluster={} to cluster={}",
       logPeerId(), this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId);
 
     initializeWALEntryFilter(peerClusterId);
@@ -589,7 +584,10 @@ public class ReplicationSource implements ReplicationSourceInterface {
 
   @Override
   public void startup() {
-    // mark we are running now
+    if (this.sourceRunning) {
+      return;
+    }
+    // Mark we are running now
     this.sourceRunning = true;
     initThread = new Thread(this::initialize);
     Threads.setDaemonThreadRunning(initThread,
@@ -612,7 +610,8 @@ public class ReplicationSource implements ReplicationSourceInterface {
     terminate(reason, cause, clearMetrics, true);
   }
 
-  public void terminate(String reason, Exception cause, boolean clearMetrics, boolean join) {
+  public void terminate(String reason, Exception cause, boolean clearMetrics,
+      boolean join) {
     if (cause == null) {
       LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId, reason);
     } else {
@@ -798,7 +797,10 @@ public class ReplicationSource implements ReplicationSourceInterface {
     return queueStorage;
   }
 
+  /**
+   * @return String to use as a log prefix that contains current peerId.
+   */
   private String logPeerId(){
-    return "[Source for peer " + this.getPeerId() + "]:";
+    return "peerId=" + this.getPeerId() + ",";
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index d2cb6f3..7e67f53 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -174,7 +174,7 @@ class WALEntryStream implements Closeable {
   private void tryAdvanceEntry() throws IOException {
     if (checkReader()) {
       boolean beingWritten = readNextEntryAndRecordReaderPosition();
-      LOG.trace("reading wal file {}. Current open for write: {}", this.currentPath, beingWritten);
+      LOG.trace("Reading WAL {}; currently open for write={}", this.currentPath, beingWritten);
       if (currentEntry == null && !beingWritten) {
         // no more entries in this log file, and the file is already closed, i.e, rolled
         // Before dequeueing, we should always get one more attempt at reading.
@@ -222,7 +222,7 @@ class WALEntryStream implements Closeable {
         if (currentPositionOfReader < stat.getLen()) {
           final long skippedBytes = stat.getLen() - currentPositionOfReader;
           LOG.debug(
-            "Reached the end of WAL file '{}'. It was not closed cleanly," +
+            "Reached the end of WAL {}. It was not closed cleanly," +
               " so we did not parse {} bytes of data. This is normally ok.",
             currentPath, skippedBytes);
           metrics.incrUncleanlyClosedWALs();
@@ -230,7 +230,7 @@ class WALEntryStream implements Closeable {
         }
       } else if (currentPositionOfReader + trailerSize < stat.getLen()) {
         LOG.warn(
-          "Processing end of WAL file '{}'. At position {}, which is too far away from" +
+          "Processing end of WAL {} at position {}, which is too far away from" +
             " reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}",
           currentPath, currentPositionOfReader, stat.getLen(), getCurrentPathStat());
         setPosition(0);
@@ -241,7 +241,7 @@ class WALEntryStream implements Closeable {
       }
     }
     if (LOG.isTraceEnabled()) {
-      LOG.trace("Reached the end of log " + this.currentPath + ", and the length of the file is " +
+      LOG.trace("Reached the end of " + this.currentPath + " and length of the file is " +
         (stat == null ? "N/A" : stat.getLen()));
     }
     metrics.incrCompletedWAL();
@@ -249,7 +249,7 @@ class WALEntryStream implements Closeable {
   }
 
   private void dequeueCurrentLog() throws IOException {
-    LOG.debug("Reached the end of log {}", currentPath);
+    LOG.debug("EOF, closing {}", currentPath);
     closeReader();
     logQueue.remove();
     setPosition(0);
@@ -264,7 +264,7 @@ class WALEntryStream implements Closeable {
     long readerPos = reader.getPosition();
     OptionalLong fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath);
     if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) {
-      // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted
+      // See HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted
       // data, so we need to make sure that we do not read beyond the committed file length.
       if (LOG.isDebugEnabled()) {
         LOG.debug("The provider tells us the valid length for " + currentPath + " is " +
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java
index bb02af3..557ba77 100644
--- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java
@@ -221,7 +221,7 @@ public final class MetaTableLocator {
       LOG.warn("Tried to set null ServerName in hbase:meta; skipping -- ServerName required");
       return;
     }
-    LOG.info("Setting hbase:meta (replicaId={}) location in ZooKeeper as {}, state={}", replicaId,
+    LOG.info("Setting hbase:meta replicaId={} location in ZooKeeper as {}, state={}", replicaId,
       serverName, state);
     // Make the MetaRegionServer pb and then get its bytes and save this as
     // the znode content.
@@ -235,9 +235,9 @@ public final class MetaTableLocator {
           zookeeper.getZNodePaths().getZNodeForReplica(replicaId), data);
     } catch(KeeperException.NoNodeException nne) {
       if (replicaId == RegionInfo.DEFAULT_REPLICA_ID) {
-        LOG.debug("META region location doesn't exist, create it");
+        LOG.debug("hbase:meta region location doesn't exist, create it");
       } else {
-        LOG.debug("META region location doesn't exist for replicaId=" + replicaId +
+        LOG.debug("hbase:meta region location doesn't exist for replicaId=" + replicaId +
             ", create it");
       }
       ZKUtil.createAndWatch(zookeeper, zookeeper.getZNodePaths().getZNodeForReplica(replicaId),