You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/15 13:56:24 UTC

[GitHub] [kafka] ijuma commented on a change in pull request #9755: MINOR: refactor SelectingIterator by scala iterator

ijuma commented on a change in pull request #9755:
URL: https://github.com/apache/kafka/pull/9755#discussion_r543356198



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -972,40 +972,13 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  class SelectingIterator(val partitions: util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]],
-                          val quota: ReplicationQuotaManager)
-                          extends util.Iterator[util.Map.Entry[TopicPartition, FetchResponse.PartitionData[Records]]] {
-    val iter = partitions.entrySet().iterator()
-
-    var nextElement: util.Map.Entry[TopicPartition, FetchResponse.PartitionData[Records]] = null
-
-    override def hasNext: Boolean = {
-      while ((nextElement == null) && iter.hasNext()) {
-        val element = iter.next()
-        if (quota.isThrottled(element.getKey)) {
-          nextElement = element
-        }
-      }
-      nextElement != null
-    }
-
-    override def next(): util.Map.Entry[TopicPartition, FetchResponse.PartitionData[Records]] = {
-      if (!hasNext()) throw new NoSuchElementException()
-      val element = nextElement
-      nextElement = null
-      element
-    }
-
-    override def remove(): Unit = throw new UnsupportedOperationException()
-  }
-
   // Traffic from both in-sync and out of sync replicas are accounted for in replication quota to ensure total replication
   // traffic doesn't exceed quota.
-  private def sizeOfThrottledPartitions(versionId: Short,
-                                        unconvertedResponse: FetchResponse[Records],
-                                        quota: ReplicationQuotaManager): Int = {
-    val iter = new SelectingIterator(unconvertedResponse.responseData, quota)
-    FetchResponse.sizeOf(versionId, iter)
+  private[server] def sizeOfThrottledPartitions(versionId: Short,
+                                                unconvertedResponse: FetchResponse[Records],
+                                                quota: ReplicationQuotaManager): Int = {
+    FetchResponse.sizeOf(versionId, unconvertedResponse.responseData.entrySet()
+      .iterator().asScala.filter(element => quota.isThrottled(element.getKey)).asJava)

Review comment:
       Nit: no need for `()` after `iterator`.

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -3045,4 +3046,32 @@ class KafkaApisTest {
       Errors.LOG_DIR_NOT_FOUND -> 1,
       Errors.INVALID_TOPIC_EXCEPTION -> 1).asJava, response.errorCounts)
   }
+
+  @Test
+  def testSizeOfThrottledPartitions(): Unit = {
+    def fetchResponse(data: Map[TopicPartition, String]): FetchResponse[Records] = {
+      val responseData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
+      data.foreach {

Review comment:
       Seems like you could use `map` here?

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -3045,4 +3046,32 @@ class KafkaApisTest {
       Errors.LOG_DIR_NOT_FOUND -> 1,
       Errors.INVALID_TOPIC_EXCEPTION -> 1).asJava, response.errorCounts)
   }
+
+  @Test
+  def testSizeOfThrottledPartitions(): Unit = {
+    def fetchResponse(data: Map[TopicPartition, String]): FetchResponse[Records] = {
+      val responseData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
+      data.foreach {
+        case (tp, raw) =>
+          responseData.put(tp, new FetchResponse.PartitionData(Errors.NONE,
+            105, 105, 0, Optional.empty(), Collections.emptyList(), Optional.empty(),
+            MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(100, raw.getBytes(StandardCharsets.UTF_8)))))
+      }
+      new FetchResponse(Errors.NONE, responseData, 100, 100)
+    }
+
+    val throttledPartition = new TopicPartition("throttledData", 0)
+    val throttledData = Map(throttledPartition -> "throttledData")
+    val expectedSize = FetchResponse.sizeOf(FetchResponseData.HIGHEST_SUPPORTED_VERSION,
+      fetchResponse(throttledData).responseData().entrySet().iterator())

Review comment:
       Nit: no need for `()`.

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -3045,4 +3046,32 @@ class KafkaApisTest {
       Errors.LOG_DIR_NOT_FOUND -> 1,
       Errors.INVALID_TOPIC_EXCEPTION -> 1).asJava, response.errorCounts)
   }
+
+  @Test
+  def testSizeOfThrottledPartitions(): Unit = {
+    def fetchResponse(data: Map[TopicPartition, String]): FetchResponse[Records] = {
+      val responseData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
+      data.foreach {
+        case (tp, raw) =>
+          responseData.put(tp, new FetchResponse.PartitionData(Errors.NONE,
+            105, 105, 0, Optional.empty(), Collections.emptyList(), Optional.empty(),
+            MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(100, raw.getBytes(StandardCharsets.UTF_8)))))
+      }
+      new FetchResponse(Errors.NONE, responseData, 100, 100)
+    }
+
+    val throttledPartition = new TopicPartition("throttledData", 0)
+    val throttledData = Map(throttledPartition -> "throttledData")
+    val expectedSize = FetchResponse.sizeOf(FetchResponseData.HIGHEST_SUPPORTED_VERSION,
+      fetchResponse(throttledData).responseData().entrySet().iterator())
+
+    val response = fetchResponse(throttledData ++ Map(new TopicPartition("nonThrottledData", 0) -> "nonThrottledData"))
+
+    val quota = Mockito.mock(classOf[ReplicationQuotaManager])
+    Mockito.when(quota.isThrottled(ArgumentMatchers.any(classOf[TopicPartition])))
+      .thenAnswer(invocation => throttledPartition == invocation.getArgument(0).asInstanceOf[TopicPartition])
+
+    val kafkaApis = createKafkaApis()
+    assertEquals(expectedSize, kafkaApis.sizeOfThrottledPartitions(FetchResponseData.HIGHEST_SUPPORTED_VERSION, response, quota))

Review comment:
       Seems like we could move this method to some utility class or the `KafkaApis` companion object and then you don't need to mock `KafkaApis` at all.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org