You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/12 01:09:19 UTC
[03/50] [abbrv] kafka git commit: MINOR: Remove a couple of redundant
`CoreUtils.rm` methods
MINOR: Remove a couple of redundant `CoreUtils.rm` methods
Also:
* Rename remaining `CoreUtils.rm` to `delete` for consistency
* Use `try with resources` in `Utils` to simplify code
* Silence compiler warning due to exception catch clause in `TestUtils`
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #1153 from ijuma/remove-redundant-core-utils-rm
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/43d5078e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/43d5078e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/43d5078e
Branch: refs/heads/0.10.0
Commit: 43d5078e981bbb25fd81cdc8ba4c339cd2d3f3d2
Parents: 4c0660b
Author: Ismael Juma <is...@juma.me.uk>
Authored: Mon Mar 28 14:35:31 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Mar 28 14:35:31 2016 -0700
----------------------------------------------------------------------
.../org/apache/kafka/common/utils/Utils.java | 18 ++++---------
core/src/main/scala/kafka/log/Log.scala | 3 ++-
.../kafka/metrics/KafkaCSVMetricsReporter.scala | 7 ++---
core/src/main/scala/kafka/utils/CoreUtils.scala | 27 +-------------------
.../kafka/api/ProducerCompressionTest.scala | 2 +-
.../test/scala/other/kafka/StressTestLog.scala | 5 ++--
.../other/kafka/TestLinearWriteSpeed.scala | 7 +++--
.../unit/kafka/admin/AddPartitionsTest.scala | 2 +-
.../test/scala/unit/kafka/admin/AdminTest.scala | 12 +++++----
.../integration/KafkaServerTestHarness.scala | 2 +-
.../kafka/integration/RollingBounceTest.scala | 2 +-
.../integration/UncleanLeaderElectionTest.scala | 2 +-
.../unit/kafka/log/BrokerCompressionTest.scala | 9 +++----
.../test/scala/unit/kafka/log/CleanerTest.scala | 2 +-
.../kafka/log/LogCleanerIntegrationTest.scala | 5 ++--
.../scala/unit/kafka/log/LogManagerTest.scala | 5 ++--
.../src/test/scala/unit/kafka/log/LogTest.scala | 6 ++---
.../unit/kafka/producer/ProducerTest.scala | 4 +--
.../unit/kafka/server/AdvertiseBrokerTest.scala | 4 +--
.../server/HighwatermarkPersistenceTest.scala | 7 +++--
.../unit/kafka/server/LeaderElectionTest.scala | 2 +-
.../scala/unit/kafka/server/LogOffsetTest.scala | 3 ++-
.../unit/kafka/server/LogRecoveryTest.scala | 3 ++-
.../unit/kafka/server/OffsetCommitTest.scala | 9 +++----
.../server/ServerGenerateBrokerIdTest.scala | 18 ++++++-------
.../unit/kafka/server/ServerShutdownTest.scala | 6 ++---
.../unit/kafka/server/ServerStartupTest.scala | 6 ++---
.../test/scala/unit/kafka/utils/TestUtils.scala | 10 ++++----
.../scala/unit/kafka/zk/EmbeddedZookeeper.scala | 9 +++----
29 files changed, 86 insertions(+), 111 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 4c4225b..0167548 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -442,13 +442,8 @@ public class Utils {
*/
public static Properties loadProps(String filename) throws IOException, FileNotFoundException {
Properties props = new Properties();
- InputStream propStream = null;
- try {
- propStream = new FileInputStream(filename);
+ try (InputStream propStream = new FileInputStream(filename)) {
props.load(propStream);
- } finally {
- if (propStream != null)
- propStream.close();
}
return props;
}
@@ -540,16 +535,13 @@ public class Utils {
*/
public static String readFileAsString(String path, Charset charset) throws IOException {
if (charset == null) charset = Charset.defaultCharset();
- FileInputStream stream = new FileInputStream(new File(path));
- String result = new String();
- try {
+
+ try (FileInputStream stream = new FileInputStream(new File(path))) {
FileChannel fc = stream.getChannel();
MappedByteBuffer bb = fc.map(FileChannel.MapMode.READ_ONLY, 0, fc.size());
- result = charset.decode(bb).toString();
- } finally {
- stream.close();
+ return charset.decode(bb).toString();
}
- return result;
+
}
public static String readFileAsString(String path) throws IOException {
http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 8c956f7..81c19fa 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.common.record.TimestampType
import scala.collection.JavaConversions
import com.yammer.metrics.core.Gauge
+import org.apache.kafka.common.utils.Utils
object LogAppendInfo {
val UnknownLogAppendInfo = LogAppendInfo(-1, -1, Message.NoTimestamp, NoCompressionCodec, NoCompressionCodec, -1, -1, false)
@@ -714,7 +715,7 @@ class Log(val dir: File,
removeLogMetrics()
logSegments.foreach(_.delete())
segments.clear()
- CoreUtils.rm(dir)
+ Utils.delete(dir)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala b/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
index cc0da9f..686f692 100755
--- a/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
@@ -22,14 +22,15 @@ package kafka.metrics
import com.yammer.metrics.Metrics
import java.io.File
+
import com.yammer.metrics.reporting.CsvReporter
import java.util.concurrent.TimeUnit
-import kafka.utils.{CoreUtils, VerifiableProperties, Logging}
+import kafka.utils.{Logging, VerifiableProperties}
+import org.apache.kafka.common.utils.Utils
private trait KafkaCSVMetricsReporterMBean extends KafkaMetricsReporterMBean
-
private class KafkaCSVMetricsReporter extends KafkaMetricsReporter
with KafkaCSVMetricsReporterMBean
with Logging {
@@ -48,7 +49,7 @@ private class KafkaCSVMetricsReporter extends KafkaMetricsReporter
if (!initialized) {
val metricsConfig = new KafkaMetricsConfig(props)
csvDir = new File(props.getString("kafka.csv.metrics.dir", "kafka_metrics"))
- CoreUtils.rm(csvDir)
+ Utils.delete(csvDir)
csvDir.mkdirs()
underlying = new CsvReporter(Metrics.defaultRegistry(), csvDir)
if (props.getBoolean("kafka.csv.metrics.reporter.enabled", default = false)) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/main/scala/kafka/utils/CoreUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala
index b01f5cc..fe2bebf 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -79,35 +79,10 @@ object CoreUtils extends Logging {
}
/**
- * Recursively delete the given file/directory and any subfiles (if any exist)
- * @param file The root file at which to begin deleting
- */
- def rm(file: String): Unit = rm(new File(file))
-
- /**
* Recursively delete the list of files/directories and any subfiles (if any exist)
* @param files sequence of files to be deleted
*/
- def rm(files: Seq[String]): Unit = files.foreach(f => rm(new File(f)))
-
- /**
- * Recursively delete the given file/directory and any subfiles (if any exist)
- * @param file The root file at which to begin deleting
- */
- def rm(file: File) {
- if(file == null) {
- return
- } else if(file.isDirectory) {
- val files = file.listFiles()
- if(files != null) {
- for(f <- files)
- rm(f)
- }
- file.delete()
- } else {
- file.delete()
- }
- }
+ def delete(files: Seq[String]): Unit = files.foreach(f => Utils.delete(new File(f)))
/**
* Register the given mbean with the platform mbean server,
http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index c4a2bd7..fc1ceec 100755
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -55,7 +55,7 @@ class ProducerCompressionTest(compression: String) extends ZooKeeperTestHarness
@After
override def tearDown() {
server.shutdown
- CoreUtils.rm(server.config.logDirs)
+ CoreUtils.delete(server.config.logDirs)
super.tearDown()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/other/kafka/StressTestLog.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala
index dead0eb..8adc7e2 100755
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -19,11 +19,12 @@ package kafka
import java.util.Properties
import java.util.concurrent.atomic._
-import kafka.common._
+
import kafka.message._
import kafka.log._
import kafka.utils._
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException
+import org.apache.kafka.common.utils.Utils
/**
* A stress test that instantiates a log and then runs continual appends against it from one thread and continual reads against it
@@ -55,7 +56,7 @@ object StressTestLog {
running.set(false)
writer.join()
reader.join()
- CoreUtils.rm(dir)
+ Utils.delete(dir)
}
})
http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
index 236d857..db281bf 100755
--- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
+++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
@@ -21,11 +21,14 @@ import java.io._
import java.nio._
import java.nio.channels._
import java.util.{Properties, Random}
+
import kafka.log._
import kafka.utils._
import kafka.message._
+
import scala.math._
import joptsimple._
+import org.apache.kafka.common.utils.Utils
/**
* This test does linear writes using either a kafka log or a file and measures throughput and latency.
@@ -196,7 +199,7 @@ object TestLinearWriteSpeed {
}
class LogWritable(val dir: File, config: LogConfig, scheduler: Scheduler, val messages: ByteBufferMessageSet) extends Writable {
- CoreUtils.rm(dir)
+ Utils.delete(dir)
val log = new Log(dir, config, 0L, scheduler, SystemTime)
def write(): Int = {
log.append(messages, true)
@@ -204,7 +207,7 @@ object TestLinearWriteSpeed {
}
def close() {
log.close()
- CoreUtils.rm(log.dir)
+ Utils.delete(log.dir)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index b9bbace..ab8d363 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -59,7 +59,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
@After
override def tearDown() {
servers.foreach(_.shutdown())
- servers.foreach(server => CoreUtils.rm(server.config.logDirs))
+ servers.foreach(server => CoreUtils.delete(server.config.logDirs))
super.tearDown()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 8910e09..21bb6ab 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -22,13 +22,15 @@ import org.apache.kafka.common.protocol.ApiKeys
import org.junit.Assert._
import org.junit.Test
import java.util.Properties
+
import kafka.utils._
import kafka.log._
import kafka.zk.ZooKeeperTestHarness
-import kafka.utils.{Logging, ZkUtils, TestUtils}
-import kafka.common.{TopicExistsException, TopicAndPartition}
-import kafka.server.{ConfigType, KafkaServer, KafkaConfig}
+import kafka.utils.{Logging, TestUtils, ZkUtils}
+import kafka.common.{TopicAndPartition, TopicExistsException}
+import kafka.server.{ConfigType, KafkaConfig, KafkaServer}
import java.io.File
+
import TestUtils._
import scala.collection.{Map, immutable}
@@ -418,7 +420,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
assertEquals(newConfig, configInZk)
} finally {
server.shutdown()
- server.config.logDirs.foreach(CoreUtils.rm(_))
+ CoreUtils.delete(server.config.logDirs)
}
}
@@ -449,7 +451,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
assertEquals(new Quota(2000, true), server.apis.quotaManagers(ApiKeys.FETCH.id).quota(clientId))
} finally {
server.shutdown()
- server.config.logDirs.foreach(CoreUtils.rm(_))
+ CoreUtils.delete(server.config.logDirs)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 870b9ad..676772f 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -87,7 +87,7 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness {
@After
override def tearDown() {
servers.foreach(_.shutdown())
- servers.foreach(_.config.logDirs.foreach(CoreUtils.rm(_)))
+ servers.foreach(server => CoreUtils.delete(server.config.logDirs))
super.tearDown
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
index b931568..5221855 100755
--- a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
@@ -43,7 +43,7 @@ class RollingBounceTest extends ZooKeeperTestHarness {
@After
override def tearDown() {
servers.foreach(_.shutdown())
- servers.foreach(server => CoreUtils.rm(server.config.logDirs))
+ servers.foreach(server => CoreUtils.delete(server.config.logDirs))
super.tearDown()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index b725d8b..a8ba283 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -82,7 +82,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
@After
override def tearDown() {
servers.foreach(server => shutdownServer(server))
- servers.foreach(server => CoreUtils.rm(server.config.logDirs))
+ servers.foreach(server => CoreUtils.delete(server.config.logDirs))
// restore log levels
kafkaApisLogger.setLevel(Level.ERROR)
http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
index d0cb4a1..7487bc5 100755
--- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
@@ -17,7 +17,6 @@
package kafka.log
-import java.io.File
import kafka.utils._
import kafka.message._
import org.scalatest.junit.JUnitSuite
@@ -26,9 +25,9 @@ import org.junit.Assert._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.runners.Parameterized.Parameters
-import java.util.{Properties, Collection, ArrayList}
-import kafka.server.KafkaConfig
import org.apache.kafka.common.record.CompressionType
+import org.apache.kafka.common.utils.Utils
+import java.util.{Collection, Properties}
import scala.collection.JavaConversions._
@RunWith(value = classOf[Parameterized])
@@ -41,7 +40,7 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin
@After
def tearDown() {
- CoreUtils.rm(tmpDir)
+ Utils.delete(tmpDir)
}
/**
@@ -78,4 +77,4 @@ object BrokerCompressionTest {
messageCompression <- CompressionType.values
) yield Array(messageCompression.name, brokerCompression)
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
index 3773233..b6849f0 100755
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -49,7 +49,7 @@ class CleanerTest extends JUnitSuite {
@After
def teardown() {
- CoreUtils.rm(tmpdir)
+ Utils.delete(tmpdir)
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 6b91611..cc9873c 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -25,6 +25,7 @@ import kafka.message._
import kafka.server.OffsetCheckpoint
import kafka.utils._
import org.apache.kafka.common.record.CompressionType
+import org.apache.kafka.common.utils.Utils
import org.junit.Assert._
import org.junit._
import org.junit.runner.RunWith
@@ -119,7 +120,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
@After
def teardown() {
time.scheduler.shutdown()
- CoreUtils.rm(logDir)
+ Utils.delete(logDir)
}
/* create a cleaner instance and logs with the given parameters */
@@ -165,4 +166,4 @@ object LogCleanerIntegrationTest {
list.add(Array(codec.name))
list
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 46bfbed..f290d54 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -24,6 +24,7 @@ import kafka.common._
import kafka.server.OffsetCheckpoint
import kafka.utils._
import org.apache.kafka.common.errors.OffsetOutOfRangeException
+import org.apache.kafka.common.utils.Utils
import org.junit.Assert._
import org.junit.{After, Before, Test}
@@ -54,8 +55,8 @@ class LogManagerTest {
def tearDown() {
if(logManager != null)
logManager.shutdown()
- CoreUtils.rm(logDir)
- logManager.logDirs.foreach(CoreUtils.rm(_))
+ Utils.delete(logDir)
+ logManager.logDirs.foreach(Utils.delete)
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index c2eb817..4d75d53 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -19,7 +19,6 @@ package kafka.log
import java.io._
import java.util.Properties
-import java.util.concurrent.atomic._
import org.apache.kafka.common.errors.{CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException}
import kafka.api.ApiVersion
@@ -30,6 +29,7 @@ import org.junit.{After, Before, Test}
import kafka.message._
import kafka.utils._
import kafka.server.KafkaConfig
+import org.apache.kafka.common.utils.Utils
class LogTest extends JUnitSuite {
@@ -47,7 +47,7 @@ class LogTest extends JUnitSuite {
@After
def tearDown() {
- CoreUtils.rm(tmpDir)
+ Utils.delete(tmpDir)
}
def createEmptyLogs(dir: File, offsets: Int*) {
@@ -810,7 +810,7 @@ class LogTest extends JUnitSuite {
log = new Log(logDir, config, recoveryPoint, time.scheduler, time)
assertEquals(numMessages, log.logEndOffset)
assertEquals("Messages in the log after recovery should be the same.", messages, log.logSegments.flatMap(_.log.iterator.toList))
- CoreUtils.rm(logDir)
+ Utils.delete(logDir)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index 4a1ad5a..cf25cdb 100755
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -96,8 +96,8 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
server1.shutdown
server2.shutdown
- CoreUtils.rm(server1.config.logDirs)
- CoreUtils.rm(server2.config.logDirs)
+ CoreUtils.delete(server1.config.logDirs)
+ CoreUtils.delete(server2.config.logDirs)
super.tearDown()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
index 75fa664..dc17aa4 100755
--- a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
@@ -43,7 +43,7 @@ class AdvertiseBrokerTest extends ZooKeeperTestHarness {
@After
override def tearDown() {
server.shutdown()
- CoreUtils.rm(server.config.logDirs)
+ CoreUtils.delete(server.config.logDirs)
super.tearDown()
}
@@ -55,4 +55,4 @@ class AdvertiseBrokerTest extends ZooKeeperTestHarness {
assertEquals(advertisedPort, endpoint.port)
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index 2e66601..26e2817 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -18,16 +18,15 @@ package kafka.server
import kafka.log._
import java.io.File
-import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.utils.{Utils, MockTime => JMockTime}
import org.easymock.EasyMock
import org.junit._
import org.junit.Assert._
import kafka.common._
import kafka.cluster.Replica
-import kafka.utils.{ZkUtils, SystemTime, KafkaScheduler, TestUtils, MockTime, CoreUtils}
+import kafka.utils.{KafkaScheduler, MockTime, SystemTime, TestUtils, ZkUtils}
import java.util.concurrent.atomic.AtomicBoolean
-import org.apache.kafka.common.utils.{MockTime => JMockTime}
class HighwatermarkPersistenceTest {
@@ -42,7 +41,7 @@ class HighwatermarkPersistenceTest {
@After
def teardown() {
for(manager <- logManagers; dir <- manager.logDirs)
- CoreUtils.rm(dir)
+ Utils.delete(dir)
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index e84780a..7258980 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -58,7 +58,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
@After
override def tearDown() {
servers.foreach(_.shutdown())
- servers.foreach(server => CoreUtils.rm(server.config.logDirs))
+ servers.foreach(server => CoreUtils.delete(server.config.logDirs))
super.tearDown()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 8c86a7b..d5c696e 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -30,6 +30,7 @@ import kafka.utils._
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.utils.Utils
import org.junit.Assert._
import org.junit.{After, Before, Test}
@@ -57,7 +58,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
override def tearDown() {
simpleConsumer.close
server.shutdown
- CoreUtils.rm(logDir)
+ Utils.delete(logDir)
super.tearDown()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index e13bfd9..d37de76 100755
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -26,6 +26,7 @@ import java.io.File
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.{IntegerSerializer, StringSerializer}
+import org.apache.kafka.common.utils.Utils
import org.junit.{After, Before, Test}
import org.junit.Assert._
@@ -94,7 +95,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
producer.close()
for (server <- servers) {
server.shutdown()
- CoreUtils.rm(server.config.logDirs(0))
+ Utils.delete(new File(server.config.logDirs(0)))
}
super.tearDown()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index 1d5148b..29eaf2d 100755
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -19,13 +19,14 @@ package kafka.server
import kafka.api.{GroupCoordinatorRequest, OffsetCommitRequest, OffsetFetchRequest}
import kafka.consumer.SimpleConsumer
-import kafka.common.{OffsetMetadata, OffsetMetadataAndError, OffsetAndMetadata, TopicAndPartition}
+import kafka.common.{OffsetAndMetadata, OffsetMetadata, OffsetMetadataAndError, TopicAndPartition}
import kafka.utils._
import kafka.utils.TestUtils._
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.protocol.Errors
-
+import org.apache.kafka.common.utils.Utils
import org.junit.{After, Before, Test}
+import org.junit.Assert._
import java.util.Properties
import java.io.File
@@ -33,8 +34,6 @@ import java.io.File
import scala.util.Random
import scala.collection._
-import org.junit.Assert._
-
class OffsetCommitTest extends ZooKeeperTestHarness {
val random: Random = new Random()
val group = "test-group"
@@ -71,7 +70,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
override def tearDown() {
simpleConsumer.close
server.shutdown
- CoreUtils.rm(logDir)
+ Utils.delete(logDir)
super.tearDown()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
index c26ff13..8e25366 100755
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
@@ -51,7 +51,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
server1.startup()
assertEquals(server1.config.brokerId, 1001)
server1.shutdown()
- CoreUtils.rm(server1.config.logDirs)
+ CoreUtils.delete(server1.config.logDirs)
TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
}
@@ -75,9 +75,9 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
assertTrue(verifyBrokerMetadata(server1.config.logDirs,1001))
assertTrue(verifyBrokerMetadata(server2.config.logDirs,0))
assertTrue(verifyBrokerMetadata(server3.config.logDirs,1002))
- CoreUtils.rm(server1.config.logDirs)
- CoreUtils.rm(server2.config.logDirs)
- CoreUtils.rm(server3.config.logDirs)
+ CoreUtils.delete(server1.config.logDirs)
+ CoreUtils.delete(server2.config.logDirs)
+ CoreUtils.delete(server3.config.logDirs)
TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
}
@@ -93,7 +93,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
assertEquals(server3.config.brokerId,3)
server3.shutdown()
assertTrue(verifyBrokerMetadata(server3.config.logDirs,3))
- CoreUtils.rm(server3.config.logDirs)
+ CoreUtils.delete(server3.config.logDirs)
TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
}
@@ -116,7 +116,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
server1.startup()
server1.shutdown()
assertTrue(verifyBrokerMetadata(config1.logDirs, 1001))
- CoreUtils.rm(server1.config.logDirs)
+ CoreUtils.delete(server1.config.logDirs)
TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
}
@@ -133,7 +133,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
case e: kafka.common.InconsistentBrokerIdException => //success
}
server1.shutdown()
- CoreUtils.rm(server1.config.logDirs)
+ CoreUtils.delete(server1.config.logDirs)
TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
}
@@ -170,8 +170,8 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
// verify correct broker metadata was written
assertTrue(verifyBrokerMetadata(serverA.config.logDirs,1))
assertTrue(verifyBrokerMetadata(newServerB.config.logDirs,2))
- CoreUtils.rm(serverA.config.logDirs)
- CoreUtils.rm(newServerB.config.logDirs)
+ CoreUtils.delete(serverA.config.logDirs)
+ CoreUtils.delete(newServerB.config.logDirs)
TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 67f62d9..bc71edd 100755
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -104,7 +104,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
consumer.close()
producer.close()
server.shutdown()
- CoreUtils.rm(server.config.logDirs)
+ CoreUtils.delete(server.config.logDirs)
verifyNonDaemonThreadsStatus
}
@@ -117,7 +117,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
server.startup()
server.shutdown()
server.awaitShutdown()
- CoreUtils.rm(server.config.logDirs)
+ CoreUtils.delete(server.config.logDirs)
verifyNonDaemonThreadsStatus
}
@@ -145,7 +145,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
server.shutdown()
server.awaitShutdown()
}
- CoreUtils.rm(server.config.logDirs)
+ CoreUtils.delete(server.config.logDirs)
verifyNonDaemonThreadsStatus
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index b321a02..9b49365 100755
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -40,7 +40,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
assertTrue(pathExists)
server.shutdown()
- CoreUtils.rm(server.config.logDirs)
+ CoreUtils.delete(server.config.logDirs)
}
@Test
@@ -66,7 +66,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
assertEquals(brokerRegistration, zkUtils.readData(ZkUtils.BrokerIdsPath + "/" + brokerId)._1)
server1.shutdown()
- CoreUtils.rm(server1.config.logDirs)
+ CoreUtils.delete(server1.config.logDirs)
}
@Test
@@ -80,6 +80,6 @@ class ServerStartupTest extends ZooKeeperTestHarness {
assertEquals(brokerId, server.metadataCache.getAliveBrokers.head.id)
server.shutdown()
- CoreUtils.rm(server.config.logDirs)
+ CoreUtils.delete(server.config.logDirs)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 0730468..a1e7912 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -21,9 +21,8 @@ import java.io._
import java.nio._
import java.nio.file.Files
import java.nio.channels._
-import java.util
-import java.util.concurrent.{Callable, TimeUnit, Executors}
-import java.util.{Collections, Random, Properties}
+import java.util.concurrent.{Callable, Executors, TimeUnit}
+import java.util.{Collections, Properties, Random}
import java.security.cert.X509Certificate
import javax.net.ssl.X509TrustManager
import charset.Charset
@@ -52,6 +51,7 @@ import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.network.Mode
import org.apache.kafka.common.record.CompressionType
import org.apache.kafka.common.serialization.{ByteArraySerializer, Serializer}
+import org.apache.kafka.common.utils.Utils
import scala.collection.Map
import scala.collection.JavaConversions._
@@ -100,7 +100,7 @@ object TestUtils extends Logging {
Runtime.getRuntime().addShutdownHook(new Thread() {
override def run() = {
- CoreUtils.rm(f)
+ Utils.delete(f)
}
})
f
@@ -1115,7 +1115,7 @@ object TestUtils extends Logging {
}
} catch {
case ie: InterruptedException => failWithTimeout()
- case e => exceptions += e
+ case e: Throwable => exceptions += e
} finally {
threadPool.shutdownNow()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
index 5fa2f65..1030c46 100755
--- a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
+++ b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
@@ -21,10 +21,9 @@ import org.apache.zookeeper.server.ZooKeeperServer
import org.apache.zookeeper.server.NIOServerCnxnFactory
import kafka.utils.TestUtils
import java.net.InetSocketAddress
-import javax.security.auth.login.Configuration
+
import kafka.utils.CoreUtils
-import org.apache.kafka.common.security.JaasUtils
-import org.apache.kafka.common.utils.Utils.getPort
+import org.apache.kafka.common.utils.Utils
class EmbeddedZookeeper() {
val snapshotDir = TestUtils.tempDir()
@@ -40,8 +39,8 @@ class EmbeddedZookeeper() {
def shutdown() {
CoreUtils.swallow(zookeeper.shutdown())
CoreUtils.swallow(factory.shutdown())
- CoreUtils.rm(logDir)
- CoreUtils.rm(snapshotDir)
+ Utils.delete(logDir)
+ Utils.delete(snapshotDir)
}
}