You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/07/14 17:02:55 UTC

kafka git commit: KAFKA-5127; Replace pattern matching with foreach where the case None is ignored

Repository: kafka
Updated Branches:
  refs/heads/trunk 2d2e9adb5 -> e39104547


KAFKA-5127; Replace pattern matching with foreach where the case None is ignored

Author: Balint Molnar <ba...@gmail.com>

Reviewers: Vahid Hashemian <va...@us.ibm.com>, Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>

Closes #2919 from baluchicken/KAFKA-5127


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e3910454
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e3910454
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e3910454

Branch: refs/heads/trunk
Commit: e391045473f258562d55bbc07faafe75ec7213ac
Parents: 2d2e9ad
Author: Balint Molnar <ba...@gmail.com>
Authored: Fri Jul 14 09:57:01 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Jul 14 09:57:14 2017 -0700

----------------------------------------------------------------------
 .../consumer/ZookeeperConsumerConnector.scala   | 35 +++++-------
 .../kafka/controller/KafkaController.scala      | 44 +++++++--------
 .../kafka/controller/ReplicaStateMachine.scala  |  2 +-
 .../scala/kafka/metrics/KafkaMetricsGroup.scala |  6 +-
 .../kafka/network/RequestOrResponseSend.scala   |  6 +-
 .../producer/async/DefaultEventHandler.scala    |  5 +-
 .../main/scala/kafka/security/auth/Acl.scala    | 26 ++++-----
 .../main/scala/kafka/server/KafkaServer.scala   | 58 ++++++++++----------
 .../kafka/tools/ConsumerOffsetChecker.scala     | 47 ++++++----------
 .../kafka/tools/StateChangeLogMerger.scala      | 18 +++---
 .../scala/kafka/utils/ReplicationUtils.scala    | 18 +++---
 core/src/main/scala/kafka/utils/ZkUtils.scala   | 48 +++++++---------
 .../kafka/api/ConsumerBounceTest.scala          | 10 ++--
 .../util/ReplicaFetcherMockBlockingSend.scala   |  5 +-
 14 files changed, 133 insertions(+), 195 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index ba2fce1..cdf730f 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -220,10 +220,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
         try {
           if (config.autoCommitEnable)
             scheduler.shutdown()
-          fetcher match {
-            case Some(f) => f.stopConnections
-            case None =>
-          }
+          fetcher.foreach(_.stopConnections())
           sendShutdownToAllQueues()
           if (config.autoCommitEnable)
             commitOffsets(true)
@@ -780,23 +777,21 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
                                        messageStreams: Map[String,List[KafkaStream[_,_]]],
                                        queuesToBeCleared: Iterable[BlockingQueue[FetchedDataChunk]]) {
       val allPartitionInfos = topicRegistry.values.map(p => p.values).flatten
-      fetcher match {
-        case Some(f) =>
-          f.stopConnections
-          clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, messageStreams)
-          /**
-          * here, we need to commit offsets before stopping the consumer from returning any more messages
-          * from the current data chunk. Since partition ownership is not yet released, this commit offsets
-          * call will ensure that the offsets committed now will be used by the next consumer thread owning the partition
-          * for the current data chunk. Since the fetchers are already shutdown and this is the last chunk to be iterated
-          * by the consumer, there will be no more messages returned by this iterator until the rebalancing finishes
-          * successfully and the fetchers restart to fetch more data chunks
-          **/
+      fetcher.foreach { f =>
+        f.stopConnections()
+        clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, messageStreams)
+        /**
+        * here, we need to commit offsets before stopping the consumer from returning any more messages
+        * from the current data chunk. Since partition ownership is not yet released, this commit offsets
+        * call will ensure that the offsets committed now will be used by the next consumer thread owning the partition
+        * for the current data chunk. Since the fetchers are already shutdown and this is the last chunk to be iterated
+        * by the consumer, there will be no more messages returned by this iterator until the rebalancing finishes
+        * successfully and the fetchers restart to fetch more data chunks
+        **/
         if (config.autoCommitEnable) {
           info("Committing all offsets after clearing the fetcher queues")
           commitOffsets(true)
         }
-        case None =>
       }
     }
 
@@ -833,11 +828,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       info("Consumer " + consumerIdString + " selected partitions : " +
         allPartitionInfos.sortWith((s,t) => s.partitionId < t.partitionId).map(_.toString).mkString(","))
 
