You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by am...@apache.org on 2023/11/02 13:37:25 UTC

(druid) branch master updated: Fix used segment retrieval in Kill tasks (#15306)

This is an automated email from the ASF dual-hosted git repository.

amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new dc3213b05dd Fix used segment retrieval in Kill tasks (#15306)
dc3213b05dd is described below

commit dc3213b05dd4ca62f4273d9e14f547c791c0dbc6
Author: AmatyaAvadhanula <am...@imply.io>
AuthorDate: Thu Nov 2 19:07:17 2023 +0530

    Fix used segment retrieval in Kill tasks (#15306)
    
    Fix used segment retrieval in Kill tasks
---
 .../common/task/KillUnusedSegmentsTask.java        | 60 ++++++++++++----------
 1 file changed, 32 insertions(+), 28 deletions(-)

diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
index 1726a3e6800..54fae94684f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery;
 import org.apache.druid.error.InvalidInput;
@@ -171,8 +172,6 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
   @Override
   public TaskStatus runTask(TaskToolbox toolbox) throws Exception
   {
-    final NavigableMap<DateTime, List<TaskLock>> taskLockMap = getTaskLockMap(toolbox.getTaskActionClient());
-
     // Track stats for reporting
     int numSegmentsKilled = 0;
     int numBatchesProcessed = 0;
@@ -196,21 +195,40 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
         limit,
         numTotalBatches != null ? StringUtils.format(" in [%d] batches.", numTotalBatches) : "."
     );
+
+    RetrieveUsedSegmentsAction retrieveUsedSegmentsAction = new RetrieveUsedSegmentsAction(
+            getDataSource(),
+            null,
+            ImmutableList.of(getInterval()),
+            Segments.INCLUDING_OVERSHADOWED
+    );
+    // Fetch the load specs of all segments overlapping with the unused segment intervals
+    final Set<Map<String, Object>> usedSegmentLoadSpecs =
+            new HashSet<>(toolbox.getTaskActionClient().submit(retrieveUsedSegmentsAction)
+                    .stream()
+                    .map(DataSegment::getLoadSpec)
+                    .collect(Collectors.toSet())
+            );
+
     do {
       if (nextBatchSize <= 0) {
         break;
       }
 
       unusedSegments = toolbox
-          .getTaskActionClient()
-          .submit(new RetrieveUnusedSegmentsAction(getDataSource(), getInterval(), nextBatchSize));
+              .getTaskActionClient()
+              .submit(new RetrieveUnusedSegmentsAction(getDataSource(), getInterval(), nextBatchSize));
+
+      // Fetch locks each time as a revokal could have occurred in between batches
+      final NavigableMap<DateTime, List<TaskLock>> taskLockMap
+              = getNonRevokedTaskLockMap(toolbox.getTaskActionClient());
 
       if (!TaskLocks.isLockCoversSegments(taskLockMap, unusedSegments)) {
         throw new ISE(
-            "Locks[%s] for task[%s] can't cover segments[%s]",
-            taskLockMap.values().stream().flatMap(List::stream).collect(Collectors.toList()),
-            getId(),
-            unusedSegments
+                "Locks[%s] for task[%s] can't cover segments[%s]",
+                taskLockMap.values().stream().flatMap(List::stream).collect(Collectors.toList()),
+                getId(),
+                unusedSegments
         );
       }
 
@@ -222,24 +240,6 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
 
       toolbox.getTaskActionClient().submit(new SegmentNukeAction(new HashSet<>(unusedSegments)));
 
-      final Set<Interval> unusedSegmentIntervals = unusedSegments.stream()
-                                                                 .map(DataSegment::getInterval)
-                                                                 .collect(Collectors.toSet());
-      final Set<Map<String, Object>> usedSegmentLoadSpecs = new HashSet<>();
-      if (!unusedSegmentIntervals.isEmpty()) {
-        RetrieveUsedSegmentsAction retrieveUsedSegmentsAction = new RetrieveUsedSegmentsAction(
-            getDataSource(),
-            null,
-            unusedSegmentIntervals,
-            Segments.INCLUDING_OVERSHADOWED
-        );
-        // Fetch the load specs of all segments overlapping with the unused segment intervals
-        usedSegmentLoadSpecs.addAll(toolbox.getTaskActionClient().submit(retrieveUsedSegmentsAction)
-                                           .stream()
-                                           .map(DataSegment::getLoadSpec)
-                                           .collect(Collectors.toSet())
-        );
-      }
 
       // Kill segments from the deep storage only if their load specs are not being used by any used segments
       final List<DataSegment> segmentsToBeKilled = unusedSegments
@@ -289,11 +289,15 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
     return null != limit ? Math.min(limit - numSegmentsKilled, batchSize) : batchSize;
   }
 
-  private NavigableMap<DateTime, List<TaskLock>> getTaskLockMap(TaskActionClient client) throws IOException
+  private NavigableMap<DateTime, List<TaskLock>> getNonRevokedTaskLockMap(TaskActionClient client) throws IOException
   {
     final NavigableMap<DateTime, List<TaskLock>> taskLockMap = new TreeMap<>();
     getTaskLocks(client).forEach(
-        taskLock -> taskLockMap.computeIfAbsent(taskLock.getInterval().getStart(), k -> new ArrayList<>()).add(taskLock)
+        taskLock -> {
+          if (!taskLock.isRevoked()) {
+            taskLockMap.computeIfAbsent(taskLock.getInterval().getStart(), k -> new ArrayList<>()).add(taskLock);
+          }
+        }
     );
     return taskLockMap;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org