You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/11/18 13:54:56 UTC
[2/2] kafka git commit: KAFKA-4377;
remove deprecated scala.collection.JavaConversions calls
KAFKA-4377; remove deprecated scala.collection.JavaConversions calls
JavaConversions are deprecated in 2.12 in favour of JavaConverters.
Author: Bernard Leach <le...@bouncycastle.org>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #2101 from leachbj/4377-java-converters
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0e7ba700
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0e7ba700
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0e7ba700
Branch: refs/heads/trunk
Commit: 0e7ba70008fa8961ae55861a4dd3a5370d1d5a69
Parents: 079ea89
Author: Bernard Leach <le...@bouncycastle.org>
Authored: Fri Nov 18 04:10:43 2016 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Nov 18 04:29:55 2016 +0000
----------------------------------------------------------------------
core/src/main/scala/kafka/Kafka.scala | 4 ++--
.../main/scala/kafka/admin/AdminClient.scala | 16 ++++++-------
.../src/main/scala/kafka/admin/AdminUtils.scala | 8 ++-----
.../main/scala/kafka/admin/ConfigCommand.scala | 24 ++++++++++----------
.../main/scala/kafka/admin/TopicCommand.scala | 16 ++++++-------
.../main/scala/kafka/api/FetchResponse.scala | 8 +++----
.../scala/kafka/consumer/BaseConsumer.scala | 22 ++++--------------
.../consumer/ZookeeperConsumerConnector.scala | 17 +++++++-------
.../consumer/ZookeeperTopicEventWatcher.scala | 8 +++----
.../controller/PartitionStateMachine.scala | 13 ++++-------
.../kafka/controller/ReplicaStateMachine.scala | 16 ++++++-------
.../kafka/javaapi/OffsetCommitRequest.scala | 7 ++----
.../kafka/javaapi/OffsetCommitResponse.scala | 7 ++----
.../kafka/javaapi/OffsetFetchRequest.scala | 10 ++------
.../kafka/javaapi/OffsetFetchResponse.scala | 7 ++----
.../scala/kafka/javaapi/OffsetRequest.scala | 8 ++-----
.../scala/kafka/javaapi/TopicMetadata.scala | 22 ++++--------------
.../kafka/javaapi/TopicMetadataRequest.scala | 7 ++----
.../consumer/ZookeeperConsumerConnector.scala | 11 ++++-----
core/src/main/scala/kafka/log/Log.scala | 13 ++++-------
core/src/main/scala/kafka/log/LogManager.scala | 7 +++---
.../scala/kafka/metrics/KafkaMetricsGroup.scala | 3 ++-
.../scala/kafka/server/ClientQuotaManager.scala | 4 ++--
.../src/main/scala/kafka/server/KafkaApis.scala | 22 ++++++++----------
.../main/scala/kafka/server/KafkaConfig.scala | 19 +++-------------
.../scala/kafka/tools/ConsoleProducer.scala | 7 +++---
.../scala/kafka/tools/ConsumerPerformance.scala | 16 +++++++------
.../scala/kafka/tools/EndToEndLatency.scala | 11 ++++-----
.../scala/kafka/tools/ExportZkOffsets.scala | 5 ++--
core/src/main/scala/kafka/tools/JmxTool.scala | 8 +++----
.../main/scala/kafka/tools/MirrorMaker.scala | 6 ++---
.../scala/kafka/tools/ReplayLogProducer.scala | 4 ++--
.../scala/kafka/tools/SimpleConsumerShell.scala | 4 ++--
core/src/main/scala/kafka/utils/Pool.scala | 18 +++++----------
.../kafka/utils/VerifiableProperties.scala | 8 +++----
core/src/main/scala/kafka/utils/ZkUtils.scala | 11 +++------
.../integration/kafka/api/AdminClientTest.scala | 11 +++++----
.../kafka/api/ConsumerBounceTest.scala | 17 +++++++-------
.../other/kafka/TestPurgatoryPerformance.scala | 4 ++--
.../message/BaseMessageSetTestCases.scala | 10 +++-----
.../unit/kafka/log/BrokerCompressionTest.scala | 6 ++---
.../scala/unit/kafka/metrics/MetricsTest.scala | 6 ++---
.../kafka/server/ApiVersionsRequestTest.scala | 4 ++--
.../test/scala/unit/kafka/utils/TestUtils.scala | 3 +--
44 files changed, 181 insertions(+), 277 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/Kafka.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala
index 6b551ce..88508b5 100755
--- a/core/src/main/scala/kafka/Kafka.scala
+++ b/core/src/main/scala/kafka/Kafka.scala
@@ -24,7 +24,7 @@ import kafka.server.{KafkaServer, KafkaServerStartable}
import kafka.utils.{CommandLineUtils, Logging}
import org.apache.kafka.common.utils.Utils
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
object Kafka extends Logging {
@@ -47,7 +47,7 @@ object Kafka extends Logging {
CommandLineUtils.printUsageAndDie(optionParser, "Found non argument parameters: " + options.nonOptionArguments().toArray.mkString(","))
}
- props.putAll(CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt)))
+ props.putAll(CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt).asScala))
}
props
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 592fecf..9cd4823 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -13,7 +13,7 @@
package kafka.admin
import java.nio.ByteBuffer
-import java.util.Properties
+import java.util.{Collections, Properties}
import java.util.concurrent.atomic.AtomicInteger
import kafka.common.KafkaException
@@ -23,7 +23,6 @@ import org.apache.kafka.clients._
import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, ConsumerProtocol, RequestFuture}
import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
-import org.apache.kafka.common.errors.DisconnectException
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.Selector
import org.apache.kafka.common.protocol.types.Struct
@@ -32,7 +31,6 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.{SystemTime, Time, Utils}
import org.apache.kafka.common.{Cluster, Node, TopicPartition}
-import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
class AdminClient(val time: Time,
@@ -76,11 +74,11 @@ class AdminClient(val time: Time,
def listGroups(node: Node): List[GroupOverview] = {
val response = send(node, ApiKeys.LIST_GROUPS, new ListGroupsRequest()).asInstanceOf[ListGroupsResponse]
Errors.forCode(response.errorCode()).maybeThrow()
- response.groups().map(group => GroupOverview(group.groupId(), group.protocolType())).toList
+ response.groups().asScala.map(group => GroupOverview(group.groupId(), group.protocolType())).toList
}
private def findAllBrokers(): List[Node] = {
- val request = new MetadataRequest(List[String]())
+ val request = new MetadataRequest(Collections.emptyList[String])
val response = sendAnyNode(ApiKeys.METADATA, request).asInstanceOf[MetadataResponse]
val errors = response.errors()
if (!errors.isEmpty)
@@ -135,7 +133,7 @@ class AdminClient(val time: Time,
def describeConsumerGroup(groupId: String): ConsumerGroupSummary = {
val coordinator = findCoordinator(groupId)
- val responseBody = send(coordinator, ApiKeys.DESCRIBE_GROUPS, new DescribeGroupsRequest(List(groupId).asJava))
+ val responseBody = send(coordinator, ApiKeys.DESCRIBE_GROUPS, new DescribeGroupsRequest(Collections.singletonList(groupId)))
val response = responseBody.asInstanceOf[DescribeGroupsResponse]
val metadata = response.groups.get(groupId)
if (metadata == null)
@@ -144,11 +142,11 @@ class AdminClient(val time: Time,
throw new IllegalArgumentException(s"Consumer Group $groupId with protocol type '${metadata.protocolType}' is not a valid consumer group")
Errors.forCode(metadata.errorCode()).maybeThrow()
- val consumers = metadata.members.map { consumer =>
+ val consumers = metadata.members.asScala.map { consumer =>
ConsumerSummary(consumer.memberId, consumer.clientId, consumer.clientHost, metadata.state match {
case "Stable" =>
val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(Utils.readBytes(consumer.memberAssignment)))
- assignment.partitions.toList
+ assignment.partitions.asScala.toList
case _ =>
List()
})
@@ -190,7 +188,7 @@ object AdminClient {
config
}
- class AdminConfig(originals: Map[_,_]) extends AbstractConfig(AdminConfigDef, originals, false)
+ class AdminConfig(originals: Map[_,_]) extends AbstractConfig(AdminConfigDef, originals.asJava, false)
def createSimplePlaintext(brokerUrl: String): AdminClient = {
val config = Map(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> brokerUrl)
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index d3ce217..e95d327 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -31,7 +31,7 @@ import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
import org.apache.kafka.common.requests.MetadataResponse
import scala.collection._
-import JavaConverters._
+import scala.collection.JavaConverters._
import mutable.ListBuffer
import scala.collection.mutable
import collection.Map
@@ -560,11 +560,7 @@ object AdminUtils extends Logging with AdminUtilities {
* Write out the entity config to zk, if there is any
*/
private def writeEntityConfig(zkUtils: ZkUtils, entityPath: String, config: Properties) {
- val configMap: mutable.Map[String, String] = {
- import JavaConversions._
- config
- }
- val map = Map("version" -> 1, "config" -> configMap)
+ val map = Map("version" -> 1, "config" -> config.asScala)
zkUtils.updatePersistentPath(entityPath, Json.encode(map))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/admin/ConfigCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 34df6b0..aa3780e 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -20,13 +20,13 @@ package kafka.admin
import java.util.Properties
import joptsimple._
import kafka.common.Config
-import kafka.log.{LogConfig}
+import kafka.log.LogConfig
import kafka.server.{ConfigEntityName, QuotaId}
import kafka.server.{DynamicConfig, ConfigType}
import kafka.utils.{CommandLineUtils, ZkUtils}
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.utils.Utils
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.collection._
@@ -113,7 +113,7 @@ object ConfigCommand extends Config {
// When describing all users, don't include empty user nodes with only <user, client> quota overrides.
if (!configs.isEmpty || !describeAllUsers) {
println("Configs for %s are %s"
- .format(entity, configs.map(kv => kv._1 + "=" + kv._2).mkString(",")))
+ .format(entity, configs.asScala.map(kv => kv._1 + "=" + kv._2).mkString(",")))
}
}
}
@@ -138,7 +138,7 @@ object ConfigCommand extends Config {
private[admin] def parseConfigsToBeDeleted(opts: ConfigCommandOptions): Seq[String] = {
if (opts.options.has(opts.deleteConfig)) {
- val configsToBeDeleted = opts.options.valuesOf(opts.deleteConfig).map(_.trim())
+ val configsToBeDeleted = opts.options.valuesOf(opts.deleteConfig).asScala.map(_.trim())
val propsToBeDeleted = new Properties
configsToBeDeleted.foreach(propsToBeDeleted.setProperty(_, ""))
configsToBeDeleted
@@ -214,7 +214,7 @@ object ConfigCommand extends Config {
}
private[admin] def parseEntity(opts: ConfigCommandOptions): ConfigEntity = {
- val entityTypes = opts.options.valuesOf(opts.entityType)
+ val entityTypes = opts.options.valuesOf(opts.entityType).asScala
if (entityTypes.head == ConfigType.User || entityTypes.head == ConfigType.Client)
parseQuotaEntity(opts)
else {
@@ -225,9 +225,9 @@ object ConfigCommand extends Config {
}
private def parseQuotaEntity(opts: ConfigCommandOptions): ConfigEntity = {
- val types = opts.options.valuesOf(opts.entityType)
+ val types = opts.options.valuesOf(opts.entityType).asScala
val namesIterator = opts.options.valuesOf(opts.entityName).iterator
- val names = opts.options.specs
+ val names = opts.options.specs.asScala
.filter(spec => spec.options.contains("entity-name") || spec.options.contains("entity-default"))
.map(spec => if (spec.options.contains("entity-name")) namesIterator.next else "")
@@ -235,7 +235,7 @@ object ConfigCommand extends Config {
throw new IllegalArgumentException("--entity-name or --entity-default must be specified with each --entity-type for --alter")
val reverse = types.size == 2 && types(0) == ConfigType.Client
- val entityTypes = if (reverse) types.reverse else types.toBuffer
+ val entityTypes = if (reverse) types.reverse else types
val sortedNames = (if (reverse && names.length == 2) names.reverse else names).iterator
def sanitizeName(entityType: String, name: String) = {
@@ -276,9 +276,9 @@ object ConfigCommand extends Config {
val nl = System.getProperty("line.separator")
val addConfig = parser.accepts("add-config", "Key Value pairs of configs to add. Square brackets can be used to group values which contain commas: 'k1=v1,k2=[v1,v2,v2],k3=v3'. The following is a list of valid configurations: " +
"For entity_type '" + ConfigType.Topic + "': " + LogConfig.configNames.map("\t" + _).mkString(nl, nl, nl) +
- "For entity_type '" + ConfigType.Broker + "': " + DynamicConfig.Broker.names.map("\t" + _).mkString(nl, nl, nl) +
- "For entity_type '" + ConfigType.User + "': " + DynamicConfig.Client.names.map("\t" + _).mkString(nl, nl, nl) +
- "For entity_type '" + ConfigType.Client + "': " + DynamicConfig.Client.names.map("\t" + _).mkString(nl, nl, nl) +
+ "For entity_type '" + ConfigType.Broker + "': " + DynamicConfig.Broker.names.asScala.map("\t" + _).mkString(nl, nl, nl) +
+ "For entity_type '" + ConfigType.User + "': " + DynamicConfig.Client.names.asScala.map("\t" + _).mkString(nl, nl, nl) +
+ "For entity_type '" + ConfigType.Client + "': " + DynamicConfig.Client.names.asScala.map("\t" + _).mkString(nl, nl, nl) +
s"Entity types '${ConfigType.User}' and '${ConfigType.Client}' may be specified together to update config for clients of a specific user.")
.withRequiredArg
.ofType(classOf[String])
@@ -302,7 +302,7 @@ object ConfigCommand extends Config {
CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, entityType)
CommandLineUtils.checkInvalidArgs(parser, options, alterOpt, Set(describeOpt))
CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, Set(alterOpt, addConfig, deleteConfig))
- val entityTypeVals = options.valuesOf(entityType)
+ val entityTypeVals = options.valuesOf(entityType).asScala
if(options.has(alterOpt)) {
if (entityTypeVals.contains(ConfigType.User) || entityTypeVals.contains(ConfigType.Client)) {
if (!options.has(entityName) && !options.has(entityDefault))
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 2fcc2ce..1078ba2 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -20,8 +20,8 @@ package kafka.admin
import java.util.Properties
import joptsimple._
import kafka.common.{AdminCommandFailedException, Topic}
-import kafka.consumer.{ConsumerConfig => OldConsumerConfig, Whitelist}
-import kafka.log.{Defaults, LogConfig}
+import kafka.consumer.Whitelist
+import kafka.log.LogConfig
import kafka.server.ConfigType
import kafka.utils.ZkUtils._
import kafka.utils._
@@ -29,9 +29,9 @@ import org.I0Itec.zkclient.exception.ZkNodeExistsException
import org.apache.kafka.common.errors.TopicExistsException
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.utils.Utils
-import scala.collection.JavaConversions._
+
+import scala.collection.JavaConverters._
import scala.collection._
-import org.apache.kafka.clients.consumer.{ConsumerConfig => NewConsumerConfig}
object TopicCommand extends Logging {
@@ -199,8 +199,8 @@ object TopicCommand extends Logging {
val describePartitions: Boolean = !reportOverriddenConfigs
val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
if (describeConfigs) {
- val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)
- if (!reportOverriddenConfigs || configs.size() != 0) {
+ val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic).asScala
+ if (!reportOverriddenConfigs || configs.nonEmpty) {
val numPartitions = topicPartitionAssignment.size
val replicationFactor = topicPartitionAssignment.head._2.size
println("Topic:%s\tPartitionCount:%d\tReplicationFactor:%d\tConfigs:%s"
@@ -229,7 +229,7 @@ object TopicCommand extends Logging {
}
def parseTopicConfigsToBeAdded(opts: TopicCommandOptions): Properties = {
- val configsToBeAdded = opts.options.valuesOf(opts.configOpt).map(_.split("""\s*=\s*"""))
+ val configsToBeAdded = opts.options.valuesOf(opts.configOpt).asScala.map(_.split("""\s*=\s*"""))
require(configsToBeAdded.forall(config => config.length == 2),
"Invalid topic config: all configs to be added must be in the format \"key=val\".")
val props = new Properties
@@ -244,7 +244,7 @@ object TopicCommand extends Logging {
def parseTopicConfigsToBeDeleted(opts: TopicCommandOptions): Seq[String] = {
if (opts.options.has(opts.deleteConfigOpt)) {
- val configsToBeDeleted = opts.options.valuesOf(opts.deleteConfigOpt).map(_.trim())
+ val configsToBeDeleted = opts.options.valuesOf(opts.deleteConfigOpt).asScala.map(_.trim())
val propsToBeDeleted = new Properties
configsToBeDeleted.foreach(propsToBeDeleted.setProperty(_, ""))
LogConfig.validateNames(propsToBeDeleted)
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/api/FetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala
index d31d4ba..5e2a999 100644
--- a/core/src/main/scala/kafka/api/FetchResponse.scala
+++ b/core/src/main/scala/kafka/api/FetchResponse.scala
@@ -21,14 +21,14 @@ import java.nio.ByteBuffer
import java.nio.channels.GatheringByteChannel
import kafka.common.TopicAndPartition
-import kafka.message.{MessageSet, ByteBufferMessageSet}
+import kafka.message.{ByteBufferMessageSet, MessageSet}
import kafka.api.ApiUtils._
import org.apache.kafka.common.KafkaException
-import org.apache.kafka.common.network.{Send, MultiSend}
+import org.apache.kafka.common.network.{MultiSend, Send}
import org.apache.kafka.common.protocol.Errors
import scala.collection._
-import JavaConverters._
+import scala.collection.JavaConverters._
object FetchResponsePartitionData {
def readFrom(buffer: ByteBuffer): FetchResponsePartitionData = {
@@ -139,7 +139,7 @@ class TopicDataSend(val dest: String, val topicData: TopicData) extends Send {
buffer.rewind()
private val sends = new MultiSend(dest,
- JavaConversions.seqAsJavaList(topicData.partitionData.toList.map(d => new PartitionDataSend(d._1, d._2))))
+ topicData.partitionData.map(d => new PartitionDataSend(d._1, d._2): Send).asJava)
override def writeTo(channel: GatheringByteChannel): Long = {
if (completed)
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/consumer/BaseConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
index 6e232a8..c1ee7cd 100644
--- a/core/src/main/scala/kafka/consumer/BaseConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
@@ -17,24 +17,14 @@
package kafka.consumer
-import java.util.Properties
+import java.util.{Collections, Properties}
import java.util.regex.Pattern
-import kafka.api.FetchRequestBuilder
import kafka.api.OffsetRequest
-import kafka.api.Request
-import kafka.client.ClientUtils
-import kafka.cluster.BrokerEndPoint
import kafka.common.StreamEndException
import kafka.message.Message
-import kafka.common.TopicAndPartition
-import kafka.message.MessageAndOffset
-import kafka.utils.ToolsUtils
-
-import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
import org.apache.kafka.common.record.TimestampType
-import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.TopicPartition
/**
@@ -60,8 +50,6 @@ case class BaseConsumerRecord(topic: String,
class NewShinyConsumer(topic: Option[String], partitionId: Option[Int], offset: Option[Long], whitelist: Option[String], consumerProps: Properties, val timeoutMs: Long = Long.MaxValue) extends BaseConsumer {
import org.apache.kafka.clients.consumer.KafkaConsumer
- import scala.collection.JavaConversions._
-
val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps)
consumerInit()
var recordIter = consumer.poll(0).iterator
@@ -74,7 +62,7 @@ class NewShinyConsumer(topic: Option[String], partitionId: Option[Int], offset:
// default to latest if no offset is provided
seek(topic, partitionId, OffsetRequest.LatestTime)
case (Some(topic), None, None, None) =>
- consumer.subscribe(List(topic))
+ consumer.subscribe(Collections.singletonList(topic))
case (None, None, None, Some(whitelist)) =>
consumer.subscribe(Pattern.compile(whitelist), new NoOpConsumerRebalanceListener())
case _ =>
@@ -87,10 +75,10 @@ class NewShinyConsumer(topic: Option[String], partitionId: Option[Int], offset:
def seek(topic: String, partitionId: Int, offset: Long) {
val topicPartition = new TopicPartition(topic, partitionId)
- consumer.assign(List(topicPartition))
+ consumer.assign(Collections.singletonList(topicPartition))
offset match {
- case OffsetRequest.EarliestTime => consumer.seekToBeginning(List(topicPartition))
- case OffsetRequest.LatestTime => consumer.seekToEnd(List(topicPartition))
+ case OffsetRequest.EarliestTime => consumer.seekToBeginning(Collections.singletonList(topicPartition))
+ case OffsetRequest.LatestTime => consumer.seekToEnd(Collections.singletonList(topicPartition))
case _ => consumer.seek(topicPartition, offset)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index fbe7089..22a0c9a 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -42,7 +42,7 @@ import org.apache.kafka.common.security.JaasUtils
import org.apache.zookeeper.Watcher.Event.KeeperState
import scala.collection._
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
/**
* This class handles the consumers interaction with zookeeper
@@ -696,9 +696,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
if (topicRegistry.size == 0)
new java.util.HashMap[String, java.util.Set[java.lang.Integer]]
else
- mapAsJavaMap(topicRegistry.map(topics =>
- topics._1 -> topics._2.keys
- ).toMap).asInstanceOf[java.util.Map[String, java.util.Set[java.lang.Integer]]]
+ topicRegistry.map(topics =>
+ topics._1 -> topics._2.keys // note this is incorrect, see KAFKA-2284
+ ).toMap.asJava.asInstanceOf[java.util.Map[String, java.util.Set[java.lang.Integer]]]
)
}
releasePartitionOwnership(topicRegistry)
@@ -755,14 +755,13 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
case (topic, partitionOwnerShips) =>
val partitionOwnershipForTopicScalaMap = partitionOwnerShips.map({
case (topicAndPartition, consumerThreadId) =>
- topicAndPartition.partition -> consumerThreadId
- })
- topic -> mapAsJavaMap(collection.mutable.Map(partitionOwnershipForTopicScalaMap.toSeq:_*))
- .asInstanceOf[java.util.Map[java.lang.Integer, ConsumerThreadId]]
+ (topicAndPartition.partition: Integer) -> consumerThreadId
+ }).toMap
+ topic -> partitionOwnershipForTopicScalaMap.asJava
})
consumerRebalanceListener.beforeStartingFetchers(
consumerIdString,
- mapAsJavaMap(collection.mutable.Map(partitionAssigmentMapForCallback.toSeq:_*))
+ partitionAssigmentMapForCallback.asJava
)
}
updateFetcher(cluster)
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
index 0cd22f0..b9f2d41 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
@@ -17,9 +17,9 @@
package kafka.consumer
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import kafka.utils.{ZkUtils, Logging}
-import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
+import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener}
import org.apache.zookeeper.Watcher.Event.KeeperState
class ZookeeperTopicEventWatcher(val zkUtils: ZkUtils,
@@ -37,7 +37,7 @@ class ZookeeperTopicEventWatcher(val zkUtils: ZkUtils,
new ZkSessionExpireListener(topicEventListener))
val topics = zkUtils.zkClient.subscribeChildChanges(
- ZkUtils.BrokerTopicsPath, topicEventListener).toList
+ ZkUtils.BrokerTopicsPath, topicEventListener)
// call to bootstrap topic list
topicEventListener.handleChildChange(ZkUtils.BrokerTopicsPath, topics)
@@ -64,7 +64,7 @@ class ZookeeperTopicEventWatcher(val zkUtils: ZkUtils,
lock.synchronized {
try {
if (zkUtils != null) {
- val latestTopics = zkUtils.zkClient.getChildren(ZkUtils.BrokerTopicsPath).toList
+ val latestTopics = zkUtils.zkClient.getChildren(ZkUtils.BrokerTopicsPath).asScala
debug("all topics: %s".format(latestTopics))
eventHandler.handleTopicEvent(latestTopics)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index ee94b46..a5285c3 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -17,8 +17,7 @@
package kafka.controller
import collection._
-import collection.JavaConversions
-import collection.mutable.Buffer
+import collection.JavaConverters._
import java.util.concurrent.atomic.AtomicBoolean
import kafka.api.LeaderAndIsr
import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException}
@@ -416,9 +415,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
if (hasStarted.get) {
try {
val currentChildren = {
- import JavaConversions._
- debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))
- (children: Buffer[String]).toSet
+ debug("Topic change listener fired for path %s with children %s".format(parentPath, children.asScala.mkString(",")))
+ children.asScala.toSet
}
val newTopics = currentChildren -- controllerContext.allTopics
val deletedTopics = controllerContext.allTopics -- currentChildren
@@ -456,10 +454,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
@throws(classOf[Exception])
def handleChildChange(parentPath : String, children : java.util.List[String]) {
inLock(controllerContext.controllerLock) {
- var topicsToBeDeleted = {
- import JavaConversions._
- (children: Buffer[String]).toSet
- }
+ var topicsToBeDeleted = children.asScala.toSet
debug("Delete topics listener fired for topics %s to be deleted".format(topicsToBeDeleted.mkString(",")))
val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics
if(nonExistentTopics.nonEmpty) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 03887ae..a26f95a 100755
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -17,13 +17,13 @@
package kafka.controller
import collection._
-import collection.JavaConversions._
+import collection.JavaConverters._
import java.util.concurrent.atomic.AtomicBoolean
-import kafka.common.{TopicAndPartition, StateChangeFailedException}
-import kafka.utils.{ZkUtils, ReplicationUtils, Logging}
+
+import kafka.common.{StateChangeFailedException, TopicAndPartition}
+import kafka.controller.Callbacks.CallbackBuilder
+import kafka.utils.{Logging, ReplicationUtils, ZkUtils}
import org.I0Itec.zkclient.IZkChildListener
-import org.apache.log4j.Logger
-import kafka.controller.Callbacks._
import kafka.utils.CoreUtils._
/**
@@ -300,7 +300,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
}
def replicasInDeletionStates(topic: String): Set[PartitionAndReplica] = {
- val deletionStates = Set(ReplicaDeletionStarted, ReplicaDeletionSuccessful, ReplicaDeletionIneligible)
+ val deletionStates = Set[ReplicaState](ReplicaDeletionStarted, ReplicaDeletionSuccessful, ReplicaDeletionIneligible)
replicaState.filter(r => r._1.topic.equals(topic) && deletionStates.contains(r._2)).keySet
}
@@ -351,12 +351,12 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
class BrokerChangeListener() extends IZkChildListener with Logging {
this.logIdent = "[BrokerChangeListener on Controller " + controller.config.brokerId + "]: "
def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) {
- info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.sorted.mkString(",")))
+ info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.asScala.sorted.mkString(",")))
inLock(controllerContext.controllerLock) {
if (hasStarted.get) {
ControllerStats.leaderElectionTimer.time {
try {
- val curBrokers = currentBrokerList.map(_.toInt).toSet.flatMap(zkUtils.getBrokerInfo)
+ val curBrokers = currentBrokerList.asScala.map(_.toInt).toSet.flatMap(zkUtils.getBrokerInfo)
val curBrokerIds = curBrokers.map(_.id)
val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
index 456c3c4..1924d5e 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
@@ -18,6 +18,7 @@
package kafka.javaapi
import kafka.common.{OffsetAndMetadata, TopicAndPartition}
+import scala.collection.JavaConverters._
class OffsetCommitRequest(groupId: String,
requestInfo: java.util.Map[TopicAndPartition, OffsetAndMetadata],
@@ -25,11 +26,7 @@ class OffsetCommitRequest(groupId: String,
clientId: String,
versionId: Short) {
val underlying = {
- val scalaMap: collection.immutable.Map[TopicAndPartition, OffsetAndMetadata] = {
- import collection.JavaConversions._
-
- requestInfo.toMap
- }
+ val scalaMap: collection.immutable.Map[TopicAndPartition, OffsetAndMetadata] = requestInfo.asScala.toMap
kafka.api.OffsetCommitRequest(
groupId = groupId,
requestInfo = scalaMap,
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala b/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
index b222329..c79f5b6 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
@@ -20,14 +20,11 @@ package kafka.javaapi
import java.nio.ByteBuffer
import kafka.common.TopicAndPartition
-import collection.JavaConversions
+import scala.collection.JavaConverters._
class OffsetCommitResponse(private val underlying: kafka.api.OffsetCommitResponse) {
- def errors: java.util.Map[TopicAndPartition, Short] = {
- import JavaConversions._
- underlying.commitStatus
- }
+ def errors: java.util.Map[TopicAndPartition, Short] = underlying.commitStatus.asJava
def hasError = underlying.hasError
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala
index 818ae33..8eb0d47 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala
@@ -18,9 +18,7 @@
package kafka.javaapi
import kafka.common.TopicAndPartition
-import scala.collection.mutable
-import collection.JavaConversions
-import java.nio.ByteBuffer
+import collection.JavaConverters._
class OffsetFetchRequest(groupId: String,
requestInfo: java.util.List[TopicAndPartition],
@@ -37,13 +35,9 @@ class OffsetFetchRequest(groupId: String,
}
val underlying = {
- val scalaSeq = {
- import JavaConversions._
- requestInfo: mutable.Buffer[TopicAndPartition]
- }
kafka.api.OffsetFetchRequest(
groupId = groupId,
- requestInfo = scalaSeq,
+ requestInfo = requestInfo.asScala,
versionId = versionId,
correlationId = correlationId,
clientId = clientId
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala b/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala
index c4bdb12..01aa8e8 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala
@@ -20,14 +20,11 @@ package kafka.javaapi
import java.nio.ByteBuffer
import kafka.common.{TopicAndPartition, OffsetMetadataAndError}
-import collection.JavaConversions
+import collection.JavaConverters._
class OffsetFetchResponse(private val underlying: kafka.api.OffsetFetchResponse) {
- def offsets: java.util.Map[TopicAndPartition, OffsetMetadataAndError] = {
- import JavaConversions._
- underlying.requestInfo
- }
+ def offsets: java.util.Map[TopicAndPartition, OffsetMetadataAndError] = underlying.requestInfo.asJava
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
index c8a0ded..21997d3 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
@@ -19,8 +19,7 @@ package kafka.javaapi
import kafka.common.TopicAndPartition
import kafka.api.{Request, PartitionOffsetRequestInfo}
-import scala.collection.mutable
-import java.nio.ByteBuffer
+import scala.collection.JavaConverters._
class OffsetRequest(requestInfo: java.util.Map[TopicAndPartition, PartitionOffsetRequestInfo],
@@ -28,10 +27,7 @@ class OffsetRequest(requestInfo: java.util.Map[TopicAndPartition, PartitionOffse
clientId: String) {
val underlying = {
- val scalaMap = {
- import collection.JavaConversions._
- (requestInfo: mutable.Map[TopicAndPartition, PartitionOffsetRequestInfo]).toMap
- }
+ val scalaMap = requestInfo.asScala.toMap
kafka.api.OffsetRequest(
requestInfo = scalaMap,
versionId = versionId,
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
index 4ef8321..4e2631f 100644
--- a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
+++ b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
@@ -17,20 +17,14 @@
package kafka.javaapi
import kafka.cluster.BrokerEndPoint
-import scala.collection.JavaConversions
+import scala.collection.JavaConverters._
private[javaapi] object MetadataListImplicits {
implicit def toJavaTopicMetadataList(topicMetadataSeq: Seq[kafka.api.TopicMetadata]):
- java.util.List[kafka.javaapi.TopicMetadata] = {
- import JavaConversions._
- topicMetadataSeq.map(new kafka.javaapi.TopicMetadata(_))
- }
+ java.util.List[kafka.javaapi.TopicMetadata] = topicMetadataSeq.map(new kafka.javaapi.TopicMetadata(_)).asJava
implicit def toPartitionMetadataList(partitionMetadataSeq: Seq[kafka.api.PartitionMetadata]):
- java.util.List[kafka.javaapi.PartitionMetadata] = {
- import JavaConversions._
- partitionMetadataSeq.map(new kafka.javaapi.PartitionMetadata(_))
- }
+ java.util.List[kafka.javaapi.PartitionMetadata] = partitionMetadataSeq.map(new kafka.javaapi.PartitionMetadata(_)).asJava
}
class TopicMetadata(private val underlying: kafka.api.TopicMetadata) {
@@ -57,15 +51,9 @@ class PartitionMetadata(private val underlying: kafka.api.PartitionMetadata) {
underlying.leader
}
- def replicas: java.util.List[BrokerEndPoint] = {
- import JavaConversions._
- underlying.replicas
- }
+ def replicas: java.util.List[BrokerEndPoint] = underlying.replicas.asJava
- def isr: java.util.List[BrokerEndPoint] = {
- import JavaConversions._
- underlying.isr
- }
+ def isr: java.util.List[BrokerEndPoint] = underlying.isr.asJava
def errorCode: Short = underlying.errorCode
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
index f625ba0..efd5405 100644
--- a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
import kafka.api._
import org.apache.kafka.common.protocol.ApiKeys
-import scala.collection.mutable
+import scala.collection.JavaConverters._
class TopicMetadataRequest(val versionId: Short,
val correlationId: Int,
@@ -29,10 +29,7 @@ class TopicMetadataRequest(val versionId: Short,
val topics: java.util.List[String])
extends RequestOrResponse(Some(ApiKeys.METADATA.id)) {
- val underlying: kafka.api.TopicMetadataRequest = {
- import scala.collection.JavaConversions._
- new kafka.api.TopicMetadataRequest(versionId, correlationId, clientId, topics: mutable.Buffer[String])
- }
+ val underlying: kafka.api.TopicMetadataRequest = new kafka.api.TopicMetadataRequest(versionId, correlationId, clientId, topics.asScala)
def this(topics: java.util.List[String]) =
this(kafka.api.TopicMetadataRequest.CurrentVersion, 0, kafka.api.TopicMetadataRequest.DefaultClientId, topics)
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
index 6347bfd..e145075 100644
--- a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
@@ -19,7 +19,7 @@ package kafka.javaapi.consumer
import kafka.serializer._
import kafka.consumer._
import kafka.common.{OffsetAndMetadata, TopicAndPartition, MessageStreamsExistException}
-import scala.collection.{immutable, mutable, JavaConversions}
+import scala.collection.mutable
import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.JavaConverters._
@@ -79,8 +79,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
throw new MessageStreamsExistException(this.getClass.getSimpleName +
" can create message streams at most once",null)
val scalaTopicCountMap: Map[String, Int] = {
- import JavaConversions._
- Map.empty[String, Int] ++ (topicCountMap.asInstanceOf[java.util.Map[String, Int]]: mutable.Map[String, Int])
+ Map.empty[String, Int] ++ topicCountMap.asInstanceOf[java.util.Map[String, Int]].asScala
}
val scalaReturn = underlying.consume(scalaTopicCountMap, keyDecoder, valueDecoder)
val ret = new java.util.HashMap[String,java.util.List[KafkaStream[K,V]]]
@@ -96,10 +95,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
def createMessageStreams(topicCountMap: java.util.Map[String,java.lang.Integer]): java.util.Map[String,java.util.List[KafkaStream[Array[Byte],Array[Byte]]]] =
createMessageStreams(topicCountMap, new DefaultDecoder(), new DefaultDecoder())
- def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter, numStreams: Int, keyDecoder: Decoder[K], valueDecoder: Decoder[V]) = {
- import JavaConversions._
- underlying.createMessageStreamsByFilter(topicFilter, numStreams, keyDecoder, valueDecoder)
- }
+ def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter, numStreams: Int, keyDecoder: Decoder[K], valueDecoder: Decoder[V]) =
+ underlying.createMessageStreamsByFilter(topicFilter, numStreams, keyDecoder, valueDecoder).asJava
def createMessageStreamsByFilter(topicFilter: TopicFilter, numStreams: Int) =
createMessageStreamsByFilter(topicFilter, numStreams, new DefaultDecoder(), new DefaultDecoder())
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 6b57696..4c286d2 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -32,7 +32,8 @@ import org.apache.kafka.common.errors.{UnsupportedForMessageFormatException, Cor
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.requests.ListOffsetRequest
-import scala.collection.{Seq, JavaConversions}
+import scala.collection.Seq
+import scala.collection.JavaConverters._
import com.yammer.metrics.core.Gauge
import org.apache.kafka.common.utils.Utils
@@ -902,23 +903,19 @@ class Log(val dir: File,
/**
* All the log segments in this log ordered from oldest to newest
*/
- def logSegments: Iterable[LogSegment] = {
- import JavaConversions._
- segments.values
- }
+ def logSegments: Iterable[LogSegment] = segments.values.asScala
/**
* Get all segments beginning with the segment that includes "from" and ending with the segment
* that includes up to "to-1" or the end of the log (if to > logEndOffset)
*/
def logSegments(from: Long, to: Long): Iterable[LogSegment] = {
- import JavaConversions._
lock synchronized {
val floor = segments.floorKey(from)
if(floor eq null)
- segments.headMap(to).values
+ segments.headMap(to).values.asScala
else
- segments.subMap(floor, true, to, false).values
+ segments.subMap(floor, true, to, false).values.asScala
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 7806eda..a5beb49 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit
import kafka.utils._
import scala.collection._
+import scala.collection.JavaConverters._
import kafka.common.{KafkaException, TopicAndPartition}
import kafka.server.{BrokerState, OffsetCheckpoint, RecoveringFromUncleanShutdown}
import java.util.concurrent.{ExecutionException, ExecutorService, Executors, Future}
@@ -366,10 +367,10 @@ class LogManager(val logDirs: Array[File],
time)
logs.put(topicAndPartition, log)
info("Created log for partition [%s,%d] in %s with properties {%s}."
- .format(topicAndPartition.topic,
- topicAndPartition.partition,
+ .format(topicAndPartition.topic,
+ topicAndPartition.partition,
dataDir.getAbsolutePath,
- {import JavaConversions._; config.originals.mkString(", ")}))
+ config.originals.asScala.mkString(", ")))
log
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index 13b57e3..ede49c4 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -27,6 +27,7 @@ import kafka.producer.{ProducerRequestStatsRegistry, ProducerStatsRegistry, Prod
import kafka.utils.Logging
import scala.collection.immutable
+import scala.collection.JavaConverters._
import javax.management.ObjectName
@@ -198,7 +199,7 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
private def removeAllMetricsInList(metricNameList: immutable.List[MetricName], clientId: String) {
metricNameList.foreach(metric => {
val pattern = (".*clientId=" + clientId + ".*").r
- val registeredMetrics = scala.collection.JavaConversions.asScalaSet(Metrics.defaultRegistry().allMetrics().keySet())
+ val registeredMetrics = Metrics.defaultRegistry().allMetrics().keySet().asScala
for (registeredMetric <- registeredMetrics) {
if (registeredMetric.getGroup == metric.getGroup &&
registeredMetric.getName == metric.getName &&
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 0c7c26b..eb536f7 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -27,7 +27,7 @@ import org.apache.kafka.common.metrics._
import org.apache.kafka.common.metrics.stats.{Total, Rate, Avg}
import org.apache.kafka.common.utils.Time
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
/**
* Represents the sensors aggregated per client
@@ -442,7 +442,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
metric.config(getQuotaMetricConfig(newQuota))
}
} else {
- allMetrics.filterKeys(n => n.name == quotaMetricName.name && n.group == quotaMetricName.group).foreach {
+ allMetrics.asScala.filterKeys(n => n.name == quotaMetricName.name && n.group == quotaMetricName.group).foreach {
case (metricName, metric) =>
val userTag = if (metricName.tags.containsKey("user")) metricName.tags.get("user") else ""
val clientIdTag = if (metricName.tags.containsKey("client-id")) metricName.tags.get("client-id") else ""
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 94ae419..f1a9506 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -19,8 +19,8 @@ package kafka.server
import java.nio.ByteBuffer
import java.lang.{Long => JLong, Short => JShort}
+import java.util.{Collections, Properties}
import java.util
-import java.util.Properties
import kafka.admin.{AdminUtils, RackAwareMode}
import kafka.api.{ControlledShutdownRequest, ControlledShutdownResponse, FetchResponsePartitionData}
@@ -345,10 +345,10 @@ class KafkaApis(val requestChannel: RequestChannel,
// the callback for sending a produce response
def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
- val mergedResponseStatus = responseStatus ++
+ val mergedResponseStatus = responseStatus ++
unauthorizedForWriteRequestInfo.mapValues(_ =>
- new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, Message.NoTimestamp)) ++
- nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ =>
+ new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, Message.NoTimestamp)) ++
+ nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ =>
new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, -1, Message.NoTimestamp))
var errorInResponse = false
@@ -1002,15 +1002,13 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleJoinGroupRequest(request: RequestChannel.Request) {
- import JavaConversions._
-
val joinGroupRequest = request.body.asInstanceOf[JoinGroupRequest]
// the callback for sending a join-group response
def sendResponseCallback(joinResult: JoinGroupResult) {
val members = joinResult.members map { case (memberId, metadataArray) => (memberId, ByteBuffer.wrap(metadataArray)) }
val responseBody = new JoinGroupResponse(request.header.apiVersion, joinResult.errorCode, joinResult.generationId,
- joinResult.subProtocol, joinResult.memberId, joinResult.leaderId, members)
+ joinResult.subProtocol, joinResult.memberId, joinResult.leaderId, members.asJava)
trace("Sending join group response %s for correlation id %d to client %s."
.format(responseBody, request.header.correlationId, request.header.clientId))
@@ -1025,11 +1023,11 @@ class KafkaApis(val requestChannel: RequestChannel,
JoinGroupResponse.UNKNOWN_PROTOCOL,
JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
- Map.empty[String, ByteBuffer])
+ Collections.emptyMap())
requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
} else {
// let the coordinator to handle join-group
- val protocols = joinGroupRequest.groupProtocols().map(protocol =>
+ val protocols = joinGroupRequest.groupProtocols().asScala.map(protocol =>
(protocol.name, Utils.toArray(protocol.metadata))).toList
coordinator.handleJoinGroup(
joinGroupRequest.groupId,
@@ -1045,8 +1043,6 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleSyncGroupRequest(request: RequestChannel.Request) {
- import JavaConversions._
-
val syncGroupRequest = request.body.asInstanceOf[SyncGroupRequest]
def sendResponseCallback(memberState: Array[Byte], errorCode: Short) {
@@ -1061,7 +1057,7 @@ class KafkaApis(val requestChannel: RequestChannel,
syncGroupRequest.groupId(),
syncGroupRequest.generationId(),
syncGroupRequest.memberId(),
- syncGroupRequest.groupAssignment().mapValues(Utils.toArray),
+ syncGroupRequest.groupAssignment().asScala.mapValues(Utils.toArray),
sendResponseCallback
)
}
@@ -1189,7 +1185,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val (authorizedTopics, unauthorizedForDeleteTopics) = existingAndAuthorizedForDescribeTopics.partition { topic =>
authorize(request.session, Delete, new Resource(auth.Topic, topic))
}
-
+
def sendResponseCallback(results: Map[String, Errors]): Unit = {
val completeResults = nonExistingOrUnauthorizedForDescribeTopics.map(topic => (topic, Errors.UNKNOWN_TOPIC_OR_PARTITION)).toMap ++
unauthorizedForDeleteTopics.map(topic => (topic, Errors.TOPIC_AUTHORIZATION_FAILED)).toMap ++ results
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 80568b3..08e50dd 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -33,8 +33,8 @@ import org.apache.kafka.common.metrics.MetricsReporter
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.kafka.common.record.TimestampType
-import scala.collection.{JavaConverters, Map, immutable}
-import JavaConverters._
+import scala.collection.{Map, immutable}
+import scala.collection.JavaConverters._
object Defaults {
/** ********* Zookeeper Configuration ***********/
@@ -757,20 +757,7 @@ object KafkaConfig {
}
- def configNames() = {
- import scala.collection.JavaConversions._
- configDef.names().toList.sorted
- }
-
- /**
- * Check that property names are valid
- */
- def validateNames(props: Properties) {
- import scala.collection.JavaConversions._
- val names = configDef.names()
- for (name <- props.keys)
- require(names.contains(name), "Unknown Kafka configuration \"%s\".".format(name))
- }
+ def configNames() = configDef.names().asScala.toList.sorted
def fromProps(props: Properties): KafkaConfig =
fromProps(props, true)
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/tools/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index c536359..c6dcfce 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -29,6 +29,8 @@ import joptsimple._
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.utils.Utils
+import scala.collection.JavaConverters._
+
object ConsoleProducer {
def main(args: Array[String]) {
@@ -251,7 +253,6 @@ object ConsoleProducer {
CommandLineUtils.printUsageAndDie(parser, "Read data from standard input and publish it to Kafka.")
CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, brokerListOpt)
- import scala.collection.JavaConversions._
val useOldProducer = options.has(useOldProducerOpt)
val topic = options.valueOf(topicOpt)
val brokerList = options.valueOf(brokerListOpt)
@@ -275,8 +276,8 @@ object ConsoleProducer {
val valueEncoderClass = options.valueOf(valueEncoderOpt)
val readerClass = options.valueOf(messageReaderOpt)
val socketBuffer = options.valueOf(socketBufferSizeOpt)
- val cmdLineProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt))
- val extraProducerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(producerPropertyOpt))
+ val cmdLineProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt).asScala)
+ val extraProducerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(producerPropertyOpt).asScala)
/* new producer related configs */
val maxMemoryBytes = options.valueOf(maxMemoryBytesOpt)
val maxPartitionMemoryBytes = options.valueOf(maxPartitionMemoryBytesOpt)
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index 2b3f56d..b7087f2 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -19,16 +19,18 @@ package kafka.tools
import java.util
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import java.util.concurrent.atomic.AtomicLong
import java.nio.channels.ClosedByInterruptException
+
import org.apache.log4j.Logger
import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, KafkaConsumer}
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.TopicPartition
import kafka.utils.CommandLineUtils
-import java.util.{ Random, Properties }
+import java.util.{Collections, Properties, Random}
+
import kafka.consumer.Consumer
import kafka.consumer.ConsumerConnector
import kafka.consumer.KafkaStream
@@ -60,7 +62,7 @@ object ConsumerPerformance {
var startMs, endMs = 0L
if (!config.useOldConsumer) {
val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](config.props)
- consumer.subscribe(List(config.topic))
+ consumer.subscribe(Collections.singletonList(config.topic))
startMs = System.currentTimeMillis
consume(consumer, List(config.topic), config.numMessages, 1000, config, totalMessagesRead, totalBytesRead)
endMs = System.currentTimeMillis
@@ -105,7 +107,7 @@ object ConsumerPerformance {
// Wait for group join, metadata fetch, etc
val joinTimeout = 10000
val isAssigned = new AtomicBoolean(false)
- consumer.subscribe(topics, new ConsumerRebalanceListener {
+ consumer.subscribe(topics.asJava, new ConsumerRebalanceListener {
def onPartitionsAssigned(partitions: util.Collection[TopicPartition]) {
isAssigned.set(true)
}
@@ -119,7 +121,7 @@ object ConsumerPerformance {
}
consumer.poll(100)
}
- consumer.seekToBeginning(List[TopicPartition]())
+ consumer.seekToBeginning(Collections.emptyList())
// Now start the benchmark
val startMs = System.currentTimeMillis
@@ -128,9 +130,9 @@ object ConsumerPerformance {
var currentTimeMillis = lastConsumedTime
while (messagesRead < count && currentTimeMillis - lastConsumedTime <= timeout) {
- val records = consumer.poll(100)
+ val records = consumer.poll(100).asScala
currentTimeMillis = System.currentTimeMillis
- if (records.count() > 0)
+ if (records.nonEmpty)
lastConsumedTime = currentTimeMillis
for (record <- records) {
messagesRead += 1
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/tools/EndToEndLatency.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/EndToEndLatency.scala b/core/src/main/scala/kafka/tools/EndToEndLatency.scala
index 9aaad3e..b9c9e3e 100755
--- a/core/src/main/scala/kafka/tools/EndToEndLatency.scala
+++ b/core/src/main/scala/kafka/tools/EndToEndLatency.scala
@@ -17,14 +17,13 @@
package kafka.tools
-import java.util.{Arrays, Properties}
+import java.util.{Arrays, Collections, Properties}
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.common.TopicPartition
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.util.Random
@@ -70,7 +69,7 @@ object EndToEndLatency {
consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0") //ensure we have no temporal batching
val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps)
- consumer.subscribe(List(topic))
+ consumer.subscribe(Collections.singletonList(topic))
val producerProps = loadProps
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
@@ -89,7 +88,7 @@ object EndToEndLatency {
//Ensure we are at latest offset. seekToEnd evaluates lazily, that is to say actually performs the seek only when
//a poll() or position() request is issued. Hence we need to poll after we seek to ensure we see our first write.
- consumer.seekToEnd(List[TopicPartition]())
+ consumer.seekToEnd(Collections.emptyList())
consumer.poll(0)
var totalTime = 0.0
@@ -122,7 +121,7 @@ object EndToEndLatency {
//Check we only got the one message
if (recordIter.hasNext) {
- val count = 1 + recordIter.size
+ val count = 1 + recordIter.asScala.size
throw new RuntimeException(s"Only one result was expected during this test. We found [$count]")
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
index 907e8ac..0b11ec8 100644
--- a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
+++ b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
@@ -20,8 +20,8 @@ package kafka.tools
import java.io.FileWriter
import joptsimple._
import kafka.utils.{Logging, ZkUtils, ZKGroupTopicDirs, CommandLineUtils}
-import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.security.JaasUtils
+import scala.collection.JavaConverters._
/**
@@ -88,8 +88,7 @@ object ExportZkOffsets extends Logging {
consumerGroups = zkUtils.getChildren(ZkUtils.ConsumersPath).toList
}
else {
- import scala.collection.JavaConversions._
- consumerGroups = groups
+ consumerGroups = groups.asScala
}
for (consumerGrp <- consumerGroups) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/tools/JmxTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala
index 1dcfb19..3538874 100644
--- a/core/src/main/scala/kafka/tools/JmxTool.scala
+++ b/core/src/main/scala/kafka/tools/JmxTool.scala
@@ -23,7 +23,7 @@ import java.text.SimpleDateFormat
import javax.management._
import javax.management.remote._
import joptsimple.OptionParser
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.math._
import kafka.utils.{CommandLineUtils, Logging}
@@ -85,11 +85,11 @@ object JmxTool extends Logging {
val queries: Iterable[ObjectName] =
if(options.has(objectNameOpt))
- options.valuesOf(objectNameOpt).map(new ObjectName(_))
+ options.valuesOf(objectNameOpt).asScala.map(new ObjectName(_))
else
List(null)
- val names = queries.flatMap((name: ObjectName) => mbsc.queryNames(name, null): mutable.Set[ObjectName])
+ val names = queries.flatMap((name: ObjectName) => mbsc.queryNames(name, null).asScala)
val numExpectedAttributes: Map[ObjectName, Int] =
if (attributesWhitelistExists)
@@ -123,7 +123,7 @@ object JmxTool extends Logging {
var attributes = new mutable.HashMap[String, Any]()
for(name <- names) {
val mbean = mbsc.getMBeanInfo(name)
- for(attrObj <- mbsc.getAttributes(name, mbean.getAttributes.map(_.getName))) {
+ for(attrObj <- mbsc.getAttributes(name, mbean.getAttributes.map(_.getName)).asScala) {
val attr = attrObj.asInstanceOf[Attribute]
attributesWhitelist match {
case Some(allowedAttributes) =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 1a6ba69..2cfcb95 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -40,7 +40,7 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.errors.WakeupException
import org.apache.kafka.common.record.Record
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
import scala.util.control.ControlThrowable
import org.apache.kafka.clients.consumer.{ConsumerConfig => NewConsumerConfig}
@@ -431,7 +431,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
val data = mirrorMakerConsumer.receive()
trace("Sending message with value size %d and offset %d".format(data.value.length, data.offset))
val records = messageHandler.handle(data)
- records.foreach(producer.send)
+ records.asScala.foreach(producer.send)
maybeFlushAndCommitOffsets()
}
} catch {
@@ -607,7 +607,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
}
override def commit() {
- consumer.commitSync(offsets.map { case (tp, offset) => (tp, new OffsetAndMetadata(offset, ""))})
+ consumer.commitSync(offsets.map { case (tp, offset) => (tp, new OffsetAndMetadata(offset, ""))}.asJava)
offsets.clear()
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
index 4e2c7ef..049f129 100644
--- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
+++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
@@ -24,6 +24,7 @@ import kafka.consumer._
import kafka.utils.{ToolsUtils, CommandLineUtils, Logging, ZkUtils}
import kafka.api.OffsetRequest
import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer, ProducerConfig}
+import scala.collection.JavaConverters._
object ReplayLogProducer extends Logging {
@@ -114,8 +115,7 @@ object ReplayLogProducer extends Logging {
val outputTopic = options.valueOf(outputTopicOpt)
val reportingInterval = options.valueOf(reportingIntervalOpt).intValue
val isSync = options.has(syncOpt)
- import scala.collection.JavaConversions._
- val producerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt))
+ val producerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt).asScala)
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index 9368bda..1ce0289 100755
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -24,7 +24,7 @@ import kafka.client.ClientUtils
import kafka.api.{FetchRequestBuilder, OffsetRequest, Request}
import kafka.cluster.BrokerEndPoint
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import kafka.common.{MessageFormatter, TopicAndPartition}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.utils.Utils
@@ -117,7 +117,7 @@ object SimpleConsumerShell extends Logging {
val noWaitAtEndOfLog = options.has(noWaitAtEndOfLogOpt)
val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))
- val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt))
+ val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt).asScala)
val fetchRequestBuilder = new FetchRequestBuilder()
.clientId(clientId)
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/utils/Pool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Pool.scala b/core/src/main/scala/kafka/utils/Pool.scala
index f4aa628..1d96238 100644
--- a/core/src/main/scala/kafka/utils/Pool.scala
+++ b/core/src/main/scala/kafka/utils/Pool.scala
@@ -17,10 +17,10 @@
package kafka.utils
-import java.util.ArrayList
import java.util.concurrent._
+
import collection.mutable
-import collection.JavaConversions
+import collection.JavaConverters._
import kafka.common.KafkaException
class Pool[K,V](valueFactory: Option[(K) => V] = None) extends Iterable[(K, V)] {
@@ -72,16 +72,10 @@ class Pool[K,V](valueFactory: Option[(K) => V] = None) extends Iterable[(K, V)]
def remove(key: K, value: V): Boolean = pool.remove(key, value)
- def keys: mutable.Set[K] = {
- import JavaConversions._
- pool.keySet()
- }
-
- def values: Iterable[V] = {
- import JavaConversions._
- new ArrayList[V](pool.values())
- }
-
+ def keys: mutable.Set[K] = pool.keySet().asScala
+
+ def values: Iterable[V] = pool.values().asScala
+
def clear() { pool.clear() }
override def size = pool.size
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/utils/VerifiableProperties.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/VerifiableProperties.scala b/core/src/main/scala/kafka/utils/VerifiableProperties.scala
index 9600b0a..de4f654 100755
--- a/core/src/main/scala/kafka/utils/VerifiableProperties.scala
+++ b/core/src/main/scala/kafka/utils/VerifiableProperties.scala
@@ -21,11 +21,12 @@ import java.util.Properties
import java.util.Collections
import scala.collection._
import kafka.message.{CompressionCodec, NoCompressionCodec}
+import scala.collection.JavaConverters._
class VerifiableProperties(val props: Properties) extends Logging {
private val referenceSet = mutable.HashSet[String]()
-
+
def this() = this(new Properties)
def containsKey(name: String): Boolean = {
@@ -215,10 +216,7 @@ class VerifiableProperties(val props: Properties) extends Logging {
def verify() {
info("Verifying properties")
- val propNames = {
- import JavaConversions._
- Collections.list(props.propertyNames).map(_.toString).sorted
- }
+ val propNames = Collections.list(props.propertyNames).asScala.map(_.toString).sorted
for(key <- propNames) {
if (!referenceSet.contains(key) && !key.startsWith("external"))
warn("Property %s is not valid".format(key))
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 787cb8f..de56fe2 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -38,6 +38,7 @@ import org.apache.zookeeper.data.{ACL, Stat}
import org.apache.zookeeper.{CreateMode, KeeperException, ZooDefs, ZooKeeper}
import scala.collection._
+import scala.collection.JavaConverters._
object ZkUtils {
val ConsumersPath = "/consumers"
@@ -630,17 +631,11 @@ class ZkUtils(val zkClient: ZkClient,
dataAndStat
}
- def getChildren(path: String): Seq[String] = {
- import scala.collection.JavaConversions._
- // triggers implicit conversion from java list to scala Seq
- zkClient.getChildren(path)
- }
+ def getChildren(path: String): Seq[String] = zkClient.getChildren(path).asScala
def getChildrenParentMayNotExist(path: String): Seq[String] = {
- import scala.collection.JavaConversions._
- // triggers implicit conversion from java list to scala Seq
try {
- zkClient.getChildren(path)
+ zkClient.getChildren(path).asScala
} catch {
case _: ZkNoNodeException => Nil
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
index f13f59f..a62922c 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
@@ -16,15 +16,16 @@
*/
package kafka.api
+import java.util.Collections
+
import kafka.admin.AdminClient
import kafka.server.KafkaConfig
-import kafka.utils.{TestUtils, Logging}
+import kafka.utils.{Logging, TestUtils}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.TopicPartition
import org.junit.{Before, Test}
import org.junit.Assert._
-import scala.collection.JavaConversions._
class AdminClientTest extends IntegrationTestHarness with Logging {
@@ -63,7 +64,7 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
@Test
def testListGroups() {
- consumers.head.subscribe(List(topic))
+ consumers.head.subscribe(Collections.singletonList(topic))
TestUtils.waitUntilTrue(() => {
consumers.head.poll(0)
!consumers.head.assignment.isEmpty
@@ -78,7 +79,7 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
@Test
def testGetConsumerGroupSummary() {
- consumers.head.subscribe(List(topic))
+ consumers.head.subscribe(Collections.singletonList(topic))
TestUtils.waitUntilTrue(() => {
consumers.head.poll(0)
!consumers.head.assignment.isEmpty
@@ -97,7 +98,7 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
@Test
def testDescribeConsumerGroup() {
- consumers.head.subscribe(List(topic))
+ consumers.head.subscribe(Collections.singletonList(topic))
TestUtils.waitUntilTrue(() => {
consumers.head.poll(0)
!consumers.head.assignment.isEmpty
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 0900d43..3b81e25 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -13,18 +13,17 @@
package kafka.api
-import java.util
+import java.util.Collections
import kafka.server.KafkaConfig
import kafka.utils.{Logging, ShutdownableThread, TestUtils}
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
-import org.apache.kafka.common.errors.{IllegalGenerationException, UnknownMemberIdException}
import org.apache.kafka.common.TopicPartition
import org.junit.Assert._
-import org.junit.{Test, Before}
+import org.junit.{Before, Test}
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
@@ -81,13 +80,13 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
var consumed = 0L
val consumer = this.consumers.head
- consumer.subscribe(List(topic))
+ consumer.subscribe(Collections.singletonList(topic))
val scheduler = new BounceBrokerScheduler(numIters)
scheduler.start()
while (scheduler.isRunning.get()) {
- for (record <- consumer.poll(100)) {
+ for (record <- consumer.poll(100).asScala) {
assertEquals(consumed, record.offset())
consumed += 1
}
@@ -97,7 +96,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
assertEquals(consumer.position(tp), consumer.committed(tp).offset)
if (consumer.position(tp) == numRecords) {
- consumer.seekToBeginning(List[TopicPartition]())
+ consumer.seekToBeginning(Collections.emptyList())
consumed = 0
}
} catch {
@@ -118,7 +117,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
this.producers.foreach(_.close)
val consumer = this.consumers.head
- consumer.assign(List(tp))
+ consumer.assign(Collections.singletonList(tp))
consumer.seek(tp, 0)
// wait until all the followers have synced the last HW with leader
@@ -133,7 +132,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
val coin = TestUtils.random.nextInt(3)
if (coin == 0) {
info("Seeking to end of log")
- consumer.seekToEnd(List[TopicPartition]())
+ consumer.seekToEnd(Collections.emptyList())
assertEquals(numRecords.toLong, consumer.position(tp))
} else if (coin == 1) {
val pos = TestUtils.random.nextInt(numRecords).toLong
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
index 6ccac29..7cb5b6e 100644
--- a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
+++ b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
@@ -27,7 +27,7 @@ import kafka.server.{DelayedOperationPurgatory, DelayedOperation}
import kafka.utils._
import scala.math._
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
/**
* This is a benchmark test of the purgatory.
@@ -90,7 +90,7 @@ object TestPurgatoryPerformance {
val pct50 = options.valueOf(pct50Opt).doubleValue
val verbose = options.valueOf(verboseOpt).booleanValue
- val gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans().sortBy(_.getName)
+ val gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans().asScala.sortBy(_.getName)
val osMXBean = ManagementFactory.getOperatingSystemMXBean
val latencySamples = new LatencySamples(1000000, pct75, pct50)
val intervalSamples = new IntervalSamples(1000000, requestRate)
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
index 80f809e..4a1be11 100644
--- a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -22,15 +22,13 @@ import org.scalatest.junit.JUnitSuite
import org.junit.Test
import kafka.utils.TestUtils
import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, CompressionCodec, Message}
+import scala.collection.JavaConverters._
trait BaseMessageSetTestCases extends JUnitSuite {
val messages = Array(new Message("abcd".getBytes()), new Message("efgh".getBytes()))
def createMessageSet(messages: Seq[Message], compressed: CompressionCodec = NoCompressionCodec): MessageSet
- def toMessageIterator(messageSet: MessageSet): Iterator[Message] = {
- import scala.collection.JavaConversions._
- messageSet.map(m => m.message).iterator
- }
+ def toMessageIterator(messageSet: MessageSet): Iterator[Message] = messageSet.asScala.map(m => m.message).iterator
@Test
def testWrittenEqualsRead {
@@ -40,7 +38,6 @@ trait BaseMessageSetTestCases extends JUnitSuite {
@Test
def testIteratorIsConsistent() {
- import scala.collection.JavaConversions._
val m = createMessageSet(messages)
// two iterators over the same set should give the same results
TestUtils.checkEquals(m.iterator, m.iterator)
@@ -48,7 +45,6 @@ trait BaseMessageSetTestCases extends JUnitSuite {
@Test
def testIteratorIsConsistentWithCompression() {
- import scala.collection.JavaConversions._
val m = createMessageSet(messages, DefaultCompressionCodec)
// two iterators over the same set should give the same results
TestUtils.checkEquals(m.iterator, m.iterator)
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
index 7487bc5..a050bb3 100755
--- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
@@ -28,7 +28,7 @@ import org.junit.runners.Parameterized.Parameters
import org.apache.kafka.common.record.CompressionType
import org.apache.kafka.common.utils.Utils
import java.util.{Collection, Properties}
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
@RunWith(value = classOf[Parameterized])
class BrokerCompressionTest(messageCompression: String, brokerCompression: String) extends JUnitSuite {
@@ -73,8 +73,8 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin
object BrokerCompressionTest {
@Parameters
def parameters: Collection[Array[String]] = {
- for (brokerCompression <- BrokerCompressionCodec.brokerCompressionOptions;
+ (for (brokerCompression <- BrokerCompressionCodec.brokerCompressionOptions;
messageCompression <- CompressionType.values
- ) yield Array(messageCompression.name, brokerCompression)
+ ) yield Array(messageCompression.name, brokerCompression)).asJava
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0e7ba700/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index d82ec58..16f0636 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -31,7 +31,7 @@ import kafka.admin.AdminUtils
import kafka.utils.TestUtils._
import scala.collection._
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.util.matching.Regex
import kafka.consumer.{ConsumerConfig, ZookeeperConsumerConnector}
@@ -96,7 +96,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
def testClusterIdMetric(): Unit ={
// Check if clusterId metric exists.
val metrics = Metrics.defaultRegistry().allMetrics
- assertEquals(metrics.keySet.filter(_.getMBeanName().equals("kafka.server:type=KafkaServer,name=ClusterId")).size, 1)
+ assertEquals(metrics.keySet.asScala.count(_.getMBeanName().equals("kafka.server:type=KafkaServer,name=ClusterId")), 1)
}
@deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0")
@@ -114,7 +114,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
private def checkTopicMetricsExists(topic: String): Boolean = {
val topicMetricRegex = new Regex(".*("+topic+")$")
val metricGroups = Metrics.defaultRegistry().groupedMetrics(MetricPredicate.ALL).entrySet()
- for(metricGroup <- metricGroups) {
+ for(metricGroup <- metricGroups.asScala) {
if (topicMetricRegex.pattern.matcher(metricGroup.getKey()).matches)
return true
}