You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2021/06/07 12:18:39 UTC
[accumulo] branch main updated: Make DeadCompactionDetector handle
network hiccups (#2132)
This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 3dc2bb7 Make DeadCompactionDetector handle network hiccups (#2132)
3dc2bb7 is described below
commit 3dc2bb736995ef9a4bd78eedaeda2e2c59585338
Author: Dave Marion <dl...@apache.org>
AuthorDate: Mon Jun 7 08:18:24 2021 -0400
Make DeadCompactionDetector handle network hiccups (#2132)
Modified the DeadCompactionDetector to fail compactions if
they are dead for more than two cycles. This should handle
the case where there is a transient network issue talking
to another component.
Closes #2125
---
.../coordinator/DeadCompactionDetector.java | 43 +++++++++++++++++++---
1 file changed, 38 insertions(+), 5 deletions(-)
diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
index 1b5bd28..3b6a3b4 100644
--- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
@@ -21,8 +21,12 @@ package org.apache.accumulo.coordinator;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.dataImpl.KeyExtent;
@@ -40,13 +44,15 @@ public class DeadCompactionDetector {
private final ServerContext context;
private final CompactionCoordinator coordinator;
- private ScheduledThreadPoolExecutor schedExecutor;
+ private final ScheduledThreadPoolExecutor schedExecutor;
+ private final ConcurrentHashMap<ExternalCompactionId,Long> deadCompactions;
public DeadCompactionDetector(ServerContext context, CompactionCoordinator coordinator,
ScheduledThreadPoolExecutor stpe) {
this.context = context;
this.coordinator = coordinator;
this.schedExecutor = stpe;
+ this.deadCompactions = new ConcurrentHashMap<>();
}
private void detectDeadCompactions() {
@@ -66,6 +72,9 @@ public class DeadCompactionDetector {
});
if (tabletCompactions.isEmpty()) {
+ // Clear out dead compactions, tservers don't think anything is running
+ log.trace("Clearing the dead compaction map, no tablets have compactions running");
+ this.deadCompactions.clear();
// no need to look for dead compactions when tablets don't have anything recorded as running
return;
}
@@ -74,6 +83,10 @@ public class DeadCompactionDetector {
tabletCompactions.forEach((ecid, extent) -> log.trace("Saw {} for {}", ecid, extent));
}
+ // Remove from the dead map any compactions that the Tablet's
+ // do not think are running any more.
+ this.deadCompactions.keySet().retainAll(tabletCompactions.keySet());
+
// Determine what compactions are currently running and remove those.
//
// In order for this overall algorithm to be correct and avoid race conditions, the compactor
@@ -84,21 +97,41 @@ public class DeadCompactionDetector {
running.forEach((ecid) -> {
if (tabletCompactions.remove(ecid) != null) {
- log.trace("Removed {} running on a compactor", ecid);
+ log.trace("Removed compaction {} running on a compactor", ecid);
+ }
+ if (this.deadCompactions.remove(ecid) != null) {
+ log.trace("Removed {} from the dead compaction map, it's running on a compactor", ecid);
}
});
// Determine which compactions are currently committing and remove those
context.getAmple().getExternalCompactionFinalStates()
- .map(ecfs -> ecfs.getExternalCompactionId()).forEach(tabletCompactions::remove);
+ .map(ecfs -> ecfs.getExternalCompactionId()).forEach(ecid -> {
+ if (tabletCompactions.remove(ecid) != null) {
+ log.trace("Removed compaction {} that is committing", ecid);
+ }
+ if (this.deadCompactions.remove(ecid) != null) {
+ log.trace("Removed {} from the dead compaction map, it's committing", ecid);
+ }
+ });
- tabletCompactions
- .forEach((ecid, extent) -> log.debug("Detected dead compaction {} {}", ecid, extent));
+ tabletCompactions.forEach((ecid, extent) -> {
+ log.debug("Possible dead compaction detected {} {}", ecid, extent);
+ this.deadCompactions.merge(ecid, 1L, Long::sum);
+ });
// Everything left in tabletCompactions is no longer running anywhere and should be failed.
// Its possible that a compaction committed while going through the steps above, if so then
// that is ok and marking it failed will end up being a no-op.
+ Set<ExternalCompactionId> toFail =
+ this.deadCompactions.entrySet().stream().filter(e -> e.getValue() > 2).map(e -> e.getKey())
+ .collect(Collectors.toCollection(TreeSet::new));
+ tabletCompactions.keySet().retainAll(toFail);
+ tabletCompactions.forEach((eci, v) -> {
+ log.warn("Compaction {} believed to be dead, failing it.", eci);
+ });
coordinator.compactionFailed(tabletCompactions);
+ this.deadCompactions.keySet().removeAll(toFail);
}
public void start() {