You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/03/30 12:42:22 UTC

kafka git commit: KAFKA-4902; Utils#delete should correctly handle I/O errors and symlinks

Repository: kafka
Updated Branches:
  refs/heads/trunk 43fb2df7a -> d345d53e4


KAFKA-4902; Utils#delete should correctly handle I/O errors and symlinks

Author: Colin P. Mccabe <cm...@confluent.io>

Reviewers: Jun Rao <ju...@gmail.com>, Apurva Mehta <ap...@confluent.io>, Ismael Juma <is...@juma.me.uk>

Closes #2691 from cmccabe/KAFKA-4902


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d345d53e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d345d53e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d345d53e

Branch: refs/heads/trunk
Commit: d345d53e4e5e4f74707e2521aa635b93ba3f1e7b
Parents: 43fb2df
Author: Colin P. Mccabe <cm...@confluent.io>
Authored: Thu Mar 30 13:34:52 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu Mar 30 13:38:09 2017 +0100

----------------------------------------------------------------------
 checkstyle/suppressions.xml                     |  2 +-
 .../org/apache/kafka/common/utils/Utils.java    | 37 ++++++++++++++------
 .../apache/kafka/common/utils/UtilsTest.java    | 24 +++++++++++++
 .../java/org/apache/kafka/test/TestUtils.java   |  9 ++++-
 .../examples/pageview/JsonPOJOSerializer.java   |  3 +-
 .../kafka/streams/state/internals/Segment.java  |  9 ++++-
 .../kafka/streams/state/internals/Segments.java | 11 +++++-
 .../internals/ProcessorStateManagerTest.java    |  2 +-
 .../processor/internals/StandbyTaskTest.java    |  3 +-
 .../processor/internals/StateDirectoryTest.java |  7 ++--
 .../processor/internals/StreamTaskTest.java     |  3 +-
 .../StreamThreadStateStoreProviderTest.java     |  2 +-
 12 files changed, 87 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d345d53e/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index f722aba..ea1619e 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -8,7 +8,7 @@
 
     <!-- Clients -->
     <suppress checks="ClassFanOutComplexity"
-              files="(Fetcher|ConsumerCoordinator|KafkaConsumer|KafkaProducer|SaslServerAuthenticator).java"/>
+              files="(Fetcher|ConsumerCoordinator|KafkaConsumer|KafkaProducer|SaslServerAuthenticator|Utils).java"/>
     <suppress checks="ClassFanOutComplexity"
               files=".*/protocol/Errors.java"/>
     <suppress checks="ClassFanOutComplexity"

http://git-wip-us.apache.org/repos/asf/kafka/blob/d345d53e/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 796b019..a7d2a1b 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -35,9 +35,13 @@ import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.FileVisitResult;
 import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
 import java.nio.file.StandardCopyOption;
