You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/03/11 10:17:05 UTC
[pulsar] branch master updated: Support Kafka's
ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG. (#3797)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 117321d Support Kafka's ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG. (#3797)
117321d is described below
commit 117321d7df0d508daa795cccd1adeda7e350ff9d
Author: Marvin Cai <ca...@gmail.com>
AuthorDate: Mon Mar 11 03:16:58 2019 -0700
Support Kafka's ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG. (#3797)
**Motivation**
Support Kafka's ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG #1090
Previously `ProducerBuilder.sendTimeout` was set by parsing Kafka's `ProducerConfig.MAX_BLOCK_MS_CONFIG`.
According to Kafka's [document](https://kafka.apache.org/20/documentation.html) it's for
> Controlling how long KafkaProducer.send() and KafkaProducer.partitionsFor() will block either because the buffer is full or metadata unavailable.
But `ProducerBuilder.sendTimeout` is for
> If a message is not acknowledged by the server before the sendTimeout expires, an error will be reported.
And Kafka's `ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG`, according to the document is for:
> Controlling the maximum amount of time the client will wait for the response of a request.
Which I think would be better fit purpose of Pulsar's `ProducerBuilder.sendTimeout`.
---
.../apache/kafka/clients/producer/PulsarKafkaProducer.java | 4 ++--
.../kafka/clients/producer/PulsarKafkaProducerTest.java | 12 +++++++++++-
site2/docs/adaptors-kafka.md | 2 +-
3 files changed, 14 insertions(+), 4 deletions(-)
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
index 7309c69..4906cf5 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
@@ -119,7 +119,7 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
String serviceUrl = producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);
try {
// Support Kafka's ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG in ms.
- // If passed in value is greater than Integer.MAX_VALUE in second will throw ArithmeticException.
+ // If passed in value is greater than Integer.MAX_VALUE in second will throw IllegalArgumentException.
int keepAliveInterval = Math.toIntExact(keepAliveIntervalMs / 1000);
client = PulsarClientKafkaConfig.getClientBuilder(properties).serviceUrl(serviceUrl).keepAliveInterval(keepAliveInterval, TimeUnit.SECONDS).build();
} catch (ArithmeticException e) {
@@ -146,7 +146,7 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
pulsarProducerBuilder.messageRouter(new KafkaMessageRouter(lingerMs));
- int sendTimeoutMillis = Integer.parseInt(properties.getProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "60000"));
+ int sendTimeoutMillis = Integer.parseInt(properties.getProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000"));
pulsarProducerBuilder.sendTimeout(sendTimeoutMillis, TimeUnit.MILLISECONDS);
boolean blockOnBufferFull = Boolean
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
index 6bc99e7..37ff221 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
@@ -63,6 +63,13 @@ public class PulsarKafkaProducerTest {
public void testPulsarKafkaProducer() {
ClientBuilder mockClientBuilder = mock(ClientBuilder.class);
ProducerBuilder mockProducerBuilder = mock(ProducerBuilder.class);
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ Assert.assertEquals((int)invocation.getArguments()[0], 1000000, "Send time out is suppose to be 1000.");
+ return mockProducerBuilder;
+ }
+ }).when(mockProducerBuilder).sendTimeout(anyInt(), any(TimeUnit.class));
doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(anyString());
doAnswer(new Answer() {
@Override
@@ -83,10 +90,12 @@ public class PulsarKafkaProducerTest {
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650"));
properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "1000000");
+ properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000000");
PulsarKafkaProducer<String, String> pulsarKafkaProducer = new PulsarKafkaProducer<>(properties, null, null);
verify(mockClientBuilder, times(1)).keepAliveInterval(1000, TimeUnit.SECONDS);
+ verify(mockProducerBuilder, times(1)).sendTimeout(1000000, TimeUnit.MILLISECONDS);
}
@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Invalid value 2147483648000 for 'connections.max.idle.ms'. Please use a value smaller than 2147483647000 milliseconds.")
@@ -98,6 +107,7 @@ public class PulsarKafkaProducerTest {
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650"));
properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, Long.toString((Integer.MAX_VALUE + 1L) * 1000));
- PulsarKafkaProducer<String, String> pulsarKafkaProducer = new PulsarKafkaProducer<>(properties, null, null);
+ new PulsarKafkaProducer<>(properties, null, null);
}
+
}
diff --git a/site2/docs/adaptors-kafka.md b/site2/docs/adaptors-kafka.md
index c3f69e1..924192d 100644
--- a/site2/docs/adaptors-kafka.md
+++ b/site2/docs/adaptors-kafka.md
@@ -152,7 +152,7 @@ Properties:
| `request.timeout.ms` | Ignored | |
| `retries` | Ignored | Pulsar client retries with exponential backoff until the send timeout expires |
| `send.buffer.bytes` | Ignored | |
-| `timeout.ms` | Ignored | |
+| `timeout.ms` | Yes | |
| `value.serializer` | Yes | |