You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "AmatyaAvadhanula (via GitHub)" <gi...@apache.org> on 2024/01/14 13:50:43 UTC

[PR] Release unneeded append locks after acquiring a new superseding append lock (druid)

AmatyaAvadhanula opened a new pull request, #15682:
URL: https://github.com/apache/druid/pull/15682

   **Changes:**
   
   1) When a task lock posse is created, add the task to it immediately. This would help avoid leaving behind phantom posses without associated tasks. This helps simplify changes made as part of https://github.com/apache/druid/pull/14966.
     A task posse could be created but the task may not be associated with it due to failures during segment allocation etc. This can leave behind an empty posse associated with a phantom lock that can block ingestion. The workaround was to restart the overlord. The change made in this PR associates the task with its requested posse immediately and will help the task lockbox to clean it up when `unlock` is called.
   
   2) When an appending task lock is acquired by a task, check if any of the conflicting locks is superseded by it. If yes, unlock the unneeded old locks associated with the task. 
      Consider an appending lock acquired for an hour granularity by a streaming task with concurrent append and replace. If a concurrent replacing task increments the version of the segments and changes the granularity from hour to day, the next allocation from the task for the same timestamp would create a day lock which supersedes the existing hour lock. If both the locks are associated with the task, this can lead to the transaction append action to fail because of an invalid state of the locks. The change in this PR would prevent such a state from occurring.
   
   
   This PR has:
   
   - [x] been self-reviewed.
      - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] a release note entry in the PR description.
   - [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
   - [x] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [x] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] added integration tests.
   - [ ] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Release unneeded append locks after acquiring a new superseding append lock (druid)

Posted by "abhishekagarwal87 (via GitHub)" <gi...@apache.org>.
abhishekagarwal87 commented on code in PR #15682:
URL: https://github.com/apache/druid/pull/15682#discussion_r1469609669


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -549,60 +514,14 @@ private void acquireTaskLock(SegmentAllocationHolder holder, boolean isTimeChunk
     }
   }
 
-  /**
-   * Adds the task to the found lock posse if not already added and updates
-   * in the metadata store. Marks the segment allocation as failed if the update
-   * did not succeed.
-   */
-  private void addTaskAndPersistLocks(SegmentAllocationHolder holder, boolean isTimeChunkLock)
-  {
-    final Task task = holder.task;
-    final TaskLock acquiredLock = holder.acquiredLock;
-
-    if (holder.taskLockPosse.addTask(task)) {
-      log.info("Added task [%s] to TaskLock [%s]", task.getId(), acquiredLock);
-
-      // This can also be batched later
-      boolean success = updateLockInStorage(task, acquiredLock);
-      if (success) {
-        holder.markSucceeded();
-      } else {
-        final Integer partitionId = isTimeChunkLock
-                                    ? null : ((SegmentLock) acquiredLock).getPartitionId();
-        unlock(task, holder.lockRequestInterval, partitionId);
-        holder.markFailed("Could not update task lock in metadata store.");
-      }
-    } else {
-      log.info("Task [%s] already present in TaskLock [%s]", task.getId(), acquiredLock.getGroupId());
-      holder.markSucceeded();
-    }
-  }
-
-  private boolean updateLockInStorage(Task task, TaskLock taskLock)
-  {
-    try {
-      taskStorage.addLock(task.getId(), taskLock);
-      return true;
-    }
-    catch (Exception e) {
-      log.makeAlert("Failed to persist lock in storage")
-         .addData("task", task.getId())
-         .addData("dataSource", taskLock.getDataSource())
-         .addData("interval", taskLock.getInterval())
-         .addData("version", taskLock.getVersion())
-         .emit();
-
-      return false;
-    }
-  }
-
-  private TaskLockPosse createOrFindLockPosse(LockRequest request)
+  private TaskLockPosse createOrFindLockPosse(LockRequest request, Task task, boolean persist)

Review Comment:
   Please add some comments to describe the logic. 



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -499,11 +468,7 @@ public List<SegmentAllocateResult> allocateSegments(
         allocateSegmentIds(dataSource, interval, skipSegmentLineageCheck, holderList.getPending());
         holderList.getPending().forEach(holder -> acquireTaskLock(holder, false));
       }
-      holderList.getPending().forEach(holder -> addTaskAndPersistLocks(holder, isTimeChunkLock));
-    }
-    catch (Exception e) {
-      holderList.clearStaleLocks(this);
-      throw e;
+      holderList.getPending().forEach(SegmentAllocationHolder::markSucceeded);

Review Comment:
   why is this changed? 



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -499,11 +468,7 @@ public List<SegmentAllocateResult> allocateSegments(
         allocateSegmentIds(dataSource, interval, skipSegmentLineageCheck, holderList.getPending());
         holderList.getPending().forEach(holder -> acquireTaskLock(holder, false));
       }
-      holderList.getPending().forEach(holder -> addTaskAndPersistLocks(holder, isTimeChunkLock));
-    }
-    catch (Exception e) {
-      holderList.clearStaleLocks(this);
-      throw e;
+      holderList.getPending().forEach(SegmentAllocationHolder::markSucceeded);

Review Comment:
   why is this changed? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Release unneeded append locks after acquiring a new superseding append lock (druid)

Posted by "AmatyaAvadhanula (via GitHub)" <gi...@apache.org>.
AmatyaAvadhanula merged PR #15682:
URL: https://github.com/apache/druid/pull/15682


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org