You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/16 18:45:20 UTC

[GitHub] [kafka] tang7526 opened a new pull request #10332: KAFKA-10697: Remove ProduceResponse.responses

tang7526 opened a new pull request #10332:
URL: https://github.com/apache/kafka/pull/10332


   issue : https://issues.apache.org/jira/browse/KAFKA-10697
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tang7526 commented on a change in pull request #10332: KAFKA-10697: Remove ProduceResponse.responses

Posted by GitBox <gi...@apache.org>.
tang7526 commented on a change in pull request #10332:
URL: https://github.com/apache/kafka/pull/10332#discussion_r595764290



##########
File path: clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java
##########
@@ -40,42 +41,39 @@
     public void produceResponseV5Test() {
         Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
         TopicPartition tp0 = new TopicPartition("test", 0);
-        responseData.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE,
-                10000, RecordBatch.NO_TIMESTAMP, 100));
+        responseData.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE, 10000, RecordBatch.NO_TIMESTAMP, 100));
 
         ProduceResponse v5Response = new ProduceResponse(responseData, 10);
         short version = 5;
 
         ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader(v5Response, version, 0);
 
         ResponseHeader.parse(buffer, ApiKeys.PRODUCE.responseHeaderVersion(version)); // throw away.
-        ProduceResponse v5FromBytes = (ProduceResponse) AbstractResponse.parseResponse(ApiKeys.PRODUCE,
-                buffer, version);
-
-        assertEquals(1, v5FromBytes.responses().size());
-        assertTrue(v5FromBytes.responses().containsKey(tp0));
-        ProduceResponse.PartitionResponse partitionResponse = v5FromBytes.responses().get(tp0);
-        assertEquals(100, partitionResponse.logStartOffset);
-        assertEquals(10000, partitionResponse.baseOffset);
+        ProduceResponse v5FromBytes = (ProduceResponse) AbstractResponse.parseResponse(ApiKeys.PRODUCE, buffer, version);
+
+        assertEquals(1, v5FromBytes.data().responses().size());

Review comment:
       I split it into two assertEquals:
   `assertEquals(1, v5FromBytes.data().responses().size());`
   `assertEquals(1, topicProduceResponse.partitionResponses().size()); `
   
   I think these two `assertEquals` should equal to previous `assertEquals`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tang7526 commented on a change in pull request #10332: KAFKA-10697: Remove ProduceResponse.responses

Posted by GitBox <gi...@apache.org>.
tang7526 commented on a change in pull request #10332:
URL: https://github.com/apache/kafka/pull/10332#discussion_r596103388



##########
File path: core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
##########
@@ -185,17 +191,20 @@ class ProduceRequestTest extends BaseRequestTest {
       .setAcks((-1).toShort)
       .setTimeoutMs(3000)
       .setTransactionalId(null)).build())
-    assertEquals(1, produceResponse.responses.size)
-    val (tp, partitionResponse) = produceResponse.responses.asScala.head
+
+    assertEquals(1, produceResponse.data.responses.size)
+    val topicProduceResponse = produceResponse.data.responses.asScala.head
+    assertEquals(1, topicProduceResponse.partitionResponses.size)   
+    val partitionProduceResponse = topicProduceResponse.partitionResponses.asScala.head;

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10332: KAFKA-10697: Remove ProduceResponse.responses

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10332:
URL: https://github.com/apache/kafka/pull/10332#discussion_r595747248



##########
File path: clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java
##########
@@ -40,42 +41,39 @@
     public void produceResponseV5Test() {
         Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
         TopicPartition tp0 = new TopicPartition("test", 0);
-        responseData.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE,
-                10000, RecordBatch.NO_TIMESTAMP, 100));
+        responseData.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE, 10000, RecordBatch.NO_TIMESTAMP, 100));
 
         ProduceResponse v5Response = new ProduceResponse(responseData, 10);
         short version = 5;
 
         ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader(v5Response, version, 0);
 
         ResponseHeader.parse(buffer, ApiKeys.PRODUCE.responseHeaderVersion(version)); // throw away.
-        ProduceResponse v5FromBytes = (ProduceResponse) AbstractResponse.parseResponse(ApiKeys.PRODUCE,
-                buffer, version);
-
-        assertEquals(1, v5FromBytes.responses().size());
-        assertTrue(v5FromBytes.responses().containsKey(tp0));
-        ProduceResponse.PartitionResponse partitionResponse = v5FromBytes.responses().get(tp0);
-        assertEquals(100, partitionResponse.logStartOffset);
-        assertEquals(10000, partitionResponse.baseOffset);
+        ProduceResponse v5FromBytes = (ProduceResponse) AbstractResponse.parseResponse(ApiKeys.PRODUCE, buffer, version);
+
+        assertEquals(1, v5FromBytes.data().responses().size());

Review comment:
       Is this equal to previous check? The previous `assertEquals` checks the number of `topic partition`. By contrast, it check the number of `topic` now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 merged pull request #10332: KAFKA-10697: Remove ProduceResponse.responses

Posted by GitBox <gi...@apache.org>.
chia7712 merged pull request #10332:
URL: https://github.com/apache/kafka/pull/10332


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tang7526 commented on a change in pull request #10332: KAFKA-10697: Remove ProduceResponse.responses

Posted by GitBox <gi...@apache.org>.
tang7526 commented on a change in pull request #10332:
URL: https://github.com/apache/kafka/pull/10332#discussion_r596103601



##########
File path: core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
##########
@@ -110,21 +110,25 @@ class ProduceRequestTest extends BaseRequestTest {
       .setAcks((-1).toShort)
       .setTimeoutMs(3000)
       .setTransactionalId(null)).build())
