You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by am...@apache.org on 2023/11/02 13:37:25 UTC
(druid) branch master updated: Fix used segment retrieval in Kill tasks (#15306)
This is an automated email from the ASF dual-hosted git repository.
amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new dc3213b05dd Fix used segment retrieval in Kill tasks (#15306)
dc3213b05dd is described below
commit dc3213b05dd4ca62f4273d9e14f547c791c0dbc6
Author: AmatyaAvadhanula <am...@imply.io>
AuthorDate: Thu Nov 2 19:07:17 2023 +0530
Fix used segment retrieval in Kill tasks (#15306)
Fix used segment retrieval in Kill tasks
---
.../common/task/KillUnusedSegmentsTask.java | 60 ++++++++++++----------
1 file changed, 32 insertions(+), 28 deletions(-)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
index 1726a3e6800..54fae94684f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery;
import org.apache.druid.error.InvalidInput;
@@ -171,8 +172,6 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
@Override
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
- final NavigableMap<DateTime, List<TaskLock>> taskLockMap = getTaskLockMap(toolbox.getTaskActionClient());
-
// Track stats for reporting
int numSegmentsKilled = 0;
int numBatchesProcessed = 0;
@@ -196,21 +195,40 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
limit,
numTotalBatches != null ? StringUtils.format(" in [%d] batches.", numTotalBatches) : "."
);
+
+ RetrieveUsedSegmentsAction retrieveUsedSegmentsAction = new RetrieveUsedSegmentsAction(
+ getDataSource(),
+ null,
+ ImmutableList.of(getInterval()),
+ Segments.INCLUDING_OVERSHADOWED
+ );
+ // Fetch the load specs of all segments overlapping with the unused segment intervals
+ final Set<Map<String, Object>> usedSegmentLoadSpecs =
+ new HashSet<>(toolbox.getTaskActionClient().submit(retrieveUsedSegmentsAction)
+ .stream()
+ .map(DataSegment::getLoadSpec)
+ .collect(Collectors.toSet())
+ );
+
do {
if (nextBatchSize <= 0) {
break;
}
unusedSegments = toolbox
- .getTaskActionClient()
- .submit(new RetrieveUnusedSegmentsAction(getDataSource(), getInterval(), nextBatchSize));
+ .getTaskActionClient()
+ .submit(new RetrieveUnusedSegmentsAction(getDataSource(), getInterval(), nextBatchSize));
+
+ // Fetch locks each time as a revokal could have occurred in between batches
+ final NavigableMap<DateTime, List<TaskLock>> taskLockMap
+ = getNonRevokedTaskLockMap(toolbox.getTaskActionClient());
if (!TaskLocks.isLockCoversSegments(taskLockMap, unusedSegments)) {
throw new ISE(
- "Locks[%s] for task[%s] can't cover segments[%s]",
- taskLockMap.values().stream().flatMap(List::stream).collect(Collectors.toList()),
- getId(),
- unusedSegments
+ "Locks[%s] for task[%s] can't cover segments[%s]",
+ taskLockMap.values().stream().flatMap(List::stream).collect(Collectors.toList()),
+ getId(),
+ unusedSegments
);
}
@@ -222,24 +240,6 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
toolbox.getTaskActionClient().submit(new SegmentNukeAction(new HashSet<>(unusedSegments)));
- final Set<Interval> unusedSegmentIntervals = unusedSegments.stream()
- .map(DataSegment::getInterval)
- .collect(Collectors.toSet());
- final Set<Map<String, Object>> usedSegmentLoadSpecs = new HashSet<>();
- if (!unusedSegmentIntervals.isEmpty()) {
- RetrieveUsedSegmentsAction retrieveUsedSegmentsAction = new RetrieveUsedSegmentsAction(
- getDataSource(),
- null,
- unusedSegmentIntervals,
- Segments.INCLUDING_OVERSHADOWED
- );
- // Fetch the load specs of all segments overlapping with the unused segment intervals
- usedSegmentLoadSpecs.addAll(toolbox.getTaskActionClient().submit(retrieveUsedSegmentsAction)
- .stream()
- .map(DataSegment::getLoadSpec)
- .collect(Collectors.toSet())
- );
- }
// Kill segments from the deep storage only if their load specs are not being used by any used segments
final List<DataSegment> segmentsToBeKilled = unusedSegments
@@ -289,11 +289,15 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
return null != limit ? Math.min(limit - numSegmentsKilled, batchSize) : batchSize;
}
- private NavigableMap<DateTime, List<TaskLock>> getTaskLockMap(TaskActionClient client) throws IOException
+ private NavigableMap<DateTime, List<TaskLock>> getNonRevokedTaskLockMap(TaskActionClient client) throws IOException
{
final NavigableMap<DateTime, List<TaskLock>> taskLockMap = new TreeMap<>();
getTaskLocks(client).forEach(
- taskLock -> taskLockMap.computeIfAbsent(taskLock.getInterval().getStart(), k -> new ArrayList<>()).add(taskLock)
+ taskLock -> {
+ if (!taskLock.isRevoked()) {
+ taskLockMap.computeIfAbsent(taskLock.getInterval().getStart(), k -> new ArrayList<>()).add(taskLock);
+ }
+ }
);
return taskLockMap;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org