You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2021/03/16 20:40:24 UTC

[accumulo] branch 1451-external-compactions-feature updated: Make synchronization tighter

This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push:
     new 00bde13  Make synchronization tighter
00bde13 is described below

commit 00bde136655633339a42783fd41a23ba3f324c94
Author: Dave Marion <dl...@apache.org>
AuthorDate: Tue Mar 16 20:38:54 2021 +0000

    Make synchronization tighter
    
    Instead of synchronizing on the QUEUES collection in total, synchronize
    on the QueueAndPriority objects which are used in the code and are cached
    singletons in the JVM
---
 .../coordinator/CompactionCoordinator.java         | 68 +++++++++++-----------
 1 file changed, 34 insertions(+), 34 deletions(-)

diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index b4b2cb4..3f38d6c 100644
--- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -84,7 +84,7 @@ public class CompactionCoordinator extends AbstractServer
 
   /* Map of external queue name -> priority -> tservers */
   private static final Map<String,TreeMap<Long,LinkedHashSet<TServerInstance>>> QUEUES =
-      new HashMap<>();
+      new ConcurrentHashMap<>();
   /* index of tserver to queue and priority, exists to provide O(1) lookup into QUEUES */
   private static final Map<TServerInstance,HashSet<QueueAndPriority>> INDEX =
       new ConcurrentHashMap<>();
@@ -209,7 +209,7 @@ public class CompactionCoordinator extends AbstractServer
             summaries.forEach(summary -> {
               QueueAndPriority qp =
                   QueueAndPriority.get(summary.getQueue().intern(), summary.getPriority());
-              synchronized (QUEUES) {
+              synchronized (qp) {
                 QUEUES.computeIfAbsent(qp.getQueue(), k -> new TreeMap<>())
                     .computeIfAbsent(qp.getPriority(), k -> new LinkedHashSet<>()).add(tsi);
                 INDEX.computeIfAbsent(tsi, k -> new HashSet<>()).add(qp);
@@ -264,7 +264,7 @@ public class CompactionCoordinator extends AbstractServer
         if (null != m) {
           LinkedHashSet<TServerInstance> tservers = m.get(qp.getPriority());
           if (null != tservers) {
-            synchronized (QUEUES) {
+            synchronized (qp) {
               tservers.remove(tsi);
               INDEX.remove(tsi);
             }
@@ -299,31 +299,31 @@ public class CompactionCoordinator extends AbstractServer
     LOG.debug("getCompactionJob " + queueName + " " + compactorAddress);
     String queue = queueName.intern();
     TExternalCompactionJob result = null;
-    // CBUG Review synchronization on QUEUES
-    synchronized (QUEUES) {
-      TreeMap<Long,LinkedHashSet<TServerInstance>> m = QUEUES.get(queue);
-      if (null != m && !m.isEmpty()) {
-        while (result == null) {
-
-          // m could become empty if we have contacted all tservers in this queue and
-          // there are no compactions
-          if (m.isEmpty()) {
-            LOG.debug("No tservers found for queue {}, returning empty job to compactor {}", queue,
-                compactorAddress);
-            result = new TExternalCompactionJob();
-            break;
-          }
-
-          // Get the first TServerInstance from the highest priority queue
-          Entry<Long,LinkedHashSet<TServerInstance>> entry = m.firstEntry();
-          Long priority = entry.getKey();
-          LinkedHashSet<TServerInstance> tservers = entry.getValue();
+    TreeMap<Long,LinkedHashSet<TServerInstance>> m = QUEUES.get(queue);
+    if (null != m && !m.isEmpty()) {
+      while (result == null) {
+
+        // m could become empty if we have contacted all tservers in this queue and
+        // there are no compactions
+        if (m.isEmpty()) {
+          LOG.debug("No tservers found for queue {}, returning empty job to compactor {}", queue,
+              compactorAddress);
+          result = new TExternalCompactionJob();
+          break;
+        }
 
-          if (null == tservers || tservers.isEmpty()) {
-            // Clean up the map entry when no tservers for this queue and priority
-            m.remove(entry.getKey(), entry.getValue());
-            continue;
-          } else {
+        // Get the first TServerInstance from the highest priority queue
+        Entry<Long,LinkedHashSet<TServerInstance>> entry = m.firstEntry();
+        Long priority = entry.getKey();
+        LinkedHashSet<TServerInstance> tservers = entry.getValue();
+        QueueAndPriority qp = QueueAndPriority.get(queue, priority);
+
+        if (null == tservers || tservers.isEmpty()) {
+          // Clean up the map entry when no tservers for this queue and priority
+          m.remove(entry.getKey(), entry.getValue());
+          continue;
+        } else {
+          synchronized(qp) {
             TServerInstance tserver = tservers.iterator().next();
             LOG.debug("Found tserver {} with priority {} for queue {}", tserver.getHostAndPort(),
                 priority, queue);
@@ -334,9 +334,9 @@ public class CompactionCoordinator extends AbstractServer
               // CBUG This may be redundant as cleanup happens in the 'if' clause above
               m.remove(entry.getKey(), entry.getValue());
             }
-            HashSet<QueueAndPriority> qp = INDEX.get(tserver);
-            qp.remove(QueueAndPriority.get(queue, priority));
-            if (qp.isEmpty()) {
+            HashSet<QueueAndPriority> queues = INDEX.get(tserver);
+            queues.remove(QueueAndPriority.get(queue, priority));
+            if (queues.isEmpty()) {
               // Remove the tserver from the index
               INDEX.remove(tserver);
             }
@@ -368,11 +368,11 @@ public class CompactionCoordinator extends AbstractServer
             }
           }
         }
-      } else {
-        LOG.debug("No tservers found for queue {}, returning empty job to compactor {}", queue,
-            compactorAddress);
-        result = new TExternalCompactionJob();
       }
+    } else {
+      LOG.debug("No tservers found for queue {}, returning empty job to compactor {}", queue,
+          compactorAddress);
+      result = new TExternalCompactionJob();
     }
     return result;