-    val (tp, partitionResponse) = produceResponse.responses.asScala.head
+
+    assertEquals(1, produceResponse.data.responses.size)
+    val topicProduceResponse = produceResponse.data.responses.asScala.head
+    assertEquals(1, topicProduceResponse.partitionResponses.size)   
+    val partitionProduceResponse = topicProduceResponse.partitionResponses.asScala.head;
+    val tp = new TopicPartition(topicProduceResponse.name, partitionProduceResponse.index)
     assertEquals(topicPartition, tp)
-    assertEquals(Errors.INVALID_TIMESTAMP, partitionResponse.error)
+    assertEquals(Errors.INVALID_TIMESTAMP, Errors.forCode(partitionProduceResponse.errorCode))
     // there are 3 records with InvalidTimestampException created from inner function createRecords
-    assertEquals(3, partitionResponse.recordErrors.size())
-    assertEquals(0, partitionResponse.recordErrors.get(0).batchIndex)
-    assertEquals(1, partitionResponse.recordErrors.get(1).batchIndex)
-    assertEquals(2, partitionResponse.recordErrors.get(2).batchIndex)
-    for (recordError <- partitionResponse.recordErrors.asScala) {
-      assertNotNull(recordError.message)
+    assertEquals(3, partitionProduceResponse.recordErrors.size)
+    assertEquals(0, partitionProduceResponse.recordErrors.get(0).batchIndex)
+    assertEquals(1, partitionProduceResponse.recordErrors.get(1).batchIndex)
+    assertEquals(2, partitionProduceResponse.recordErrors.get(2).batchIndex)
+    for (recordError <- partitionProduceResponse.recordErrors.asScala) {

Review comment:
       Done.

##########
File path: clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java
##########
@@ -40,42 +41,39 @@
     public void produceResponseV5Test() {
         Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
         TopicPartition tp0 = new TopicPartition("test", 0);
-        responseData.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE,
-                10000, RecordBatch.NO_TIMESTAMP, 100));
+        responseData.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE, 10000, RecordBatch.NO_TIMESTAMP, 100));
 
         ProduceResponse v5Response = new ProduceResponse(responseData, 10);
         short version = 5;
 
         ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader(v5Response, version, 0);
 
         ResponseHeader.parse(buffer, ApiKeys.PRODUCE.responseHeaderVersion(version)); // throw away.
-        ProduceResponse v5FromBytes = (ProduceResponse) AbstractResponse.parseResponse(ApiKeys.PRODUCE,
-                buffer, version);
-
-        assertEquals(1, v5FromBytes.responses().size());
-        assertTrue(v5FromBytes.responses().containsKey(tp0));
-        ProduceResponse.PartitionResponse partitionResponse = v5FromBytes.responses().get(tp0);
-        assertEquals(100, partitionResponse.logStartOffset);
-        assertEquals(10000, partitionResponse.baseOffset);
+        ProduceResponse v5FromBytes = (ProduceResponse) AbstractResponse.parseResponse(ApiKeys.PRODUCE, buffer, version);
+
+        assertEquals(1, v5FromBytes.data().responses().size());
+        ProduceResponseData.TopicProduceResponse topicProduceResponse = v5FromBytes.data().responses().iterator().next();
+        assertEquals(1, topicProduceResponse.partitionResponses().size());  
+        ProduceResponseData.PartitionProduceResponse partitionProduceResponse = topicProduceResponse.partitionResponses().iterator().next();
+        TopicPartition tp = new TopicPartition(topicProduceResponse.name(), partitionProduceResponse.index());
+        assertEquals(tp0, tp);
+
+        assertEquals(100, partitionProduceResponse.logStartOffset());
+        assertEquals(10000, partitionProduceResponse.baseOffset());
         assertEquals(10, v5FromBytes.throttleTimeMs());
-        assertEquals(responseData, v5Response.responses());

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tang7526 commented on a change in pull request #10332: KAFKA-10697: Remove ProduceResponse.responses

Posted by GitBox <gi...@apache.org>.
tang7526 commented on a change in pull request #10332:
URL: https://github.com/apache/kafka/pull/10332#discussion_r597008060



##########
File path: clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java
##########
@@ -40,42 +42,42 @@
     public void produceResponseV5Test() {
         Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
         TopicPartition tp0 = new TopicPartition("test", 0);
-        responseData.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE,
-                10000, RecordBatch.NO_TIMESTAMP, 100));
+        responseData.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE, 10000, RecordBatch.NO_TIMESTAMP, 100));
 
         ProduceResponse v5Response = new ProduceResponse(responseData, 10);
         short version = 5;
 
         ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader(v5Response, version, 0);
 
         ResponseHeader.parse(buffer, ApiKeys.PRODUCE.responseHeaderVersion(version)); // throw away.
