You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/12/08 22:06:27 UTC

[pulsar] branch master updated: [Message Routing] Set CustomPartition implicitly when messageRouter is set (#3126)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b6f679  [Message Routing] Set CustomPartition implicitly when messageRouter is set (#3126)
5b6f679 is described below

commit 5b6f6794ff6b1dff3b0ea87be6969d947960b8dc
Author: Eren Avsarogullari <er...@gmail.com>
AuthorDate: Sat Dec 8 22:06:23 2018 +0000

    [Message Routing] Set CustomPartition implicitly when messageRouter is set (#3126)
    
    ### Motivation
    This PR proposes some refactoring and fixes on message routing logic.
    
    ### Modifications
    - `getMessageRouter` logic is refactored on`PartitionedProducerImpl`.
    - If `messageRouter` is set through `ProducerBuilderImpl`, `messageRoutingMode` needs to be set as `MessageRoutingMode.CustomPartition` implicitly. Default is `MessageRoutingMode.RoundRobinPartition`
    - Javadoc is fixed
    - Custom Message Router Documentation is fixed
    - UT coverage is added (`ProducerBuilderImplTest` and `PartitionedProducerConsumerTest`)
---
 .../api/PartitionedProducerConsumerTest.java       |  59 ++++++++-
 .../pulsar/client/api/MessageRoutingMode.java      |   2 +-
 .../client/impl/PartitionedProducerImpl.java       |   8 +-
 .../pulsar/client/impl/ProducerBuilderImpl.java    |  21 +++-
 .../impl/conf/ProducerConfigurationData.java       |   2 +-
 .../client/impl/ProducerBuilderImplTest.java       | 136 +++++++++++++++++++++
 site2/docs/cookbooks-partitioned.md                |  18 +--
 .../cookbooks-partitioned.md                       |  18 +--
 8 files changed, 238 insertions(+), 26 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
index 02b46d2..4263f5e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
@@ -89,7 +89,7 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
             producer.send(message.getBytes());
         }
 
-        Message<byte[]> msg = null;
+        Message<byte[]> msg;
         Set<String> messageSet = Sets.newHashSet();
         for (int i = 0; i < 10; i++) {
             msg = consumer.receive(5, TimeUnit.SECONDS);
@@ -127,6 +127,55 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
     }
 
     @Test(timeOut = 30000)
+    public void testCustomPartitionProducer() throws Exception {
+        TopicName topicName = null;
+        Producer<byte[]> producer = null;
+        Consumer<byte[]> consumer = null;
+        final int MESSAGE_COUNT = 16;
+        try {
+            log.info("-- Starting {} test --", methodName);
+
+            int numPartitions = 4;
+            topicName = TopicName
+                    .get("persistent://my-property/my-ns/my-partitionedtopic1-" + System.currentTimeMillis());
+
+            admin.topics().createPartitionedTopic(topicName.toString(), numPartitions);
+
+            producer = pulsarClient.newProducer().topic(topicName.toString())
+                    .messageRouter(new AlwaysTwoMessageRouter())
+                    .create();
+
+            consumer = pulsarClient.newConsumer().topic(topicName.toString())
+                    .subscriptionName("my-partitioned-subscriber").subscribe();
+
+            for (int i = 0; i < MESSAGE_COUNT; i++) {
+                String message = "my-message-" + i;
+                producer.newMessage().key(String.valueOf(i)).value(message.getBytes()).send();
+            }
+
+            Message<byte[]> msg;
+            Set<String> messageSet = Sets.newHashSet();
+
+            for (int i = 0; i < MESSAGE_COUNT; i++) {
+                msg = consumer.receive(5, TimeUnit.SECONDS);
+                Assert.assertNotNull(msg, "Message should not be null");
+                consumer.acknowledge(msg);
+                String receivedMessage = new String(msg.getData());
+                log.debug("Received message: [{}]", receivedMessage);
+                String expectedMessage = "my-message-" + i;
+                testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+            }
+        } finally {
+            producer.close();
+            consumer.unsubscribe();
+            consumer.close();
+            admin.topics().deletePartitionedTopic(topicName.toString());
+
+            log.info("-- Exiting {} test --", methodName);
+        }
+    }
+
+    @Test(timeOut = 30000)
     public void testSinglePartitionProducer() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
@@ -147,7 +196,7 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
             producer.send(message.getBytes());
         }
 
