You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/25 23:20:54 UTC

kafka git commit: KAFKA-2718: Prevent temp directory being reused in parallel test runs

Repository: kafka
Updated Branches:
  refs/heads/trunk 617a91a23 -> 8ed271b82


KAFKA-2718: Prevent temp directory being reused in parallel test runs

Use Files.createTempDirectory to avoid reuse, for log directories create a new temp directory as parent

Author: Rajini Sivaram <ra...@googlemail.com>

Reviewers: Ismael Juma, Guozhang Wang

Closes #583 from rajinisivaram/KAFKA-2718-v2


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8ed271b8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8ed271b8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8ed271b8

Branch: refs/heads/trunk
Commit: 8ed271b82abd4641b594aff262512bd9eb588263
Parents: 617a91a
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Wed Nov 25 14:20:48 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 25 14:20:48 2015 -0800

----------------------------------------------------------------------
 .../test/scala/other/kafka/StressTestLog.scala  |  2 +-
 .../unit/kafka/log/BrokerCompressionTest.scala  | 10 +++-----
 .../test/scala/unit/kafka/log/CleanerTest.scala |  5 ++--
 .../src/test/scala/unit/kafka/log/LogTest.scala |  6 ++---
 .../test/scala/unit/kafka/utils/TestUtils.scala | 24 ++++++++++++++------
 5 files changed, 27 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8ed271b8/core/src/test/scala/other/kafka/StressTestLog.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala
index 225d77b..5f0e650 100755
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -32,7 +32,7 @@ object StressTestLog {
   val running = new AtomicBoolean(true)
   
   def main(args: Array[String]) {
-    val dir = TestUtils.tempDir()
+    val dir = TestUtils.randomPartitionLogDir(TestUtils.tempDir())
     val time = new MockTime
     val logProprties = new Properties()
     logProprties.put(LogConfig.SegmentBytesProp, 64*1024*1024: java.lang.Integer)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8ed271b8/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
index 6180b87..d0cb4a1 100755
--- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
@@ -34,18 +34,14 @@ import scala.collection.JavaConversions._
 @RunWith(value = classOf[Parameterized])
 class BrokerCompressionTest(messageCompression: String, brokerCompression: String) extends JUnitSuite {
 
-  var logDir: File = null
+  val tmpDir = TestUtils.tempDir()
+  val logDir = TestUtils.randomPartitionLogDir(tmpDir)
   val time = new MockTime(0)
   val logConfig = LogConfig()
 
-  @Before
-  def setUp() {
-    logDir = TestUtils.tempDir()
-  }
-
   @After
   def tearDown() {
-    CoreUtils.rm(logDir)
+    CoreUtils.rm(tmpDir)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/8ed271b8/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
index 49869aa..8ab9f91 100755
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -37,7 +37,8 @@ import scala.collection._
  */
 class CleanerTest extends JUnitSuite {
   
-  val dir = TestUtils.tempDir()
+  val tmpdir = TestUtils.tempDir()
+  val dir = TestUtils.randomPartitionLogDir(tmpdir)
   val logProps = new Properties()
   logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
   logProps.put(LogConfig.SegmentIndexBytesProp, 1024: java.lang.Integer)
@@ -48,7 +49,7 @@ class CleanerTest extends JUnitSuite {
   
   @After
   def teardown() {
-    CoreUtils.rm(dir)
+    CoreUtils.rm(tmpdir)
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/8ed271b8/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 7f0d9d6..f4427b9 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -30,21 +30,21 @@ import kafka.server.KafkaConfig
 
 class LogTest extends JUnitSuite {
   
-  var logDir: File = null
+  val tmpDir = TestUtils.tempDir()
+  val logDir = TestUtils.randomPartitionLogDir(tmpDir)
   val time = new MockTime(0)
   var config: KafkaConfig = null
   val logConfig = LogConfig()  
 
   @Before
   def setUp() {
-    logDir = TestUtils.tempDir()
     val props = TestUtils.createBrokerConfig(0, "127.0.0.1:1", port = -1)
     config = KafkaConfig.fromProps(props)
   }
 
   @After
   def tearDown() {
-    CoreUtils.rm(logDir)
+    CoreUtils.rm(tmpDir)
   }
   
   def createEmptyLogs(dir: File, offsets: Int*) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8ed271b8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 2f734f6..88c91f4 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -19,6 +19,7 @@ package kafka.utils
 
 import java.io._
 import java.nio._
+import java.nio.file.Files
 import java.nio.channels._
 import java.util.Random
 import java.util.Properties
@@ -35,7 +36,6 @@ import org.apache.kafka.test.TestSslUtils
 import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 
 import org.I0Itec.zkclient.{ZkClient, ZkConnection}
-
 import kafka.server._
 import kafka.producer._
 import kafka.message._
@@ -94,11 +94,9 @@ object TestUtils extends Logging {
    * Create a temporary relative directory
    */
   def tempRelativeDir(parent: String): File = {
-    new File(parent).mkdirs()
-    val attempts = 1000
-    val f = Iterator.continually(new File(parent, "kafka-" + random.nextInt(1000000)))
-                    .take(attempts).find(_.mkdir())
-                    .getOrElse(sys.error(s"Failed to create directory after $attempts attempts"))
+    val parentFile = new File(parent)
+    parentFile.mkdirs()
+    val f = Files.createTempDirectory(parentFile.toPath, "kafka-").toFile
     f.deleteOnExit()
 
     Runtime.getRuntime().addShutdownHook(new Thread() {
@@ -106,7 +104,19 @@ object TestUtils extends Logging {
         CoreUtils.rm(f)
       }
     })
-
+    f
+  }
+  
+  /**
+   * Create a random log directory in the format <string>-<int> used for Kafka partition logs.
+   * It is the responsibility of the caller to set up a shutdown hook for deletion of the directory.
+   */
+  def randomPartitionLogDir(parentDir: File): File = {
+    val attempts = 1000
+    val f = Iterator.continually(new File(parentDir, "kafka-" + random.nextInt(1000000)))
+                                  .take(attempts).find(_.mkdir())
+                                  .getOrElse(sys.error(s"Failed to create directory after $attempts attempts"))
+    f.deleteOnExit()
     f
   }