-        ProduceResponse v5FromBytes = (ProduceResponse) AbstractResponse.parseResponse(ApiKeys.PRODUCE,
-                buffer, version);
-
-        assertEquals(1, v5FromBytes.responses().size());
-        assertTrue(v5FromBytes.responses().containsKey(tp0));
-        ProduceResponse.PartitionResponse partitionResponse = v5FromBytes.responses().get(tp0);
-        assertEquals(100, partitionResponse.logStartOffset);
-        assertEquals(10000, partitionResponse.baseOffset);
-        assertEquals(10, v5FromBytes.throttleTimeMs());
-        assertEquals(responseData, v5Response.responses());
+        ProduceResponse v5FromBytes = (ProduceResponse) AbstractResponse.parseResponse(ApiKeys.PRODUCE, buffer, version);
+
+        assertEquals(1, v5FromBytes.data().responses().size());
+        ProduceResponseData.TopicProduceResponse topicProduceResponse = v5FromBytes.data().responses().iterator().next();
+        assertEquals(1, topicProduceResponse.partitionResponses().size());  
+        ProduceResponseData.PartitionProduceResponse partitionProduceResponse = topicProduceResponse.partitionResponses().iterator().next();
+        TopicPartition tp = new TopicPartition(topicProduceResponse.name(), partitionProduceResponse.index());
+        assertEquals(tp0, tp);
+
+        assertEquals(100, partitionProduceResponse.logStartOffset());
+        assertEquals(10000, partitionProduceResponse.baseOffset());
+        assertEquals(RecordBatch.NO_TIMESTAMP, partitionProduceResponse.logAppendTimeMs());
+        assertEquals(Errors.NONE, Errors.forCode(partitionProduceResponse.errorCode()));
+        assertEquals(null, partitionProduceResponse.errorMessage());

Review comment:
       Done.

##########
File path: clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java
##########
@@ -40,42 +42,42 @@
     public void produceResponseV5Test() {
         Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
         TopicPartition tp0 = new TopicPartition("test", 0);
-        responseData.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE,
-                10000, RecordBatch.NO_TIMESTAMP, 100));
+        responseData.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE, 10000, RecordBatch.NO_TIMESTAMP, 100));
 
         ProduceResponse v5Response = new ProduceResponse(responseData, 10);
         short version = 5;
 
         ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader(v5Response, version, 0);
 
         ResponseHeader.parse(buffer, ApiKeys.PRODUCE.responseHeaderVersion(version)); // throw away.
-        ProduceResponse v5FromBytes = (ProduceResponse) AbstractResponse.parseResponse(ApiKeys.PRODUCE,
-                buffer, version);
-
-        assertEquals(1, v5FromBytes.responses().size());
-        assertTrue(v5FromBytes.responses().containsKey(tp0));
-        ProduceResponse.PartitionResponse partitionResponse = v5FromBytes.responses().get(tp0);
-        assertEquals(100, partitionResponse.logStartOffset);
-        assertEquals(10000, partitionResponse.baseOffset);
-        assertEquals(10, v5FromBytes.throttleTimeMs());
-        assertEquals(responseData, v5Response.responses());
+        ProduceResponse v5FromBytes = (ProduceResponse) AbstractResponse.parseResponse(ApiKeys.PRODUCE, buffer, version);
+
+        assertEquals(1, v5FromBytes.data().responses().size());
+        ProduceResponseData.TopicProduceResponse topicProduceResponse = v5FromBytes.data().responses().iterator().next();
+        assertEquals(1, topicProduceResponse.partitionResponses().size());  
+        ProduceResponseData.PartitionProduceResponse partitionProduceResponse = topicProduceResponse.partitionResponses().iterator().next();
+        TopicPartition tp = new TopicPartition(topicProduceResponse.name(), partitionProduceResponse.index());
+        assertEquals(tp0, tp);
+
+        assertEquals(100, partitionProduceResponse.logStartOffset());
+        assertEquals(10000, partitionProduceResponse.baseOffset());
+        assertEquals(RecordBatch.NO_TIMESTAMP, partitionProduceResponse.logAppendTimeMs());
+        assertEquals(Errors.NONE, Errors.forCode(partitionProduceResponse.errorCode()));
+        assertEquals(null, partitionProduceResponse.errorMessage());
+        assertTrue(partitionProduceResponse.recordErrors().isEmpty());
     }
 
     @SuppressWarnings("deprecation")
     @Test
     public void produceResponseVersionTest() {
         Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
-        responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE,
-                10000, RecordBatch.NO_TIMESTAMP, 100));
+        responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE, 10000, RecordBatch.NO_TIMESTAMP, 100));
         ProduceResponse v0Response = new ProduceResponse(responseData);
         ProduceResponse v1Response = new ProduceResponse(responseData, 10);
         ProduceResponse v2Response = new ProduceResponse(responseData, 10);
         assertEquals(0, v0Response.throttleTimeMs(), "Throttle time must be zero");
         assertEquals(10, v1Response.throttleTimeMs(), "Throttle time must be 10");
         assertEquals(10, v2Response.throttleTimeMs(), "Throttle time must be 10");
-        assertEquals(responseData, v0Response.responses(), "Response data does not match");

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tang7526 commented on pull request #10332: KAFKA-10697: Remove ProduceResponse.responses

Posted by GitBox <gi...@apache.org>.
tang7526 commented on pull request #10332:
URL: https://github.com/apache/kafka/pull/10332#issuecomment-800730712


   @chia7712 Could you help review this PR?  Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10332: KAFKA-10697: Remove ProduceResponse.responses

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10332:
URL: https://github.com/apache/kafka/pull/10332#discussion_r595897827



##########
File path: core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
##########
@@ -155,10 +155,16 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       classOf[PrincipalBuilder].getName)
   }
 
-  @nowarn("cat=deprecation")
   val requestKeyToError = (topicNames: Map[Uuid, String]) => Map[ApiKeys, Nothing => Errors](
     ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => resp.errors.asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2),
