You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/02/01 18:18:46 UTC

[kafka] 01/02: KAFKA-6499: Do not write offset checkpoint file with empty offset map (#4492)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 69e6be673e396fe9db59e0222b8e6c5a4e11a48e
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Thu Feb 1 10:11:29 2018 -0800

    KAFKA-6499: Do not write offset checkpoint file with empty offset map (#4492)
    
    * In Checkpoint.write(), if the offset map passed in is empty, skip the writing of the file which would only contain version number and the empty size. From the reading pov, it is the same as no file existed.
    * Add related unit tests.
    * Minor fixes on log4j messages.
    
    Reviewers: Bill Bejeck <bi...@confluent.io>, Damian Guy <da...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
 .../processor/internals/AbstractStateManager.java   |  2 +-
 .../processor/internals/GlobalStateManagerImpl.java |  2 +-
 .../processor/internals/ProcessorStateManager.java  | 13 +++++++------
 .../streams/state/internals/OffsetCheckpoint.java   |  5 +++++
 .../internals/ProcessorStateManagerTest.java        |  6 +++---
 .../state/internals/OffsetCheckpointTest.java       | 21 +++++++++++++++++++--
 6 files changed, 36 insertions(+), 13 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
index d387762..b270e03 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
@@ -65,7 +65,7 @@ abstract class AbstractStateManager implements StateManager {
         try {
             checkpoint.write(checkpointableOffsets);
         } catch (final IOException fatalException) {
-            log.error("Failed to update checkpoint file for global stores.", fatalException);
+            log.error("Failed to write offset checkpoint file to {} while re-initializing {}: {}", checkpoint, stateStores, fatalException);
             throw new StreamsException("Failed to reinitialize global store.", fatalException);
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 2d4ee8f..56e6bed 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -339,7 +339,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
             try {
                 checkpoint.write(checkpointableOffsets);
             } catch (IOException e) {
-                log.warn("Failed to write offsets checkpoint for global globalStores", e);
+                log.warn("Failed to write offset checkpoint file to {} for global stores: {}", checkpoint, e);
             }
         }
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 1ee0e14..e7a23bd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -294,7 +294,6 @@ public class ProcessorStateManager extends AbstractStateManager {
     // write the checkpoint
     @Override
     public void checkpoint(final Map<TopicPartition, Long> ackedOffsets) {
-        log.trace("Writing checkpoint: {}", ackedOffsets);
         checkpointableOffsets.putAll(changelogReader.restoredOffsets());
         for (final StateStore store : stores.values()) {
             final String storeName = store.name();
@@ -311,14 +310,16 @@ public class ProcessorStateManager extends AbstractStateManager {
                 }
             }
         }
-        // write the checkpoint file before closing, to indicate clean shutdown
+        // write the checkpoint file before closing
+        if (checkpoint == null) {
+            checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME));
+        }
+
+        log.trace("Writing checkpoint: {}", checkpointableOffsets);
         try {
-            if (checkpoint == null) {
-                checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME));
-            }
             checkpoint.write(checkpointableOffsets);
         } catch (final IOException e) {
-            log.warn("Failed to write checkpoint file to {}:", new File(baseDir, CHECKPOINT_FILE_NAME), e);
+            log.warn("Failed to write offset checkpoint file to {}: {}", checkpoint, e);
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
index 8c14737..9f0e1f8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
@@ -66,6 +66,11 @@ public class OffsetCheckpoint {
      * @throws IOException if any file operation fails with an IO exception
      */
     public void write(final Map<TopicPartition, Long> offsets) throws IOException {
+        // if there is no offsets, skip writing the file to save disk IOs
+        if (offsets.isEmpty()) {
+            return;
+        }
+
         synchronized (lock) {
             // write to temp file and then swap with the existing file
             final File temp = new File(file.getAbsolutePath() + ".tmp");
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index ab9abc3..31f07cc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -309,8 +309,8 @@ public class ProcessorStateManagerTest {
             false,
             logContext);
         try {
-            // make sure the checkpoint file isn't deleted
-            assertTrue(checkpointFile.exists());
+            // make sure the checkpoint file is not written yet
+            assertFalse(checkpointFile.exists());
 
             stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
             stateMgr.register(nonPersistentStore, nonPersistentStore.stateRestoreCallback);
@@ -630,7 +630,7 @@ public class ProcessorStateManagerTest {
 
     @Test
     public void shouldDeleteCheckpointFileOnCreationIfEosEnabled() throws IOException {
-        checkpoint.write(Collections.<TopicPartition, Long>emptyMap());
+        checkpoint.write(Collections.singletonMap(new TopicPartition(persistentStoreTopicName, 1), 123L));
         assertTrue(checkpointFile.exists());
 
         ProcessorStateManager stateManager = null;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
index 3b78d05..54cd3df 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -34,8 +35,8 @@ public class OffsetCheckpointTest {
 
     @Test
     public void testReadWrite() throws IOException {
-        File f = TestUtils.tempFile();
-        OffsetCheckpoint checkpoint = new OffsetCheckpoint(f);
+        final File f = TestUtils.tempFile();
+        final OffsetCheckpoint checkpoint = new OffsetCheckpoint(f);
 
         try {
             Map<TopicPartition, Long> offsets = new HashMap<>();
@@ -56,4 +57,20 @@ public class OffsetCheckpointTest {
             checkpoint.delete();
         }
     }
+
+    @Test
+    public void shouldNotWriteCheckpointWhenNoOffsets() throws IOException {
+        // we do not need to worry about file name uniqueness since this file should not be created
+        final File f = new File(TestUtils.tempDirectory().getAbsolutePath(), "kafka.tmp");
+        final OffsetCheckpoint checkpoint = new OffsetCheckpoint(f);
+
+        checkpoint.write(Collections.<TopicPartition, Long>emptyMap());
+
+        assertFalse(f.exists());
+
+        assertEquals(Collections.<TopicPartition, Long>emptyMap(), checkpoint.read());
+
+        // deleting a non-exist checkpoint file should be fine
+        checkpoint.delete();
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.