You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2023/05/26 05:35:09 UTC

[druid] branch master updated: Fix regression in batch segment allocation (#14337)

This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 0cde3a8b52 Fix regression in batch segment allocation (#14337)
0cde3a8b52 is described below

commit 0cde3a8b522fbd5c09222069ac51689a614b8ec2
Author: Kashif Faraz <ka...@gmail.com>
AuthorDate: Fri May 26 11:04:54 2023 +0530

    Fix regression in batch segment allocation (#14337)
    
    * Improve batch segment allocation logs
    
    * Fix batch seg alloc regression
    
    * Fix logs
    
    * Fix logs
    
    * Fix tests and logs
---
 .../common/actions/SegmentAllocateRequest.java     |  13 +++
 .../common/actions/SegmentAllocationQueue.java     | 101 +++++++++++++--------
 .../common/actions/SegmentAllocateActionTest.java  |   4 -
 .../common/actions/SegmentAllocationQueueTest.java |   5 +-
 4 files changed, 78 insertions(+), 45 deletions(-)

diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateRequest.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateRequest.java
index adac7523f4..74a32c31c4 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateRequest.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateRequest.java
@@ -74,4 +74,17 @@ public class SegmentAllocateRequest
   {
     return rowInterval;
   }
+
+  @Override
+  public String toString()
+  {
+    return "SegmentAllocateRequest{" +
+           "taskId=" + task.getId() +
+           ", queryGranularity=" + action.getQueryGranularity() +
+           ", segmentGranularity=" + action.getPreferredSegmentGranularity() +
+           ", maxAttempts=" + maxAttempts +
+           ", rowInterval=" + rowInterval +
+           ", attempts=" + attempts +
+           '}';
+  }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java
index d7ab0fd21f..4d51350c59 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java
@@ -225,7 +225,7 @@ public class SegmentAllocationQueue
   {
     batch.key.resetQueueTime();
     if (!isLeader.get()) {
-      batch.failPendingRequests("Cannot allocate segment if not leader");
+      batch.failPendingRequests("Not leader anymore");
       return false;
     } else if (processingQueue.offer(batch.key)) {
       log.debug("Added a new batch [%s] to queue.", batch.key);
@@ -312,7 +312,7 @@ public class SegmentAllocationQueue
     while (nextKey != null && !isLeader.get()) {
       processingQueue.pollFirst();
       AllocateRequestBatch nextBatch = keyToBatch.remove(nextKey);
-      nextBatch.failPendingRequests("Cannot allocate segment if not leader");
+      nextBatch.failPendingRequests("Not leader anymore");
       ++failedBatches;
 
       nextKey = processingQueue.peekFirst();
@@ -332,15 +332,13 @@ public class SegmentAllocationQueue
     if (requestBatch.isEmpty()) {
       return true;
     } else if (!isLeader.get()) {
-      requestBatch.failPendingRequests("Cannot allocate segment if not leader");
+      requestBatch.failPendingRequests("Not leader anymore");
       return true;
     }
 
     log.debug(
         "Processing [%d] requests for batch [%s], queue time [%s].",
-        requestBatch.size(),
-        requestKey,
-        requestKey.getQueueTime()
+        requestBatch.size(), requestKey, requestKey.getQueueTime()
     );
 
     final long startTimeMillis = System.currentTimeMillis();
@@ -364,7 +362,14 @@ public class SegmentAllocationQueue
     final Set<DataSegment> updatedUsedSegments = retrieveUsedSegments(requestKey);
 
     if (updatedUsedSegments.equals(usedSegments)) {
-      requestBatch.failPendingRequests("Allocation failed probably due to conflicting segments.");
+      log.warn(
+          "Completing [%d] failed requests in batch [%s] with null value as there"
+          + " are conflicting segments. Cannot retry allocation until the set of"
+          + " used segments overlapping the allocation interval [%s] changes.",
+          size(), requestKey, requestKey.preferredAllocationInterval
+      );
+
+      requestBatch.completePendingRequestsWithNull();
       return true;
     } else {
       log.debug("Used segments have changed. Requeuing failed requests.");
@@ -390,6 +395,7 @@ public class SegmentAllocationQueue
     // Find requests whose row interval overlaps with an existing used segment
     final Set<SegmentAllocateRequest> allRequests = requestBatch.getRequests();
     final Set<SegmentAllocateRequest> requestsWithNoOverlappingSegment = new HashSet<>();
+    final List<SegmentAllocateRequest> requestsWithPartialOverlap = new ArrayList<>();
 
     if (usedSegments.isEmpty()) {
       requestsWithNoOverlappingSegment.addAll(allRequests);
@@ -415,38 +421,42 @@ public class SegmentAllocationQueue
           // There is no valid allocation interval for this request due to a
           // partially overlapping used segment. Need not do anything right now.
           // The request will be retried upon requeueing the batch.
+          requestsWithPartialOverlap.add(request);
         }
       }
 
       // Try to allocate segments for the identified used segment intervals.
       // Do not retry the failed requests with other intervals unless the batch is requeued.
       for (Map.Entry<Interval, List<SegmentAllocateRequest>> entry : overlapIntervalToRequests.entrySet()) {
-        successCount += allocateSegmentsForInterval(
-            entry.getKey(),
-            entry.getValue(),
-            requestBatch
-        );
+        successCount +=
+            allocateSegmentsForInterval(entry.getKey(), entry.getValue(), requestBatch);
       }
     }
 
     // For requests that do not overlap with a used segment, first try to allocate
-    // using the preferred granularity, then smaller granularities
+    // using the preferred granularity, then successively smaller granularities
     final Set<SegmentAllocateRequest> pendingRequests = new HashSet<>(requestsWithNoOverlappingSegment);
-    for (Granularity granularity :
-        Granularity.granularitiesFinerThan(requestBatch.key.preferredSegmentGranularity)) {
+    final List<Granularity> candidateGranularities
+        = Granularity.granularitiesFinerThan(requestBatch.key.preferredSegmentGranularity);
+    for (Granularity granularity : candidateGranularities) {
       Map<Interval, List<SegmentAllocateRequest>> requestsByInterval =
           getRequestsByInterval(pendingRequests, granularity);
 
       for (Map.Entry<Interval, List<SegmentAllocateRequest>> entry : requestsByInterval.entrySet()) {
-        successCount += allocateSegmentsForInterval(
-            entry.getKey(),
-            entry.getValue(),
-            requestBatch
-        );
+        successCount +=
+            allocateSegmentsForInterval(entry.getKey(), entry.getValue(), requestBatch);
         pendingRequests.retainAll(requestBatch.getRequests());
       }
     }
 
+    if (!requestsWithPartialOverlap.isEmpty()) {
+      log.info(
+          "Found [%d] requests in batch [%s] with row intervals that partially overlap existing segments."
+          + " These cannot be processed until the set of used segments changes. Example request: [%s]",
+          requestsWithPartialOverlap.size(), requestBatch.key, requestsWithPartialOverlap.get(0)
+      );
+    }
+
     return successCount;
   }
 
@@ -474,9 +484,7 @@ public class SegmentAllocationQueue
     final AllocateRequestKey requestKey = requestBatch.key;
     log.debug(
         "Trying allocation for [%d] requests, interval [%s] in batch [%s]",
-        requests.size(),
-        tryInterval,
-        requestKey
+        requests.size(), tryInterval, requestKey
     );
 
     final List<SegmentAllocateResult> results = taskLockbox.allocateSegments(
@@ -581,7 +589,7 @@ public class SegmentAllocationQueue
     synchronized void failPendingRequests(Throwable cause)
     {
       if (!requestToFuture.isEmpty()) {
-        log.warn("Failing [%d] requests in batch due to [%s]. Batch key: %s", size(), cause.getMessage(), key);
+        log.warn("Failing [%d] requests in batch [%s], reason [%s].", size(), cause.getMessage(), key);
         requestToFuture.values().forEach(future -> future.completeExceptionally(cause));
         requestToFuture.keySet().forEach(
             request -> emitTaskMetric("task/action/failed/count", 1L, request)
@@ -590,6 +598,19 @@ public class SegmentAllocationQueue
       }
     }
 
+    synchronized void completePendingRequestsWithNull()
+    {
+      if (requestToFuture.isEmpty()) {
+        return;
+      }
+
+      requestToFuture.values().forEach(future -> future.complete(null));
+      requestToFuture.keySet().forEach(
+          request -> emitTaskMetric("task/action/failed/count", 1L, request)
+      );
+      requestToFuture.clear();
+    }
+
     synchronized void handleResult(SegmentAllocateResult result, SegmentAllocateRequest request)
     {
       request.incrementAttempts();
@@ -599,20 +620,17 @@ public class SegmentAllocationQueue
         requestToFuture.remove(request).complete(result.getSegmentId());
       } else if (request.canRetry()) {
         log.info(
-            "Allocation failed in attempt [%d] due to error [%s]. Can still retry. Action: %s",
-            request.getAttempts(),
-            result.getErrorMessage(),
-            request.getAction()
+            "Allocation failed on attempt [%d] due to error [%s]. Can still retry action [%s].",
+            request.getAttempts(), result.getErrorMessage(), request.getAction()
         );
       } else {
         emitTaskMetric("task/action/failed/count", 1L, request);
         log.error(
-            "Failing allocate action after [%d] attempts. Latest error [%s]. Action: %s",
-            request.getAttempts(),
-            result.getErrorMessage(),
-            request.getAction()
+            "Exhausted max attempts [%d] for allocation with latest error [%s]."
+            + " Completing action [%s] with a null value.",
+            request.getAttempts(), result.getErrorMessage(), request.getAction()
         );
-        requestToFuture.remove(request).completeExceptionally(new ISE(result.getErrorMessage()));
+        requestToFuture.remove(request).complete(null);
       }
     }
 
@@ -651,6 +669,7 @@ public class SegmentAllocationQueue
     private final boolean useNonRootGenPartitionSpace;
 
     private final int hash;
+    private final String serialized;
 
     /**
      * Creates a new key for the given request. The batch for a unique key will
@@ -681,6 +700,7 @@ public class SegmentAllocationQueue
           preferredAllocationInterval,
           lockGranularity
       );
+      this.serialized = serialize();
 
       this.maxWaitTimeMillis = maxWaitTimeMillis;
     }
@@ -727,14 +747,19 @@ public class SegmentAllocationQueue
 
     @Override
     public String toString()
+    {
+      return serialized;
+    }
+
+    private String serialize()
     {
       return "{" +
-             "ds='" + dataSource + '\'' +
-             ", gr='" + groupId + '\'' +
-             ", incId=" + batchIncrementalId +
+             "datasource='" + dataSource + '\'' +
+             ", groupId='" + groupId + '\'' +
+             ", batchId=" + batchIncrementalId +
              ", lock=" + lockGranularity +
-             ", invl=" + preferredAllocationInterval +
-             ", slc=" + skipSegmentLineageCheck +
+             ", allocInterval=" + preferredAllocationInterval +
+             ", skipLineageCheck=" + skipSegmentLineageCheck +
              '}';
     }
   }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java
index c8861a92cd..b498834fba 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java
@@ -64,7 +64,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -984,9 +983,6 @@ public class SegmentAllocateActionTest
         return action.perform(task, taskActionTestKit.getTaskActionToolbox());
       }
     }
-    catch (ExecutionException e) {
-      return null;
-    }
     catch (Exception e) {
       throw new RuntimeException(e);
     }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java
index 974b3096f9..8adb18ddb7 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java
@@ -223,8 +223,7 @@ public class SegmentAllocationQueueTest
     executor.finishNextPendingTask();
 
     Assert.assertNotNull(getSegmentId(hourSegmentFuture));
-    Throwable t = Assert.assertThrows(ISE.class, () -> getSegmentId(halfHourSegmentFuture));
-    Assert.assertEquals("Storage coordinator could not allocate segment.", t.getMessage());
+    Assert.assertNull(getSegmentId(halfHourSegmentFuture));
   }
 
   @Test
@@ -309,7 +308,7 @@ public class SegmentAllocationQueueTest
 
     for (Future<SegmentIdWithShardSpec> future : segmentFutures) {
       Throwable t = Assert.assertThrows(ISE.class, () -> getSegmentId(future));
-      Assert.assertEquals("Cannot allocate segment if not leader", t.getMessage());
+      Assert.assertEquals("Not leader anymore", t.getMessage());
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org