You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2021/05/04 12:19:44 UTC

[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #11190: Temporarily skip compaction for locked intervals

abhishekagarwal87 commented on a change in pull request #11190:
URL: https://github.com/apache/druid/pull/11190#discussion_r625731629



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
##########
@@ -154,6 +160,17 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
           }
         }
 
+        // Skip all the locked intervals
+        LOG.debug(
+            "Skipping the following intervals for Compaction as they are currently locked: %s",
+            taskToLockedIntervals
+        );
+        taskToLockedIntervals.forEach(
+            (taskId, datasourceIntervals) -> compactionTaskIntervals
+                .computeIfAbsent(datasourceIntervals.getDatasource(), ds -> new ArrayList<>())
+                .addAll(datasourceIntervals.getIntervals())
+        );
+
         final CompactionSegmentIterator iterator =
             policy.reset(compactionConfigs, dataSources, compactionTaskIntervals);

Review comment:
       maybe `compactionTaskIntervals` needs to be called something else now since it can also include intervals for which there is no lock. 

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
##########
@@ -674,6 +675,64 @@ public TaskLock apply(TaskLockPosse taskLockPosse)
     }
   }
 
+  /**
+   * Gets a Map containing intervals locked by active tasks. Intervals locked
+   * by revoked TaskLocks are not included in the returned Map.
+   *
+   * @return Map from Task Id to locked intervals.
+   */
+  public Map<String, DatasourceIntervals> getLockedIntervals()
+  {
+    final Map<String, List<Interval>> taskToIntervals = new HashMap<>();
+    final Map<String, String> taskToDatasource = new HashMap<>();
+
+    // Take a lock and populate the maps
+    giant.lock();
+    try {
+      running.forEach(
+          (datasource, datasourceLocks) -> datasourceLocks.forEach(
+              (startTime, startTimeLocks) -> startTimeLocks.forEach(
+                  (interval, taskLockPosses) -> taskLockPosses.forEach(
+                      taskLockPosse -> taskLockPosse.taskIds.forEach(taskId -> {
+                        // Do not proceed if the lock is revoked
+                        if (taskLockPosse.getTaskLock().isRevoked()) {

Review comment:
       what is the rationale behind this? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org