-    ApiKeys.PRODUCE -> ((resp: requests.ProduceResponse) => resp.responses.asScala.find(_._1 == tp).get._2.error),
+    ApiKeys.PRODUCE -> ((resp: requests.ProduceResponse) => {
+      Errors.forCode(
+        resp.data
+          .responses.asScala.find(_.name == topic).get

Review comment:
       How about using `.responses.find(topic)`?

##########
File path: core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
##########
@@ -61,14 +59,17 @@ class ProduceRequestTest extends BaseRequestTest {
             .setAcks((-1).toShort)
             .setTimeoutMs(3000)
             .setTransactionalId(null)).build())
-      assertEquals(1, produceResponse.responses.size)
-      val (tp, partitionResponse) = produceResponse.responses.asScala.head
+      assertEquals(1, produceResponse.data.responses.size)
+      val topicProduceResponse = produceResponse.data.responses.asScala.head
+      assertEquals(1, topicProduceResponse.partitionResponses.size)   
+      val partitionProduceResponse = topicProduceResponse.partitionResponses.asScala.head;

Review comment:
       redundant ";"

##########
File path: core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
##########
@@ -154,11 +153,12 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness {
 
     assertEquals(0, responseBuffer.remaining, "The response should parse completely")
     assertEquals(correlationId, responseHeader.correlationId, "The correlationId should match request")
-    assertEquals(1, produceResponse.responses.size, "One partition response should be returned")
-
-    val partitionResponse = produceResponse.responses.get(topicPartition)
-    assertNotNull(partitionResponse)
-    assertEquals(Errors.NONE, partitionResponse.error, "There should be no error")
+    assertEquals(1, produceResponse.data.responses.size, "One topic response should be returned")
+    val topicProduceResponse = produceResponse.data.responses.asScala.head
+    assertEquals(1, topicProduceResponse.partitionResponses.size, "One partition response should be returned")    
+    val partitionProduceResponse = topicProduceResponse.partitionResponses.asScala.head;

Review comment:
       redundant ";"

##########
File path: core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
##########
@@ -185,17 +191,20 @@ class ProduceRequestTest extends BaseRequestTest {
       .setAcks((-1).toShort)
       .setTimeoutMs(3000)
       .setTransactionalId(null)).build())
-    assertEquals(1, produceResponse.responses.size)
-    val (tp, partitionResponse) = produceResponse.responses.asScala.head
+
+    assertEquals(1, produceResponse.data.responses.size)
+    val topicProduceResponse = produceResponse.data.responses.asScala.head
+    assertEquals(1, topicProduceResponse.partitionResponses.size)   
+    val partitionProduceResponse = topicProduceResponse.partitionResponses.asScala.head;

Review comment:
       redundant ";"

##########
File path: core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
##########
@@ -110,21 +110,25 @@ class ProduceRequestTest extends BaseRequestTest {
       .setAcks((-1).toShort)
       .setTimeoutMs(3000)
       .setTransactionalId(null)).build())
-    val (tp, partitionResponse) = produceResponse.responses.asScala.head
+
+    assertEquals(1, produceResponse.data.responses.size)
+    val topicProduceResponse = produceResponse.data.responses.asScala.head
+    assertEquals(1, topicProduceResponse.partitionResponses.size)   
+    val partitionProduceResponse = topicProduceResponse.partitionResponses.asScala.head;
+    val tp = new TopicPartition(topicProduceResponse.name, partitionProduceResponse.index)
     assertEquals(topicPartition, tp)
-    assertEquals(Errors.INVALID_TIMESTAMP, partitionResponse.error)
+    assertEquals(Errors.INVALID_TIMESTAMP, Errors.forCode(partitionProduceResponse.errorCode))
     // there are 3 records with InvalidTimestampException created from inner function createRecords
-    assertEquals(3, partitionResponse.recordErrors.size())
-    assertEquals(0, partitionResponse.recordErrors.get(0).batchIndex)
-    assertEquals(1, partitionResponse.recordErrors.get(1).batchIndex)
-    assertEquals(2, partitionResponse.recordErrors.get(2).batchIndex)
-    for (recordError <- partitionResponse.recordErrors.asScala) {
-      assertNotNull(recordError.message)
+    assertEquals(3, partitionProduceResponse.recordErrors.size)
+    assertEquals(0, partitionProduceResponse.recordErrors.get(0).batchIndex)
+    assertEquals(1, partitionProduceResponse.recordErrors.get(1).batchIndex)
+    assertEquals(2, partitionProduceResponse.recordErrors.get(2).batchIndex)
+    for (recordError <- partitionProduceResponse.recordErrors.asScala) {

Review comment:
       Could you rewrite it by `foreach`?
   ```scala
   partitionProduceResponse.recordErrors.asScala.foreach(recordError => assertNotNull(recordError.batchIndexErrorMessage))
   ```

##########
File path: core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
##########
@@ -337,12 +336,13 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
     }
   }
 
-  @nowarn("cat=deprecation")
   private def verifyConnection(socket: Socket): Unit = {
     val produceResponse = sendAndReceive[ProduceResponse](produceRequest, socket)
-    assertEquals(1, produceResponse.responses.size)
-    val (_, partitionResponse) = produceResponse.responses.asScala.head
-    assertEquals(Errors.NONE, partitionResponse.error)
+    assertEquals(1, produceResponse.data.responses.size)
+    val topicProduceResponse = produceResponse.data.responses.asScala.head
+    assertEquals(1, topicProduceResponse.partitionResponses.size)    
+    val partitionProduceResponse = topicProduceResponse.partitionResponses.asScala.head;

Review comment:
       redundant ";"

##########
File path: core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
##########
@@ -152,8 +156,11 @@ class ProduceRequestTest extends BaseRequestTest {
       .setTransactionalId(null)).build()
 
     val produceResponse = sendProduceRequest(nonReplicaId, produceRequest)
-    assertEquals(1, produceResponse.responses.size)
-    assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, produceResponse.responses.asScala.head._2.error)
+    assertEquals(1, produceResponse.data.responses.size)
+    val topicProduceResponse = produceResponse.data.responses.asScala.head
+    assertEquals(1, topicProduceResponse.partitionResponses.size)   
+    val partitionProduceResponse = topicProduceResponse.partitionResponses.asScala.head;

Review comment:
       redundant ";"

##########
File path: clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java
##########
@@ -40,42 +41,39 @@
     public void produceResponseV5Test() {
         Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
         TopicPartition tp0 = new TopicPartition("test", 0);
-        responseData.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE,
-                10000, RecordBatch.NO_TIMESTAMP, 100));
+        responseData.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE, 10000, RecordBatch.NO_TIMESTAMP, 100));
 
         ProduceResponse v5Response = new ProduceResponse(responseData, 10);
         short version = 5;
 
         ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader(v5Response, version, 0);
 
         ResponseHeader.parse(buffer, ApiKeys.PRODUCE.responseHeaderVersion(version)); // throw away.
