You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "sacha barber (JIRA)" <ji...@apache.org> on 2018/12/16 20:29:00 UTC

[jira] [Commented] (KAFKA-6647) KafkaStreams.cleanUp creates .lock file in directory its trying to clean (Windows OS)

    [ https://issues.apache.org/jira/browse/KAFKA-6647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16722566#comment-16722566 ] 

sacha barber commented on KAFKA-6647:
-------------------------------------

I would also like to add this seems to be caused by the 

TopologyTestDriver.close

 

if I add a method like this (scala sorry)

 

def cleanup(props:Properties, testDriver: TopologyTestDriver) = {

{code}

def cleanup(props:Properties, testDriver: TopologyTestDriver) = {

 try {
  testDriver.close
 } catch {
    case e: Exception => {
      delete(new File("C:\\data\\kafka-streams"))
    }
  }
}

def delete(file: File) {
  if (file.isDirectory)
    Option(file.listFiles).map(_.toList).getOrElse(Nil).foreach(delete(_))
    file.delete
  }

{code}

 

I see the Exception others are talking about above getting caught for the TopologyTestDriver close() call, But then I just resort to using regular 

java.io to do the actual delete for my tests. This does get my tests to pass ok, but why cant the Kafka code do this on windows, if my simple tests code works. I read the part about how windows will only delete file on next file assignment, but to my eyes my simple tests using delete worked here, whilst Kafka TopologyTestDriver close() did not

 

I am using Windows 10.0, and am using Kafka 2.1.0

 

And have changed my state directory to this one

 

{code}

props.put(StreamsConfig.STATE_DIR_CONFIG, s"C:\\data\\kafka-streams".asInstanceOf[Object])

{code}

 

Any ideas when this will get fixed properly?

> KafkaStreams.cleanUp creates .lock file in directory its trying to clean (Windows OS)
> -------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6647
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6647
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.1
>         Environment: windows 10.
> java version "1.8.0_162"
> Java(TM) SE Runtime Environment (build 1.8.0_162-b12)
> Java HotSpot(TM) 64-Bit Server VM (build 25.162-b12, mixed mode)
> org.apache.kafka:kafka-streams:1.0.1
> Kafka commitId : c0518aa65f25317e
>            Reporter: George Bloggs
>            Priority: Minor
>              Labels: streams
>
> When calling kafkaStreams.cleanUp() before starting a stream the StateDirectory.cleanRemovedTasks() method contains this check:
> {code:java}
> ... Line 240
>                   if (lock(id, 0)) {
>                         long now = time.milliseconds();
>                         long lastModifiedMs = taskDir.lastModified();
>                         if (now > lastModifiedMs + cleanupDelayMs) {
>                             log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs);
>                             Utils.delete(taskDir);
>                         }
>                     }
> {code}
> The check for lock(id,0) will create a .lock file in the directory that subsequently is going to be deleted. If the .lock file already exists from a previous run the attempt to delete the .lock file fails with AccessDeniedException.
> This leaves the .lock file in the taskDir. Calling Utils.delete(taskDir) will then attempt to remove the taskDir path calling Files.delete(path).
> The call to files.delete(path) in postVisitDirectory will then fail java.nio.file.DirectoryNotEmptyException as the failed attempt to delete the .lock file left the directory not empty. (o.a.k.s.p.internals.StateDirectory       : stream-thread [restartedMain] Failed to lock the state directory due to an unexpected exception)
> This seems to then cause issues using streams from a topic to an inMemory store.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)