You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2020/02/04 17:08:54 UTC

[kafka] branch 2.5 updated: KAFKA-9492; Ignore record errors in ProduceResponse for older versions (#8030)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 8c915fa  KAFKA-9492; Ignore record errors in ProduceResponse for older versions (#8030)
8c915fa is described below

commit 8c915fa2b46bdd80fb0cf5c74890bad94bb88af0
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Tue Feb 4 17:01:08 2020 +0000

    KAFKA-9492; Ignore record errors in ProduceResponse for older versions (#8030)
    
    Fixes NPE in brokers when processing record errors in produce response for older versions.
    
    Reviewers: Chia-Ping Tsai <ch...@gmail.com>, Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../kafka/common/requests/ProduceResponse.java     |  24 +++--
 .../kafka/common/requests/ProduceResponseTest.java | 115 +++++++++++++++++++++
 .../kafka/common/requests/RequestResponseTest.java |  51 ---------
 3 files changed, 128 insertions(+), 62 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index 8bfc629..7bbab08 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -222,7 +222,8 @@ public class ProduceResponse extends AbstractResponse {
                 int partition = partRespStruct.get(PARTITION_ID);
                 Errors error = Errors.forCode(partRespStruct.get(ERROR_CODE));
                 long offset = partRespStruct.getLong(BASE_OFFSET_KEY_NAME);
-                long logAppendTime = partRespStruct.getLong(LOG_APPEND_TIME_KEY_NAME);
+                long logAppendTime = partRespStruct.hasField(LOG_APPEND_TIME_KEY_NAME) ?
+                        partRespStruct.getLong(LOG_APPEND_TIME_KEY_NAME) : RecordBatch.NO_TIMESTAMP;
                 long logStartOffset = partRespStruct.getOrElse(LOG_START_OFFSET_FIELD, INVALID_OFFSET);
 
                 List<RecordError> recordErrors = Collections.emptyList();
@@ -274,19 +275,20 @@ public class ProduceResponse extends AbstractResponse {
                 partStruct.setIfExists(LOG_APPEND_TIME_KEY_NAME, part.logAppendTime);
                 partStruct.setIfExists(LOG_START_OFFSET_FIELD, part.logStartOffset);
 
-                List<Struct> recordErrors = Collections.emptyList();
-                if (!part.recordErrors.isEmpty()) {
-                    recordErrors = new ArrayList<>();
-                    for (RecordError indexAndMessage : part.recordErrors) {
-                        Struct indexAndMessageStruct = partStruct.instance(RECORD_ERRORS_KEY_NAME)
-                                .set(BATCH_INDEX_KEY_NAME, indexAndMessage.batchIndex)
-                                .set(BATCH_INDEX_ERROR_MESSAGE_FIELD, indexAndMessage.message);
-                        recordErrors.add(indexAndMessageStruct);
+                if (partStruct.hasField(RECORD_ERRORS_KEY_NAME)) {
+                    List<Struct> recordErrors = Collections.emptyList();
+                    if (!part.recordErrors.isEmpty()) {
+                        recordErrors = new ArrayList<>();
+                        for (RecordError indexAndMessage : part.recordErrors) {
+                            Struct indexAndMessageStruct = partStruct.instance(RECORD_ERRORS_KEY_NAME)
+                                    .set(BATCH_INDEX_KEY_NAME, indexAndMessage.batchIndex)
+                                    .set(BATCH_INDEX_ERROR_MESSAGE_FIELD, indexAndMessage.message);
+                            recordErrors.add(indexAndMessageStruct);
+                        }
                     }
+                    partStruct.set(RECORD_ERRORS_KEY_NAME, recordErrors.toArray());
                 }
 
-                partStruct.setIfExists(RECORD_ERRORS_KEY_NAME, recordErrors.toArray());
-
                 partStruct.setIfExists(ERROR_MESSAGE_FIELD, part.errorMessage);
                 partitionArray.add(partStruct);
             }
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java
new file mode 100644
index 0000000..ea6e998
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.record.RecordBatch;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.kafka.common.protocol.ApiKeys.PRODUCE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ProduceResponseTest {
+
+    @Test
+    public void produceResponseV5Test() {
+        Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
+        TopicPartition tp0 = new TopicPartition("test", 0);
+        responseData.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE,
+                10000, RecordBatch.NO_TIMESTAMP, 100));
+
+        ProduceResponse v5Response = new ProduceResponse(responseData, 10);
+        short version = 5;
+
+        ByteBuffer buffer = v5Response.serialize(ApiKeys.PRODUCE, version, 0);
+        buffer.rewind();
+
+        ResponseHeader.parse(buffer, ApiKeys.PRODUCE.responseHeaderVersion(version)); // throw away.
+
+        Struct deserializedStruct = ApiKeys.PRODUCE.parseResponse(version, buffer);
+
+        ProduceResponse v5FromBytes = (ProduceResponse) AbstractResponse.parseResponse(ApiKeys.PRODUCE,
+                deserializedStruct, version);
+
+        assertEquals(1, v5FromBytes.responses().size());
+        assertTrue(v5FromBytes.responses().containsKey(tp0));
+        ProduceResponse.PartitionResponse partitionResponse = v5FromBytes.responses().get(tp0);
+        assertEquals(100, partitionResponse.logStartOffset);
+        assertEquals(10000, partitionResponse.baseOffset);
+        assertEquals(10, v5FromBytes.throttleTimeMs());
+        assertEquals(responseData, v5Response.responses());
+    }
+
+    @Test
+    public void produceResponseVersionTest() {
+        Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
+        responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE,
+                10000, RecordBatch.NO_TIMESTAMP, 100));
+        ProduceResponse v0Response = new ProduceResponse(responseData);
+        ProduceResponse v1Response = new ProduceResponse(responseData, 10);
+        ProduceResponse v2Response = new ProduceResponse(responseData, 10);
+        assertEquals("Throttle time must be zero", 0, v0Response.throttleTimeMs());
+        assertEquals("Throttle time must be 10", 10, v1Response.throttleTimeMs());
+        assertEquals("Throttle time must be 10", 10, v2Response.throttleTimeMs());
+        assertEquals("Should use schema version 0", ApiKeys.PRODUCE.responseSchema((short) 0),
+                v0Response.toStruct((short) 0).schema());
+        assertEquals("Should use schema version 1", ApiKeys.PRODUCE.responseSchema((short) 1),
+                v1Response.toStruct((short) 1).schema());
+        assertEquals("Should use schema version 2", ApiKeys.PRODUCE.responseSchema((short) 2),
+                v2Response.toStruct((short) 2).schema());
+        assertEquals("Response data does not match", responseData, v0Response.responses());
+        assertEquals("Response data does not match", responseData, v1Response.responses());
+        assertEquals("Response data does not match", responseData, v2Response.responses());
+    }
+
+    @Test
+    public void produceResponseRecordErrorsTest() {
+        Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
+        TopicPartition tp = new TopicPartition("test", 0);
+        ProduceResponse.PartitionResponse partResponse = new ProduceResponse.PartitionResponse(Errors.NONE,
+                10000, RecordBatch.NO_TIMESTAMP, 100,
+                Collections.singletonList(new ProduceResponse.RecordError(3, "Record error")),
+                "Produce failed");
+        responseData.put(tp, partResponse);
+
+        for (short ver = 0; ver <= PRODUCE.latestVersion(); ver++) {
+            ProduceResponse response = new ProduceResponse(responseData);
+            Struct struct = response.toStruct(ver);
+            assertEquals("Should use schema version " + ver, ApiKeys.PRODUCE.responseSchema(ver), struct.schema());
+            ProduceResponse.PartitionResponse deserialized = new ProduceResponse(struct).responses().get(tp);
+            if (ver >= 8) {
+                assertEquals(1, deserialized.recordErrors.size());
+                assertEquals(3, deserialized.recordErrors.get(0).batchIndex);
+                assertEquals("Record error", deserialized.recordErrors.get(0).message);
+                assertEquals("Produce failed", deserialized.errorMessage);
+            } else {
+                assertEquals(0, deserialized.recordErrors.size());
+                assertEquals(null, deserialized.errorMessage);
+            }
+        }
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 2304fcf..8330e6a 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -603,57 +603,6 @@ public class RequestResponseTest {
     }
 
     @Test
-    public void produceResponseV5Test() {
-        Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
-        TopicPartition tp0 = new TopicPartition("test", 0);
-        responseData.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE,
-                10000, RecordBatch.NO_TIMESTAMP, 100));
-
-        ProduceResponse v5Response = new ProduceResponse(responseData, 10);
-        short version = 5;
-
-        ByteBuffer buffer = v5Response.serialize(ApiKeys.PRODUCE, version, 0);
-        buffer.rewind();
-
-        ResponseHeader.parse(buffer, ApiKeys.PRODUCE.responseHeaderVersion(version)); // throw away.
-
-        Struct deserializedStruct = ApiKeys.PRODUCE.parseResponse(version, buffer);
-
-        ProduceResponse v5FromBytes = (ProduceResponse) AbstractResponse.parseResponse(ApiKeys.PRODUCE,
-                deserializedStruct, version);
-
-        assertEquals(1, v5FromBytes.responses().size());
-        assertTrue(v5FromBytes.responses().containsKey(tp0));
-        ProduceResponse.PartitionResponse partitionResponse = v5FromBytes.responses().get(tp0);
-        assertEquals(100, partitionResponse.logStartOffset);
-        assertEquals(10000, partitionResponse.baseOffset);
-        assertEquals(10, v5FromBytes.throttleTimeMs());
-        assertEquals(responseData, v5Response.responses());
-    }
-
-    @Test
-    public void produceResponseVersionTest() {
-        Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
-        responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE,
-                10000, RecordBatch.NO_TIMESTAMP, 100));
-        ProduceResponse v0Response = new ProduceResponse(responseData);
-        ProduceResponse v1Response = new ProduceResponse(responseData, 10);
-        ProduceResponse v2Response = new ProduceResponse(responseData, 10);
-        assertEquals("Throttle time must be zero", 0, v0Response.throttleTimeMs());
-        assertEquals("Throttle time must be 10", 10, v1Response.throttleTimeMs());
-        assertEquals("Throttle time must be 10", 10, v2Response.throttleTimeMs());
-        assertEquals("Should use schema version 0", ApiKeys.PRODUCE.responseSchema((short) 0),
-                v0Response.toStruct((short) 0).schema());
-        assertEquals("Should use schema version 1", ApiKeys.PRODUCE.responseSchema((short) 1),
-                v1Response.toStruct((short) 1).schema());
-        assertEquals("Should use schema version 2", ApiKeys.PRODUCE.responseSchema((short) 2),
-                v2Response.toStruct((short) 2).schema());
-        assertEquals("Response data does not match", responseData, v0Response.responses());
-        assertEquals("Response data does not match", responseData, v1Response.responses());
-        assertEquals("Response data does not match", responseData, v2Response.responses());
-    }
-
-    @Test
     public void fetchResponseVersionTest() {
         LinkedHashMap<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> responseData = new LinkedHashMap<>();