You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/15 09:40:43 UTC

[GitHub] [kafka] chia7712 opened a new pull request #9755: MINOR: refactor SelectingIterator by scala iterator

chia7712 opened a new pull request #9755:
URL: https://github.com/apache/kafka/pull/9755


   ```SelectingIterator``` can be replaced by scala iterator with filter. Also, this PR adds a unit test for it.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9755: MINOR: refactor SelectingIterator by scala iterator

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9755:
URL: https://github.com/apache/kafka/pull/9755#discussion_r543356198



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -972,40 +972,13 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  class SelectingIterator(val partitions: util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]],
-                          val quota: ReplicationQuotaManager)
-                          extends util.Iterator[util.Map.Entry[TopicPartition, FetchResponse.PartitionData[Records]]] {
-    val iter = partitions.entrySet().iterator()
-
-    var nextElement: util.Map.Entry[TopicPartition, FetchResponse.PartitionData[Records]] = null
-
-    override def hasNext: Boolean = {
-      while ((nextElement == null) && iter.hasNext()) {
-        val element = iter.next()
-        if (quota.isThrottled(element.getKey)) {
-          nextElement = element
-        }
-      }
-      nextElement != null
-    }
-
-    override def next(): util.Map.Entry[TopicPartition, FetchResponse.PartitionData[Records]] = {
-      if (!hasNext()) throw new NoSuchElementException()
-      val element = nextElement
-      nextElement = null
-      element
-    }
-
-    override def remove(): Unit = throw new UnsupportedOperationException()
-  }
-
   // Traffic from both in-sync and out of sync replicas are accounted for in replication quota to ensure total replication
   // traffic doesn't exceed quota.
-  private def sizeOfThrottledPartitions(versionId: Short,
-                                        unconvertedResponse: FetchResponse[Records],
-                                        quota: ReplicationQuotaManager): Int = {
-    val iter = new SelectingIterator(unconvertedResponse.responseData, quota)
-    FetchResponse.sizeOf(versionId, iter)
+  private[server] def sizeOfThrottledPartitions(versionId: Short,
+                                                unconvertedResponse: FetchResponse[Records],
+                                                quota: ReplicationQuotaManager): Int = {
+    FetchResponse.sizeOf(versionId, unconvertedResponse.responseData.entrySet()
+      .iterator().asScala.filter(element => quota.isThrottled(element.getKey)).asJava)

Review comment:
       Nit: no need for `()` after `iterator`.

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -3045,4 +3046,32 @@ class KafkaApisTest {
       Errors.LOG_DIR_NOT_FOUND -> 1,
       Errors.INVALID_TOPIC_EXCEPTION -> 1).asJava, response.errorCounts)
   }
+
+  @Test
+  def testSizeOfThrottledPartitions(): Unit = {
+    def fetchResponse(data: Map[TopicPartition, String]): FetchResponse[Records] = {
+      val responseData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
+      data.foreach {

Review comment:
       Seems like you could use `map` here?

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -3045,4 +3046,32 @@ class KafkaApisTest {
       Errors.LOG_DIR_NOT_FOUND -> 1,
       Errors.INVALID_TOPIC_EXCEPTION -> 1).asJava, response.errorCounts)
   }
+
+  @Test
+  def testSizeOfThrottledPartitions(): Unit = {
+    def fetchResponse(data: Map[TopicPartition, String]): FetchResponse[Records] = {
+      val responseData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
+      data.foreach {
+        case (tp, raw) =>
+          responseData.put(tp, new FetchResponse.PartitionData(Errors.NONE,
+            105, 105, 0, Optional.empty(), Collections.emptyList(), Optional.empty(),
+            MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(100, raw.getBytes(StandardCharsets.UTF_8)))))
+      }
+      new FetchResponse(Errors.NONE, responseData, 100, 100)
+    }
+
+    val throttledPartition = new TopicPartition("throttledData", 0)
+    val throttledData = Map(throttledPartition -> "throttledData")
+    val expectedSize = FetchResponse.sizeOf(FetchResponseData.HIGHEST_SUPPORTED_VERSION,
+      fetchResponse(throttledData).responseData().entrySet().iterator())

Review comment:
       Nit: no need for `()`.

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -3045,4 +3046,32 @@ class KafkaApisTest {
       Errors.LOG_DIR_NOT_FOUND -> 1,
       Errors.INVALID_TOPIC_EXCEPTION -> 1).asJava, response.errorCounts)
   }
+
+  @Test
+  def testSizeOfThrottledPartitions(): Unit = {
+    def fetchResponse(data: Map[TopicPartition, String]): FetchResponse[Records] = {
+      val responseData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
+      data.foreach {
+        case (tp, raw) =>
+          responseData.put(tp, new FetchResponse.PartitionData(Errors.NONE,
+            105, 105, 0, Optional.empty(), Collections.emptyList(), Optional.empty(),
+            MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(100, raw.getBytes(StandardCharsets.UTF_8)))))
+      }
+      new FetchResponse(Errors.NONE, responseData, 100, 100)
+    }
+
+    val throttledPartition = new TopicPartition("throttledData", 0)
+    val throttledData = Map(throttledPartition -> "throttledData")
+    val expectedSize = FetchResponse.sizeOf(FetchResponseData.HIGHEST_SUPPORTED_VERSION,
+      fetchResponse(throttledData).responseData().entrySet().iterator())
+
+    val response = fetchResponse(throttledData ++ Map(new TopicPartition("nonThrottledData", 0) -> "nonThrottledData"))
+
+    val quota = Mockito.mock(classOf[ReplicationQuotaManager])
+    Mockito.when(quota.isThrottled(ArgumentMatchers.any(classOf[TopicPartition])))
+      .thenAnswer(invocation => throttledPartition == invocation.getArgument(0).asInstanceOf[TopicPartition])
+
+    val kafkaApis = createKafkaApis()
+    assertEquals(expectedSize, kafkaApis.sizeOfThrottledPartitions(FetchResponseData.HIGHEST_SUPPORTED_VERSION, response, quota))

