You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Adrian McCague (JIRA)" <ji...@apache.org> on 2018/07/24 10:24:00 UTC

[jira] [Commented] (KAFKA-6767) OffsetCheckpoint write assumes parent directory exists

    [ https://issues.apache.org/jira/browse/KAFKA-6767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554059#comment-16554059 ] 

Adrian McCague commented on KAFKA-6767:
---------------------------------------

Hi [~guozhang] we are seeing this issue as well on a relatively frequent basis (Streams 1.1.0) here is my case:
{code:java}
task [1_1] Failed to write offset checkpoint file to /data/my-topology/1_1/.checkpoint: {}	
java.io.FileNotFoundException: /data/my-topology/1_1/.checkpoint.tmp (No such file or directory) at java.io.FileOutputStream.open0(FileOutputStream.java) at java.io.FileOutputStream.open(FileOutputStream.java:270) at java.io.FileOutputStream.<init>(FileOutputStream.java:213) at java.io.FileOutputStream.<init>(FileOutputStream.java:162) at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:78) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:320) at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:314) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:307) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:297) at org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67) at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:357) at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:347) at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:403) at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:994) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:811) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720){code}
This is the sub-topology:
{code:java}
Sub-topology: 1
Source: KSTREAM-SOURCE-0000000011 (topics: [x])
--> KSTREAM-PEEK-0000000012
Source: KSTREAM-SOURCE-0000000015 (topics: [y])
--> KSTREAM-PEEK-0000000016
Processor: KSTREAM-PEEK-0000000012 (stores: [])
--> KSTREAM-KEY-SELECT-0000000013
<-- KSTREAM-SOURCE-0000000011
Processor: KSTREAM-PEEK-0000000016 (stores: [])
--> KSTREAM-KEY-SELECT-0000000017
<-- KSTREAM-SOURCE-0000000015
Processor: KSTREAM-KEY-SELECT-0000000013 (stores: [])
--> KSTREAM-MAPVALUES-0000000014
<-- KSTREAM-PEEK-0000000012
Processor: KSTREAM-KEY-SELECT-0000000017 (stores: [])
--> KSTREAM-MAPVALUES-0000000018
<-- KSTREAM-PEEK-0000000016
Processor: KSTREAM-MAPVALUES-0000000014 (stores: [])
--> KSTREAM-MERGE-0000000019
<-- KSTREAM-KEY-SELECT-0000000013
Processor: KSTREAM-MAPVALUES-0000000018 (stores: [])
--> KSTREAM-MERGE-0000000019
<-- KSTREAM-KEY-SELECT-0000000017
Processor: KSTREAM-MERGE-0000000019 (stores: [])
--> KSTREAM-FILTER-0000000022
<-- KSTREAM-MAPVALUES-0000000014, KSTREAM-MAPVALUES-0000000018
Processor: KSTREAM-FILTER-0000000022 (stores: [])
--> KSTREAM-SINK-0000000021
<-- KSTREAM-MERGE-0000000019
Sink: KSTREAM-SINK-0000000021 (topic: z-store-repartition)
<-- KSTREAM-FILTER-0000000022{code}
So I believe this supports your theory that stateless tasks are attempting to checkpoint. In this case it appears the final sink is related to a repartition before a DSL Aggregate, which may hint towards the bug.

> OffsetCheckpoint write assumes parent directory exists
> ------------------------------------------------------
>
>                 Key: KAFKA-6767
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6767
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0
>            Reporter: Steven Schlansker
>            Priority: Minor
>
> We run Kafka Streams with RocksDB state stores on ephemeral disks (i.e. if an instance dies it is created from scratch, rather than reusing the existing RocksDB.)
> We routinely see:
> {code:java}
> 2018-04-09T19:14:35.004Z WARN <> [chat-0319e3c3-d8b2-4c60-bd69-a8484d8d4435-StreamThread-1] o.a.k.s.p.i.ProcessorStateManager - task [0_11] Failed to write offset checkpoint file to /mnt/mesos/sandbox/storage/chat/0_11/.checkpoint: {}
> java.io.FileNotFoundException: /mnt/mesos/sandbox/storage/chat/0_11/.checkpoint.tmp (No such file or directory)
> at java.io.FileOutputStream.open0(Native Method)
> at java.io.FileOutputStream.open(FileOutputStream.java:270)
> at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
> at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
> at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:78)
> at org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:320)
> at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:314)
> at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:307)
> at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:297)
> at org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)
> at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:357)
> at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:347)
> at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:403)
> at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:994)
> at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:811)
> at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
> at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720){code}
> Inspecting the state store directory, I can indeed see that {{chat/0_11}} does not exist (although many other partitions do).
>  
> Looking at the OffsetCheckpoint write method, it seems to try to open a new checkpoint file without first ensuring that the parent directory exists.
>  
> {code:java}
>     public void write(final Map<TopicPartition, Long> offsets) throws IOException {
>         // if there is no offsets, skip writing the file to save disk IOs
>         if (offsets.isEmpty()) {
>             return;
>         }
>         synchronized (lock) {
>             // write to temp file and then swap with the existing file
>             final File temp = new File(file.getAbsolutePath() + ".tmp");{code}
>  
> Either the OffsetCheckpoint class should initialize the directories if needed, or some precondition of it being called should ensure that is the case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)