-        ProduceResponse v5FromBytes = (ProduceResponse) AbstractResponse.parseResponse(ApiKeys.PRODUCE,
-                buffer, version);
-
-        assertEquals(1, v5FromBytes.responses().size());
-        assertTrue(v5FromBytes.responses().containsKey(tp0));
-        ProduceResponse.PartitionResponse partitionResponse = v5FromBytes.responses().get(tp0);
-        assertEquals(100, partitionResponse.logStartOffset);
-        assertEquals(10000, partitionResponse.baseOffset);
+        ProduceResponse v5FromBytes = (ProduceResponse) AbstractResponse.parseResponse(ApiKeys.PRODUCE, buffer, version);
+
+        assertEquals(1, v5FromBytes.data().responses().size());
+        ProduceResponseData.TopicProduceResponse topicProduceResponse = v5FromBytes.data().responses().iterator().next();
+        assertEquals(1, topicProduceResponse.partitionResponses().size());  
+        ProduceResponseData.PartitionProduceResponse partitionProduceResponse = topicProduceResponse.partitionResponses().iterator().next();
+        TopicPartition tp = new TopicPartition(topicProduceResponse.name(), partitionProduceResponse.index());
+        assertEquals(tp0, tp);
+
+        assertEquals(100, partitionProduceResponse.logStartOffset());
+        assertEquals(10000, partitionProduceResponse.baseOffset());
         assertEquals(10, v5FromBytes.throttleTimeMs());
-        assertEquals(responseData, v5Response.responses());

Review comment:
       This check includes `logAppendTime`, `error code`, `error message` and `recordErrors`. Could you add them back?

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -1455,10 +1453,11 @@ class KafkaApisTest {
 
       val response = capturedResponse.getValue.asInstanceOf[ProduceResponse]
 
-      assertEquals(1, response.responses().size())
-      for (partitionResponse <- response.responses().asScala) {
-        assertEquals(Errors.INVALID_PRODUCER_EPOCH, partitionResponse._2.error)
-      }
+      assertEquals(1, response.data.responses.size)
+      val topicProduceResponse = response.data.responses.asScala.head
+      assertEquals(1, topicProduceResponse.partitionResponses.size)   
+      val partitionProduceResponse = topicProduceResponse.partitionResponses.asScala.head;

Review comment:
       redundant ";"

##########
File path: core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
##########
@@ -110,21 +110,25 @@ class ProduceRequestTest extends BaseRequestTest {
       .setAcks((-1).toShort)
       .setTimeoutMs(3000)
       .setTransactionalId(null)).build())
-    val (tp, partitionResponse) = produceResponse.responses.asScala.head
+
+    assertEquals(1, produceResponse.data.responses.size)
+    val topicProduceResponse = produceResponse.data.responses.asScala.head
+    assertEquals(1, topicProduceResponse.partitionResponses.size)   
+    val partitionProduceResponse = topicProduceResponse.partitionResponses.asScala.head;

Review comment:
       redundant ";"

