You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/03/11 10:17:05 UTC

[pulsar] branch master updated: Support Kafka's ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG. (#3797)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 117321d  Support Kafka's ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG. (#3797)
117321d is described below

commit 117321d7df0d508daa795cccd1adeda7e350ff9d
Author: Marvin Cai <ca...@gmail.com>
AuthorDate: Mon Mar 11 03:16:58 2019 -0700

    Support Kafka's ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG. (#3797)
    
    **Motivation**
    
    Support Kafka's ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG  #1090
    
    Previously `ProducerBuilder.sendTimeout` was set by parsing Kafka's `ProducerConfig.MAX_BLOCK_MS_CONFIG`.
    According to Kafka's [document](https://kafka.apache.org/20/documentation.html) it's for
    
    > Controlling how long KafkaProducer.send() and KafkaProducer.partitionsFor() will block either because the buffer is full or metadata unavailable.
    
    But `ProducerBuilder.sendTimeout`  is for
    
    > If a message is not acknowledged by the server before the sendTimeout expires, an error will be reported.
    
    And Kafka's `ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG`, according to the document is for:
    > Controlling the maximum amount of time the client will wait for the response of a request.
    
    Which I think would be better fit purpose of Pulsar's `ProducerBuilder.sendTimeout`.
---
 .../apache/kafka/clients/producer/PulsarKafkaProducer.java   |  4 ++--
 .../kafka/clients/producer/PulsarKafkaProducerTest.java      | 12 +++++++++++-
 site2/docs/adaptors-kafka.md                                 |  2 +-
 3 files changed, 14 insertions(+), 4 deletions(-)

diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
index 7309c69..4906cf5 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
@@ -119,7 +119,7 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
         String serviceUrl = producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);
         try {
             // Support Kafka's ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG in ms.
-            // If passed in value is greater than Integer.MAX_VALUE in second will throw ArithmeticException.
+            // If passed in value is greater than Integer.MAX_VALUE in second will throw IllegalArgumentException.
             int keepAliveInterval = Math.toIntExact(keepAliveIntervalMs / 1000);
             client = PulsarClientKafkaConfig.getClientBuilder(properties).serviceUrl(serviceUrl).keepAliveInterval(keepAliveInterval, TimeUnit.SECONDS).build();
         } catch (ArithmeticException e) {
@@ -146,7 +146,7 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
 
         pulsarProducerBuilder.messageRouter(new KafkaMessageRouter(lingerMs));
 
-        int sendTimeoutMillis = Integer.parseInt(properties.getProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "60000"));
+        int sendTimeoutMillis = Integer.parseInt(properties.getProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000"));
         pulsarProducerBuilder.sendTimeout(sendTimeoutMillis, TimeUnit.MILLISECONDS);
 
         boolean blockOnBufferFull = Boolean
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
index 6bc99e7..37ff221 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
@@ -63,6 +63,13 @@ public class PulsarKafkaProducerTest {
     public void testPulsarKafkaProducer() {
         ClientBuilder mockClientBuilder = mock(ClientBuilder.class);
         ProducerBuilder mockProducerBuilder = mock(ProducerBuilder.class);
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+                Assert.assertEquals((int)invocation.getArguments()[0], 1000000, "Send time out is suppose to be 1000.");
+                return mockProducerBuilder;
+            }
+        }).when(mockProducerBuilder).sendTimeout(anyInt(), any(TimeUnit.class));
         doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(anyString());
         doAnswer(new Answer() {
             @Override
@@ -83,10 +90,12 @@ public class PulsarKafkaProducerTest {
         properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
         properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650"));
         properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "1000000");
+        properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000000");
 
         PulsarKafkaProducer<String, String> pulsarKafkaProducer = new PulsarKafkaProducer<>(properties, null, null);
 
         verify(mockClientBuilder, times(1)).keepAliveInterval(1000, TimeUnit.SECONDS);
+        verify(mockProducerBuilder, times(1)).sendTimeout(1000000, TimeUnit.MILLISECONDS);
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Invalid value 2147483648000 for 'connections.max.idle.ms'. Please use a value smaller than 2147483647000 milliseconds.")
@@ -98,6 +107,7 @@ public class PulsarKafkaProducerTest {
         properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650"));
         properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, Long.toString((Integer.MAX_VALUE + 1L) * 1000));
 
-        PulsarKafkaProducer<String, String> pulsarKafkaProducer = new PulsarKafkaProducer<>(properties, null, null);
+        new PulsarKafkaProducer<>(properties, null, null);
     }
+
 }
diff --git a/site2/docs/adaptors-kafka.md b/site2/docs/adaptors-kafka.md
index c3f69e1..924192d 100644
--- a/site2/docs/adaptors-kafka.md
+++ b/site2/docs/adaptors-kafka.md
@@ -152,7 +152,7 @@ Properties:
 | `request.timeout.ms`                    | Ignored   |                                                                               |
 | `retries`                               | Ignored   | Pulsar client retries with exponential backoff until the send timeout expires |
 | `send.buffer.bytes`                     | Ignored   |                                                                               |
-| `timeout.ms`                            | Ignored   |                                                                               |
+| `timeout.ms`                            | Yes       |                                                                               |
 | `value.serializer`                      | Yes       |                                                                               |