You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by wc...@apache.org on 2019/11/17 10:14:06 UTC

[hbase] branch master updated: HBASE-23102: Improper Usage of Map putIfAbsent (#828)

This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new a3efa59  HBASE-23102: Improper Usage of Map putIfAbsent (#828)
a3efa59 is described below

commit a3efa5911d071a6848459b37a220d36177aec490
Author: belugabehr <12...@users.noreply.github.com>
AuthorDate: Sun Nov 17 05:13:52 2019 -0500

    HBASE-23102: Improper Usage of Map putIfAbsent (#828)
    
    Signed-off-by: Wellington Chevreuil <wc...@apache.org>
---
 .../hadoop/hbase/procedure2/ProcedureExecutor.java |  8 +++--
 .../procedure2/RemoteProcedureDispatcher.java      |  3 +-
 .../hadoop/hbase/rsgroup/RSGroupAdminServer.java   | 22 +++++++-----
 .../hadoop/hbase/executor/ExecutorService.java     | 29 ++++++++--------
 .../hadoop/hbase/master/RegionsRecoveryChore.java  |  5 ++-
 .../hbase/master/assignment/RegionStates.java      | 23 +++----------
 .../quotas/FileArchiverNotifierFactoryImpl.java    | 14 +++-----
 .../org/apache/hadoop/hbase/quotas/QuotaCache.java | 32 +++++++++--------
 .../throttle/StoreHotnessProtector.java            | 19 ++++------
 .../regionserver/ReplicationSource.java            | 40 +++++++++++++---------
 10 files changed, 93 insertions(+), 102 deletions(-)

diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index 6d1ffb4..05a0c2c 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -828,10 +828,12 @@ public class ProcedureExecutor<TEnvironment> {
       return;
     }
 
-    Procedure<TEnvironment> proc =
-      new FailedProcedure<>(procId.longValue(), procName, procOwner, nonceKey, exception);
+    completed.computeIfAbsent(procId, (key) -> {
+      Procedure<TEnvironment> proc = new FailedProcedure<>(procId.longValue(),
+          procName, procOwner, nonceKey, exception);
 
-    completed.putIfAbsent(procId, new CompletedProcedureRetainer<>(proc));
+      return new CompletedProcedureRetainer<>(proc);
+    });
   }
 
   // ==========================================================================
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
index 02f6d7d..7d1bd4e 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
@@ -147,8 +147,7 @@ public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable
    */
   public void addNode(final TRemote key) {
     assert key != null: "Tried to add a node with a null key";
-    final BufferNode newNode = new BufferNode(key);
-    nodeMap.putIfAbsent(key, newNode);
+    nodeMap.computeIfAbsent(key, k -> new BufferNode(k));
   }
 
   /**
diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java
index f3ef4fb..e6bf69e 100644
--- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java
+++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java
@@ -581,9 +581,9 @@ public class RSGroupAdminServer implements RSGroupAdmin {
       ServerName currServer = entry.getValue();
       RegionInfo currRegion = entry.getKey();
       if (rsGroupInfo.getTables().contains(currTable)) {
-        assignments.putIfAbsent(currTable, new HashMap<>());
-        assignments.get(currTable).putIfAbsent(currServer, new ArrayList<>());
-        assignments.get(currTable).get(currServer).add(currRegion);
+        assignments.computeIfAbsent(currTable, key -> new HashMap<>())
+            .computeIfAbsent(currServer, key -> new ArrayList<>())
+            .add(currRegion);
       }
     }
 
@@ -595,12 +595,16 @@ public class RSGroupAdminServer implements RSGroupAdmin {
     }
 
     // add all tables that are members of the group
-    for(TableName tableName : rsGroupInfo.getTables()) {
-      if(assignments.containsKey(tableName)) {
-        result.put(tableName, new HashMap<>());
-        result.get(tableName).putAll(serverMap);
-        result.get(tableName).putAll(assignments.get(tableName));
-        LOG.debug("Adding assignments for {}: {}", tableName, assignments.get(tableName));
+    for (TableName tableName : rsGroupInfo.getTables()) {
+      if (assignments.containsKey(tableName)) {
+        Map<ServerName, List<RegionInfo>> tableResults = new HashMap<>(serverMap);
+
+        Map<ServerName, List<RegionInfo>> tableAssignments = assignments.get(tableName);
+        tableResults.putAll(tableAssignments);
+
+        result.put(tableName, tableResults);
+
+        LOG.debug("Adding assignments for {}: {}", tableName, tableAssignments);
       }
     }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
index ea788ac..c12c30a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
@@ -63,7 +63,7 @@ public class ExecutorService {
   private static final Logger LOG = LoggerFactory.getLogger(ExecutorService.class);
 
   // hold the all the executors created in a map addressable by their names
-  private final ConcurrentHashMap<String, Executor> executorMap = new ConcurrentHashMap<>();
+  private final ConcurrentMap<String, Executor> executorMap = new ConcurrentHashMap<>();
 
   // Name of the server hosting this executor service.
   private final String servername;
@@ -87,18 +87,18 @@ public class ExecutorService {
    */
   @VisibleForTesting
   public void startExecutorService(String name, int maxThreads) {
-    if (this.executorMap.get(name) != null) {
-      throw new RuntimeException("An executor service with the name " + name +
-        " is already running!");
-    }
-    Executor hbes = new Executor(name, maxThreads);
-    if (this.executorMap.putIfAbsent(name, hbes) != null) {
-      throw new RuntimeException("An executor service with the name " + name +
-      " is already running (2)!");
-    }
-    LOG.debug("Starting executor service name=" + name +
-      ", corePoolSize=" + hbes.threadPoolExecutor.getCorePoolSize() +
-      ", maxPoolSize=" + hbes.threadPoolExecutor.getMaximumPoolSize());
+    Executor hbes = this.executorMap.compute(name, (key, value) -> {
+      if (value != null) {
+        throw new RuntimeException("An executor service with the name " + key +
+            " is already running!");
+      }
+      return new Executor(key, maxThreads);
+    });
+
+    LOG.debug(
+        "Starting executor service name={}, corePoolSize={}, maxPoolSize={}",
+        name, hbes.threadPoolExecutor.getCorePoolSize(),
+        hbes.threadPoolExecutor.getMaximumPoolSize());
   }
 
   boolean isExecutorServiceRunning(String name) {
@@ -134,7 +134,8 @@ public class ExecutorService {
   public void startExecutorService(final ExecutorType type, final int maxThreads) {
     String name = type.getExecutorName(this.servername);
     if (isExecutorServiceRunning(name)) {
-      LOG.debug("Executor service " + toString() + " already running on " + this.servername);
+      LOG.debug("Executor service {} already running on {}", this,
+          this.servername);
       return;
     }
     startExecutorService(name, maxThreads);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionsRecoveryChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionsRecoveryChore.java
index c5ad867..9f20434 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionsRecoveryChore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionsRecoveryChore.java
@@ -161,9 +161,8 @@ public class RegionsRecoveryChore extends ScheduledChore {
     }
     LOG.warn("Region {} for Table {} has high storeFileRefCount {}, considering it for reopen..",
       regionInfo.getRegionNameAsString(), tableName, regionStoreRefCount);
-    tableToReopenRegionsMap.putIfAbsent(tableName, new ArrayList<>());
-    tableToReopenRegionsMap.get(tableName).add(regionName);
-
+    tableToReopenRegionsMap
+        .computeIfAbsent(tableName, (key) -> new ArrayList<>()).add(regionName);
   }
 
   // hashcode/equals implementation to ensure at-most one object of RegionsRecoveryChore
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java
index 6654620..f245500 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java
@@ -112,9 +112,8 @@ public class RegionStates {
   // ==========================================================================
   @VisibleForTesting
   RegionStateNode createRegionStateNode(RegionInfo regionInfo) {
-    RegionStateNode newNode = new RegionStateNode(regionInfo, regionInTransition);
-    RegionStateNode oldNode = regionsMap.putIfAbsent(regionInfo.getRegionName(), newNode);
-    return oldNode != null ? oldNode : newNode;
+    return regionsMap.computeIfAbsent(regionInfo.getRegionName(),
+      key -> new RegionStateNode(regionInfo, regionInTransition));
   }
 
   public RegionStateNode getOrCreateRegionStateNode(RegionInfo regionInfo) {
@@ -556,7 +555,7 @@ public class RegionStates {
       // Add online servers with no assignment for the table.
       for (Map<ServerName, List<RegionInfo>> table : result.values()) {
         for (ServerName serverName : serverMap.keySet()) {
-          table.putIfAbsent(serverName, new ArrayList<>());
+          table.computeIfAbsent(serverName, key -> new ArrayList<>());
         }
       }
     } else {
@@ -677,13 +676,7 @@ public class RegionStates {
 
   public RegionFailedOpen addToFailedOpen(final RegionStateNode regionNode) {
     final byte[] key = regionNode.getRegionInfo().getRegionName();
-    RegionFailedOpen node = regionFailedOpen.get(key);
-    if (node == null) {
-      RegionFailedOpen newNode = new RegionFailedOpen(regionNode);
-      RegionFailedOpen oldNode = regionFailedOpen.putIfAbsent(key, newNode);
-      node = oldNode != null ? oldNode : newNode;
-    }
-    return node;
+    return regionFailedOpen.computeIfAbsent(key, (k) -> new RegionFailedOpen(regionNode));
   }
 
   public RegionFailedOpen getFailedOpen(final RegionInfo regionInfo) {
@@ -714,13 +707,7 @@ public class RegionStates {
    * to {@link #getServerNode(ServerName)} where we can.
    */
   public ServerStateNode getOrCreateServer(final ServerName serverName) {
-    ServerStateNode node = serverMap.get(serverName);
-    if (node == null) {
-      node = new ServerStateNode(serverName);
-      ServerStateNode oldNode = serverMap.putIfAbsent(serverName, node);
-      node = oldNode != null ? oldNode : node;
-    }
-    return node;
+    return serverMap.computeIfAbsent(serverName, key -> new ServerStateNode(key));
   }
 
   public void removeServer(final ServerName serverName) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileArchiverNotifierFactoryImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileArchiverNotifierFactoryImpl.java
index 5b6d8c1..bbe53b2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileArchiverNotifierFactoryImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FileArchiverNotifierFactoryImpl.java
@@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.quotas;
 
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.hadoop.conf.Configuration;
@@ -36,7 +37,7 @@ public final class FileArchiverNotifierFactoryImpl implements FileArchiverNotifi
   private static final FileArchiverNotifierFactoryImpl DEFAULT_INSTANCE =
       new FileArchiverNotifierFactoryImpl();
   private static volatile FileArchiverNotifierFactory CURRENT_INSTANCE = DEFAULT_INSTANCE;
-  private final ConcurrentHashMap<TableName,FileArchiverNotifier> CACHE;
+  private final ConcurrentMap<TableName,FileArchiverNotifier> CACHE;
 
   private FileArchiverNotifierFactoryImpl() {
     CACHE = new ConcurrentHashMap<>();
@@ -62,15 +63,10 @@ public final class FileArchiverNotifierFactoryImpl implements FileArchiverNotifi
    * @param tn The table to obtain a notifier for
    * @return The notifier for the given {@code tablename}.
    */
-  public FileArchiverNotifier get(
-      Connection conn, Configuration conf, FileSystem fs, TableName tn) {
+  public FileArchiverNotifier get(Connection conn, Configuration conf, FileSystem fs,
+      TableName tn) {
     // Ensure that only one instance is exposed to callers
-    final FileArchiverNotifier newMapping = new FileArchiverNotifierImpl(conn, conf, fs, tn);
-    final FileArchiverNotifier previousMapping = CACHE.putIfAbsent(tn, newMapping);
-    if (previousMapping == null) {
-      return newMapping;
-    }
-    return previousMapping;
+    return CACHE.computeIfAbsent(tn, key -> new FileArchiverNotifierImpl(conn, conf, fs, key));
   }
 
   public int getCacheSize() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java
index ce26366..1c97b20 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ScheduledChore;
@@ -69,10 +70,10 @@ public class QuotaCache implements Stoppable {
   // for testing purpose only, enforce the cache to be always refreshed
   static boolean TEST_FORCE_REFRESH = false;
 
-  private final ConcurrentHashMap<String, QuotaState> namespaceQuotaCache = new ConcurrentHashMap<>();
-  private final ConcurrentHashMap<TableName, QuotaState> tableQuotaCache = new ConcurrentHashMap<>();
-  private final ConcurrentHashMap<String, UserQuotaState> userQuotaCache = new ConcurrentHashMap<>();
-  private final ConcurrentHashMap<String, QuotaState> regionServerQuotaCache =
+  private final ConcurrentMap<String, QuotaState> namespaceQuotaCache = new ConcurrentHashMap<>();
+  private final ConcurrentMap<TableName, QuotaState> tableQuotaCache = new ConcurrentHashMap<>();
+  private final ConcurrentMap<String, UserQuotaState> userQuotaCache = new ConcurrentHashMap<>();
+  private final ConcurrentMap<String, QuotaState> regionServerQuotaCache =
       new ConcurrentHashMap<>();
   private volatile boolean exceedThrottleQuotaEnabled = false;
   // factors used to divide cluster scope quota into machine scope quota
@@ -174,7 +175,7 @@ public class QuotaCache implements Stoppable {
    * Returns the QuotaState requested. If the quota info is not in cache an empty one will be
    * returned and the quota request will be enqueued for the next cache refresh.
    */
-  private <K> QuotaState getQuotaState(final ConcurrentHashMap<K, QuotaState> quotasMap,
+  private <K> QuotaState getQuotaState(final ConcurrentMap<K, QuotaState> quotasMap,
       final K key) {
     return computeIfAbsent(quotasMap, key, QuotaState::new, this::triggerCacheRefresh);
   }
@@ -223,17 +224,18 @@ public class QuotaCache implements Stoppable {
     protected void chore() {
       // Prefetch online tables/namespaces
       for (TableName table: ((HRegionServer)QuotaCache.this.rsServices).getOnlineTables()) {
-        if (table.isSystemTable()) continue;
-        if (!QuotaCache.this.tableQuotaCache.containsKey(table)) {
-          QuotaCache.this.tableQuotaCache.putIfAbsent(table, new QuotaState());
-        }
-        String ns = table.getNamespaceAsString();
-        if (!QuotaCache.this.namespaceQuotaCache.containsKey(ns)) {
-          QuotaCache.this.namespaceQuotaCache.putIfAbsent(ns, new QuotaState());
+        if (table.isSystemTable()) {
+          continue;
         }
+        QuotaCache.this.tableQuotaCache.computeIfAbsent(table, key -> new QuotaState());
+
+        final String ns = table.getNamespaceAsString();
+
+        QuotaCache.this.namespaceQuotaCache.computeIfAbsent(ns, key -> new QuotaState());
       }
-      QuotaCache.this.regionServerQuotaCache.putIfAbsent(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY,
-        new QuotaState());
+
+      QuotaCache.this.regionServerQuotaCache.computeIfAbsent(
+          QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY, key -> new QuotaState());
 
       updateQuotaFactors();
       fetchNamespaceQuotaState();
@@ -319,7 +321,7 @@ public class QuotaCache implements Stoppable {
     }
 
     private <K, V extends QuotaState> void fetch(final String type,
-        final ConcurrentHashMap<K, V> quotasMap, final Fetcher<K, V> fetcher) {
+        final ConcurrentMap<K, V> quotasMap, final Fetcher<K, V> fetcher) {
       long now = EnvironmentEdgeManager.currentTime();
       long refreshPeriod = getPeriod();
       long evictPeriod = refreshPeriod * EVICT_PERIOD_FACTOR;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotnessProtector.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotnessProtector.java
index 71fd89b..ee6db31 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotnessProtector.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/StoreHotnessProtector.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver.throttle;
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.hadoop.conf.Configuration;
@@ -78,7 +79,7 @@ public class StoreHotnessProtector {
   private final static int DEFAULT_PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_NUM = 100;
   private final static int DEFAULT_PARALLEL_PREPARE_PUT_STORE_MULTIPLIER = 2;
 
-  private final Map<byte[], AtomicInteger> preparePutToStoreMap =
+  private final ConcurrentMap<byte[], AtomicInteger> preparePutToStoreMap =
       new ConcurrentSkipListMap<>(Bytes.BYTES_RAWCOMPARATOR);
   private final Region region;
 
@@ -101,7 +102,7 @@ public class StoreHotnessProtector {
   public void update(Configuration conf) {
     init(conf);
     preparePutToStoreMap.clear();
-    LOG.debug("update config: " + toString());
+    LOG.debug("update config: {}", this);
   }
 
   public void start(Map<byte[], List<Cell>> familyMaps) throws RegionTooBusyException {
@@ -121,13 +122,9 @@ public class StoreHotnessProtector {
 
         //we need to try to add #preparePutCount at first because preparePutToStoreMap will be
         //cleared when changing the configuration.
-        preparePutToStoreMap.putIfAbsent(e.getKey(), new AtomicInteger());
-        AtomicInteger preparePutCounter = preparePutToStoreMap.get(e.getKey());
-        if (preparePutCounter == null) {
-          preparePutCounter = new AtomicInteger();
-          preparePutToStoreMap.putIfAbsent(e.getKey(), preparePutCounter);
-        }
-        int preparePutCount = preparePutCounter.incrementAndGet();
+        int preparePutCount = preparePutToStoreMap
+            .computeIfAbsent(e.getKey(), key -> new AtomicInteger())
+            .incrementAndGet();
         if (store.getCurrentParallelPutCount() > this.parallelPutToStoreThreadLimit
             || preparePutCount > this.parallelPreparePutToStoreThreadLimit) {
           tooBusyStore = (tooBusyStore == null ?
@@ -146,9 +143,7 @@ public class StoreHotnessProtector {
       String msg =
           "StoreTooBusy," + this.region.getRegionInfo().getRegionNameAsString() + ":" + tooBusyStore
               + " Above parallelPutToStoreThreadLimit(" + this.parallelPutToStoreThreadLimit + ")";
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(msg);
-      }
+      LOG.trace(msg);
       throw new RegionTooBusyException(msg);
     }
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 683a9ab..32739b1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -305,24 +305,30 @@ public class ReplicationSource implements ReplicationSourceInterface {
   }
 
   private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
-    ReplicationSourceShipper worker = createNewShipper(walGroupId, queue);
-    ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker);
-    if (extant != null) {
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("{} Someone has beat us to start a worker thread for wal group {}", logPeerId(),
-          walGroupId);
-      }
-    } else {
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("{} Starting up worker for wal group {}", logPeerId(), walGroupId);
+    workerThreads.compute(walGroupId, (key, value) -> {
+      if (value != null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(
+              "{} Someone has beat us to start a worker thread for wal group {}",
+              logPeerId(), key);
+        }
+        return value;
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("{} Starting up worker for wal group {}", logPeerId(), key);
+        }
+        ReplicationSourceShipper worker = createNewShipper(walGroupId, queue);
+        ReplicationSourceWALReader walReader =
+            createNewWALReader(walGroupId, queue, worker.getStartPosition());
+        Threads.setDaemonThreadRunning(
+            walReader, Thread.currentThread().getName()
+                + ".replicationSource.wal-reader." + walGroupId + "," + queueId,
+            this::uncaughtException);
+        worker.setWALReader(walReader);
+        worker.startup(this::uncaughtException);
+        return worker;
       }
-      ReplicationSourceWALReader walReader =
-        createNewWALReader(walGroupId, queue, worker.getStartPosition());
-      Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName() +
-        ".replicationSource.wal-reader." + walGroupId + "," + queueId, this::uncaughtException);
-      worker.setWALReader(walReader);
-      worker.startup(this::uncaughtException);
-    }
+    });
   }
 
   @Override