##########
File path: core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
##########
@@ -43,12 +42,11 @@ class ProduceRequestTest extends BaseRequestTest {
 
   val metricsKeySet = KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala
 
-  @nowarn("cat=deprecation")
   @Test
   def testSimpleProduceRequest(): Unit = {
     val (partition, leader) = createTopicAndFindPartitionWithLeader("topic")
 
-    def sendAndCheck(memoryRecords: MemoryRecords, expectedOffset: Long): ProduceResponse.PartitionResponse = {
+    def sendAndCheck(memoryRecords: MemoryRecords, expectedOffset: Long): ProduceResponseData.PartitionProduceResponse = {

Review comment:
       Could you remove the returned type if it is not used.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10332: KAFKA-10697: Remove ProduceResponse.responses

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10332:
URL: https://github.com/apache/kafka/pull/10332#discussion_r596291543



##########
File path: clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java
##########
@@ -40,42 +42,42 @@
     public void produceResponseV5Test() {
         Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
         TopicPartition tp0 = new TopicPartition("test", 0);
-        responseData.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE,
-                10000, RecordBatch.NO_TIMESTAMP, 100));
+        responseData.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE, 10000, RecordBatch.NO_TIMESTAMP, 100));
 
         ProduceResponse v5Response = new ProduceResponse(responseData, 10);
         short version = 5;
 
         ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader(v5Response, version, 0);
 
         ResponseHeader.parse(buffer, ApiKeys.PRODUCE.responseHeaderVersion(version)); // throw away.
-        ProduceResponse v5FromBytes = (ProduceResponse) AbstractResponse.parseResponse(ApiKeys.PRODUCE,
-                buffer, version);
-
-        assertEquals(1, v5FromBytes.responses().size());
-        assertTrue(v5FromBytes.responses().containsKey(tp0));
-        ProduceResponse.PartitionResponse partitionResponse = v5FromBytes.responses().get(tp0);
-        assertEquals(100, partitionResponse.logStartOffset);
-        assertEquals(10000, partitionResponse.baseOffset);
-        assertEquals(10, v5FromBytes.throttleTimeMs());
-        assertEquals(responseData, v5Response.responses());
+        ProduceResponse v5FromBytes = (ProduceResponse) AbstractResponse.parseResponse(ApiKeys.PRODUCE, buffer, version);
+
+        assertEquals(1, v5FromBytes.data().responses().size());
+        ProduceResponseData.TopicProduceResponse topicProduceResponse = v5FromBytes.data().responses().iterator().next();
+        assertEquals(1, topicProduceResponse.partitionResponses().size());  
+        ProduceResponseData.PartitionProduceResponse partitionProduceResponse = topicProduceResponse.partitionResponses().iterator().next();
+        TopicPartition tp = new TopicPartition(topicProduceResponse.name(), partitionProduceResponse.index());
+        assertEquals(tp0, tp);
+
+        assertEquals(100, partitionProduceResponse.logStartOffset());
+        assertEquals(10000, partitionProduceResponse.baseOffset());
+        assertEquals(RecordBatch.NO_TIMESTAMP, partitionProduceResponse.logAppendTimeMs());
+        assertEquals(Errors.NONE, Errors.forCode(partitionProduceResponse.errorCode()));
+        assertEquals(null, partitionProduceResponse.errorMessage());

Review comment:
       Could you use `assertNull`?

##########
File path: clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java
##########
@@ -40,42 +42,42 @@
     public void produceResponseV5Test() {
         Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
         TopicPartition tp0 = new TopicPartition("test", 0);
-        responseData.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE,
-                10000, RecordBatch.NO_TIMESTAMP, 100));
+        responseData.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE, 10000, RecordBatch.NO_TIMESTAMP, 100));
 
         ProduceResponse v5Response = new ProduceResponse(responseData, 10);
         short version = 5;
 
         ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader(v5Response, version, 0);
 
         ResponseHeader.parse(buffer, ApiKeys.PRODUCE.responseHeaderVersion(version)); // throw away.
-        ProduceResponse v5FromBytes = (ProduceResponse) AbstractResponse.parseResponse(ApiKeys.PRODUCE,
-                buffer, version);
-
-        assertEquals(1, v5FromBytes.responses().size());
-        assertTrue(v5FromBytes.responses().containsKey(tp0));
-        ProduceResponse.PartitionResponse partitionResponse = v5FromBytes.responses().get(tp0);
-        assertEquals(100, partitionResponse.logStartOffset);
-        assertEquals(10000, partitionResponse.baseOffset);
-        assertEquals(10, v5FromBytes.throttleTimeMs());
-        assertEquals(responseData, v5Response.responses());
+        ProduceResponse v5FromBytes = (ProduceResponse) AbstractResponse.parseResponse(ApiKeys.PRODUCE, buffer, version);
+
+        assertEquals(1, v5FromBytes.data().responses().size());
+        ProduceResponseData.TopicProduceResponse topicProduceResponse = v5FromBytes.data().responses().iterator().next();
+        assertEquals(1, topicProduceResponse.partitionResponses().size());  
+        ProduceResponseData.PartitionProduceResponse partitionProduceResponse = topicProduceResponse.partitionResponses().iterator().next();
+        TopicPartition tp = new TopicPartition(topicProduceResponse.name(), partitionProduceResponse.index());
+        assertEquals(tp0, tp);
+
+        assertEquals(100, partitionProduceResponse.logStartOffset());
+        assertEquals(10000, partitionProduceResponse.baseOffset());
+        assertEquals(RecordBatch.NO_TIMESTAMP, partitionProduceResponse.logAppendTimeMs());
+        assertEquals(Errors.NONE, Errors.forCode(partitionProduceResponse.errorCode()));
+        assertEquals(null, partitionProduceResponse.errorMessage());
+        assertTrue(partitionProduceResponse.recordErrors().isEmpty());
     }
 
     @SuppressWarnings("deprecation")
     @Test
     public void produceResponseVersionTest() {
         Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
-        responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE,
-                10000, RecordBatch.NO_TIMESTAMP, 100));
+        responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE, 10000, RecordBatch.NO_TIMESTAMP, 100));
         ProduceResponse v0Response = new ProduceResponse(responseData);
         ProduceResponse v1Response = new ProduceResponse(responseData, 10);
         ProduceResponse v2Response = new ProduceResponse(responseData, 10);
         assertEquals(0, v0Response.throttleTimeMs(), "Throttle time must be zero");
         assertEquals(10, v1Response.throttleTimeMs(), "Throttle time must be 10");
         assertEquals(10, v2Response.throttleTimeMs(), "Throttle time must be 10");
-        assertEquals(responseData, v0Response.responses(), "Response data does not match");

Review comment:
       Could you add similar checks?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tang7526 commented on a change in pull request #10332: KAFKA-10697: Remove ProduceResponse.responses

Posted by GitBox <gi...@apache.org>.
tang7526 commented on a change in pull request #10332:
URL: https://github.com/apache/kafka/pull/10332#discussion_r596102985



##########
File path: core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
##########
@@ -337,12 +336,13 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
     }
   }
 
