You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/07/14 17:02:55 UTC
kafka git commit: KAFKA-5127;
Replace pattern matching with foreach where the case None is ignored
Repository: kafka
Updated Branches:
refs/heads/trunk 2d2e9adb5 -> e39104547
KAFKA-5127; Replace pattern matching with foreach where the case None is ignored
Author: Balint Molnar <ba...@gmail.com>
Reviewers: Vahid Hashemian <va...@us.ibm.com>, Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>
Closes #2919 from baluchicken/KAFKA-5127
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e3910454
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e3910454
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e3910454
Branch: refs/heads/trunk
Commit: e391045473f258562d55bbc07faafe75ec7213ac
Parents: 2d2e9ad
Author: Balint Molnar <ba...@gmail.com>
Authored: Fri Jul 14 09:57:01 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Jul 14 09:57:14 2017 -0700
----------------------------------------------------------------------
.../consumer/ZookeeperConsumerConnector.scala | 35 +++++-------
.../kafka/controller/KafkaController.scala | 44 +++++++--------
.../kafka/controller/ReplicaStateMachine.scala | 2 +-
.../scala/kafka/metrics/KafkaMetricsGroup.scala | 6 +-
.../kafka/network/RequestOrResponseSend.scala | 6 +-
.../producer/async/DefaultEventHandler.scala | 5 +-
.../main/scala/kafka/security/auth/Acl.scala | 26 ++++-----
.../main/scala/kafka/server/KafkaServer.scala | 58 ++++++++++----------
.../kafka/tools/ConsumerOffsetChecker.scala | 47 ++++++----------
.../kafka/tools/StateChangeLogMerger.scala | 18 +++---
.../scala/kafka/utils/ReplicationUtils.scala | 18 +++---
core/src/main/scala/kafka/utils/ZkUtils.scala | 48 +++++++---------
.../kafka/api/ConsumerBounceTest.scala | 10 ++--
.../util/ReplicaFetcherMockBlockingSend.scala | 5 +-
14 files changed, 133 insertions(+), 195 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index ba2fce1..cdf730f 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -220,10 +220,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
try {
if (config.autoCommitEnable)
scheduler.shutdown()
- fetcher match {
- case Some(f) => f.stopConnections
- case None =>
- }
+ fetcher.foreach(_.stopConnections())
sendShutdownToAllQueues()
if (config.autoCommitEnable)
commitOffsets(true)
@@ -780,23 +777,21 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
messageStreams: Map[String,List[KafkaStream[_,_]]],
queuesToBeCleared: Iterable[BlockingQueue[FetchedDataChunk]]) {
val allPartitionInfos = topicRegistry.values.map(p => p.values).flatten
- fetcher match {
- case Some(f) =>
- f.stopConnections
- clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, messageStreams)
- /**
- * here, we need to commit offsets before stopping the consumer from returning any more messages
- * from the current data chunk. Since partition ownership is not yet released, this commit offsets
- * call will ensure that the offsets committed now will be used by the next consumer thread owning the partition
- * for the current data chunk. Since the fetchers are already shutdown and this is the last chunk to be iterated
- * by the consumer, there will be no more messages returned by this iterator until the rebalancing finishes
- * successfully and the fetchers restart to fetch more data chunks
- **/
+ fetcher.foreach { f =>
+ f.stopConnections()
+ clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, messageStreams)
+ /**
+ * here, we need to commit offsets before stopping the consumer from returning any more messages
+ * from the current data chunk. Since partition ownership is not yet released, this commit offsets
+ * call will ensure that the offsets committed now will be used by the next consumer thread owning the partition
+ * for the current data chunk. Since the fetchers are already shutdown and this is the last chunk to be iterated
+ * by the consumer, there will be no more messages returned by this iterator until the rebalancing finishes
+ * successfully and the fetchers restart to fetch more data chunks
+ **/
if (config.autoCommitEnable) {
info("Committing all offsets after clearing the fetcher queues")
commitOffsets(true)
}
- case None =>
}
}
@@ -833,11 +828,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
info("Consumer " + consumerIdString + " selected partitions : " +
allPartitionInfos.sortWith((s,t) => s.partitionId < t.partitionId).map(_.toString).mkString(","))
- fetcher match {
- case Some(f) =>
- f.startConnections(allPartitionInfos, cluster)
- case None =>
- }
+ fetcher.foreach(_.startConnections(allPartitionInfos, cluster))
}
private def reflectPartitionOwnershipDecision(partitionAssignment: Map[TopicAndPartition, ConsumerThreadId]): Boolean = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index e7f98e5..0ba412b 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1273,29 +1273,27 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
override def process(): Unit = {
if (!isActive) return
// check if this partition is still being reassigned or not
- controllerContext.partitionsBeingReassigned.get(topicAndPartition) match {
- case Some(reassignedPartitionContext) =>
- // need to re-read leader and isr from zookeeper since the zkclient callback doesn't return the Stat object
- val newLeaderAndIsrOpt = zkUtils.getLeaderAndIsrForPartition(topicAndPartition.topic, topicAndPartition.partition)
- newLeaderAndIsrOpt match {
- case Some(leaderAndIsr) => // check if new replicas have joined ISR
- val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet
- if(caughtUpReplicas == reassignedReplicas) {
- // resume the partition reassignment process
- info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
- .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) +
- "Resuming partition reassignment")
- onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
- }
- else {
- info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
- .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) +
- "Replica(s) %s still need to catch up".format((reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")))
- }
- case None => error("Error handling reassignment of partition %s to replicas %s as it was never created"
- .format(topicAndPartition, reassignedReplicas.mkString(",")))
- }
- case None =>
+ controllerContext.partitionsBeingReassigned.get(topicAndPartition).foreach { reassignedPartitionContext =>
+ // need to re-read leader and isr from zookeeper since the zkclient callback doesn't return the Stat object
+ val newLeaderAndIsrOpt = zkUtils.getLeaderAndIsrForPartition(topicAndPartition.topic, topicAndPartition.partition)
+ newLeaderAndIsrOpt match {
+ case Some(leaderAndIsr) => // check if new replicas have joined ISR
+ val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet
+ if(caughtUpReplicas == reassignedReplicas) {
+ // resume the partition reassignment process
+ info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
+ .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) +
+ "Resuming partition reassignment")
+ onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
+ }
+ else {
+ info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
+ .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) +
+ "Replica(s) %s still need to catch up".format((reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")))
+ }
+ case None => error("Error handling reassignment of partition %s to replicas %s as it was never created"
+ .format(topicAndPartition, reassignedReplicas.mkString(",")))
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 60b9990..43fac19 100755
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -198,7 +198,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
.format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
case None => // that means the partition was never in OnlinePartition state, this means the broker never
- // started a log for that partition and does not have a high watermark value for this partition
+ // started a log for that partition and does not have a high watermark value for this partition
}
}
replicaState.put(partitionAndReplica, OnlineReplica)
http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index 0847625..ca623ae 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -68,11 +68,7 @@ trait KafkaMetricsGroup extends Logging {
val scope: String = KafkaMetricsGroup.toScope(tags).getOrElse(null)
val tagsName = KafkaMetricsGroup.toMBeanName(tags)
- tagsName match {
- case Some(tn) =>
- nameBuilder.append(",").append(tn)
- case None =>
- }
+ tagsName.foreach(nameBuilder.append(",").append(_))
new MetricName(group, typeName, name, scope, nameBuilder.toString())
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/main/scala/kafka/network/RequestOrResponseSend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestOrResponseSend.scala b/core/src/main/scala/kafka/network/RequestOrResponseSend.scala
index 1bfbf53..7a14e5e 100644
--- a/core/src/main/scala/kafka/network/RequestOrResponseSend.scala
+++ b/core/src/main/scala/kafka/network/RequestOrResponseSend.scala
@@ -27,11 +27,7 @@ import org.apache.kafka.common.network.NetworkSend
object RequestOrResponseSend {
def serialize(request: RequestOrResponse): ByteBuffer = {
val buffer = ByteBuffer.allocate(request.sizeInBytes + request.requestId.fold(0)(_ => 2))
- request.requestId match {
- case Some(requestId) =>
- buffer.putShort(requestId)
- case None =>
- }
+ request.requestId.foreach(buffer.putShort)
request.writeTo(buffer)
buffer.rewind()
buffer
http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 77c3b7d..3e4eaa3 100755
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -114,10 +114,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
case Some(messageSetPerBroker) =>
val failedTopicPartitions = send(brokerid, messageSetPerBroker)
failedTopicPartitions.foreach(topicPartition => {
- messagesPerBrokerMap.get(topicPartition) match {
- case Some(data) => failedProduceRequests.appendAll(data)
- case None => // nothing
- }
+ messagesPerBrokerMap.get(topicPartition).foreach(failedProduceRequests.appendAll)
})
case None => // failed to group messages
messagesPerBrokerMap.values.foreach(m => failedProduceRequests.appendAll(m))
http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/main/scala/kafka/security/auth/Acl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/Acl.scala b/core/src/main/scala/kafka/security/auth/Acl.scala
index c23dd2d..f99a088 100644
--- a/core/src/main/scala/kafka/security/auth/Acl.scala
+++ b/core/src/main/scala/kafka/security/auth/Acl.scala
@@ -57,20 +57,18 @@ object Acl {
return collection.immutable.Set.empty[Acl]
var acls: collection.mutable.HashSet[Acl] = new collection.mutable.HashSet[Acl]()
- Json.parseFull(aclJson) match {
- case Some(m) =>
- val aclMap = m.asInstanceOf[Map[String, Any]]
- //the acl json version.
- require(aclMap(VersionKey) == CurrentVersion)
- val aclSet: List[Map[String, Any]] = aclMap(AclsKey).asInstanceOf[List[Map[String, Any]]]
- aclSet.foreach(item => {
- val principal: KafkaPrincipal = KafkaPrincipal.fromString(item(PrincipalKey).asInstanceOf[String])
- val permissionType: PermissionType = PermissionType.fromString(item(PermissionTypeKey).asInstanceOf[String])
- val operation: Operation = Operation.fromString(item(OperationKey).asInstanceOf[String])
- val host: String = item(HostsKey).asInstanceOf[String]
- acls += new Acl(principal, permissionType, host, operation)
- })
- case None =>
+ Json.parseFull(aclJson).foreach { m =>
+ val aclMap = m.asInstanceOf[Map[String, Any]]
+ //the acl json version.
+ require(aclMap(VersionKey) == CurrentVersion)
+ val aclSet: List[Map[String, Any]] = aclMap(AclsKey).asInstanceOf[List[Map[String, Any]]]
+ aclSet.foreach(item => {
+ val principal: KafkaPrincipal = KafkaPrincipal.fromString(item(PrincipalKey).asInstanceOf[String])
+ val permissionType: PermissionType = PermissionType.fromString(item(PermissionTypeKey).asInstanceOf[String])
+ val operation: Operation = Operation.fromString(item(OperationKey).asInstanceOf[String])
+ val host: String = item(HostsKey).asInstanceOf[String]
+ acls += new Acl(principal, permissionType, host, operation)
+ })
}
acls.toSet
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 0a87750..cc34e14 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -410,21 +410,20 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
// Get the current controller info. This is to ensure we use the most recent info to issue the
// controlled shutdown request
val controllerId = zkUtils.getController()
- zkUtils.getBrokerInfo(controllerId) match {
- case Some(broker) =>
- // if this is the first attempt, if the controller has changed or if an exception was thrown in a previous
- // attempt, connect to the most recent controller
- if (ioException || broker != prevController) {
+ //If this method returns None ignore and try again
+ zkUtils.getBrokerInfo(controllerId).foreach { broker =>
+ // if this is the first attempt, if the controller has changed or if an exception was thrown in a previous
+ // attempt, connect to the most recent controller
+ if (ioException || broker != prevController) {
- ioException = false
+ ioException = false
- if (prevController != null)
- networkClient.close(node(prevController).idString)
+ if (prevController != null)
+ networkClient.close(node(prevController).idString)
- prevController = broker
- metadataUpdater.setNodes(Seq(node(prevController)).asJava)
- }
- case None => //ignore and try again
+ prevController = broker
+ metadataUpdater.setNodes(Seq(node(prevController)).asJava)
+ }
}
// 2. issue a controlled shutdown to the controller
@@ -483,24 +482,23 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
// Get the current controller info. This is to ensure we use the most recent info to issue the
// controlled shutdown request
val controllerId = zkUtils.getController()
- zkUtils.getBrokerInfo(controllerId) match {
- case Some(broker) =>
- if (channel == null || prevController == null || !prevController.equals(broker)) {
- // if this is the first attempt or if the controller has changed, create a channel to the most recent
- // controller
- if (channel != null)
- channel.disconnect()
-
- val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerListenerName)
- channel = new BlockingChannel(brokerEndPoint.host,
- brokerEndPoint.port,
- BlockingChannel.UseDefaultBufferSize,
- BlockingChannel.UseDefaultBufferSize,
- config.controllerSocketTimeoutMs)
- channel.connect()
- prevController = broker
- }
- case None => //ignore and try again
+ //If this method returns None ignore and try again
+ zkUtils.getBrokerInfo(controllerId).foreach { broker =>
+ if (channel == null || prevController == null || !prevController.equals(broker)) {
+ // if this is the first attempt or if the controller has changed, create a channel to the most recent
+ // controller
+ if (channel != null)
+ channel.disconnect()
+
+ val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerListenerName)
+ channel = new BlockingChannel(brokerEndPoint.host,
+ brokerEndPoint.port,
+ BlockingChannel.UseDefaultBufferSize,
+ BlockingChannel.UseDefaultBufferSize,
+ config.controllerSocketTimeoutMs)
+ channel.connect()
+ prevController = broker
+ }
}
// 2. issue a controlled shutdown to the controller
http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
index d5e29ac..87147dc 100644
--- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
@@ -63,17 +63,15 @@ object ConsumerOffsetChecker extends Logging {
zkUtils.getLeaderForPartition(topic, producerId) match {
case Some(bid) =>
val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(zkUtils, bid))
- consumerOpt match {
- case Some(consumer) =>
- val topicAndPartition = TopicAndPartition(topic, producerId)
- val request =
- OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
- val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
-
- val lagString = offsetOpt.map(o => if (o == -1) "unknown" else (logSize - o).toString)
- println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format(group, topic, producerId, offsetOpt.getOrElse("unknown"), logSize, lagString.getOrElse("unknown"),
- owner match {case Some(ownerStr) => ownerStr case None => "none"}))
- case None => // ignore
+ consumerOpt.foreach { consumer =>
+ val topicAndPartition = TopicAndPartition(topic, producerId)
+ val request =
+ OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
+ val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
+
+ val lagString = offsetOpt.map(o => if (o == -1) "unknown" else (logSize - o).toString)
+ println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format(group, topic, producerId, offsetOpt.getOrElse("unknown"), logSize, lagString.getOrElse("unknown"),
+ owner match {case Some(ownerStr) => ownerStr case None => "none"}))
}
case None =>
println("No broker for partition %s - %s".format(topic, producerId))
@@ -81,22 +79,18 @@ object ConsumerOffsetChecker extends Logging {
}
private def processTopic(zkUtils: ZkUtils, group: String, topic: String) {
- topicPidMap.get(topic) match {
- case Some(producerIds) =>
- producerIds.sorted.foreach {
- producerId => processPartition(zkUtils, group, topic, producerId)
+ topicPidMap.get(topic).foreach { producerIds =>
+ producerIds.sorted.foreach {
+ producerId => processPartition(zkUtils, group, topic, producerId)
}
- case None => // ignore
}
}
private def printBrokerInfo() {
println("BROKER INFO")
for ((bid, consumerOpt) <- consumerMap)
- consumerOpt match {
- case Some(consumer) =>
- println("%s -> %s:%d".format(bid, consumer.host, consumer.port))
- case None => // ignore
+ consumerOpt.foreach { consumer =>
+ println("%s -> %s:%d".format(bid, consumer.host, consumer.port))
}
}
@@ -197,23 +191,14 @@ object ConsumerOffsetChecker extends Logging {
if (options.has("broker-info"))
printBrokerInfo()
- for ((_, consumerOpt) <- consumerMap)
- consumerOpt match {
- case Some(consumer) => consumer.close()
- case None => // ignore
- }
+ consumerMap.values.flatten.foreach(_.close())
}
catch {
case t: Throwable =>
println("Exiting due to: %s.".format(t.getMessage))
}
finally {
- for (consumerOpt <- consumerMap.values) {
- consumerOpt match {
- case Some(consumer) => consumer.close()
- case None => // ignore
- }
- }
+ consumerMap.values.flatten.foreach(_.close())
if (zkUtils != null)
zkUtils.close()
http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
index f2b929a..a3c80d1 100755
--- a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
+++ b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
@@ -167,18 +167,14 @@ object StateChangeLogMerger extends Logging {
def getNextLine(itr: Iterator[String]): LineIterator = {
while (itr != null && itr.hasNext) {
val nextLine = itr.next
- dateRegex.findFirstIn(nextLine) match {
- case Some(d) =>
- val date = dateFormat.parse(d)
- if ((date.equals(startDate) || date.after(startDate)) && (date.equals(endDate) || date.before(endDate))) {
- topicPartitionRegex.findFirstMatchIn(nextLine) match {
- case Some(matcher) =>
- if ((topic == null || topic == matcher.group(1)) && (partitions.isEmpty || partitions.contains(matcher.group(3).toInt)))
- return new LineIterator(nextLine, itr)
- case None =>
- }
+ dateRegex.findFirstIn(nextLine).foreach { d =>
+ val date = dateFormat.parse(d)
+ if ((date.equals(startDate) || date.after(startDate)) && (date.equals(endDate) || date.before(endDate))) {
+ topicPartitionRegex.findFirstMatchIn(nextLine).foreach { matcher =>
+ if ((topic == null || topic == matcher.group(1)) && (partitions.isEmpty || partitions.contains(matcher.group(3).toInt)))
+ return new LineIterator(nextLine, itr)
}
- case None =>
+ }
}
}
new LineIterator()
http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/main/scala/kafka/utils/ReplicationUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
index c0cb5aa..fe31d7f 100644
--- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala
+++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
@@ -53,16 +53,14 @@ object ReplicationUtils extends Logging {
val writtenLeaderOpt = writtenLeaderAndIsrInfo._1
val writtenStat = writtenLeaderAndIsrInfo._2
val expectedLeader = parseLeaderAndIsr(expectedLeaderAndIsrInfo, path, writtenStat)
- writtenLeaderOpt match {
- case Some(writtenData) =>
- val writtenLeader = parseLeaderAndIsr(writtenData, path, writtenStat)
- (expectedLeader,writtenLeader) match {
- case (Some(expectedLeader),Some(writtenLeader)) =>
- if(expectedLeader == writtenLeader)
- return (true, writtenStat.getVersion())
- case _ =>
- }
- case None =>
+ writtenLeaderOpt.foreach { writtenData =>
+ val writtenLeader = parseLeaderAndIsr(writtenData, path, writtenStat)
+ (expectedLeader,writtenLeader) match {
+ case (Some(expectedLeader),Some(writtenLeader)) =>
+ if(expectedLeader == writtenLeader)
+ return (true, writtenStat.getVersion())
+ case _ =>
+ }
}
} catch {
case _: Exception =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index e03893c..0035120 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -187,18 +187,14 @@ object ZkUtils {
def parseTopicsData(jsonData: String): Seq[String] = {
var topics = List.empty[String]
- Json.parseFull(jsonData) match {
- case Some(m) =>
- m.asInstanceOf[Map[String, Any]].get("topics") match {
- case Some(partitionsSeq) =>
- val mapPartitionSeq = partitionsSeq.asInstanceOf[Seq[Map[String, Any]]]
- mapPartitionSeq.foreach(p => {
- val topic = p.get("topic").get.asInstanceOf[String]
- topics ++= List(topic)
- })
- case None =>
- }
- case None =>
+ Json.parseFull(jsonData).foreach { m =>
+ m.asInstanceOf[Map[String, Any]].get("topics").foreach { partitionsSeq =>
+ val mapPartitionSeq = partitionsSeq.asInstanceOf[Seq[Map[String, Any]]]
+ mapPartitionSeq.foreach(p => {
+ val topic = p.get("topic").get.asInstanceOf[String]
+ topics ++= List(topic)
+ })
+ }
}
topics
}
@@ -696,9 +692,8 @@ class ZkUtils(val zkClient: ZkClient,
def getPartitionLeaderAndIsrForTopics(topicAndPartitions: Set[TopicAndPartition]): mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = {
val ret = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
for(topicAndPartition <- topicAndPartitions) {
- ReplicationUtils.getLeaderIsrAndEpochForPartition(this, topicAndPartition.topic, topicAndPartition.partition) match {
- case Some(leaderIsrAndControllerEpoch) => ret.put(topicAndPartition, leaderIsrAndControllerEpoch)
- case None =>
+ ReplicationUtils.getLeaderIsrAndEpochForPartition(this, topicAndPartition.topic, topicAndPartition.partition).foreach { leaderIsrAndControllerEpoch =>
+ ret.put(topicAndPartition, leaderIsrAndControllerEpoch)
}
}
ret
@@ -708,21 +703,16 @@ class ZkUtils(val zkClient: ZkClient,
val ret = new mutable.HashMap[TopicAndPartition, Seq[Int]]
topics.foreach { topic =>
val jsonPartitionMapOpt = readDataMaybeNull(getTopicPath(topic))._1
- jsonPartitionMapOpt match {
- case Some(jsonPartitionMap) =>
- Json.parseFull(jsonPartitionMap) match {
- case Some(m) => m.asInstanceOf[Map[String, Any]].get("partitions") match {
- case Some(repl) =>
- val replicaMap = repl.asInstanceOf[Map[String, Seq[Int]]]
- for((partition, replicas) <- replicaMap){
- ret.put(TopicAndPartition(topic, partition.toInt), replicas)
- debug("Replicas assigned to topic [%s], partition [%s] are [%s]".format(topic, partition, replicas))
- }
- case None =>
- }
- case None =>
+ jsonPartitionMapOpt.foreach { jsonPartitionMap =>
+ Json.parseFull(jsonPartitionMap).foreach { m =>
+ m.asInstanceOf[Map[String, Any]].get("partitions").foreach { repl =>
+ val replicaMap = repl.asInstanceOf[Map[String, Seq[Int]]]
+ for((partition, replicas) <- replicaMap){
+ ret.put(TopicAndPartition(topic, partition.toInt), replicas)
+ debug("Replicas assigned to topic [%s], partition [%s] are [%s]".format(topic, partition, replicas))
+ }
}
- case None =>
+ }
}
}
ret
http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 2fa4d15..d146e9d 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -386,13 +386,11 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
info("Closing consumer with timeout " + closeTimeoutMs + " ms.")
consumer.close(closeTimeoutMs, TimeUnit.MILLISECONDS)
val timeTakenMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime - startNanos)
- maxCloseTimeMs match {
- case Some(ms) => assertTrue("Close took too long " + timeTakenMs, timeTakenMs < ms + closeGraceTimeMs)
- case None =>
+ maxCloseTimeMs.foreach { ms =>
+ assertTrue("Close took too long " + timeTakenMs, timeTakenMs < ms + closeGraceTimeMs)
}
- minCloseTimeMs match {
- case Some(ms) => assertTrue("Close finished too quickly " + timeTakenMs, timeTakenMs >= ms)
- case None =>
+ minCloseTimeMs.foreach { ms =>
+ assertTrue("Close finished too quickly " + timeTakenMs, timeTakenMs >= ms)
}
info("consumer.close() completed in " + timeTakenMs + " ms.")
}, 0)
http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala b/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
index e04bd95..0692afb 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
@@ -48,10 +48,7 @@ class ReplicaFetcherMockBlockingSend(offsets: java.util.Map[TopicPartition, Epoc
//Create a suitable response based on the API key
val response = requestBuilder.apiKey() match {
case ApiKeys.OFFSET_FOR_LEADER_EPOCH =>
- callback match {
- case Some(f) => f()
- case None => //nothing
- }
+ callback.foreach(_.apply())
epochFetchCount += 1
new OffsetsForLeaderEpochResponse(offsets)