You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/03/30 12:42:22 UTC
kafka git commit: KAFKA-4902;
Utils#delete should correctly handle I/O errors and symlinks
Repository: kafka
Updated Branches:
refs/heads/trunk 43fb2df7a -> d345d53e4
KAFKA-4902; Utils#delete should correctly handle I/O errors and symlinks
Author: Colin P. Mccabe <cm...@confluent.io>
Reviewers: Jun Rao <ju...@gmail.com>, Apurva Mehta <ap...@confluent.io>, Ismael Juma <is...@juma.me.uk>
Closes #2691 from cmccabe/KAFKA-4902
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d345d53e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d345d53e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d345d53e
Branch: refs/heads/trunk
Commit: d345d53e4e5e4f74707e2521aa635b93ba3f1e7b
Parents: 43fb2df
Author: Colin P. Mccabe <cm...@confluent.io>
Authored: Thu Mar 30 13:34:52 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu Mar 30 13:38:09 2017 +0100
----------------------------------------------------------------------
checkstyle/suppressions.xml | 2 +-
.../org/apache/kafka/common/utils/Utils.java | 37 ++++++++++++++------
.../apache/kafka/common/utils/UtilsTest.java | 24 +++++++++++++
.../java/org/apache/kafka/test/TestUtils.java | 9 ++++-
.../examples/pageview/JsonPOJOSerializer.java | 3 +-
.../kafka/streams/state/internals/Segment.java | 9 ++++-
.../kafka/streams/state/internals/Segments.java | 11 +++++-
.../internals/ProcessorStateManagerTest.java | 2 +-
.../processor/internals/StandbyTaskTest.java | 3 +-
.../processor/internals/StateDirectoryTest.java | 7 ++--
.../processor/internals/StreamTaskTest.java | 3 +-
.../StreamThreadStateStoreProviderTest.java | 2 +-
12 files changed, 87 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d345d53e/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index f722aba..ea1619e 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -8,7 +8,7 @@
<!-- Clients -->
<suppress checks="ClassFanOutComplexity"
- files="(Fetcher|ConsumerCoordinator|KafkaConsumer|KafkaProducer|SaslServerAuthenticator).java"/>
+ files="(Fetcher|ConsumerCoordinator|KafkaConsumer|KafkaProducer|SaslServerAuthenticator|Utils).java"/>
<suppress checks="ClassFanOutComplexity"
files=".*/protocol/Errors.java"/>
<suppress checks="ClassFanOutComplexity"
http://git-wip-us.apache.org/repos/asf/kafka/blob/d345d53e/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 796b019..a7d2a1b 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -35,9 +35,13 @@ import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
+import java.nio.file.FileVisitResult;
import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
import java.nio.file.StandardCopyOption;
+import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -545,19 +549,30 @@ public class Utils {
*
* @param file The root file at which to begin deleting
*/
- public static void delete(File file) {
- if (file == null) {
+ public static void delete(final File file) throws IOException {
+ if (file == null)
return;
- } else if (file.isDirectory()) {
- File[] files = file.listFiles();
- if (files != null) {
- for (File f : files)
- delete(f);
+ Files.walkFileTree(file.toPath(), new SimpleFileVisitor<Path>() {
+ @Override
+ public FileVisitResult visitFileFailed(Path path, IOException exc) throws IOException {
+ // If the root path did not exist, ignore the error; otherwise throw it.
+ if (exc instanceof NoSuchFileException && path.toFile().equals(file))
+ return FileVisitResult.TERMINATE;
+ throw exc;
}
- file.delete();
- } else {
- file.delete();
- }
+
+ @Override
+ public FileVisitResult visitFile(Path path, BasicFileAttributes attrs) throws IOException {
+ Files.delete(path);
+ return FileVisitResult.CONTINUE;
+ }
+
+ @Override
+ public FileVisitResult postVisitDirectory(Path path, IOException exc) throws IOException {
+ Files.delete(path);
+ return FileVisitResult.CONTINUE;
+ }
+ });
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/d345d53e/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index 512c29c..2d6d05c 100755
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -24,10 +24,12 @@ import org.junit.Test;
import java.io.Closeable;
import java.io.DataOutputStream;
import java.io.EOFException;
+import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Collections;
@@ -425,4 +427,26 @@ public class UtilsTest {
}
}
+ @Test(timeout = 120000)
+ public void testRecursiveDelete() throws IOException {
+ Utils.delete(null); // delete of null does nothing.
+
+ // Test that deleting a temporary file works.
+ File tempFile = TestUtils.tempFile();
+ Utils.delete(tempFile);
+ assertFalse(Files.exists(tempFile.toPath()));
+
+ // Test recursive deletes
+ File tempDir = TestUtils.tempDirectory();
+ File tempDir2 = TestUtils.tempDirectory(tempDir.toPath(), "a");
+ TestUtils.tempDirectory(tempDir.toPath(), "b");
+ TestUtils.tempDirectory(tempDir2.toPath(), "c");
+ Utils.delete(tempDir);
+ assertFalse(Files.exists(tempDir.toPath()));
+ assertFalse(Files.exists(tempDir2.toPath()));
+
+ // Test that deleting a non-existent directory hierarchy works.
+ Utils.delete(tempDir);
+ assertFalse(Files.exists(tempDir.toPath()));
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d345d53e/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index b5fada4..ea857a0 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -22,6 +22,8 @@ import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.xml.bind.DatatypeConverter;
import java.io.File;
@@ -50,6 +52,7 @@ import static org.junit.Assert.fail;
* Helper functions for writing unit tests
*/
public class TestUtils {
+ private static final Logger log = LoggerFactory.getLogger(TestUtils.class);
public static final File IO_TMP_DIR = new File(System.getProperty("java.io.tmpdir"));
@@ -165,7 +168,11 @@ public class TestUtils {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
- Utils.delete(file);
+ try {
+ Utils.delete(file);
+ } catch (IOException e) {
+ log.error("Error deleting {}", file.getAbsolutePath(), e);
+ }
}
});
http://git-wip-us.apache.org/repos/asf/kafka/blob/d345d53e/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java
index 6a5abea..625bda9 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java
@@ -31,8 +31,7 @@ public class JsonPOJOSerializer<T> implements Serializer<T> {
*/
public JsonPOJOSerializer() {
}
-
- @SuppressWarnings("unchecked")
+
@Override
public void configure(Map<String, ?> props, boolean isKey) {
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d345d53e/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
index f3f42a2..1311a27 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
@@ -20,6 +20,8 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.ProcessorContext;
+import java.io.IOException;
+
// Use the Bytes wrapper for underlying rocksDB keys since they are used for hashing data structures
class Segment extends RocksDBStore<Bytes, byte[]> {
public final long id;
@@ -29,7 +31,7 @@ class Segment extends RocksDBStore<Bytes, byte[]> {
this.id = id;
}
- void destroy() {
+ void destroy() throws IOException {
Utils.delete(dbDir);
}
@@ -41,4 +43,9 @@ class Segment extends RocksDBStore<Bytes, byte[]> {
open = true;
}
+
+ @Override
+ public String toString() {
+ return "Segment(id=" + id + ", name=" + name() + ")";
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d345d53e/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
index 5dedb40..a02f87e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
@@ -19,8 +19,11 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
@@ -34,6 +37,8 @@ import java.util.concurrent.ConcurrentHashMap;
* Manages the {@link Segment}s that are used by the {@link RocksDBSegmentedBytesStore}
*/
class Segments {
+ private static final Logger log = LoggerFactory.getLogger(Segments.class);
+
static final long MIN_SEGMENT_INTERVAL = 60 * 1000L;
private final ConcurrentHashMap<Long, Segment> segments = new ConcurrentHashMap<>();
@@ -167,7 +172,11 @@ class Segments {
if (segment != null && segment.id <= oldestSegmentId) {
segments.remove(segmentEntry.getKey());
segment.close();
- segment.destroy();
+ try {
+ segment.destroy();
+ } catch (IOException e) {
+ log.error("Error destroying {}", segment, e);
+ }
}
}
if (oldestSegmentId > minSegmentId) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d345d53e/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index a1e1c01..fe8c186 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -85,7 +85,7 @@ public class ProcessorStateManagerTest {
}
@After
- public void cleanup() {
+ public void cleanup() throws IOException {
Utils.delete(baseDir);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d345d53e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index b3a8dab..1442b9e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -45,6 +45,7 @@ import org.junit.Before;
import org.junit.Test;
import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -149,7 +150,7 @@ public class StandbyTaskTest {
}
@After
- public void cleanup() {
+ public void cleanup() throws IOException {
Utils.delete(baseDir);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d345d53e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index 23fed21..e8d2763 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -25,6 +25,7 @@ import org.junit.Before;
import org.junit.Test;
import java.io.File;
+import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.OverlappingFileLockException;
import java.nio.file.StandardOpenOption;
@@ -52,10 +53,8 @@ public class StateDirectoryTest {
}
@After
- public void cleanup() {
- if (stateDir.exists()) {
- Utils.delete(stateDir);
- }
+ public void cleanup() throws IOException {
+ Utils.delete(stateDir);
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/d345d53e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 7c9f46b..6256434 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -55,6 +55,7 @@ import org.junit.Test;
import org.junit.Before;
import java.io.File;
+import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -146,7 +147,7 @@ public class StreamTaskTest {
}
@After
- public void cleanup() {
+ public void cleanup() throws IOException {
if (task != null) {
try {
task.close();
http://git-wip-us.apache.org/repos/asf/kafka/blob/d345d53e/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 3886be8..3102685 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -132,7 +132,7 @@ public class StreamThreadStateStoreProviderTest {
}
@After
- public void cleanUp() {
+ public void cleanUp() throws IOException {
Utils.delete(stateDir);
}