-      fetcher match {
-        case Some(f) =>
-          f.startConnections(allPartitionInfos, cluster)
-        case None =>
-      }
+      fetcher.foreach(_.startConnections(allPartitionInfos, cluster))
     }
 
     private def reflectPartitionOwnershipDecision(partitionAssignment: Map[TopicAndPartition, ConsumerThreadId]): Boolean = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index e7f98e5..0ba412b 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1273,29 +1273,27 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
     override def process(): Unit = {
       if (!isActive) return
         // check if this partition is still being reassigned or not
-      controllerContext.partitionsBeingReassigned.get(topicAndPartition) match {
-        case Some(reassignedPartitionContext) =>
-          // need to re-read leader and isr from zookeeper since the zkclient callback doesn't return the Stat object
-          val newLeaderAndIsrOpt = zkUtils.getLeaderAndIsrForPartition(topicAndPartition.topic, topicAndPartition.partition)
-          newLeaderAndIsrOpt match {
-            case Some(leaderAndIsr) => // check if new replicas have joined ISR
-              val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet
-              if(caughtUpReplicas == reassignedReplicas) {
-                // resume the partition reassignment process
-                info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
-                  .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) +
-                  "Resuming partition reassignment")
-                onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
-              }
-              else {
-                info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
-                  .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) +
-                  "Replica(s) %s still need to catch up".format((reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")))
-              }
-            case None => error("Error handling reassignment of partition %s to replicas %s as it was never created"
-              .format(topicAndPartition, reassignedReplicas.mkString(",")))
-          }
-        case None =>
+      controllerContext.partitionsBeingReassigned.get(topicAndPartition).foreach { reassignedPartitionContext =>
+        // need to re-read leader and isr from zookeeper since the zkclient callback doesn't return the Stat object
+        val newLeaderAndIsrOpt = zkUtils.getLeaderAndIsrForPartition(topicAndPartition.topic, topicAndPartition.partition)
+        newLeaderAndIsrOpt match {
+          case Some(leaderAndIsr) => // check if new replicas have joined ISR
+            val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet
+            if(caughtUpReplicas == reassignedReplicas) {
+              // resume the partition reassignment process
+              info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
+                .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) +
+                "Resuming partition reassignment")
+              onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
+            }
+            else {
+              info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
+                .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) +
+                "Replica(s) %s still need to catch up".format((reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")))
+            }
+          case None => error("Error handling reassignment of partition %s to replicas %s as it was never created"
+            .format(topicAndPartition, reassignedReplicas.mkString(",")))
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 60b9990..43fac19 100755
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -198,7 +198,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
                   stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
                     .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
                 case None => // that means the partition was never in OnlinePartition state, this means the broker never
-                  // started a log for that partition and does not have a high watermark value for this partition
+                             // started a log for that partition and does not have a high watermark value for this partition
               }
           }
           replicaState.put(partitionAndReplica, OnlineReplica)