-  @nowarn("cat=deprecation")
   private def verifyConnection(socket: Socket): Unit = {
     val produceResponse = sendAndReceive[ProduceResponse](produceRequest, socket)
-    assertEquals(1, produceResponse.responses.size)
-    val (_, partitionResponse) = produceResponse.responses.asScala.head
-    assertEquals(Errors.NONE, partitionResponse.error)
+    assertEquals(1, produceResponse.data.responses.size)
+    val topicProduceResponse = produceResponse.data.responses.asScala.head
+    assertEquals(1, topicProduceResponse.partitionResponses.size)    
+    val partitionProduceResponse = topicProduceResponse.partitionResponses.asScala.head;

Review comment:
       Done.

##########
File path: core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
##########
@@ -154,11 +153,12 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness {
 
     assertEquals(0, responseBuffer.remaining, "The response should parse completely")
     assertEquals(correlationId, responseHeader.correlationId, "The correlationId should match request")
-    assertEquals(1, produceResponse.responses.size, "One partition response should be returned")
-
-    val partitionResponse = produceResponse.responses.get(topicPartition)
-    assertNotNull(partitionResponse)
-    assertEquals(Errors.NONE, partitionResponse.error, "There should be no error")
+    assertEquals(1, produceResponse.data.responses.size, "One topic response should be returned")
+    val topicProduceResponse = produceResponse.data.responses.asScala.head
+    assertEquals(1, topicProduceResponse.partitionResponses.size, "One partition response should be returned")    
+    val partitionProduceResponse = topicProduceResponse.partitionResponses.asScala.head;

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tang7526 commented on a change in pull request #10332: KAFKA-10697: Remove ProduceResponse.responses

Posted by GitBox <gi...@apache.org>.
tang7526 commented on a change in pull request #10332:
URL: https://github.com/apache/kafka/pull/10332#discussion_r596103233



##########
File path: core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
##########
@@ -61,14 +59,17 @@ class ProduceRequestTest extends BaseRequestTest {
             .setAcks((-1).toShort)
             .setTimeoutMs(3000)
             .setTransactionalId(null)).build())
-      assertEquals(1, produceResponse.responses.size)
-      val (tp, partitionResponse) = produceResponse.responses.asScala.head
+      assertEquals(1, produceResponse.data.responses.size)
+      val topicProduceResponse = produceResponse.data.responses.asScala.head
+      assertEquals(1, topicProduceResponse.partitionResponses.size)   
+      val partitionProduceResponse = topicProduceResponse.partitionResponses.asScala.head;

Review comment:
       Done.

##########
File path: core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
##########
@@ -152,8 +156,11 @@ class ProduceRequestTest extends BaseRequestTest {
       .setTransactionalId(null)).build()
 
     val produceResponse = sendProduceRequest(nonReplicaId, produceRequest)
-    assertEquals(1, produceResponse.responses.size)
-    assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, produceResponse.responses.asScala.head._2.error)
+    assertEquals(1, produceResponse.data.responses.size)
+    val topicProduceResponse = produceResponse.data.responses.asScala.head
+    assertEquals(1, topicProduceResponse.partitionResponses.size)   
+    val partitionProduceResponse = topicProduceResponse.partitionResponses.asScala.head;

Review comment:
       Done.

