You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2021/06/07 12:18:39 UTC

[accumulo] branch main updated: Make DeadCompactionDetector handle network hiccups (#2132)

This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 3dc2bb7  Make DeadCompactionDetector handle network hiccups (#2132)
3dc2bb7 is described below

commit 3dc2bb736995ef9a4bd78eedaeda2e2c59585338
Author: Dave Marion <dl...@apache.org>
AuthorDate: Mon Jun 7 08:18:24 2021 -0400

    Make DeadCompactionDetector handle network hiccups (#2132)
    
    Modified the DeadCompactionDetector to fail compactions if
    they are dead for more than two cycles. This should handle
    the case where there is a transient network issue talking
    to another component.
    
    Closes #2125
---
 .../coordinator/DeadCompactionDetector.java        | 43 +++++++++++++++++++---
 1 file changed, 38 insertions(+), 5 deletions(-)

diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
index 1b5bd28..3b6a3b4 100644
--- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
@@ -21,8 +21,12 @@ package org.apache.accumulo.coordinator;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
@@ -40,13 +44,15 @@ public class DeadCompactionDetector {
 
   private final ServerContext context;
   private final CompactionCoordinator coordinator;
-  private ScheduledThreadPoolExecutor schedExecutor;
+  private final ScheduledThreadPoolExecutor schedExecutor;
+  private final ConcurrentHashMap<ExternalCompactionId,Long> deadCompactions;
 
   public DeadCompactionDetector(ServerContext context, CompactionCoordinator coordinator,
       ScheduledThreadPoolExecutor stpe) {
     this.context = context;
     this.coordinator = coordinator;
     this.schedExecutor = stpe;
+    this.deadCompactions = new ConcurrentHashMap<>();
   }
 
   private void detectDeadCompactions() {
@@ -66,6 +72,9 @@ public class DeadCompactionDetector {
         });
 
     if (tabletCompactions.isEmpty()) {
+      // Clear out dead compactions, tservers don't think anything is running
+      log.trace("Clearing the dead compaction map, no tablets have compactions running");
+      this.deadCompactions.clear();
       // no need to look for dead compactions when tablets don't have anything recorded as running
       return;
     }
@@ -74,6 +83,10 @@ public class DeadCompactionDetector {
       tabletCompactions.forEach((ecid, extent) -> log.trace("Saw {} for {}", ecid, extent));
     }
 
+    // Remove from the dead map any compactions that the Tablet's
+    // do not think are running any more.
+    this.deadCompactions.keySet().retainAll(tabletCompactions.keySet());
+
     // Determine what compactions are currently running and remove those.
     //
     // In order for this overall algorithm to be correct and avoid race conditions, the compactor
@@ -84,21 +97,41 @@ public class DeadCompactionDetector {
 
     running.forEach((ecid) -> {
       if (tabletCompactions.remove(ecid) != null) {
-        log.trace("Removed {} running on a compactor", ecid);
+        log.trace("Removed compaction {} running on a compactor", ecid);
+      }
+      if (this.deadCompactions.remove(ecid) != null) {
+        log.trace("Removed {} from the dead compaction map, it's running on a compactor", ecid);
       }
     });
 
     // Determine which compactions are currently committing and remove those
     context.getAmple().getExternalCompactionFinalStates()
-        .map(ecfs -> ecfs.getExternalCompactionId()).forEach(tabletCompactions::remove);
+        .map(ecfs -> ecfs.getExternalCompactionId()).forEach(ecid -> {
+          if (tabletCompactions.remove(ecid) != null) {
+            log.trace("Removed compaction {} that is committing", ecid);
+          }
+          if (this.deadCompactions.remove(ecid) != null) {
+            log.trace("Removed {} from the dead compaction map, it's committing", ecid);
+          }
+        });
 
-    tabletCompactions
-        .forEach((ecid, extent) -> log.debug("Detected dead compaction {} {}", ecid, extent));
+    tabletCompactions.forEach((ecid, extent) -> {
+      log.debug("Possible dead compaction detected {} {}", ecid, extent);
+      this.deadCompactions.merge(ecid, 1L, Long::sum);
+    });
 
     // Everything left in tabletCompactions is no longer running anywhere and should be failed.
     // Its possible that a compaction committed while going through the steps above, if so then
     // that is ok and marking it failed will end up being a no-op.
+    Set<ExternalCompactionId> toFail =
+        this.deadCompactions.entrySet().stream().filter(e -> e.getValue() > 2).map(e -> e.getKey())
+            .collect(Collectors.toCollection(TreeSet::new));
+    tabletCompactions.keySet().retainAll(toFail);
+    tabletCompactions.forEach((eci, v) -> {
+      log.warn("Compaction {} believed to be dead, failing it.", eci);
+    });
     coordinator.compactionFailed(tabletCompactions);
+    this.deadCompactions.keySet().removeAll(toFail);
   }
 
   public void start() {