Review comment:
       Seems like we could move this method to some utility class or the `KafkaApis` companion object and then you don't need to mock `KafkaApis` at all.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #9755: MINOR: refactor SelectingIterator by scala iterator

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #9755:
URL: https://github.com/apache/kafka/pull/9755#issuecomment-748762656


   @ijuma Thanks for all your suggestions. I have addressed them in latest commit. Please take a look!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #9755: MINOR: refactor SelectingIterator by scala iterator

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #9755:
URL: https://github.com/apache/kafka/pull/9755#discussion_r543412101



##########
File path: core/src/main/scala/kafka/server/KafkaApisUtils.scala
##########
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import org.apache.kafka.common.record.Records
+import org.apache.kafka.common.requests.FetchResponse
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * the suites of helpers for KafkaApis class.
+ * We don't use companion object since KafkaApis is too fat.
+ */
+object KafkaApisUtils {

Review comment:
       make sense. will replace utils by companion object.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9755: MINOR: refactor SelectingIterator by scala iterator

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9755:
URL: https://github.com/apache/kafka/pull/9755#discussion_r543399930



##########
File path: core/src/main/scala/kafka/server/KafkaApisUtils.scala
##########
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import org.apache.kafka.common.record.Records
+import org.apache.kafka.common.requests.FetchResponse
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * the suites of helpers for KafkaApis class.
+ * We don't use companion object since KafkaApis is too fat.
+ */
+object KafkaApisUtils {

Review comment:
       I don't think creating a `KafkaApisUtils` makes things any better since it's unclear when to use this and when to use `KafkaApis`. If there is no utilities class related to quotas and requests handling, I would put it in the `KafkaApis` companion object.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9755: MINOR: refactor SelectingIterator by scala iterator

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9755:
URL: https://github.com/apache/kafka/pull/9755#discussion_r546398998



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -3530,3 +3494,14 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
 }
+
+object KafkaApis {
+    // Traffic from both in-sync and out of sync replicas are accounted for in replication quota to ensure total replication

Review comment:
       Indent is not right.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -3530,3 +3494,14 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
 }
+
+object KafkaApis {
+    // Traffic from both in-sync and out of sync replicas are accounted for in replication quota to ensure total replication
+  // traffic doesn't exceed quota.
+  private[server] def sizeOfThrottledPartitions(versionId: Short,
+                                                unconvertedResponse: FetchResponse[Records],
+                                                quota: ReplicationQuotaManager): Int = {
+    FetchResponse.sizeOf(versionId, unconvertedResponse.responseData.entrySet()
+      .iterator().asScala.filter(element => quota.isThrottled(element.getKey)).asJava)

Review comment:
       A couple of unnecessary ().

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -3067,4 +3067,31 @@ class KafkaApisTest {
       Errors.LOG_DIR_NOT_FOUND -> 1,
       Errors.INVALID_TOPIC_EXCEPTION -> 1).asJava, response.errorCounts)
   }
+
+  @Test
+  def testSizeOfThrottledPartitions(): Unit = {
+    def fetchResponse(data: Map[TopicPartition, String]): FetchResponse[Records] = {
+      val responseData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]](data.map {
+        case (tp, raw) =>
+          tp -> new FetchResponse.PartitionData(Errors.NONE,
+            105, 105, 0, Optional.empty(), Collections.emptyList(), Optional.empty(),
+            MemoryRecords.withRecords(CompressionType.NONE,
+              new SimpleRecord(100, raw.getBytes(StandardCharsets.UTF_8))).asInstanceOf[Records])
+      }.toMap.asJava)
+      new FetchResponse(Errors.NONE, responseData, 100, 100)
+    }
+
+    val throttledPartition = new TopicPartition("throttledData", 0)
+    val throttledData = Map(throttledPartition -> "throttledData")
+    val expectedSize = FetchResponse.sizeOf(FetchResponseData.HIGHEST_SUPPORTED_VERSION,
+      fetchResponse(throttledData).responseData().entrySet().iterator)

Review comment:
       Unnecessary () here and in the inner method.

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -3067,4 +3067,31 @@ class KafkaApisTest {
       Errors.LOG_DIR_NOT_FOUND -> 1,
       Errors.INVALID_TOPIC_EXCEPTION -> 1).asJava, response.errorCounts)
   }
+
+  @Test
+  def testSizeOfThrottledPartitions(): Unit = {
+    def fetchResponse(data: Map[TopicPartition, String]): FetchResponse[Records] = {
+      val responseData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]](data.map {

Review comment:
       I think I'd move data to the next line to make it a bit more readable.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 merged pull request #9755: MINOR: refactor SelectingIterator by scala iterator

Posted by GitBox <gi...@apache.org>.
chia7712 merged pull request #9755:
URL: https://github.com/apache/kafka/pull/9755


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org