-        Message<byte[]> msg = null;
+        Message<byte[]> msg;
         Set<String> messageSet = Sets.newHashSet();
 
         for (int i = 0; i < 10; i++) {
@@ -600,4 +649,10 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase {
                 Collections.singletonList(nonPartitionedTopic));
     }
 
+    private class AlwaysTwoMessageRouter implements MessageRouter {
+        @Override
+        public int choosePartition(Message<?> msg, TopicMetadata metadata) {
+            return 2;
+        }
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRoutingMode.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRoutingMode.java
index 9723129..b5cd8ba 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRoutingMode.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRoutingMode.java
@@ -36,7 +36,7 @@ public enum MessageRoutingMode {
     RoundRobinPartition,
 
     /**
-     * Use custom message router implemenation that will be called to determine the partition for a particular message.
+     * Use custom message router implementation that will be called to determine the partition for a particular message.
      */
     CustomPartition
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 3b87e4e..9daf303 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -47,6 +47,8 @@ import com.google.common.collect.Lists;
 
 public class PartitionedProducerImpl<T> extends ProducerBase<T> {
 
+    private static final Logger log = LoggerFactory.getLogger(PartitionedProducerImpl.class);
+
     private List<ProducerImpl<T>> producers;
     private MessageRouter routerPolicy;
     private final ProducerStatsRecorderImpl stats;
@@ -70,12 +72,10 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> {
         MessageRouter messageRouter;
 
         MessageRoutingMode messageRouteMode = conf.getMessageRoutingMode();
-        MessageRouter customMessageRouter = conf.getCustomMessageRouter();
 
         switch (messageRouteMode) {
         case CustomPartition:
-            checkNotNull(customMessageRouter);
-            messageRouter = customMessageRouter;
+            messageRouter = checkNotNull(conf.getCustomMessageRouter());
             break;
         case RoundRobinPartition:
             messageRouter = new RoundRobinPartitionMessageRouterImpl(
@@ -235,8 +235,6 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> {
         return stats;
     }
 
-    private static final Logger log = LoggerFactory.getLogger(PartitionedProducerImpl.class);
-
     public List<ProducerImpl<T>> getProducers() {
         return producers.stream().collect(Collectors.toList());
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index 834e0c0..19df2e9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -63,7 +63,6 @@ public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
         this.schema = schema;
     }
 
-
     /**
      * Allow to override schema in builder implementation
      * @return
@@ -102,6 +101,12 @@ public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
                     .failedFuture(new IllegalArgumentException("Topic name must be set on the producer builder"));
         }
 
+        try {
+            setMessageRoutingMode();
+        } catch(PulsarClientException pce) {
+            return FutureUtil.failedFuture(pce);
+        }
+
         return interceptorList == null || interceptorList.size() == 0 ?
                 client.createProducerAsync(conf, schema, null) :
                 client.createProducerAsync(conf, schema, new ProducerInterceptors<>(interceptorList));
@@ -242,4 +247,18 @@ public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
         interceptorList.addAll(Arrays.asList(interceptors));
         return this;
     }
+
+    private void setMessageRoutingMode() throws PulsarClientException {
+        if(conf.getMessageRoutingMode() == null && conf.getCustomMessageRouter() == null) {
+            messageRoutingMode(MessageRoutingMode.RoundRobinPartition);
+        } else if(conf.getMessageRoutingMode() == null && conf.getCustomMessageRouter() != null) {
+            messageRoutingMode(MessageRoutingMode.CustomPartition);
+        } else if((conf.getMessageRoutingMode() == MessageRoutingMode.CustomPartition
+                && conf.getCustomMessageRouter() == null)
+                || (conf.getMessageRoutingMode() != MessageRoutingMode.CustomPartition
+                && conf.getCustomMessageRouter() != null)) {
+            throw new PulsarClientException("When 'messageRouter' is set, 'messageRoutingMode' " +
+                    "should be set as " + MessageRoutingMode.CustomPartition);
+        }
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
index 57ca135..611cafb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
@@ -50,7 +50,7 @@ public class ProducerConfigurationData implements Serializable, Cloneable {
     private boolean blockIfQueueFull = false;
     private int maxPendingMessages = 1000;
     private int maxPendingMessagesAcrossPartitions = 50000;
-    private MessageRoutingMode messageRoutingMode = MessageRoutingMode.RoundRobinPartition;
+    private MessageRoutingMode messageRoutingMode = null;
     private HashingScheme hashingScheme = HashingScheme.JavaStringHash;
 
     private ProducerCryptoFailureAction cryptoFailureAction = ProducerCryptoFailureAction.FAIL;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
new file mode 100644
index 0000000..e436e96
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.mockito.Matchers;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertNotNull;
+
+/**
+ * Unit tests of {@link ProducerBuilderImpl}.
+ */
+public class ProducerBuilderImplTest {
+
+    private static final String TOPIC_NAME = "testTopicName";
+    private PulsarClientImpl client;
+    private ProducerBuilderImpl producerBuilderImpl;
+
+    @BeforeTest
+    public void setup() {
+        Producer producer = mock(Producer.class);
+        client = mock(PulsarClientImpl.class);
+        producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+        when(client.newProducer()).thenReturn(producerBuilderImpl);
+
+        when(client.createProducerAsync(
+                Matchers.any(ProducerConfigurationData.class), Matchers.any(Schema.class), eq(null)))
+                .thenReturn(CompletableFuture.completedFuture(producer));
+    }
+
+    @Test
+    public void testProducerBuilderImplWhenMessageRoutingModeAndMessageRouterAreNotSet() throws PulsarClientException {
+        producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+        Producer producer = producerBuilderImpl.topic(TOPIC_NAME)
+                .create();
+        assertNotNull(producer);
+    }
+
+    @Test
+    public void testProducerBuilderImplWhenMessageRoutingModeIsSinglePartition() throws PulsarClientException {
+        producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+        Producer producer = producerBuilderImpl.topic(TOPIC_NAME)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+        assertNotNull(producer);
+    }
+
+    @Test
+    public void testProducerBuilderImplWhenMessageRoutingModeIsRoundRobinPartition() throws PulsarClientException {
+        producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+        Producer producer = producerBuilderImpl.topic(TOPIC_NAME)
+                .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+                .create();
+        assertNotNull(producer);
+    }
+
+    @Test
+    public void testProducerBuilderImplWhenMessageRoutingIsSetImplicitly() throws PulsarClientException {
+        producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+        Producer producer = producerBuilderImpl.topic(TOPIC_NAME)
+                .messageRouter(new CustomMessageRouter())
+                .create();
+        assertNotNull(producer);
+    }
+
+    @Test
+    public void testProducerBuilderImplWhenMessageRoutingIsCustomPartition() throws PulsarClientException {
+        producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+        Producer producer = producerBuilderImpl.topic(TOPIC_NAME)
+                .messageRoutingMode(MessageRoutingMode.CustomPartition)
+                .messageRouter(new CustomMessageRouter())
+                .create();
+        assertNotNull(producer);
+    }
+
+    @Test(expectedExceptions = PulsarClientException.class)
+    public void testProducerBuilderImplWhenMessageRoutingModeIsSinglePartitionAndMessageRouterIsSet()
+            throws PulsarClientException {
+        producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+        producerBuilderImpl.topic(TOPIC_NAME)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .messageRouter(new CustomMessageRouter())
+                .create();
+    }
+
+    @Test(expectedExceptions = PulsarClientException.class)
+    public void testProducerBuilderImplWhenMessageRoutingModeIsRoundRobinPartitionAndMessageRouterIsSet()
+            throws PulsarClientException {
+        producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+        producerBuilderImpl.topic(TOPIC_NAME)
+                .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+                .messageRouter(new CustomMessageRouter())
+                .create();
+    }
+
+    @Test(expectedExceptions = PulsarClientException.class)
+    public void testProducerBuilderImplWhenMessageRoutingModeIsCustomPartitionAndMessageRouterIsNotSet()
+            throws PulsarClientException {
+        ProducerBuilderImpl producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+        producerBuilderImpl.topic(TOPIC_NAME)
+                .messageRoutingMode(MessageRoutingMode.CustomPartition)
+                .create();
+    }
+
+    private class CustomMessageRouter implements MessageRouter {
+        @Override
+        public int choosePartition(Message<?> msg, TopicMetadata metadata) {
+            int partitionIndex = Integer.parseInt(msg.getKey()) % metadata.numPartitions();
+            return partitionIndex;
+        }
+    }
+}
diff --git a/site2/docs/cookbooks-partitioned.md b/site2/docs/cookbooks-partitioned.md
index ca24d94..182299f 100644
--- a/site2/docs/cookbooks-partitioned.md
+++ b/site2/docs/cookbooks-partitioned.md
@@ -30,10 +30,11 @@ Here's an example:
 String pulsarBrokerRootUrl = "pulsar://localhost:6650";
 String topic = "persistent://my-tenant/my-namespace/my-topic";
 
-PulsarClient client = PulsarClient.create(pulsarBrokerRootUrl);
-ProducerConfiguration config = new ProducerConfiguration();
-config.setMessageRoutingMode(ProducerConfiguration.MessageRoutingMode.SinglePartition);
-Producer producer = client.createProducer(topic, config);
+PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarBrokerRootUrl).build();
+Producer<byte[]> producer = pulsarClient.newProducer()
+        .topic(topic)
+        .messageRoutingMode(MessageRoutingMode.SinglePartition)
+        .create();
 producer.send("Partitioned topic message".getBytes());
 ```
 
@@ -63,10 +64,11 @@ With that implementation in hand, you can send
 String pulsarBrokerRootUrl = "pulsar://localhost:6650";
 String topic = "persistent://my-tenant/my-cluster-my-namespace/my-topic";
 
-PulsarClient client = PulsarClient.create(pulsarBrokerRootUrl);
-ProducerConfiguration config = new ProducerConfiguration();
-config.setMessageRouter(AlwaysTenRouter);
-Producer producer = client.createProducer(topic, config);
+PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarBrokerRootUrl).build();
+Producer<byte[]> producer = pulsarClient.newProducer()
+        .topic(topic)
+        .messageRouter(new AlwaysTenRouter())
+        .create();
 producer.send("Partitioned topic message".getBytes());
 ```
 
diff --git a/site2/website/versioned_docs/version-2.1.0-incubating/cookbooks-partitioned.md b/site2/website/versioned_docs/version-2.1.0-incubating/cookbooks-partitioned.md
index c6806e3..eaad8ff 100644
--- a/site2/website/versioned_docs/version-2.1.0-incubating/cookbooks-partitioned.md
+++ b/site2/website/versioned_docs/version-2.1.0-incubating/cookbooks-partitioned.md
@@ -31,10 +31,11 @@ Here's an example:
 String pulsarBrokerRootUrl = "pulsar://localhost:6650";
 String topic = "persistent://my-tenant/my-namespace/my-topic";
 
-PulsarClient client = PulsarClient.create(pulsarBrokerRootUrl);
-ProducerConfiguration config = new ProducerConfiguration();
-config.setMessageRoutingMode(ProducerConfiguration.MessageRoutingMode.SinglePartition);
-Producer producer = client.createProducer(topic, config);
+PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarBrokerRootUrl).build();
+Producer<byte[]> producer = pulsarClient.newProducer()
+        .topic(topic)
+        .messageRoutingMode(MessageRoutingMode.SinglePartition)
+        .create();
 producer.send("Partitioned topic message".getBytes());
 ```
 
@@ -64,10 +65,11 @@ With that implementation in hand, you can send
 String pulsarBrokerRootUrl = "pulsar://localhost:6650";
 String topic = "persistent://my-tenant/my-cluster-my-namespace/my-topic";
 
-PulsarClient client = PulsarClient.create(pulsarBrokerRootUrl);
-ProducerConfiguration config = new ProducerConfiguration();
-config.setMessageRouter(AlwaysTenRouter);
-Producer producer = client.createProducer(topic, config);
+PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarBrokerRootUrl).build();
+Producer<byte[]> producer = pulsarClient.newProducer()
+        .topic(topic)
+        .messageRouter(new AlwaysTenRouter())
+        .create();
 producer.send("Partitioned topic message".getBytes());
 ```