You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/18 10:34:17 UTC

[GitHub] [kafka] cadonna commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks

cadonna commented on a change in pull request #10342:
URL: https://github.com/apache/kafka/pull/10342#discussion_r596720028



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -288,50 +277,31 @@ private String logPrefix() {
      * Get the lock for the {@link TaskId}s directory if it is available
      * @param taskId task id
      * @return true if successful
-     * @throws IOException if the file cannot be created or file handle cannot be grabbed, should be considered as fatal
      */
-    synchronized boolean lock(final TaskId taskId) throws IOException {
+    synchronized boolean lock(final TaskId taskId) {
         if (!hasPersistentStores) {
             return true;
         }
 
-        final File lockFile;
-        // we already have the lock so bail out here
-        final LockAndOwner lockAndOwner = locks.get(taskId);
-        if (lockAndOwner != null && lockAndOwner.owningThread.equals(Thread.currentThread().getName())) {
-            log.trace("{} Found cached state dir lock for task {}", logPrefix(), taskId);
-            return true;
-        } else if (lockAndOwner != null) {
-            // another thread owns the lock
-            return false;
-        }
-
-        try {
-            lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME);
-        } catch (final ProcessorStateException e) {
-            // directoryForTask could be throwing an exception if another thread
-            // has concurrently deleted the directory
-            return false;
-        }
-
-        final FileChannel channel;
-
-        try {
-            channel = getOrCreateFileChannel(taskId, lockFile.toPath());
-        } catch (final NoSuchFileException e) {
-            // FileChannel.open(..) could throw NoSuchFileException when there is another thread
-            // concurrently deleting the parent directory (i.e. the directory of the taskId) of the lock
-            // file, in this case we will return immediately indicating locking failed.
+        final String lockOwner = lockedTasksToStreamThreadOwner.get(taskId);
+        if (lockOwner != null) {
+            if (lockOwner.equals(Thread.currentThread().getName())) {
+                log.trace("{} Found cached state dir lock for task {}", logPrefix(), taskId);
+                // we already own the lock
+                return true;
+            } else {
+                // another thread owns the lock
+                return false;
+            }
+        } else if (!stateDir.exists()) {

Review comment:
       I am wondering if we should throw an `IllegalStateException` here, because it seems illegal to me to request a lock of a task directory in a state directory that does not exist. 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -477,15 +441,7 @@ private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) {
                         exception
                     );
                 } finally {
-                    try {
-                        unlock(id);
-                    } catch (final IOException exception) {
-                        log.warn(
-                            String.format("%s Swallowed the following exception during unlocking after deletion of obsolete " +
-                                "state directory %s for task %s:", logPrefix(), dirName, id),
-                            exception
-                        );
-                    }
+                    unlock(id);

Review comment:
       I guess you could make the same change on line 474 in `cleanRemovedTasksCalledByUser()`, couldn't you?

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
##########
@@ -240,32 +192,30 @@ public void testCloseStateManagerOnlyThrowsFirstExceptionWhenClean() throws IOEx
     }
 
     @Test
-    public void testCloseStateManagerThrowsExceptionWhenDirty() throws IOException {
+    public void testCloseStateManagerThrowsExceptionWhenDirty() {
         expect(stateManager.taskId()).andReturn(taskId);
 
         expect(stateDirectory.lock(taskId)).andReturn(true);
 
         stateManager.close();
-        expectLastCall();
+        expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
 
         stateDirectory.unlock(taskId);
-        expectLastCall().andThrow(new IOException("Timeout"));
+        expectLastCall();

Review comment:
       You can probably remove this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org