You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/04/07 11:41:46 UTC
kafka git commit: KAFKA-4899;
Fix findbugs warnings in kafka-core [Forced Update!]
Repository: kafka
Updated Branches:
refs/heads/trunk 96ac0f307 -> ab148f39a (forced update)
KAFKA-4899; Fix findbugs warnings in kafka-core
Author: Colin P. Mccabe <cm...@confluent.io>
Reviewers: Jozef Koval <jo...@protonmail.ch>, Ismael Juma <is...@juma.me.uk>
Closes #2687 from cmccabe/KAFKA-4899
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ab148f39
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ab148f39
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ab148f39
Branch: refs/heads/trunk
Commit: ab148f39ae64ecbaa84f49c38b3cab8a0a0fd846
Parents: 359a685
Author: Colin P. Mccabe <cm...@confluent.io>
Authored: Fri Apr 7 12:29:48 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Apr 7 12:41:35 2017 +0100
----------------------------------------------------------------------
.../src/main/scala/kafka/admin/AdminUtils.scala | 4 +-
.../kafka/consumer/ConsumerFetcherThread.scala | 1 -
.../main/scala/kafka/consumer/TopicCount.scala | 9 --
.../main/scala/kafka/javaapi/FetchRequest.scala | 12 +--
.../scala/kafka/javaapi/FetchResponse.scala | 11 ++-
.../javaapi/GroupCoordinatorResponse.scala | 11 ++-
.../kafka/javaapi/OffsetCommitRequest.scala | 16 ++--
.../kafka/javaapi/OffsetFetchRequest.scala | 16 ++--
.../scala/kafka/javaapi/OffsetRequest.scala | 17 ++--
.../scala/kafka/javaapi/OffsetResponse.scala | 16 ++--
.../kafka/javaapi/TopicMetadataResponse.scala | 11 ++-
core/src/main/scala/kafka/log/Log.scala | 19 ++--
core/src/main/scala/kafka/log/LogManager.scala | 5 +-
core/src/main/scala/kafka/log/LogSegment.scala | 9 +-
.../kafka/metrics/KafkaCSVMetricsReporter.scala | 3 +-
.../kafka/server/BrokerMetadataCheckpoint.scala | 14 ++-
.../scala/kafka/server/ClientQuotaManager.scala | 2 +-
.../server/checkpoints/CheckpointFile.scala | 8 +-
.../scala/kafka/tools/ImportZkOffsets.scala | 31 ++++---
.../main/scala/kafka/tools/MirrorMaker.scala | 17 ++--
core/src/main/scala/kafka/utils/FileLock.scala | 6 +-
gradle/findbugs-exclude.xml | 98 ++++++++++++++++++--
22 files changed, 204 insertions(+), 132 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab148f39/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index d4ae4ff..c5d7f12 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -307,8 +307,8 @@ object AdminUtils extends Logging with AdminUtilities {
if (brokerList.size != brokerList.toSet.size)
throw new AdminOperationException("duplicate brokers in replica assignment: " + brokerList)
if (checkBrokerAvailable && !brokerList.toSet.subsetOf(availableBrokerList))
- throw new AdminOperationException("some specified brokers not available. specified brokers: " + brokerList.toString +
- "available broker:" + availableBrokerList.toString)
+ throw new AdminOperationException("some specified brokers not available. specified brokers: " + brokerList +
+ "available broker:" + availableBrokerList)
ret.put(partitionId, brokerList.toList)
if (ret(partitionId).size != ret(startPartitionId).size)
throw new AdminOperationException("partition " + i + " has different replication factor: " + brokerList)
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab148f39/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index 8d712f4..ec60220 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -83,7 +83,6 @@ class ConsumerFetcherThread(name: String,
def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = {
val startTimestamp = config.autoOffsetReset match {
case OffsetRequest.SmallestTimeString => OffsetRequest.EarliestTime
- case OffsetRequest.LargestTimeString => OffsetRequest.LatestTime
case _ => OffsetRequest.LatestTime
}
val topicAndPartition = TopicAndPartition(topicPartition.topic, topicPartition.partition)
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab148f39/core/src/main/scala/kafka/consumer/TopicCount.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala
index eb035f2..f423f8c 100755
--- a/core/src/main/scala/kafka/consumer/TopicCount.scala
+++ b/core/src/main/scala/kafka/consumer/TopicCount.scala
@@ -18,7 +18,6 @@
package kafka.consumer
import scala.collection._
-import org.I0Itec.zkclient.ZkClient
import kafka.utils.{Json, ZKGroupDirs, ZkUtils, Logging, CoreUtils}
import kafka.common.KafkaException
@@ -112,14 +111,6 @@ private[kafka] class StaticTopicCount(val consumerIdString: String,
def getConsumerThreadIdsPerTopic = TopicCount.makeConsumerThreadIdsPerTopic(consumerIdString, topicCountMap)
- override def equals(obj: Any): Boolean = {
- obj match {
- case null => false
- case n: StaticTopicCount => consumerIdString == n.consumerIdString && topicCountMap == n.topicCountMap
- case _ => false
- }
- }
-
def getTopicCountMap = topicCountMap
def pattern = TopicCount.staticPattern
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab148f39/core/src/main/scala/kafka/javaapi/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/FetchRequest.scala b/core/src/main/scala/kafka/javaapi/FetchRequest.scala
index fb9fa8e..fe8beaa 100644
--- a/core/src/main/scala/kafka/javaapi/FetchRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/FetchRequest.scala
@@ -57,14 +57,14 @@ class FetchRequest(correlationId: Int,
override def toString = underlying.toString
- override def equals(other: Any) = canEqual(other) && {
- val otherFetchRequest = other.asInstanceOf[kafka.javaapi.FetchRequest]
- this.underlying.equals(otherFetchRequest.underlying)
+ override def equals(obj: Any): Boolean = {
+ obj match {
+ case null => false
+ case other: FetchRequest => this.underlying.equals(other.underlying)
+ case _ => false
+ }
}
- def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.FetchRequest]
-
override def hashCode = underlying.hashCode
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab148f39/core/src/main/scala/kafka/javaapi/FetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/FetchResponse.scala b/core/src/main/scala/kafka/javaapi/FetchResponse.scala
index 9c67dd8..c916555 100644
--- a/core/src/main/scala/kafka/javaapi/FetchResponse.scala
+++ b/core/src/main/scala/kafka/javaapi/FetchResponse.scala
@@ -32,12 +32,13 @@ class FetchResponse(private val underlying: kafka.api.FetchResponse) {
def errorCode(topic: String, partition: Int) = error(topic, partition).code
- override def equals(other: Any) = canEqual(other) && {
- val otherFetchResponse = other.asInstanceOf[kafka.javaapi.FetchResponse]
- this.underlying.equals(otherFetchResponse.underlying)
+ override def equals(obj: Any): Boolean = {
+ obj match {
+ case null => false
+ case other: FetchResponse => this.underlying.equals(other.underlying)
+ case _ => false
+ }
}
- def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.FetchResponse]
-
override def hashCode = underlying.hashCode
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab148f39/core/src/main/scala/kafka/javaapi/GroupCoordinatorResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/GroupCoordinatorResponse.scala b/core/src/main/scala/kafka/javaapi/GroupCoordinatorResponse.scala
index 9871ca0..096941c 100644
--- a/core/src/main/scala/kafka/javaapi/GroupCoordinatorResponse.scala
+++ b/core/src/main/scala/kafka/javaapi/GroupCoordinatorResponse.scala
@@ -31,13 +31,14 @@ class GroupCoordinatorResponse(private val underlying: kafka.api.GroupCoordinato
underlying.coordinatorOpt
}
- override def equals(other: Any) = canEqual(other) && {
- val otherConsumerMetadataResponse = other.asInstanceOf[kafka.javaapi.GroupCoordinatorResponse]
- this.underlying.equals(otherConsumerMetadataResponse.underlying)
+ override def equals(obj: Any): Boolean = {
+ obj match {
+ case null => false
+ case other: GroupCoordinatorResponse => this.underlying.equals(other.underlying)
+ case _ => false
+ }
}
- def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.GroupCoordinatorResponse]
-
override def hashCode = underlying.hashCode
override def toString = underlying.toString
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab148f39/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
index 1924d5e..0c3c651 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
@@ -45,19 +45,15 @@ class OffsetCommitRequest(groupId: String,
this(groupId, requestInfo, correlationId, clientId, 0)
}
-
override def toString = underlying.toString
-
- override def equals(other: Any) = canEqual(other) && {
- val otherOffsetRequest = other.asInstanceOf[kafka.javaapi.OffsetCommitRequest]
- this.underlying.equals(otherOffsetRequest.underlying)
+ override def equals(obj: Any): Boolean = {
+ obj match {
+ case null => false
+ case other: OffsetCommitRequest => this.underlying.equals(other.underlying)
+ case _ => false
+ }
}
-
- def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.OffsetCommitRequest]
-
-
override def hashCode = underlying.hashCode
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab148f39/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala
index 8eb0d47..5f96439 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala
@@ -44,21 +44,17 @@ class OffsetFetchRequest(groupId: String,
)
}
-
override def toString = underlying.toString
-
- override def equals(other: Any) = canEqual(other) && {
- val otherOffsetRequest = other.asInstanceOf[kafka.javaapi.OffsetFetchRequest]
- this.underlying.equals(otherOffsetRequest.underlying)
+ override def equals(obj: Any): Boolean = {
+ obj match {
+ case null => false
+ case other: OffsetFetchRequest => this.underlying.equals(other.underlying)
+ case _ => false
+ }
}
-
- def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.OffsetFetchRequest]
-
-
override def hashCode = underlying.hashCode
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab148f39/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
index 21997d3..96b66ef 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
@@ -36,20 +36,15 @@ class OffsetRequest(requestInfo: java.util.Map[TopicAndPartition, PartitionOffse
)
}
-
-
override def toString = underlying.toString
-
- override def equals(other: Any) = canEqual(other) && {
- val otherOffsetRequest = other.asInstanceOf[kafka.javaapi.OffsetRequest]
- this.underlying.equals(otherOffsetRequest.underlying)
+ override def equals(obj: Any): Boolean = {
+ obj match {
+ case null => false
+ case other: OffsetRequest => this.underlying.equals(other.underlying)
+ case _ => false
+ }
}
-
- def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.OffsetRequest]
-
-
override def hashCode = underlying.hashCode
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab148f39/core/src/main/scala/kafka/javaapi/OffsetResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetResponse.scala b/core/src/main/scala/kafka/javaapi/OffsetResponse.scala
index 42ee2ab..cb2047f 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetResponse.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetResponse.scala
@@ -31,19 +31,15 @@ class OffsetResponse(private val underlying: kafka.api.OffsetResponse) {
def offsets(topic: String, partition: Int) =
underlying.partitionErrorAndOffsets(TopicAndPartition(topic, partition)).offsets.toArray
-
- override def equals(other: Any) = canEqual(other) && {
- val otherOffsetResponse = other.asInstanceOf[kafka.javaapi.OffsetResponse]
- this.underlying.equals(otherOffsetResponse.underlying)
+ override def equals(obj: Any): Boolean = {
+ obj match {
+ case null => false
+ case other: OffsetResponse => this.underlying.equals(other.underlying)
+ case _ => false
+ }
}
-
- def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.OffsetResponse]
-
-
override def hashCode = underlying.hashCode
-
override def toString = underlying.toString
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab148f39/core/src/main/scala/kafka/javaapi/TopicMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadataResponse.scala b/core/src/main/scala/kafka/javaapi/TopicMetadataResponse.scala
index 3359060..40f81d5 100644
--- a/core/src/main/scala/kafka/javaapi/TopicMetadataResponse.scala
+++ b/core/src/main/scala/kafka/javaapi/TopicMetadataResponse.scala
@@ -24,13 +24,14 @@ class TopicMetadataResponse(private val underlying: kafka.api.TopicMetadataRespo
underlying.topicsMetadata
}
- override def equals(other: Any) = canEqual(other) && {
- val otherTopicMetadataResponse = other.asInstanceOf[kafka.javaapi.TopicMetadataResponse]
- this.underlying.equals(otherTopicMetadataResponse.underlying)
+ override def equals(obj: Any): Boolean = {
+ obj match {
+ case null => false
+ case other: TopicMetadataResponse => this.underlying.equals(other.underlying)
+ case _ => false
+ }
}
- def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.TopicMetadataResponse]
-
override def hashCode = underlying.hashCode
override def toString = underlying.toString
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab148f39/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 999c6aa..0e8cda8 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -18,22 +18,19 @@
package kafka.log
import java.io.{File, IOException}
+import java.nio.file.Files
import java.text.NumberFormat
import java.util.concurrent.atomic._
import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap, TimeUnit}
-import com.yammer.metrics.core.Gauge
import kafka.api.KAFKA_0_10_0_IV0
import kafka.common._
-import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec}
import kafka.metrics.KafkaMetricsGroup
import kafka.server.{BrokerTopicStats, FetchDataInfo, LogOffsetMetadata}
import kafka.utils._
-import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.ListOffsetRequest
-import org.apache.kafka.common.utils.{Time, Utils}
import scala.collection.JavaConverters._
import scala.collection.{Seq, mutable}
@@ -210,7 +207,7 @@ class Log(@volatile var dir: File,
private def initializeLeaderEpochCache(): LeaderEpochCache = {
// create the log directory if it doesn't exist
- dir.mkdirs()
+ Files.createDirectories(dir.toPath)
new LeaderEpochFileCache(topicPartition, () => logEndOffsetMetadata,
new LeaderEpochCheckpointFile(LeaderEpochFile.newFile(dir)))
}
@@ -227,18 +224,18 @@ class Log(@volatile var dir: File,
val filename = file.getName
if(filename.endsWith(DeletedFileSuffix) || filename.endsWith(CleanedFileSuffix)) {
// if the file ends in .deleted or .cleaned, delete it
- file.delete()
+ Files.deleteIfExists(file.toPath)
} else if(filename.endsWith(SwapFileSuffix)) {
// we crashed in the middle of a swap operation, to recover:
// if a log, delete the .index file, complete the swap operation later
// if an index just delete it, it will be rebuilt
val baseName = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
if(baseName.getPath.endsWith(IndexFileSuffix)) {
- file.delete()
+ Files.deleteIfExists(file.toPath)
} else if(baseName.getPath.endsWith(LogFileSuffix)){
// delete the index
val index = new File(CoreUtils.replaceSuffix(baseName.getPath, LogFileSuffix, IndexFileSuffix))
- index.delete()
+ Files.deleteIfExists(index.toPath())
swapFiles += file
}
}
@@ -257,7 +254,7 @@ class Log(@volatile var dir: File,
if(!logFile.exists) {
warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath))
- file.delete()
+ Files.deleteIfExists(file.toPath)
}
} else if(filename.endsWith(LogFileSuffix)) {
// if its a log file, load the corresponding log segment
@@ -286,8 +283,8 @@ class Log(@volatile var dir: File,
case e: java.lang.IllegalArgumentException =>
warn(s"Found a corrupted index file due to ${e.getMessage}}. deleting ${timeIndexFile.getAbsolutePath}, " +
s"${indexFile.getAbsolutePath} and rebuilding index...")
- indexFile.delete()
- timeIndexFile.delete()
+ Files.deleteIfExists(timeIndexFile.toPath)
+ Files.delete(indexFile.toPath)
segment.recover(config.maxMessageSize)
}
} else {
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab148f39/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 469c46b..a9398f0 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -18,6 +18,7 @@
package kafka.log
import java.io._
+import java.nio.file.Files
import java.util.concurrent._
import kafka.admin.AdminUtils
@@ -295,7 +296,7 @@ class LogManager(val logDirs: Array[File],
// mark that the shutdown was clean by creating marker file
debug("Writing clean shutdown marker at " + dir)
- CoreUtils.swallow(new File(dir, Log.CleanShutdownFile).createNewFile())
+ CoreUtils.swallow(Files.createFile(new File(dir, Log.CleanShutdownFile).toPath))
}
} catch {
case e: ExecutionException => {
@@ -408,7 +409,7 @@ class LogManager(val logDirs: Array[File],
getLog(topicPartition).getOrElse {
val dataDir = nextLogDir()
val dir = new File(dataDir, topicPartition.topic + "-" + topicPartition.partition)
- dir.mkdirs()
+ Files.createDirectories(dir.toPath)
val log = new Log(
dir = dir,
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab148f39/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index b77be34..4f055a6 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -17,6 +17,8 @@
package kafka.log
import java.io.{File, IOException}
+import java.nio.file.Files
+import java.nio.file.attribute.FileTime
import java.util.concurrent.TimeUnit
import kafka.common._
@@ -468,9 +470,10 @@ class LogSegment(val log: FileRecords,
* Change the last modified time for this log segment
*/
def lastModified_=(ms: Long) = {
- log.file.setLastModified(ms)
- index.file.setLastModified(ms)
- timeIndex.file.setLastModified(ms)
+ val fileTime = FileTime.fromMillis(ms)
+ Files.setLastModifiedTime(log.file.toPath, fileTime)
+ Files.setLastModifiedTime(index.file.toPath, fileTime)
+ Files.setLastModifiedTime(timeIndex.file.toPath, fileTime)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab148f39/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala b/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
index 686f692..81c20a7 100755
--- a/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
@@ -22,6 +22,7 @@ package kafka.metrics
import com.yammer.metrics.Metrics
import java.io.File
+import java.nio.file.Files
import com.yammer.metrics.reporting.CsvReporter
import java.util.concurrent.TimeUnit
@@ -50,7 +51,7 @@ private class KafkaCSVMetricsReporter extends KafkaMetricsReporter
val metricsConfig = new KafkaMetricsConfig(props)
csvDir = new File(props.getString("kafka.csv.metrics.dir", "kafka_metrics"))
Utils.delete(csvDir)
- csvDir.mkdirs()
+ Files.createDirectories(csvDir.toPath())
underlying = new CsvReporter(Metrics.defaultRegistry(), csvDir)
if (props.getBoolean("kafka.csv.metrics.reporter.enabled", default = false)) {
initialized = true
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab148f39/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
index cc2c4cd..8630026 100755
--- a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
@@ -18,6 +18,7 @@
package kafka.server
import java.io._
+import java.nio.file.Files
import java.util.Properties
import kafka.utils._
import org.apache.kafka.common.utils.Utils
@@ -29,7 +30,7 @@ case class BrokerMetadata(brokerId: Int)
*/
class BrokerMetadataCheckpoint(val file: File) extends Logging {
private val lock = new Object()
- new File(file + ".tmp").delete() // try to delete any existing temp files for cleanliness
+ Files.deleteIfExists(new File(file + ".tmp").toPath()) // try to delete any existing temp files for cleanliness
def write(brokerMetadata: BrokerMetadata) = {
lock synchronized {
@@ -39,10 +40,13 @@ class BrokerMetadataCheckpoint(val file: File) extends Logging {
brokerMetaProps.setProperty("broker.id", brokerMetadata.brokerId.toString)
val temp = new File(file.getAbsolutePath + ".tmp")
val fileOutputStream = new FileOutputStream(temp)
- brokerMetaProps.store(fileOutputStream,"")
- fileOutputStream.flush()
- fileOutputStream.getFD().sync()
- fileOutputStream.close()
+ try {
+ brokerMetaProps.store(fileOutputStream, "")
+ fileOutputStream.flush()
+ fileOutputStream.getFD().sync()
+ } finally {
+ Utils.closeQuietly(fileOutputStream, temp.getName)
+ }
Utils.atomicMoveWithFallback(temp.toPath, file.toPath)
} catch {
case ie: IOException =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab148f39/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index eb536f7..84772db 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -129,7 +129,7 @@ case class QuotaEntity(quotaId: QuotaId, sanitizedUser: String, clientId: String
* @param apiKey API Key for the request
* @param time @Time object to use
*/
-class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
+final class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
private val metrics: Metrics,
private val apiKey: QuotaType,
private val time: Time) extends Logging {
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab148f39/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
index 890dde0..c7e95d9 100644
--- a/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
+++ b/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
@@ -18,7 +18,7 @@ package kafka.server.checkpoints
import java.io._
import java.nio.charset.StandardCharsets
-import java.nio.file.{FileSystems, Paths}
+import java.nio.file.{FileAlreadyExistsException, FileSystems, Files, Paths}
import kafka.utils.{Exit, Logging}
import org.apache.kafka.common.utils.Utils
import scala.collection.{Seq, mutable}
@@ -33,7 +33,9 @@ class CheckpointFile[T](val file: File, version: Int, formatter: CheckpointFileF
private val path = file.toPath.toAbsolutePath
private val tempPath = Paths.get(path.toString + ".tmp")
private val lock = new Object()
- file.createNewFile()
+
+ try Files.createFile(file.toPath) // create the file if it doesn't exist
+ catch { case _: FileAlreadyExistsException => }
def write(entries: Seq[T]) {
lock synchronized {
@@ -111,4 +113,4 @@ class CheckpointFile[T](val file: File, version: Int, formatter: CheckpointFileF
}
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab148f39/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
index bb9a65d..77d6bc1 100644
--- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
+++ b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
@@ -77,21 +77,24 @@ object ImportZkOffsets extends Logging {
}
private def getPartitionOffsetsFromFile(filename: String):Map[String,String] = {
- val fr = new InputStreamReader(new FileInputStream(filename), StandardCharsets.UTF_8)
- val br = new BufferedReader(fr)
- var partOffsetsMap: Map[String,String] = Map()
-
- var s: String = br.readLine()
- while ( s != null && s.length() >= 1) {
- val tokens = s.split(":")
-
- partOffsetsMap += tokens(0) -> tokens(1)
- debug("adding node path [" + s + "]")
-
- s = br.readLine()
+ val br = new BufferedReader(new InputStreamReader(new FileInputStream(filename), StandardCharsets.UTF_8))
+ try {
+ var partOffsetsMap: Map[String,String] = Map()
+
+ var s: String = br.readLine()
+ while ( s != null && s.length() >= 1) {
+ val tokens = s.split(":")
+
+ partOffsetsMap += tokens(0) -> tokens(1)
+ debug("adding node path [" + s + "]")
+
+ s = br.readLine()
+ }
+
+ partOffsetsMap
+ } finally {
+ br.close()
}
-
- partOffsetsMap
}
private def updateZkOffsets(zkUtils: ZkUtils, partitionOffsets: Map[String,String]): Unit = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab148f39/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 5d88b4e..d55ed6c 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -239,7 +239,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
// Create consumers
val mirrorMakerConsumers = if (useOldConsumer) {
- val customRebalanceListener = {
+ val customRebalanceListener: Option[ConsumerRebalanceListener] = {
val customRebalanceListenerClass = options.valueOf(consumerRebalanceListenerOpt)
if (customRebalanceListenerClass != null) {
val rebalanceListenerArgs = options.valueOf(rebalanceListenerArgsOpt)
@@ -252,9 +252,6 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
None
}
}
-
- if (customRebalanceListener.exists(!_.isInstanceOf[ConsumerRebalanceListener]))
- throw new IllegalArgumentException("The rebalance listener should be an instance of kafka.consumer.ConsumerRebalanceListener")
createOldConsumers(
numStreams,
consumerProps,
@@ -262,7 +259,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
Option(options.valueOf(whitelistOpt)),
Option(options.valueOf(blacklistOpt)))
} else {
- val customRebalanceListener = {
+ val customRebalanceListener: Option[org.apache.kafka.clients.consumer.ConsumerRebalanceListener] = {
val customRebalanceListenerClass = options.valueOf(consumerRebalanceListenerOpt)
if (customRebalanceListenerClass != null) {
val rebalanceListenerArgs = options.valueOf(rebalanceListenerArgsOpt)
@@ -275,9 +272,6 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
None
}
}
- if (customRebalanceListener.exists(!_.isInstanceOf[org.apache.kafka.clients.consumer.ConsumerRebalanceListener]))
- throw new IllegalArgumentException("The rebalance listener should be an instance of" +
- "org.apache.kafka.clients.consumer.ConsumerRebalanceListner")
createNewConsumers(
numStreams,
consumerProps,
@@ -518,6 +512,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
filterSpec: TopicFilter) extends MirrorMakerBaseConsumer {
private var iter: ConsumerIterator[Array[Byte], Array[Byte]] = null
private var immediateCommitRequested: Boolean = false
+ private var numCommitsNotified: Long = 0
override def init() {
// Creating one stream per each connector instance
@@ -532,7 +527,10 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
// only wait() if mirrorMakerConsumer has been initialized and it has not been cleaned up.
if (iter != null) {
immediateCommitRequested = true
- this.wait()
+ val nextNumCommitsNotified = numCommitsNotified + 1
+ do {
+ this.wait()
+ } while (numCommitsNotified < nextNumCommitsNotified)
}
}
}
@@ -540,6 +538,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
override def notifyCommit() {
this.synchronized {
immediateCommitRequested = false
+ numCommitsNotified = numCommitsNotified + 1
this.notifyAll()
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab148f39/core/src/main/scala/kafka/utils/FileLock.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/FileLock.scala b/core/src/main/scala/kafka/utils/FileLock.scala
index 896c300..d1edc83 100644
--- a/core/src/main/scala/kafka/utils/FileLock.scala
+++ b/core/src/main/scala/kafka/utils/FileLock.scala
@@ -18,6 +18,7 @@
import java.io._
import java.nio.channels._
+import java.nio.file.{FileAlreadyExistsException, Files}
/**
* A file lock a la flock/funlock
@@ -25,7 +26,10 @@ import java.nio.channels._
* The given path will be created and opened if it doesn't exist.
*/
class FileLock(val file: File) extends Logging {
- file.createNewFile() // create the file if it doesn't exist
+
+ try Files.createFile(file.toPath) // create the file if it doesn't exist
+ catch { case _: FileAlreadyExistsException => }
+
private val channel = new RandomAccessFile(file, "rw").getChannel()
private var flock: java.nio.channels.FileLock = null
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab148f39/gradle/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/gradle/findbugs-exclude.xml b/gradle/findbugs-exclude.xml
index 0a64e5f..26af433 100644
--- a/gradle/findbugs-exclude.xml
+++ b/gradle/findbugs-exclude.xml
@@ -15,15 +15,97 @@
limitations under the License.
-->
+<!-- Findbugs filtering.
+
+Findbugs is a static code analysis tool run as part of the "check" phase of the build.
+This file dictates which categories of bugs and individual false positives that we supress.
+
+For a detailed description of findbugs bug categories, see http://findbugs.sourceforge.net/bugDescriptions.html
+-->
<FindBugsFilter>
- <!-- Exclude a few findbugs codes.
- EI: May expose internal representation by returning reference to mutable object.
- EI2: May expose internal representation by incorporating reference to mutable object.
- MS: Malicious code vulnerability.
- See http://findbugs.sourceforge.net/bugDescriptions.html for a full description of the findbugs codes.
- -->
- <Match>
- <Bug code="EI,EI2,MS"/>
+ <Match>
+ <!-- Disable warnings about mutable objects and the use of public fields.
+ EI_EXPOSE_REP: May expose internal representation by returning reference to mutable object
+ EI_EXPOSE_REP2: May expose internal representation by incorporating reference to mutable object
+ MS_PKGPROTECT: Field should be package protected -->
+ <Bug pattern="EI_EXPOSE_REP,EI_EXPOSE_REP2,MS_PKGPROTECT"/>
+ </Match>
+
+ <Match>
+ <!-- Disable warnings about System.exit, until we decide to stop using it.
+ DM_EXIT: Method invokes System.exit -->
+ <Bug pattern="DM_EXIT"/>
+ </Match>
+
+ <Match>
+ <!-- Disable warnings about the lack of equals() when compareTo() is implemented.
+ EQ_COMPARETO_USE_OBJECT_EQUALS: This class defines a compareTo method but no equals() method -->
+ <Bug pattern="EQ_COMPARETO_USE_OBJECT_EQUALS"/>
+ </Match>
+
+ <Match>
+ <!-- Findbugs tends to work a little bit better with Java than with Scala. We suppress
+ some categories of bug reports when using Scala, since findbugs generates huge
+ numbers of false positives in some of these categories when examining Scala code.
+
+ NP_LOAD_OF_KNOWN_NULL_VALUE: The variable referenced at this point is known to be null
+ due to an earlier check against null.
+ NP_NULL_PARAM_DEREF: Method call passes null for non-null parameter.
+ NP_NULL_ON_SOME_PATH: Possible null pointer dereference
+ SE_BAD_FIELD: Non-transient non-serializable instance field in serializable class.
+ DM_STRING_CTOR: Method invokes inefficient new String(String) constructor.
+ DM_NEW_FOR_GETCLASS: Method allocates an object, only to get the class object.
+ ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD: Write to static field from instance method.
+ DM_NUMBER_CTOR: Method invokes inefficient Number constructor; use static valueOf instead.
+ RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE: Nullcheck of value previously dereferenced.
+ RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE: Redundant nullcheck of value known to be non-null.
+ RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE: Redundant nullcheck of value known to be null.
+ RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT: Return value of method without side effect is ignored.
+ NM_CLASS_NAMING_CONVENTION: Class names should start with an upper case letter.
+ NM_METHOD_NAMING_CONVENTION: Method names should start with a lower case letter. -->
+ <Source name="~.*\.scala" />
+ <Bug pattern="NP_LOAD_OF_KNOWN_NULL_VALUE,NP_NULL_ON_SOME_PATH,NP_NULL_PARAM_DEREF,SE_BAD_FIELD,DM_STRING_CTOR,DM_NEW_FOR_GETCLASS,ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD,DM_NUMBER_CTOR,RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE,RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE,RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE,RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT,NM_CLASS_NAMING_CONVENTION,NM_METHOD_NAMING_CONVENTION"/>
+ </Match>
+
+ <Match>
+ <!-- Add a suppression for KAFKA-4897: LogCleaner#cleanSegments should not ignore failures to delete files.
+ TODO: remove this suppression when KAFKA-4897 is fixed. -->
+ <Class name="kafka.log.Cleaner"/>
+ <Method name="cleanSegments"/>
+ <Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"/>
+ </Match>
+
+ <Match>
+ <!-- Add a suppression for ignoring the return value of CountDownLatch#await. -->
+ <Class name="kafka.log.Cleaner"/>
+ <Method name="cleanOrSleep"/>
+ <Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"/>
+ </Match>
+
+ <Match>
+ <!-- Add a suppression for having the thread start in the constructor of the old, deprecated consumer. -->
+ <Class name="kafka.producer.Producer"/>
+ <Bug pattern="SC_START_IN_CTOR"/>
+ </Match>
+
+ <Match>
+ <!-- Add a suppression for the equals() method of NetworkClientBlockingOps. -->
+ <Class name="kafka.utils.NetworkClientBlockingOps"/>
+ <Bug pattern="EQ_UNUSUAL"/>
+ </Match>
+
+ <Match>
+ <!-- Add a suppression for auto-generated calls to instanceof in kafka.utils.Json -->
+ <Source name="Json.scala"/>
+ <Package name="kafka.utils"/>
+ <Bug pattern="BC_VACUOUS_INSTANCEOF"/>
+ </Match>
+
+ <Match>
+ <!-- Add a suppression for a false locking warning. -->
+ <Package name="kafka.consumer"/>
+ <Source name="ZookeeperConsumerConnector.scala"/>
+ <Bug pattern="UL_UNRELEASED_LOCK_EXCEPTION_PATH"/>
</Match>
<Match>