You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2019/06/13 02:26:18 UTC

[pulsar] branch master updated: fix issue 4347 [ClientAPI]ReaderBuilder.loadConf() not working (#4382)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a9dbab  fix issue 4347 [ClientAPI]ReaderBuilder.loadConf() not working (#4382)
5a9dbab is described below

commit 5a9dbab613d2262948f67f2f02c1c1d74608c4fb
Author: wpl <12...@qq.com>
AuthorDate: Thu Jun 13 10:26:12 2019 +0800

    fix issue 4347 [ClientAPI]ReaderBuilder.loadConf() not working (#4382)
---
 .../pulsar/client/impl/ReaderBuilderImpl.java      |  2 ++
 .../client/impl/conf/ReaderConfigurationData.java  |  3 +++
 .../apache/pulsar/client/impl/BuildersTest.java    | 28 ++++++++++++++++++++++
 3 files changed, 33 insertions(+)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
index d74dc83..9c28e03 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
@@ -96,7 +96,9 @@ public class ReaderBuilderImpl<T> implements ReaderBuilder<T> {
 
     @Override
     public ReaderBuilder<T> loadConf(Map<String, Object> config) {
+        MessageId startMessageId = conf.getStartMessageId();
         conf = ConfigurationDataUtils.loadData(config, conf, ReaderConfigurationData.class);
+        conf.setStartMessageId(startMessageId);
         return this;
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
index 362c8f9..6645c9c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.impl.conf;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import java.io.Serializable;
 
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
@@ -31,6 +32,8 @@ import lombok.Data;
 public class ReaderConfigurationData<T> implements Serializable, Cloneable {
 
     private String topicName;
+
+    @JsonIgnore
     private MessageId startMessageId;
 
     private int receiverQueueSize = 1000;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java
index 4995706..0bb68c0 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java
@@ -21,7 +21,12 @@ package org.apache.pulsar.client.impl;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
 import org.testng.annotations.Test;
 
 public class BuildersTest {
@@ -69,4 +74,27 @@ public class BuildersTest {
         assertEquals(builder.conf.isUseTls(), true);
         assertEquals(builder.conf.getServiceUrl(), "pulsar+ssl://service:6650");
     }
+
+    @Test
+    public void readerBuilderLoadConfTest() throws Exception {
+        PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
+        String topicName = "test_src";
+        MessageId messageId = new MessageIdImpl(1, 2, 3);
+        Map<String, Object> config = new HashMap<>();
+        config.put("topicName", topicName);
+        config.put("receiverQueueSize", 2000);
+        ReaderBuilderImpl<byte[]> builder = (ReaderBuilderImpl<byte[]>) client.newReader()
+            .startMessageId(messageId)
+            .loadConf(config);
+
+        Class<?> clazz = builder.getClass();
+        Field conf = clazz.getDeclaredField("conf");
+        conf.setAccessible(true);
+        Object obj = conf.get(builder);
+        assertTrue(obj instanceof ReaderConfigurationData);
+        if (obj instanceof ReaderConfigurationData) {
+            assertEquals(((ReaderConfigurationData) obj).getTopicName(), topicName);
+            assertEquals(((ReaderConfigurationData) obj).getStartMessageId(), messageId);
+        }
+    }
 }