You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2020/10/20 09:27:40 UTC

[kafka] branch 2.7 updated: MINOR: Use `PartitionResponse.errorMessage` in exceptions in KafkaProducer (#9450)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new af15003  MINOR: Use `PartitionResponse.errorMessage` in exceptions in KafkaProducer (#9450)
af15003 is described below

commit af150033b27c7822bfc995abafd6f91597e2e4ca
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Tue Oct 20 10:15:01 2020 +0100

    MINOR: Use `PartitionResponse.errorMessage` in exceptions in KafkaProducer (#9450)
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>, Chia-Ping Tsai <ch...@gmail.com>
---
 .../kafka/clients/producer/internals/Sender.java   |  4 +--
 .../clients/producer/internals/SenderTest.java     | 36 +++++++++++++++++++---
 .../kafka/api/PlaintextProducerSendTest.scala      | 18 ++++++-----
 3 files changed, 43 insertions(+), 15 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 1300e5c..958f17f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -616,7 +616,7 @@ public class Sender implements Runnable {
                 else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED)
                     exception = new ClusterAuthorizationException("The producer is not authorized to do idempotent sends");
                 else
-                    exception = error.exception();
+                    exception = error.exception(response.errorMessage);
                 // tell the user the result of their request. We only adjust sequence numbers if the batch didn't exhaust
                 // its retries -- if it did, we don't know whether the sequence number was accepted or not, and
                 // thus it is not safe to reassign the sequence.
@@ -629,7 +629,7 @@ public class Sender implements Runnable {
                         batch.topicPartition);
                 } else {
                     log.warn("Received invalid metadata error in produce request on partition {} due to {}. Going " +
-                            "to request metadata update now", batch.topicPartition, error.exception().toString());
+                            "to request metadata update now", batch.topicPartition, error.exception(response.errorMessage).toString());
                 }
                 metadata.requestUpdate();
             }
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 63ab680..85d8db8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.TransactionAbortedException;
 import org.apache.kafka.common.utils.ProducerIdAndEpoch;
 import org.apache.kafka.clients.producer.RecordMetadata;
@@ -93,6 +94,7 @@ import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -1726,7 +1728,7 @@ public class SenderTest {
             assertFalse(batchIterator.hasNext());
             assertEquals(expectedSequence, firstBatch.baseSequence());
             return true;
-        }, produceResponse(tp, responseOffset, responseError, 0, logStartOffset));
+        }, produceResponse(tp, responseOffset, responseError, 0, logStartOffset, null));
     }
 
     @Test
@@ -2129,7 +2131,7 @@ public class SenderTest {
 
         time.sleep(deliveryTimeoutMs / 2); // expire the first batch only
 
-        client.respond(produceResponse(tp0, 0L, Errors.NONE, 0, 0L));
+        client.respond(produceResponse(tp0, 0L, Errors.NONE, 0, 0L, null));
         sender.runOnce();  // receive response (offset=0)
         assertEquals(0, client.inFlightRequestCount());
         assertEquals(0, sender.inFlightBatches(tp0).size());
@@ -2432,6 +2434,29 @@ public class SenderTest {
         sender.runOnce();
     }
 
+    @Test
+    public void testDefaultErrorMessage() throws Exception {
+        verifyErrorMessage(produceResponse(tp0, 0L, Errors.INVALID_REQUEST, 0), Errors.INVALID_REQUEST.message());
+    }
+
+    @Test
+    public void testCustomErrorMessage() throws Exception {
+        String errorMessage = "testCustomErrorMessage";
+        verifyErrorMessage(produceResponse(tp0, 0L, Errors.INVALID_REQUEST, 0, -1, errorMessage), errorMessage);
+    }
+
+    private void verifyErrorMessage(ProduceResponse response, String expectedMessage) throws Exception {
+        Future<RecordMetadata> future = appendToAccumulator(tp0, 0L, "key", "value");
+        sender.runOnce(); // connect
+        sender.runOnce(); // send produce request
+        client.respond(response);
+        sender.runOnce();
+        sender.runOnce();
+        ExecutionException e1 = assertThrows(ExecutionException.class, () -> future.get(5, TimeUnit.SECONDS));
+        assertEquals(InvalidRequestException.class, e1.getCause().getClass());
+        assertEquals(expectedMessage, e1.getCause().getMessage());
+    }
+
     class AssertEndTxnRequestMatcher implements MockClient.RequestMatcher {
 
         private TransactionResult requiredResult;
@@ -2527,8 +2552,9 @@ public class SenderTest {
                 null, MAX_BLOCK_TIMEOUT, false, time.milliseconds()).future;
     }
 
-    private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs, long logStartOffset) {
-        ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, RecordBatch.NO_TIMESTAMP, logStartOffset);
+    private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs, long logStartOffset, String errorMessage) {
+        ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset,
+                RecordBatch.NO_TIMESTAMP, logStartOffset, Collections.emptyList(), errorMessage);
         Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = Collections.singletonMap(tp, resp);
         return new ProduceResponse(partResp, throttleTimeMs);
     }
@@ -2544,7 +2570,7 @@ public class SenderTest {
 
     }
     private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs) {
-        return produceResponse(tp, offset, error, throttleTimeMs, -1L);
+        return produceResponse(tp, offset, error, throttleTimeMs, -1L, null);
     }
 
     private TransactionManager createTransactionManager() {
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
index 4e44f34..3b91c2c 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
@@ -98,10 +98,11 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
 
     val producer = createProducer(brokerList = brokerList)
     try {
-      producer.send(new ProducerRecord(topic, 0, System.currentTimeMillis() - 1001, "key".getBytes, "value".getBytes)).get()
-      fail("Should throw CorruptedRecordException")
-    } catch {
-      case e: ExecutionException => assertTrue(e.getCause.isInstanceOf[InvalidTimestampException])
+      val e = intercept[ExecutionException] {
+        producer.send(new ProducerRecord(topic, 0, System.currentTimeMillis() - 1001, "key".getBytes, "value".getBytes)).get()
+      }.getCause
+      assertTrue(e.isInstanceOf[InvalidTimestampException])
+      assertEquals("One or more records have been rejected due to invalid timestamp", e.getMessage)
     } finally {
       producer.close()
     }
@@ -109,10 +110,11 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
     // Test compressed messages.
     val compressedProducer = createProducer(brokerList = brokerList, compressionType = "gzip")
     try {
-      compressedProducer.send(new ProducerRecord(topic, 0, System.currentTimeMillis() - 1001, "key".getBytes, "value".getBytes)).get()
-      fail("Should throw CorruptedRecordException")
-    } catch {
-      case e: ExecutionException => assertTrue(e.getCause.isInstanceOf[InvalidTimestampException])
+      val e = intercept[ExecutionException] {
+        compressedProducer.send(new ProducerRecord(topic, 0, System.currentTimeMillis() - 1001, "key".getBytes, "value".getBytes)).get()
+      }.getCause
+      assertTrue(e.isInstanceOf[InvalidTimestampException])
+      assertEquals("One or more records have been rejected due to invalid timestamp", e.getMessage)
     } finally {
       compressedProducer.close()
     }