You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2023/05/26 05:35:09 UTC
[druid] branch master updated: Fix regression in batch segment allocation (#14337)
This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 0cde3a8b52 Fix regression in batch segment allocation (#14337)
0cde3a8b52 is described below
commit 0cde3a8b522fbd5c09222069ac51689a614b8ec2
Author: Kashif Faraz <ka...@gmail.com>
AuthorDate: Fri May 26 11:04:54 2023 +0530
Fix regression in batch segment allocation (#14337)
* Improve batch segment allocation logs
* Fix batch seg alloc regression
* Fix logs
* Fix logs
* Fix tests and logs
---
.../common/actions/SegmentAllocateRequest.java | 13 +++
.../common/actions/SegmentAllocationQueue.java | 101 +++++++++++++--------
.../common/actions/SegmentAllocateActionTest.java | 4 -
.../common/actions/SegmentAllocationQueueTest.java | 5 +-
4 files changed, 78 insertions(+), 45 deletions(-)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateRequest.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateRequest.java
index adac7523f4..74a32c31c4 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateRequest.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateRequest.java
@@ -74,4 +74,17 @@ public class SegmentAllocateRequest
{
return rowInterval;
}
+
+ @Override
+ public String toString()
+ {
+ return "SegmentAllocateRequest{" +
+ "taskId=" + task.getId() +
+ ", queryGranularity=" + action.getQueryGranularity() +
+ ", segmentGranularity=" + action.getPreferredSegmentGranularity() +
+ ", maxAttempts=" + maxAttempts +
+ ", rowInterval=" + rowInterval +
+ ", attempts=" + attempts +
+ '}';
+ }
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java
index d7ab0fd21f..4d51350c59 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java
@@ -225,7 +225,7 @@ public class SegmentAllocationQueue
{
batch.key.resetQueueTime();
if (!isLeader.get()) {
- batch.failPendingRequests("Cannot allocate segment if not leader");
+ batch.failPendingRequests("Not leader anymore");
return false;
} else if (processingQueue.offer(batch.key)) {
log.debug("Added a new batch [%s] to queue.", batch.key);
@@ -312,7 +312,7 @@ public class SegmentAllocationQueue
while (nextKey != null && !isLeader.get()) {
processingQueue.pollFirst();
AllocateRequestBatch nextBatch = keyToBatch.remove(nextKey);
- nextBatch.failPendingRequests("Cannot allocate segment if not leader");
+ nextBatch.failPendingRequests("Not leader anymore");
++failedBatches;
nextKey = processingQueue.peekFirst();
@@ -332,15 +332,13 @@ public class SegmentAllocationQueue
if (requestBatch.isEmpty()) {
return true;
} else if (!isLeader.get()) {
- requestBatch.failPendingRequests("Cannot allocate segment if not leader");
+ requestBatch.failPendingRequests("Not leader anymore");
return true;
}
log.debug(
"Processing [%d] requests for batch [%s], queue time [%s].",
- requestBatch.size(),
- requestKey,
- requestKey.getQueueTime()
+ requestBatch.size(), requestKey, requestKey.getQueueTime()
);
final long startTimeMillis = System.currentTimeMillis();
@@ -364,7 +362,14 @@ public class SegmentAllocationQueue
final Set<DataSegment> updatedUsedSegments = retrieveUsedSegments(requestKey);
if (updatedUsedSegments.equals(usedSegments)) {
- requestBatch.failPendingRequests("Allocation failed probably due to conflicting segments.");
+ log.warn(
+ "Completing [%d] failed requests in batch [%s] with null value as there"
+ + " are conflicting segments. Cannot retry allocation until the set of"
+ + " used segments overlapping the allocation interval [%s] changes.",
+ size(), requestKey, requestKey.preferredAllocationInterval
+ );
+
+ requestBatch.completePendingRequestsWithNull();
return true;
} else {
log.debug("Used segments have changed. Requeuing failed requests.");
@@ -390,6 +395,7 @@ public class SegmentAllocationQueue
// Find requests whose row interval overlaps with an existing used segment
final Set<SegmentAllocateRequest> allRequests = requestBatch.getRequests();
final Set<SegmentAllocateRequest> requestsWithNoOverlappingSegment = new HashSet<>();
+ final List<SegmentAllocateRequest> requestsWithPartialOverlap = new ArrayList<>();
if (usedSegments.isEmpty()) {
requestsWithNoOverlappingSegment.addAll(allRequests);
@@ -415,38 +421,42 @@ public class SegmentAllocationQueue
// There is no valid allocation interval for this request due to a
// partially overlapping used segment. Need not do anything right now.
// The request will be retried upon requeueing the batch.
+ requestsWithPartialOverlap.add(request);
}
}
// Try to allocate segments for the identified used segment intervals.
// Do not retry the failed requests with other intervals unless the batch is requeued.
for (Map.Entry<Interval, List<SegmentAllocateRequest>> entry : overlapIntervalToRequests.entrySet()) {
- successCount += allocateSegmentsForInterval(
- entry.getKey(),
- entry.getValue(),
- requestBatch
- );
+ successCount +=
+ allocateSegmentsForInterval(entry.getKey(), entry.getValue(), requestBatch);
}
}
// For requests that do not overlap with a used segment, first try to allocate
- // using the preferred granularity, then smaller granularities
+ // using the preferred granularity, then successively smaller granularities
final Set<SegmentAllocateRequest> pendingRequests = new HashSet<>(requestsWithNoOverlappingSegment);
- for (Granularity granularity :
- Granularity.granularitiesFinerThan(requestBatch.key.preferredSegmentGranularity)) {
+ final List<Granularity> candidateGranularities
+ = Granularity.granularitiesFinerThan(requestBatch.key.preferredSegmentGranularity);
+ for (Granularity granularity : candidateGranularities) {
Map<Interval, List<SegmentAllocateRequest>> requestsByInterval =
getRequestsByInterval(pendingRequests, granularity);
for (Map.Entry<Interval, List<SegmentAllocateRequest>> entry : requestsByInterval.entrySet()) {
- successCount += allocateSegmentsForInterval(
- entry.getKey(),
- entry.getValue(),
- requestBatch
- );
+ successCount +=
+ allocateSegmentsForInterval(entry.getKey(), entry.getValue(), requestBatch);
pendingRequests.retainAll(requestBatch.getRequests());
}
}
+ if (!requestsWithPartialOverlap.isEmpty()) {
+ log.info(
+ "Found [%d] requests in batch [%s] with row intervals that partially overlap existing segments."
+ + " These cannot be processed until the set of used segments changes. Example request: [%s]",
+ requestsWithPartialOverlap.size(), requestBatch.key, requestsWithPartialOverlap.get(0)
+ );
+ }
+
return successCount;
}
@@ -474,9 +484,7 @@ public class SegmentAllocationQueue
final AllocateRequestKey requestKey = requestBatch.key;
log.debug(
"Trying allocation for [%d] requests, interval [%s] in batch [%s]",
- requests.size(),
- tryInterval,
- requestKey
+ requests.size(), tryInterval, requestKey
);
final List<SegmentAllocateResult> results = taskLockbox.allocateSegments(
@@ -581,7 +589,7 @@ public class SegmentAllocationQueue
synchronized void failPendingRequests(Throwable cause)
{
if (!requestToFuture.isEmpty()) {
- log.warn("Failing [%d] requests in batch due to [%s]. Batch key: %s", size(), cause.getMessage(), key);
+ log.warn("Failing [%d] requests in batch [%s], reason [%s].", size(), cause.getMessage(), key);
requestToFuture.values().forEach(future -> future.completeExceptionally(cause));
requestToFuture.keySet().forEach(
request -> emitTaskMetric("task/action/failed/count", 1L, request)
@@ -590,6 +598,19 @@ public class SegmentAllocationQueue
}
}
+ synchronized void completePendingRequestsWithNull()
+ {
+ if (requestToFuture.isEmpty()) {
+ return;
+ }
+
+ requestToFuture.values().forEach(future -> future.complete(null));
+ requestToFuture.keySet().forEach(
+ request -> emitTaskMetric("task/action/failed/count", 1L, request)
+ );
+ requestToFuture.clear();
+ }
+
synchronized void handleResult(SegmentAllocateResult result, SegmentAllocateRequest request)
{
request.incrementAttempts();
@@ -599,20 +620,17 @@ public class SegmentAllocationQueue
requestToFuture.remove(request).complete(result.getSegmentId());
} else if (request.canRetry()) {
log.info(
- "Allocation failed in attempt [%d] due to error [%s]. Can still retry. Action: %s",
- request.getAttempts(),
- result.getErrorMessage(),
- request.getAction()
+ "Allocation failed on attempt [%d] due to error [%s]. Can still retry action [%s].",
+ request.getAttempts(), result.getErrorMessage(), request.getAction()
);
} else {
emitTaskMetric("task/action/failed/count", 1L, request);
log.error(
- "Failing allocate action after [%d] attempts. Latest error [%s]. Action: %s",
- request.getAttempts(),
- result.getErrorMessage(),
- request.getAction()
+ "Exhausted max attempts [%d] for allocation with latest error [%s]."
+ + " Completing action [%s] with a null value.",
+ request.getAttempts(), result.getErrorMessage(), request.getAction()
);
- requestToFuture.remove(request).completeExceptionally(new ISE(result.getErrorMessage()));
+ requestToFuture.remove(request).complete(null);
}
}
@@ -651,6 +669,7 @@ public class SegmentAllocationQueue
private final boolean useNonRootGenPartitionSpace;
private final int hash;
+ private final String serialized;
/**
* Creates a new key for the given request. The batch for a unique key will
@@ -681,6 +700,7 @@ public class SegmentAllocationQueue
preferredAllocationInterval,
lockGranularity
);
+ this.serialized = serialize();
this.maxWaitTimeMillis = maxWaitTimeMillis;
}
@@ -727,14 +747,19 @@ public class SegmentAllocationQueue
@Override
public String toString()
+ {
+ return serialized;
+ }
+
+ private String serialize()
{
return "{" +
- "ds='" + dataSource + '\'' +
- ", gr='" + groupId + '\'' +
- ", incId=" + batchIncrementalId +
+ "datasource='" + dataSource + '\'' +
+ ", groupId='" + groupId + '\'' +
+ ", batchId=" + batchIncrementalId +
", lock=" + lockGranularity +
- ", invl=" + preferredAllocationInterval +
- ", slc=" + skipSegmentLineageCheck +
+ ", allocInterval=" + preferredAllocationInterval +
+ ", skipLineageCheck=" + skipSegmentLineageCheck +
'}';
}
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java
index c8861a92cd..b498834fba 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java
@@ -64,7 +64,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -984,9 +983,6 @@ public class SegmentAllocateActionTest
return action.perform(task, taskActionTestKit.getTaskActionToolbox());
}
}
- catch (ExecutionException e) {
- return null;
- }
catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java
index 974b3096f9..8adb18ddb7 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java
@@ -223,8 +223,7 @@ public class SegmentAllocationQueueTest
executor.finishNextPendingTask();
Assert.assertNotNull(getSegmentId(hourSegmentFuture));
- Throwable t = Assert.assertThrows(ISE.class, () -> getSegmentId(halfHourSegmentFuture));
- Assert.assertEquals("Storage coordinator could not allocate segment.", t.getMessage());
+ Assert.assertNull(getSegmentId(halfHourSegmentFuture));
}
@Test
@@ -309,7 +308,7 @@ public class SegmentAllocationQueueTest
for (Future<SegmentIdWithShardSpec> future : segmentFutures) {
Throwable t = Assert.assertThrows(ISE.class, () -> getSegmentId(future));
- Assert.assertEquals("Cannot allocate segment if not leader", t.getMessage());
+ Assert.assertEquals("Not leader anymore", t.getMessage());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org