+import java.nio.file.attribute.BasicFileAttributes;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -545,19 +549,30 @@ public class Utils {
      *
      * @param file The root file at which to begin deleting
      */
-    public static void delete(File file) {
-        if (file == null) {
+    public static void delete(final File file) throws IOException {
+        if (file == null)
             return;
-        } else if (file.isDirectory()) {
-            File[] files = file.listFiles();
-            if (files != null) {
-                for (File f : files)
-                    delete(f);
+        Files.walkFileTree(file.toPath(), new SimpleFileVisitor<Path>() {
+            @Override
+            public FileVisitResult visitFileFailed(Path path, IOException exc) throws IOException {
+                // If the root path did not exist, ignore the error; otherwise throw it.
+                if (exc instanceof NoSuchFileException && path.toFile().equals(file))
+                    return FileVisitResult.TERMINATE;
+                throw exc;
             }
-            file.delete();
-        } else {
-            file.delete();
-        }
+
+            @Override
+            public FileVisitResult visitFile(Path path, BasicFileAttributes attrs) throws IOException {
+                Files.delete(path);
+                return FileVisitResult.CONTINUE;
+            }
+
+            @Override
+            public FileVisitResult postVisitDirectory(Path path, IOException exc) throws IOException {
+                Files.delete(path);
+                return FileVisitResult.CONTINUE;
+            }
+        });
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/d345d53e/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index 512c29c..2d6d05c 100755
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -24,10 +24,12 @@ import org.junit.Test;
 import java.io.Closeable;
 import java.io.DataOutputStream;
 import java.io.EOFException;
+import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
 import java.nio.file.StandardOpenOption;
 import java.util.Arrays;
 import java.util.Collections;
@@ -425,4 +427,26 @@ public class UtilsTest {
         }
     }
 
+    @Test(timeout = 120000)
+    public void testRecursiveDelete() throws IOException {
+        Utils.delete(null); // delete of null does nothing.
+
+        // Test that deleting a temporary file works.
+        File tempFile = TestUtils.tempFile();
+        Utils.delete(tempFile);
+        assertFalse(Files.exists(tempFile.toPath()));
+
+        // Test recursive deletes
+        File tempDir = TestUtils.tempDirectory();
+        File tempDir2 = TestUtils.tempDirectory(tempDir.toPath(), "a");
+        TestUtils.tempDirectory(tempDir.toPath(), "b");
+        TestUtils.tempDirectory(tempDir2.toPath(), "c");
+        Utils.delete(tempDir);
+        assertFalse(Files.exists(tempDir.toPath()));
+        assertFalse(Files.exists(tempDir2.toPath()));
+
+        // Test that deleting a non-existent directory hierarchy works.
+        Utils.delete(tempDir);
+        assertFalse(Files.exists(tempDir.toPath()));
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d345d53e/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index b5fada4..ea857a0 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -22,6 +22,8 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.xml.bind.DatatypeConverter;
 import java.io.File;
@@ -50,6 +52,7 @@ import static org.junit.Assert.fail;
  * Helper functions for writing unit tests
  */
 public class TestUtils {
+    private static final Logger log = LoggerFactory.getLogger(TestUtils.class);
 
     public static final File IO_TMP_DIR = new File(System.getProperty("java.io.tmpdir"));
 
@@ -165,7 +168,11 @@ public class TestUtils {
         Runtime.getRuntime().addShutdownHook(new Thread() {
             @Override
             public void run() {
-                Utils.delete(file);
+                try {
+                    Utils.delete(file);
+                } catch (IOException e) {
+                    log.error("Error deleting {}", file.getAbsolutePath(), e);
+                }
             }
         });
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d345d53e/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java
index 6a5abea..625bda9 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java
@@ -31,8 +31,7 @@ public class JsonPOJOSerializer<T> implements Serializer<T> {
      */
     public JsonPOJOSerializer() {
     }
-
-    @SuppressWarnings("unchecked")
+    
     @Override
     public void configure(Map<String, ?> props, boolean isKey) {
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d345d53e/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
index f3f42a2..1311a27 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
@@ -20,6 +20,8 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.processor.ProcessorContext;
 
+import java.io.IOException;
+
 // Use the Bytes wrapper for underlying rocksDB keys since they are used for hashing data structures
 class Segment extends RocksDBStore<Bytes, byte[]> {
     public final long id;
@@ -29,7 +31,7 @@ class Segment extends RocksDBStore<Bytes, byte[]> {
         this.id = id;
     }
 
-    void destroy() {
+    void destroy() throws IOException {
         Utils.delete(dbDir);
     }
 
@@ -41,4 +43,9 @@ class Segment extends RocksDBStore<Bytes, byte[]> {
 
         open = true;
     }
+
+    @Override
+    public String toString() {
+        return "Segment(id=" + id + ", name=" + name() + ")";
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d345d53e/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
index 5dedb40..a02f87e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
@@ -19,8 +19,11 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -34,6 +37,8 @@ import java.util.concurrent.ConcurrentHashMap;
  * Manages the {@link Segment}s that are used by the {@link RocksDBSegmentedBytesStore}
  */
 class Segments {
+    private static final Logger log = LoggerFactory.getLogger(Segments.class);
+
     static final long MIN_SEGMENT_INTERVAL = 60 * 1000L;
 
     private final ConcurrentHashMap<Long, Segment> segments = new ConcurrentHashMap<>();
@@ -167,7 +172,11 @@ class Segments {
             if (segment != null && segment.id <= oldestSegmentId) {
                 segments.remove(segmentEntry.getKey());
                 segment.close();
-                segment.destroy();
+                try {
+                    segment.destroy();
+                } catch (IOException e) {
+                    log.error("Error destroying {}", segment, e);
+                }
             }
         }
         if (oldestSegmentId > minSegmentId) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d345d53e/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index a1e1c01..fe8c186 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -85,7 +85,7 @@ public class ProcessorStateManagerTest {
     }
 
     @After
-    public void cleanup() {
+    public void cleanup() throws IOException {
         Utils.delete(baseDir);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d345d53e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index b3a8dab..1442b9e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -45,6 +45,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -149,7 +150,7 @@ public class StandbyTaskTest {
     }
 
     @After
-    public void cleanup() {
+    public void cleanup() throws IOException {
         Utils.delete(baseDir);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d345d53e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index 23fed21..e8d2763 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -25,6 +25,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
+import java.io.IOException;
 import java.nio.channels.FileChannel;
 import java.nio.channels.OverlappingFileLockException;
 import java.nio.file.StandardOpenOption;
@@ -52,10 +53,8 @@ public class StateDirectoryTest {
     }
 
     @After
-    public void cleanup() {
-        if (stateDir.exists()) {
-            Utils.delete(stateDir);
-        }
+    public void cleanup() throws IOException {
+        Utils.delete(stateDir);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/d345d53e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 7c9f46b..6256434 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -55,6 +55,7 @@ import org.junit.Test;
 import org.junit.Before;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -146,7 +147,7 @@ public class StreamTaskTest {
     }
 
     @After
-    public void cleanup() {
+    public void cleanup() throws IOException {
         if (task != null) {
             try {
                 task.close();

http://git-wip-us.apache.org/repos/asf/kafka/blob/d345d53e/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 3886be8..3102685 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -132,7 +132,7 @@ public class StreamThreadStateStoreProviderTest {
     }
 
     @After
-    public void cleanUp() {
+    public void cleanUp() throws IOException {
         Utils.delete(stateDir);
     }