You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2021/08/13 23:25:41 UTC

[pulsar] branch master updated: [tools] Pulsar Client: add ability to produce KV messages (#11303)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 17ab040  [tools] Pulsar Client: add ability to produce KV messages (#11303)
17ab040 is described below

commit 17ab040bbd71161df3860613ddcd253b3af24f59
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Sat Aug 14 01:24:45 2021 +0200

    [tools] Pulsar Client: add ability to produce KV messages (#11303)
    
    ### Motivation
    
    Currently (Pulsar 2.8.0) it is not easy to produce messages with KeyValue encoding, because command line tools do not provide such support.
    With this change the user will be able to set the schema while using `pulsar-client produce`
    
    We are adding three parameters:
    * "--key-schema" : this is the schema for the Key (default :"string")
    * "--value-schema": this is the schema for the Value (default: "bytes")
    * "--key-value-encoding-type": this is the type of encoding with values: none,separated,inline
    with key-value-encoding-type=node (default behaviour) we are not using KV encoding
    
    The command is 100% compatible with previous versions
    
    ### Modifications
    
    Add support for the properties listed above.
    We are using "Schema.AUTO_PRODUCE_BYTES" in order to deal with the Schema registry.
    The user will pass the raw value as message and we are passing it without modifications to Pulsar.
    
    Example command to send a KV message with JSON key and value:
    `bin/pulsar-client produce --key-value-encoding-type separated -k '{"a":"b"}' -m '{"a":"b"}' --key-schema 'json:{"type": "record","namespace": "com.example","name": "FullName", "fields": [{ "name": "a", "type": "string" }]} '  --value-schema 'json:{"type": "record","namespace": "com.example","name": "FullName", "fields": [{ "name": "a", "type": "string" }]} ' test`
    
    for AVRO and JSON the schema is written inline after the prefix "avro:" and "json:"
    
    ### Verifying this change
    
    This change added unit tests
---
 .../org/apache/pulsar/client/cli/CmdProduce.java   | 106 +++++++++++++++++++--
 .../apache/pulsar/client/cli/TestCmdProduce.java   |  37 ++++++-
 2 files changed, 133 insertions(+), 10 deletions(-)

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
index 326e221..52d0b25 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
@@ -30,6 +30,7 @@ import com.google.common.util.concurrent.RateLimiter;
 import com.google.gson.JsonParseException;
 
 import java.net.URI;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
@@ -38,12 +39,12 @@ import java.util.Base64;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.api.ClientBuilder;
@@ -51,8 +52,13 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.websocket.data.ProducerMessage;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
@@ -76,6 +82,9 @@ public class CmdProduce {
 
     private static final Logger LOG = LoggerFactory.getLogger(PulsarClientTool.class);
     private static final int MAX_MESSAGES = 1000;
+    static final String KEY_VALUE_ENCODING_TYPE_NOT_SET = "";
+    private static final String KEY_VALUE_ENCODING_TYPE_SEPARATED = "separated";
+    private static final String KEY_VALUE_ENCODING_TYPE_INLINE = "inline";
 
     @Parameter(description = "TopicName", required = true)
     private List<String> mainOptions;
@@ -114,6 +123,15 @@ public class CmdProduce {
     @Parameter(names = { "-k", "--key"}, description = "message key to add ")
     private String key;
 
+    @Parameter(names = { "-vs", "--value-schema"}, description = "Schema type (can be bytes,avro,json,string...)")
+    private String valueSchema = "bytes";
+
+    @Parameter(names = { "-ks", "--key-schema"}, description = "Schema type (can be bytes,avro,json,string...)")
+    private String keySchema = "string";
+
+    @Parameter(names = { "-kvet", "--key-value-encoding-type"}, description = "Key Value Encoding Type (it can be separated or inline)")
+    private String keyValueEncodingType = null;
+
     @Parameter(names = { "-ekn", "--encryption-key-name" }, description = "The public key name to encrypt payload")
     private String encKeyName = null;
 
@@ -190,6 +208,18 @@ public class CmdProduce {
             throw (new ParameterException("Please supply message content with either --messages or --files"));
         }
 
+        if (keyValueEncodingType == null) {
+            keyValueEncodingType = KEY_VALUE_ENCODING_TYPE_NOT_SET;
+        } else {
+            switch (keyValueEncodingType) {
+                case KEY_VALUE_ENCODING_TYPE_SEPARATED:
+                case KEY_VALUE_ENCODING_TYPE_INLINE:
+                    break;
+                default:
+                    throw (new ParameterException("--key-value-encoding-type "+keyValueEncodingType+" is not valid, only 'separated' or 'inline'"));
+            }
+        }
+
         int totalMessages = (messages.size() + messageFileNames.size()) * numTimesProduce;
         if (totalMessages > MAX_MESSAGES) {
             String msg = "Attempting to send " + totalMessages + " messages. Please do not send more than "
@@ -212,7 +242,8 @@ public class CmdProduce {
 
         try {
             PulsarClient client = clientBuilder.build();
-            ProducerBuilder<byte[]> producerBuilder = client.newProducer().topic(topic);
+            Schema<?> schema = buildSchema(this.keySchema, this.valueSchema, this.keyValueEncodingType);
+            ProducerBuilder<?> producerBuilder = client.newProducer(schema).topic(topic);
             if (this.chunkingAllowed) {
                 producerBuilder.enableChunking(true);
                 producerBuilder.enableBatching(false);
@@ -221,7 +252,7 @@ public class CmdProduce {
                 producerBuilder.addEncryptionKey(this.encKeyName);
                 producerBuilder.defaultCryptoKeyReader(this.encKeyValue);
             }
-            Producer<byte[]> producer = producerBuilder.create();
+            Producer<?> producer = producerBuilder.create();
 
             List<byte[]> messageBodies = generateMessageBodies(this.messages, this.messageFileNames);
             RateLimiter limiter = (this.publishRate > 0) ? RateLimiter.create(this.publishRate) : null;
@@ -238,17 +269,33 @@ public class CmdProduce {
                         limiter.acquire();
                     }
 
-                    TypedMessageBuilder<byte[]> message = producer.newMessage();
+                    TypedMessageBuilder message = producer.newMessage();
 
                     if (!kvMap.isEmpty()) {
                         message.properties(kvMap);
                     }
 
-                    if (key != null && !key.isEmpty()) {
-                        message.key(key);
+                    switch (keyValueEncodingType) {
+                        case KEY_VALUE_ENCODING_TYPE_NOT_SET:
+                            if (key != null && !key.isEmpty()) {
+                                message.key(key);
+                            }
+                            message.value(content);
+                            break;
+                        case KEY_VALUE_ENCODING_TYPE_SEPARATED:
+                        case KEY_VALUE_ENCODING_TYPE_INLINE:
+                            KeyValue kv = new KeyValue<>(
+                                    // TODO: support AVRO encoded key
+                                    key != null ? key.getBytes(StandardCharsets.UTF_8) : null,
+                                    content);
+                            message.value(kv);
+                            break;
+                        default:
+                            throw new IllegalStateException();
                     }
 
-                    message.value(content).send();
+                    message.send();
+
 
                     numMessagesSent++;
                 }
@@ -265,6 +312,51 @@ public class CmdProduce {
         return returnCode;
     }
 
+    static Schema<?> buildSchema(String keySchema, String schema, String keyValueEncodingType) {
+        switch (keyValueEncodingType) {
+            case KEY_VALUE_ENCODING_TYPE_NOT_SET:
+                return buildComponentSchema(schema);
+            case KEY_VALUE_ENCODING_TYPE_SEPARATED:
+                return Schema.KeyValue(buildComponentSchema(keySchema), buildComponentSchema(schema), KeyValueEncodingType.SEPARATED);
+            case KEY_VALUE_ENCODING_TYPE_INLINE:
+                return Schema.KeyValue(buildComponentSchema(keySchema), buildComponentSchema(schema), KeyValueEncodingType.INLINE);
+            default:
+                throw new IllegalArgumentException("Invalid KeyValueEncodingType "+keyValueEncodingType+", only: 'none','separated' and 'inline");
+        }
+    }
+
+    private static Schema<?> buildComponentSchema(String schema) {
+        Schema<?> base;
+        switch (schema) {
+            case "string":
+                base =  Schema.STRING;
+                break;
+            case "bytes":
+                // no need for wrappers
+                return Schema.BYTES;
+            default:
+                if (schema.startsWith("avro:")) {
+                    base = buildGenericSchema(SchemaType.AVRO, schema.substring(5));
+                } else if (schema.startsWith("json:")) {
+                    base = buildGenericSchema(SchemaType.JSON, schema.substring(5));
+                } else {
+                    throw new IllegalArgumentException("Invalid schema type: "+schema);
+                }
+        }
+        return Schema.AUTO_PRODUCE_BYTES(base);
+    }
+
+    private static Schema<?> buildGenericSchema(SchemaType type, String definition) {
+        return Schema.generic(SchemaInfoImpl
+                .builder()
+                .schema(definition.getBytes(StandardCharsets.UTF_8))
+                .name("client")
+                .properties(new HashMap<>())
+                .type(type)
+                .build());
+
+    }
+
     @SuppressWarnings("deprecation")
     @VisibleForTesting
     public String getProduceBaseEndPoint(String topic) {
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdProduce.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdProduce.java
index 173fcfc..17e1e2d 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdProduce.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdProduce.java
@@ -18,10 +18,16 @@
  */
 package org.apache.pulsar.client.cli;
 
-import org.testng.Assert;
+
+import static org.testng.Assert.assertEquals;
+
+import org.apache.pulsar.client.api.schema.KeyValueSchema;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+
 public class TestCmdProduce {
 
     CmdProduce cmdProduce;
@@ -35,10 +41,35 @@ public class TestCmdProduce {
     @Test
     public void testGetProduceBaseEndPoint() {
         String topicNameV1 = "persistent://public/cluster/default/issue-11067";
-        Assert.assertEquals(cmdProduce.getProduceBaseEndPoint(topicNameV1),
+        assertEquals(cmdProduce.getProduceBaseEndPoint(topicNameV1),
                 "ws://localhost:8080/ws/producer/persistent/public/cluster/default/issue-11067");
         String topicNameV2 = "persistent://public/default/issue-11067";
-        Assert.assertEquals(cmdProduce.getProduceBaseEndPoint(topicNameV2),
+        assertEquals(cmdProduce.getProduceBaseEndPoint(topicNameV2),
                 "ws://localhost:8080/ws/v2/producer/persistent/public/default/issue-11067");
     }
+
+    @Test
+    public void testBuildSchema() {
+        // default
+        assertEquals(SchemaType.BYTES, CmdProduce.buildSchema("string", "bytes", CmdProduce.KEY_VALUE_ENCODING_TYPE_NOT_SET).getSchemaInfo().getType());
+
+        // simple key value
+        assertEquals(SchemaType.KEY_VALUE, CmdProduce.buildSchema("string", "string", "separated").getSchemaInfo().getType());
+        assertEquals(SchemaType.KEY_VALUE, CmdProduce.buildSchema("string", "string", "inline").getSchemaInfo().getType());
+
+        KeyValueSchema<?, ?> composite1 = (KeyValueSchema<?, ?>) CmdProduce.buildSchema("string",
+                "json:{\"type\": \"record\",\"namespace\": \"com.example\",\"name\": \"FullName\", \"fields\": [{ \"name\": \"a\", \"type\": \"string\" }]}",
+                "inline");
+        assertEquals(KeyValueEncodingType.INLINE, composite1.getKeyValueEncodingType());
+        assertEquals(SchemaType.STRING, composite1.getKeySchema().getSchemaInfo().getType());
+        assertEquals(SchemaType.JSON, composite1.getValueSchema().getSchemaInfo().getType());
+
+        KeyValueSchema<?, ?> composite2 = (KeyValueSchema<?, ?>) CmdProduce.buildSchema(
+                "json:{\"type\": \"record\",\"namespace\": \"com.example\",\"name\": \"FullName\", \"fields\": [{ \"name\": \"a\", \"type\": \"string\" }]}",
+                "avro:{\"type\": \"record\",\"namespace\": \"com.example\",\"name\": \"FullName\", \"fields\": [{ \"name\": \"a\", \"type\": \"string\" }]}",
+                "inline");
+        assertEquals(KeyValueEncodingType.INLINE, composite2.getKeyValueEncodingType());
+        assertEquals(SchemaType.JSON, composite2.getKeySchema().getSchemaInfo().getType());
+        assertEquals(SchemaType.AVRO, composite2.getValueSchema().getSchemaInfo().getType());
+    }
 }
\ No newline at end of file