##########
File path: core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
##########
@@ -43,12 +42,11 @@ class ProduceRequestTest extends BaseRequestTest {
 
   val metricsKeySet = KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala
 
-  @nowarn("cat=deprecation")
   @Test
   def testSimpleProduceRequest(): Unit = {
     val (partition, leader) = createTopicAndFindPartitionWithLeader("topic")
 
-    def sendAndCheck(memoryRecords: MemoryRecords, expectedOffset: Long): ProduceResponse.PartitionResponse = {
+    def sendAndCheck(memoryRecords: MemoryRecords, expectedOffset: Long): ProduceResponseData.PartitionProduceResponse = {

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tang7526 commented on a change in pull request #10332: KAFKA-10697: Remove ProduceResponse.responses

Posted by GitBox <gi...@apache.org>.
tang7526 commented on a change in pull request #10332:
URL: https://github.com/apache/kafka/pull/10332#discussion_r597042561



##########
File path: clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java
##########
@@ -78,6 +80,20 @@ public void produceResponseVersionTest() {
         assertEquals(0, v0Response.throttleTimeMs(), "Throttle time must be zero");
         assertEquals(10, v1Response.throttleTimeMs(), "Throttle time must be 10");
         assertEquals(10, v2Response.throttleTimeMs(), "Throttle time must be 10");
+
+        List<ProduceResponse> arrResponse = Arrays.asList(v0Response, v1Response, v2Response);
+        for(ProduceResponse produceResponse:arrResponse) {

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tang7526 commented on a change in pull request #10332: KAFKA-10697: Remove ProduceResponse.responses

Posted by GitBox <gi...@apache.org>.
tang7526 commented on a change in pull request #10332:
URL: https://github.com/apache/kafka/pull/10332#discussion_r595764290



##########
File path: clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java
##########
@@ -40,42 +41,39 @@
     public void produceResponseV5Test() {
         Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
         TopicPartition tp0 = new TopicPartition("test", 0);
-        responseData.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE,
-                10000, RecordBatch.NO_TIMESTAMP, 100));
+        responseData.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE, 10000, RecordBatch.NO_TIMESTAMP, 100));
 
         ProduceResponse v5Response = new ProduceResponse(responseData, 10);
         short version = 5;
 
         ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader(v5Response, version, 0);
 
         ResponseHeader.parse(buffer, ApiKeys.PRODUCE.responseHeaderVersion(version)); // throw away.
-        ProduceResponse v5FromBytes = (ProduceResponse) AbstractResponse.parseResponse(ApiKeys.PRODUCE,
-                buffer, version);
-
-        assertEquals(1, v5FromBytes.responses().size());
-        assertTrue(v5FromBytes.responses().containsKey(tp0));
-        ProduceResponse.PartitionResponse partitionResponse = v5FromBytes.responses().get(tp0);
-        assertEquals(100, partitionResponse.logStartOffset);
-        assertEquals(10000, partitionResponse.baseOffset);
+        ProduceResponse v5FromBytes = (ProduceResponse) AbstractResponse.parseResponse(ApiKeys.PRODUCE, buffer, version);
+
+        assertEquals(1, v5FromBytes.data().responses().size());

Review comment:
       I split it into two `assertEquals`:
   `assertEquals(1, v5FromBytes.data().responses().size());`
   `assertEquals(1, topicProduceResponse.partitionResponses().size()); `
   
   I think these two `assertEquals` should equal to previous `assertEquals`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10332: KAFKA-10697: Remove ProduceResponse.responses

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10332:
URL: https://github.com/apache/kafka/pull/10332#discussion_r597031230



##########
File path: clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java
##########
@@ -78,6 +80,20 @@ public void produceResponseVersionTest() {
         assertEquals(0, v0Response.throttleTimeMs(), "Throttle time must be zero");
         assertEquals(10, v1Response.throttleTimeMs(), "Throttle time must be 10");
         assertEquals(10, v2Response.throttleTimeMs(), "Throttle time must be 10");
+
+        List<ProduceResponse> arrResponse = Arrays.asList(v0Response, v1Response, v2Response);
+        for(ProduceResponse produceResponse:arrResponse) {

Review comment:
       code style: `produceResponse : arrResponse`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tang7526 commented on a change in pull request #10332: KAFKA-10697: Remove ProduceResponse.responses

Posted by GitBox <gi...@apache.org>.
tang7526 commented on a change in pull request #10332:
URL: https://github.com/apache/kafka/pull/10332#discussion_r595764290



##########
File path: clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java
##########
@@ -40,42 +41,39 @@
     public void produceResponseV5Test() {
         Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
         TopicPartition tp0 = new TopicPartition("test", 0);
-        responseData.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE,
-                10000, RecordBatch.NO_TIMESTAMP, 100));
+        responseData.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE, 10000, RecordBatch.NO_TIMESTAMP, 100));
 
         ProduceResponse v5Response = new ProduceResponse(responseData, 10);
         short version = 5;
 
         ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader(v5Response, version, 0);
 
         ResponseHeader.parse(buffer, ApiKeys.PRODUCE.responseHeaderVersion(version)); // throw away.
-        ProduceResponse v5FromBytes = (ProduceResponse) AbstractResponse.parseResponse(ApiKeys.PRODUCE,
-                buffer, version);
-
-        assertEquals(1, v5FromBytes.responses().size());
-        assertTrue(v5FromBytes.responses().containsKey(tp0));
-        ProduceResponse.PartitionResponse partitionResponse = v5FromBytes.responses().get(tp0);
-        assertEquals(100, partitionResponse.logStartOffset);
-        assertEquals(10000, partitionResponse.baseOffset);
+        ProduceResponse v5FromBytes = (ProduceResponse) AbstractResponse.parseResponse(ApiKeys.PRODUCE, buffer, version);
+
+        assertEquals(1, v5FromBytes.data().responses().size());

Review comment:
       I split it into two `assertEquals`:
   `assertEquals(1, v5FromBytes.data().responses().size());`
   `assertEquals(1, topicProduceResponse.partitionResponses().size()); `
   
   I think these two `assertEquals` equal to previous `assertEquals`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tang7526 commented on a change in pull request #10332: KAFKA-10697: Remove ProduceResponse.responses

Posted by GitBox <gi...@apache.org>.
tang7526 commented on a change in pull request #10332:
URL: https://github.com/apache/kafka/pull/10332#discussion_r596103167



##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -1455,10 +1453,11 @@ class KafkaApisTest {
 
       val response = capturedResponse.getValue.asInstanceOf[ProduceResponse]
 
-      assertEquals(1, response.responses().size())
-      for (partitionResponse <- response.responses().asScala) {
-        assertEquals(Errors.INVALID_PRODUCER_EPOCH, partitionResponse._2.error)
-      }
+      assertEquals(1, response.data.responses.size)
+      val topicProduceResponse = response.data.responses.asScala.head
+      assertEquals(1, topicProduceResponse.partitionResponses.size)   
+      val partitionProduceResponse = topicProduceResponse.partitionResponses.asScala.head;

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tang7526 commented on a change in pull request #10332: KAFKA-10697: Remove ProduceResponse.responses

Posted by GitBox <gi...@apache.org>.
tang7526 commented on a change in pull request #10332:
URL: https://github.com/apache/kafka/pull/10332#discussion_r596102909



##########
File path: core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
##########
@@ -155,10 +155,16 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       classOf[PrincipalBuilder].getName)
   }
 
-  @nowarn("cat=deprecation")
   val requestKeyToError = (topicNames: Map[Uuid, String]) => Map[ApiKeys, Nothing => Errors](
     ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => resp.errors.asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2),
-    ApiKeys.PRODUCE -> ((resp: requests.ProduceResponse) => resp.responses.asScala.find(_._1 == tp).get._2.error),
+    ApiKeys.PRODUCE -> ((resp: requests.ProduceResponse) => {
+      Errors.forCode(
+        resp.data
+          .responses.asScala.find(_.name == topic).get

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org