http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index 0847625..ca623ae 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -68,11 +68,7 @@ trait KafkaMetricsGroup extends Logging {
 
     val scope: String = KafkaMetricsGroup.toScope(tags).getOrElse(null)
     val tagsName = KafkaMetricsGroup.toMBeanName(tags)
-    tagsName match {
-      case Some(tn) =>
-        nameBuilder.append(",").append(tn)
-      case None =>
-    }
+    tagsName.foreach(nameBuilder.append(",").append(_))
 
     new MetricName(group, typeName, name, scope, nameBuilder.toString())
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/main/scala/kafka/network/RequestOrResponseSend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestOrResponseSend.scala b/core/src/main/scala/kafka/network/RequestOrResponseSend.scala
index 1bfbf53..7a14e5e 100644
--- a/core/src/main/scala/kafka/network/RequestOrResponseSend.scala
+++ b/core/src/main/scala/kafka/network/RequestOrResponseSend.scala
@@ -27,11 +27,7 @@ import org.apache.kafka.common.network.NetworkSend
 object RequestOrResponseSend {
   def serialize(request: RequestOrResponse): ByteBuffer = {
     val buffer = ByteBuffer.allocate(request.sizeInBytes + request.requestId.fold(0)(_ => 2))
-    request.requestId match {
-      case Some(requestId) =>
-        buffer.putShort(requestId)
-      case None =>
-    }
+    request.requestId.foreach(buffer.putShort)
     request.writeTo(buffer)
     buffer.rewind()
     buffer

http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 77c3b7d..3e4eaa3 100755
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -114,10 +114,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
             case Some(messageSetPerBroker) =>
               val failedTopicPartitions = send(brokerid, messageSetPerBroker)
               failedTopicPartitions.foreach(topicPartition => {
-                messagesPerBrokerMap.get(topicPartition) match {
-                  case Some(data) => failedProduceRequests.appendAll(data)
-                  case None => // nothing
-                }
+                messagesPerBrokerMap.get(topicPartition).foreach(failedProduceRequests.appendAll)
               })
             case None => // failed to group messages
               messagesPerBrokerMap.values.foreach(m => failedProduceRequests.appendAll(m))

http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/main/scala/kafka/security/auth/Acl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/Acl.scala b/core/src/main/scala/kafka/security/auth/Acl.scala
index c23dd2d..f99a088 100644
--- a/core/src/main/scala/kafka/security/auth/Acl.scala
+++ b/core/src/main/scala/kafka/security/auth/Acl.scala
@@ -57,20 +57,18 @@ object Acl {
       return collection.immutable.Set.empty[Acl]
 
     var acls: collection.mutable.HashSet[Acl] = new collection.mutable.HashSet[Acl]()
-    Json.parseFull(aclJson) match {
-      case Some(m) =>
-        val aclMap = m.asInstanceOf[Map[String, Any]]
-        //the acl json version.
-        require(aclMap(VersionKey) == CurrentVersion)
-        val aclSet: List[Map[String, Any]] = aclMap(AclsKey).asInstanceOf[List[Map[String, Any]]]
-        aclSet.foreach(item => {
-          val principal: KafkaPrincipal = KafkaPrincipal.fromString(item(PrincipalKey).asInstanceOf[String])
-          val permissionType: PermissionType = PermissionType.fromString(item(PermissionTypeKey).asInstanceOf[String])
-          val operation: Operation = Operation.fromString(item(OperationKey).asInstanceOf[String])
-          val host: String = item(HostsKey).asInstanceOf[String]
-          acls += new Acl(principal, permissionType, host, operation)
-        })
-      case None =>
+    Json.parseFull(aclJson).foreach { m =>
+      val aclMap = m.asInstanceOf[Map[String, Any]]
+      //the acl json version.
+      require(aclMap(VersionKey) == CurrentVersion)
+      val aclSet: List[Map[String, Any]] = aclMap(AclsKey).asInstanceOf[List[Map[String, Any]]]
+      aclSet.foreach(item => {
+        val principal: KafkaPrincipal = KafkaPrincipal.fromString(item(PrincipalKey).asInstanceOf[String])
+        val permissionType: PermissionType = PermissionType.fromString(item(PermissionTypeKey).asInstanceOf[String])
+        val operation: Operation = Operation.fromString(item(OperationKey).asInstanceOf[String])
+        val host: String = item(HostsKey).asInstanceOf[String]
+        acls += new Acl(principal, permissionType, host, operation)
+      })
     }
     acls.toSet
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 0a87750..cc34e14 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -410,21 +410,20 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
           // Get the current controller info. This is to ensure we use the most recent info to issue the
           // controlled shutdown request
           val controllerId = zkUtils.getController()
-          zkUtils.getBrokerInfo(controllerId) match {
-            case Some(broker) =>
-              // if this is the first attempt, if the controller has changed or if an exception was thrown in a previous
-              // attempt, connect to the most recent controller
-              if (ioException || broker != prevController) {
+          //If this method returns None ignore and try again
+          zkUtils.getBrokerInfo(controllerId).foreach { broker =>
+            // if this is the first attempt, if the controller has changed or if an exception was thrown in a previous
+            // attempt, connect to the most recent controller
+            if (ioException || broker != prevController) {
 
-                ioException = false
+              ioException = false
 
-                if (prevController != null)
-                  networkClient.close(node(prevController).idString)
+              if (prevController != null)
+                networkClient.close(node(prevController).idString)
 
-                prevController = broker
-                metadataUpdater.setNodes(Seq(node(prevController)).asJava)
-              }
-            case None => //ignore and try again
+              prevController = broker
+              metadataUpdater.setNodes(Seq(node(prevController)).asJava)
+            }
           }
 
           // 2. issue a controlled shutdown to the controller
@@ -483,24 +482,23 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
           // Get the current controller info. This is to ensure we use the most recent info to issue the
           // controlled shutdown request
           val controllerId = zkUtils.getController()
-          zkUtils.getBrokerInfo(controllerId) match {
-            case Some(broker) =>
-              if (channel == null || prevController == null || !prevController.equals(broker)) {
-                // if this is the first attempt or if the controller has changed, create a channel to the most recent
-                // controller
-                if (channel != null)
-                  channel.disconnect()
-
-                val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerListenerName)
-                channel = new BlockingChannel(brokerEndPoint.host,
-                  brokerEndPoint.port,
-                  BlockingChannel.UseDefaultBufferSize,
-                  BlockingChannel.UseDefaultBufferSize,
-                  config.controllerSocketTimeoutMs)
-                channel.connect()
-                prevController = broker
-              }
-            case None => //ignore and try again
+          //If this method returns None ignore and try again
+          zkUtils.getBrokerInfo(controllerId).foreach { broker =>
+            if (channel == null || prevController == null || !prevController.equals(broker)) {
+              // if this is the first attempt or if the controller has changed, create a channel to the most recent
+              // controller
+              if (channel != null)
+                channel.disconnect()
+
+              val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerListenerName)
+              channel = new BlockingChannel(brokerEndPoint.host,
+                brokerEndPoint.port,
+                BlockingChannel.UseDefaultBufferSize,
+                BlockingChannel.UseDefaultBufferSize,
+                config.controllerSocketTimeoutMs)
+              channel.connect()
+              prevController = broker
+            }
           }
 
           // 2. issue a controlled shutdown to the controller

http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
index d5e29ac..87147dc 100644
--- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
@@ -63,17 +63,15 @@ object ConsumerOffsetChecker extends Logging {
     zkUtils.getLeaderForPartition(topic, producerId) match {
       case Some(bid) =>
         val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(zkUtils, bid))
-        consumerOpt match {
-          case Some(consumer) =>
-            val topicAndPartition = TopicAndPartition(topic, producerId)
-            val request =
-              OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
-            val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
-
-            val lagString = offsetOpt.map(o => if (o == -1) "unknown" else (logSize - o).toString)
-            println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format(group, topic, producerId, offsetOpt.getOrElse("unknown"), logSize, lagString.getOrElse("unknown"),
-                                                                   owner match {case Some(ownerStr) => ownerStr case None => "none"}))
-          case None => // ignore
+        consumerOpt.foreach { consumer =>
+          val topicAndPartition = TopicAndPartition(topic, producerId)
+          val request =
+            OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
+          val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
+
+          val lagString = offsetOpt.map(o => if (o == -1) "unknown" else (logSize - o).toString)
+          println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format(group, topic, producerId, offsetOpt.getOrElse("unknown"), logSize, lagString.getOrElse("unknown"),
+                                                                 owner match {case Some(ownerStr) => ownerStr case None => "none"}))
         }
       case None =>
         println("No broker for partition %s - %s".format(topic, producerId))
@@ -81,22 +79,18 @@ object ConsumerOffsetChecker extends Logging {
   }
 
   private def processTopic(zkUtils: ZkUtils, group: String, topic: String) {
-    topicPidMap.get(topic) match {
-      case Some(producerIds) =>
-        producerIds.sorted.foreach {
-          producerId => processPartition(zkUtils, group, topic, producerId)
+    topicPidMap.get(topic).foreach { producerIds =>
+      producerIds.sorted.foreach {
+        producerId => processPartition(zkUtils, group, topic, producerId)
         }
-      case None => // ignore
     }
   }
 
   private def printBrokerInfo() {
     println("BROKER INFO")
     for ((bid, consumerOpt) <- consumerMap)
-      consumerOpt match {
-        case Some(consumer) =>
-          println("%s -> %s:%d".format(bid, consumer.host, consumer.port))
-        case None => // ignore
+      consumerOpt.foreach { consumer =>
+        println("%s -> %s:%d".format(bid, consumer.host, consumer.port))
       }
   }
 
@@ -197,23 +191,14 @@ object ConsumerOffsetChecker extends Logging {
       if (options.has("broker-info"))
         printBrokerInfo()
 
-      for ((_, consumerOpt) <- consumerMap)
-        consumerOpt match {
-          case Some(consumer) => consumer.close()
-          case None => // ignore
-        }
+      consumerMap.values.flatten.foreach(_.close())
     }
     catch {
       case t: Throwable =>
         println("Exiting due to: %s.".format(t.getMessage))
     }
     finally {
-      for (consumerOpt <- consumerMap.values) {
-        consumerOpt match {
-          case Some(consumer) => consumer.close()
-          case None => // ignore
-        }
-      }
+      consumerMap.values.flatten.foreach(_.close())
       if (zkUtils != null)
         zkUtils.close()
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
index f2b929a..a3c80d1 100755
--- a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
+++ b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
@@ -167,18 +167,14 @@ object StateChangeLogMerger extends Logging {
   def getNextLine(itr: Iterator[String]): LineIterator = {
     while (itr != null && itr.hasNext) {
       val nextLine = itr.next
-      dateRegex.findFirstIn(nextLine) match {
-        case Some(d) =>
-          val date = dateFormat.parse(d)
-          if ((date.equals(startDate) || date.after(startDate)) && (date.equals(endDate) || date.before(endDate))) {
-            topicPartitionRegex.findFirstMatchIn(nextLine) match {
-              case Some(matcher) =>
-                if ((topic == null || topic == matcher.group(1)) && (partitions.isEmpty || partitions.contains(matcher.group(3).toInt)))
-                  return new LineIterator(nextLine, itr)
-              case None =>
-            }
+      dateRegex.findFirstIn(nextLine).foreach { d =>
+        val date = dateFormat.parse(d)
+        if ((date.equals(startDate) || date.after(startDate)) && (date.equals(endDate) || date.before(endDate))) {
+          topicPartitionRegex.findFirstMatchIn(nextLine).foreach { matcher =>
+            if ((topic == null || topic == matcher.group(1)) && (partitions.isEmpty || partitions.contains(matcher.group(3).toInt)))
+              return new LineIterator(nextLine, itr)
           }
-        case None =>
+        }
       }
     }
     new LineIterator()

http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/main/scala/kafka/utils/ReplicationUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
index c0cb5aa..fe31d7f 100644
--- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala
+++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
@@ -53,16 +53,14 @@ object ReplicationUtils extends Logging {
       val writtenLeaderOpt = writtenLeaderAndIsrInfo._1
       val writtenStat = writtenLeaderAndIsrInfo._2
       val expectedLeader = parseLeaderAndIsr(expectedLeaderAndIsrInfo, path, writtenStat)
-      writtenLeaderOpt match {
-        case Some(writtenData) =>
-          val writtenLeader = parseLeaderAndIsr(writtenData, path, writtenStat)
-          (expectedLeader,writtenLeader) match {
-            case (Some(expectedLeader),Some(writtenLeader)) =>
-              if(expectedLeader == writtenLeader)
-                return (true, writtenStat.getVersion())
-            case _ =>
-          }
-        case None =>
+      writtenLeaderOpt.foreach { writtenData =>
+        val writtenLeader = parseLeaderAndIsr(writtenData, path, writtenStat)
+        (expectedLeader,writtenLeader) match {
+          case (Some(expectedLeader),Some(writtenLeader)) =>
+            if(expectedLeader == writtenLeader)
+              return (true, writtenStat.getVersion())
+          case _ =>
+        }
       }
     } catch {
       case _: Exception =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index e03893c..0035120 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -187,18 +187,14 @@ object ZkUtils {
 
   def parseTopicsData(jsonData: String): Seq[String] = {
     var topics = List.empty[String]
-    Json.parseFull(jsonData) match {
-      case Some(m) =>
-        m.asInstanceOf[Map[String, Any]].get("topics") match {
-          case Some(partitionsSeq) =>
-            val mapPartitionSeq = partitionsSeq.asInstanceOf[Seq[Map[String, Any]]]
-            mapPartitionSeq.foreach(p => {
-              val topic = p.get("topic").get.asInstanceOf[String]
-              topics ++= List(topic)
-            })
-          case None =>
-        }
-      case None =>
+    Json.parseFull(jsonData).foreach { m =>
+      m.asInstanceOf[Map[String, Any]].get("topics").foreach { partitionsSeq =>
+          val mapPartitionSeq = partitionsSeq.asInstanceOf[Seq[Map[String, Any]]]
+          mapPartitionSeq.foreach(p => {
+            val topic = p.get("topic").get.asInstanceOf[String]
+            topics ++= List(topic)
+          })
+      }
     }
     topics
   }
@@ -696,9 +692,8 @@ class ZkUtils(val zkClient: ZkClient,
   def getPartitionLeaderAndIsrForTopics(topicAndPartitions: Set[TopicAndPartition]): mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = {
     val ret = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
     for(topicAndPartition <- topicAndPartitions) {
-      ReplicationUtils.getLeaderIsrAndEpochForPartition(this, topicAndPartition.topic, topicAndPartition.partition) match {
-        case Some(leaderIsrAndControllerEpoch) => ret.put(topicAndPartition, leaderIsrAndControllerEpoch)
-        case None =>
+      ReplicationUtils.getLeaderIsrAndEpochForPartition(this, topicAndPartition.topic, topicAndPartition.partition).foreach { leaderIsrAndControllerEpoch =>
+        ret.put(topicAndPartition, leaderIsrAndControllerEpoch)
       }
     }
     ret
@@ -708,21 +703,16 @@ class ZkUtils(val zkClient: ZkClient,
     val ret = new mutable.HashMap[TopicAndPartition, Seq[Int]]
     topics.foreach { topic =>
       val jsonPartitionMapOpt = readDataMaybeNull(getTopicPath(topic))._1
-      jsonPartitionMapOpt match {
-        case Some(jsonPartitionMap) =>
-          Json.parseFull(jsonPartitionMap) match {
-            case Some(m) => m.asInstanceOf[Map[String, Any]].get("partitions") match {
-              case Some(repl)  =>
-                val replicaMap = repl.asInstanceOf[Map[String, Seq[Int]]]
-                for((partition, replicas) <- replicaMap){
-                  ret.put(TopicAndPartition(topic, partition.toInt), replicas)
-                  debug("Replicas assigned to topic [%s], partition [%s] are [%s]".format(topic, partition, replicas))
-                }
-              case None =>
-            }
-            case None =>
+      jsonPartitionMapOpt.foreach { jsonPartitionMap =>
+        Json.parseFull(jsonPartitionMap).foreach { m =>
+          m.asInstanceOf[Map[String, Any]].get("partitions").foreach { repl =>
+              val replicaMap = repl.asInstanceOf[Map[String, Seq[Int]]]
+              for((partition, replicas) <- replicaMap){
+                ret.put(TopicAndPartition(topic, partition.toInt), replicas)
+                debug("Replicas assigned to topic [%s], partition [%s] are [%s]".format(topic, partition, replicas))
+              }
           }
-        case None =>
+        }
       }
     }
     ret

http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 2fa4d15..d146e9d 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -386,13 +386,11 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
       info("Closing consumer with timeout " + closeTimeoutMs + " ms.")
       consumer.close(closeTimeoutMs, TimeUnit.MILLISECONDS)
       val timeTakenMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime - startNanos)
-      maxCloseTimeMs match {
-        case Some(ms) => assertTrue("Close took too long " + timeTakenMs, timeTakenMs < ms + closeGraceTimeMs)
-        case None =>
+      maxCloseTimeMs.foreach { ms =>
+        assertTrue("Close took too long " + timeTakenMs, timeTakenMs < ms + closeGraceTimeMs)
       }
-      minCloseTimeMs match {
-        case Some(ms) => assertTrue("Close finished too quickly " + timeTakenMs, timeTakenMs >= ms)
-        case None =>
+      minCloseTimeMs.foreach { ms =>
+        assertTrue("Close finished too quickly " + timeTakenMs, timeTakenMs >= ms)
       }
       info("consumer.close() completed in " + timeTakenMs + " ms.")
     }, 0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/e3910454/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala b/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
index e04bd95..0692afb 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
@@ -48,10 +48,7 @@ class ReplicaFetcherMockBlockingSend(offsets: java.util.Map[TopicPartition, Epoc
     //Create a suitable response based on the API key
     val response = requestBuilder.apiKey() match {
       case ApiKeys.OFFSET_FOR_LEADER_EPOCH =>
-        callback match {
-          case Some(f) => f()
-          case None => //nothing
-        }
+        callback.foreach(_.apply())
         epochFetchCount += 1
         new OffsetsForLeaderEpochResponse(offsets)