You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2020/10/20 09:27:40 UTC
[kafka] branch 2.7 updated: MINOR: Use
`PartitionResponse.errorMessage` in exceptions in KafkaProducer (#9450)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new af15003 MINOR: Use `PartitionResponse.errorMessage` in exceptions in KafkaProducer (#9450)
af15003 is described below
commit af150033b27c7822bfc995abafd6f91597e2e4ca
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Tue Oct 20 10:15:01 2020 +0100
MINOR: Use `PartitionResponse.errorMessage` in exceptions in KafkaProducer (#9450)
Reviewers: Manikumar Reddy <ma...@gmail.com>, Chia-Ping Tsai <ch...@gmail.com>
---
.../kafka/clients/producer/internals/Sender.java | 4 +--
.../clients/producer/internals/SenderTest.java | 36 +++++++++++++++++++---
.../kafka/api/PlaintextProducerSendTest.scala | 18 ++++++-----
3 files changed, 43 insertions(+), 15 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 1300e5c..958f17f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -616,7 +616,7 @@ public class Sender implements Runnable {
else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED)
exception = new ClusterAuthorizationException("The producer is not authorized to do idempotent sends");
else
- exception = error.exception();
+ exception = error.exception(response.errorMessage);
// tell the user the result of their request. We only adjust sequence numbers if the batch didn't exhaust
// its retries -- if it did, we don't know whether the sequence number was accepted or not, and
// thus it is not safe to reassign the sequence.
@@ -629,7 +629,7 @@ public class Sender implements Runnable {
batch.topicPartition);
} else {
log.warn("Received invalid metadata error in produce request on partition {} due to {}. Going " +
- "to request metadata update now", batch.topicPartition, error.exception().toString());
+ "to request metadata update now", batch.topicPartition, error.exception(response.errorMessage).toString());
}
metadata.requestUpdate();
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 63ab680..85d8db8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.TransactionAbortedException;
import org.apache.kafka.common.utils.ProducerIdAndEpoch;
import org.apache.kafka.clients.producer.RecordMetadata;
@@ -93,6 +94,7 @@ import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -1726,7 +1728,7 @@ public class SenderTest {
assertFalse(batchIterator.hasNext());
assertEquals(expectedSequence, firstBatch.baseSequence());
return true;
- }, produceResponse(tp, responseOffset, responseError, 0, logStartOffset));
+ }, produceResponse(tp, responseOffset, responseError, 0, logStartOffset, null));
}
@Test
@@ -2129,7 +2131,7 @@ public class SenderTest {
time.sleep(deliveryTimeoutMs / 2); // expire the first batch only
- client.respond(produceResponse(tp0, 0L, Errors.NONE, 0, 0L));
+ client.respond(produceResponse(tp0, 0L, Errors.NONE, 0, 0L, null));
sender.runOnce(); // receive response (offset=0)
assertEquals(0, client.inFlightRequestCount());
assertEquals(0, sender.inFlightBatches(tp0).size());
@@ -2432,6 +2434,29 @@ public class SenderTest {
sender.runOnce();
}
+ @Test
+ public void testDefaultErrorMessage() throws Exception {
+ verifyErrorMessage(produceResponse(tp0, 0L, Errors.INVALID_REQUEST, 0), Errors.INVALID_REQUEST.message());
+ }
+
+ @Test
+ public void testCustomErrorMessage() throws Exception {
+ String errorMessage = "testCustomErrorMessage";
+ verifyErrorMessage(produceResponse(tp0, 0L, Errors.INVALID_REQUEST, 0, -1, errorMessage), errorMessage);
+ }
+
+ private void verifyErrorMessage(ProduceResponse response, String expectedMessage) throws Exception {
+ Future<RecordMetadata> future = appendToAccumulator(tp0, 0L, "key", "value");
+ sender.runOnce(); // connect
+ sender.runOnce(); // send produce request
+ client.respond(response);
+ sender.runOnce();
+ sender.runOnce();
+ ExecutionException e1 = assertThrows(ExecutionException.class, () -> future.get(5, TimeUnit.SECONDS));
+ assertEquals(InvalidRequestException.class, e1.getCause().getClass());
+ assertEquals(expectedMessage, e1.getCause().getMessage());
+ }
+
class AssertEndTxnRequestMatcher implements MockClient.RequestMatcher {
private TransactionResult requiredResult;
@@ -2527,8 +2552,9 @@ public class SenderTest {
null, MAX_BLOCK_TIMEOUT, false, time.milliseconds()).future;
}
- private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs, long logStartOffset) {
- ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, RecordBatch.NO_TIMESTAMP, logStartOffset);
+ private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs, long logStartOffset, String errorMessage) {
+ ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset,
+ RecordBatch.NO_TIMESTAMP, logStartOffset, Collections.emptyList(), errorMessage);
Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = Collections.singletonMap(tp, resp);
return new ProduceResponse(partResp, throttleTimeMs);
}
@@ -2544,7 +2570,7 @@ public class SenderTest {
}
private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs) {
- return produceResponse(tp, offset, error, throttleTimeMs, -1L);
+ return produceResponse(tp, offset, error, throttleTimeMs, -1L, null);
}
private TransactionManager createTransactionManager() {
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
index 4e44f34..3b91c2c 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
@@ -98,10 +98,11 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
val producer = createProducer(brokerList = brokerList)
try {
- producer.send(new ProducerRecord(topic, 0, System.currentTimeMillis() - 1001, "key".getBytes, "value".getBytes)).get()
- fail("Should throw CorruptedRecordException")
- } catch {
- case e: ExecutionException => assertTrue(e.getCause.isInstanceOf[InvalidTimestampException])
+ val e = intercept[ExecutionException] {
+ producer.send(new ProducerRecord(topic, 0, System.currentTimeMillis() - 1001, "key".getBytes, "value".getBytes)).get()
+ }.getCause
+ assertTrue(e.isInstanceOf[InvalidTimestampException])
+ assertEquals("One or more records have been rejected due to invalid timestamp", e.getMessage)
} finally {
producer.close()
}
@@ -109,10 +110,11 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
// Test compressed messages.
val compressedProducer = createProducer(brokerList = brokerList, compressionType = "gzip")
try {
- compressedProducer.send(new ProducerRecord(topic, 0, System.currentTimeMillis() - 1001, "key".getBytes, "value".getBytes)).get()
- fail("Should throw CorruptedRecordException")
- } catch {
- case e: ExecutionException => assertTrue(e.getCause.isInstanceOf[InvalidTimestampException])
+ val e = intercept[ExecutionException] {
+ compressedProducer.send(new ProducerRecord(topic, 0, System.currentTimeMillis() - 1001, "key".getBytes, "value".getBytes)).get()
+ }.getCause
+ assertTrue(e.isInstanceOf[InvalidTimestampException])
+ assertEquals("One or more records have been rejected due to invalid timestamp", e.getMessage)
} finally {
compressedProducer.close()
}