You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/25 23:20:54 UTC
kafka git commit: KAFKA-2718: Prevent temp directory being reused in
parallel test runs
Repository: kafka
Updated Branches:
refs/heads/trunk 617a91a23 -> 8ed271b82
KAFKA-2718: Prevent temp directory being reused in parallel test runs
Use Files.createTempDirectory to avoid reuse, for log directories create a new temp directory as parent
Author: Rajini Sivaram <ra...@googlemail.com>
Reviewers: Ismael Juma, Guozhang Wang
Closes #583 from rajinisivaram/KAFKA-2718-v2
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8ed271b8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8ed271b8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8ed271b8
Branch: refs/heads/trunk
Commit: 8ed271b82abd4641b594aff262512bd9eb588263
Parents: 617a91a
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Wed Nov 25 14:20:48 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 25 14:20:48 2015 -0800
----------------------------------------------------------------------
.../test/scala/other/kafka/StressTestLog.scala | 2 +-
.../unit/kafka/log/BrokerCompressionTest.scala | 10 +++-----
.../test/scala/unit/kafka/log/CleanerTest.scala | 5 ++--
.../src/test/scala/unit/kafka/log/LogTest.scala | 6 ++---
.../test/scala/unit/kafka/utils/TestUtils.scala | 24 ++++++++++++++------
5 files changed, 27 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/8ed271b8/core/src/test/scala/other/kafka/StressTestLog.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala
index 225d77b..5f0e650 100755
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -32,7 +32,7 @@ object StressTestLog {
val running = new AtomicBoolean(true)
def main(args: Array[String]) {
- val dir = TestUtils.tempDir()
+ val dir = TestUtils.randomPartitionLogDir(TestUtils.tempDir())
val time = new MockTime
val logProprties = new Properties()
logProprties.put(LogConfig.SegmentBytesProp, 64*1024*1024: java.lang.Integer)
http://git-wip-us.apache.org/repos/asf/kafka/blob/8ed271b8/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
index 6180b87..d0cb4a1 100755
--- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
@@ -34,18 +34,14 @@ import scala.collection.JavaConversions._
@RunWith(value = classOf[Parameterized])
class BrokerCompressionTest(messageCompression: String, brokerCompression: String) extends JUnitSuite {
- var logDir: File = null
+ val tmpDir = TestUtils.tempDir()
+ val logDir = TestUtils.randomPartitionLogDir(tmpDir)
val time = new MockTime(0)
val logConfig = LogConfig()
- @Before
- def setUp() {
- logDir = TestUtils.tempDir()
- }
-
@After
def tearDown() {
- CoreUtils.rm(logDir)
+ CoreUtils.rm(tmpDir)
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/8ed271b8/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
index 49869aa..8ab9f91 100755
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -37,7 +37,8 @@ import scala.collection._
*/
class CleanerTest extends JUnitSuite {
- val dir = TestUtils.tempDir()
+ val tmpdir = TestUtils.tempDir()
+ val dir = TestUtils.randomPartitionLogDir(tmpdir)
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
logProps.put(LogConfig.SegmentIndexBytesProp, 1024: java.lang.Integer)
@@ -48,7 +49,7 @@ class CleanerTest extends JUnitSuite {
@After
def teardown() {
- CoreUtils.rm(dir)
+ CoreUtils.rm(tmpdir)
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/8ed271b8/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 7f0d9d6..f4427b9 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -30,21 +30,21 @@ import kafka.server.KafkaConfig
class LogTest extends JUnitSuite {
- var logDir: File = null
+ val tmpDir = TestUtils.tempDir()
+ val logDir = TestUtils.randomPartitionLogDir(tmpDir)
val time = new MockTime(0)
var config: KafkaConfig = null
val logConfig = LogConfig()
@Before
def setUp() {
- logDir = TestUtils.tempDir()
val props = TestUtils.createBrokerConfig(0, "127.0.0.1:1", port = -1)
config = KafkaConfig.fromProps(props)
}
@After
def tearDown() {
- CoreUtils.rm(logDir)
+ CoreUtils.rm(tmpDir)
}
def createEmptyLogs(dir: File, offsets: Int*) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/8ed271b8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 2f734f6..88c91f4 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -19,6 +19,7 @@ package kafka.utils
import java.io._
import java.nio._
+import java.nio.file.Files
import java.nio.channels._
import java.util.Random
import java.util.Properties
@@ -35,7 +36,6 @@ import org.apache.kafka.test.TestSslUtils
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import org.I0Itec.zkclient.{ZkClient, ZkConnection}
-
import kafka.server._
import kafka.producer._
import kafka.message._
@@ -94,11 +94,9 @@ object TestUtils extends Logging {
* Create a temporary relative directory
*/
def tempRelativeDir(parent: String): File = {
- new File(parent).mkdirs()
- val attempts = 1000
- val f = Iterator.continually(new File(parent, "kafka-" + random.nextInt(1000000)))
- .take(attempts).find(_.mkdir())
- .getOrElse(sys.error(s"Failed to create directory after $attempts attempts"))
+ val parentFile = new File(parent)
+ parentFile.mkdirs()
+ val f = Files.createTempDirectory(parentFile.toPath, "kafka-").toFile
f.deleteOnExit()
Runtime.getRuntime().addShutdownHook(new Thread() {
@@ -106,7 +104,19 @@ object TestUtils extends Logging {
CoreUtils.rm(f)
}
})
-
+ f
+ }
+
+ /**
+ * Create a random log directory in the format <string>-<int> used for Kafka partition logs.
+ * It is the responsibility of the caller to set up a shutdown hook for deletion of the directory.
+ */
+ def randomPartitionLogDir(parentDir: File): File = {
+ val attempts = 1000
+ val f = Iterator.continually(new File(parentDir, "kafka-" + random.nextInt(1000000)))
+ .take(attempts).find(_.mkdir())
+ .getOrElse(sys.error(s"Failed to create directory after $attempts attempts"))
+ f.deleteOnExit()
f
}