You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/10/05 07:49:32 UTC

[GitHub] sijie closed pull request #2717: [schema] use AUTO_PRODUCE schema when possible

sijie closed pull request #2717: [schema] use AUTO_PRODUCE schema when possible
URL: https://github.com/apache/pulsar/pull/2717
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index ff85e3436c..f209c483aa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -30,6 +30,7 @@
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -74,7 +75,7 @@ public AbstractReplicator(String topicName, String replicatorPrefix, String loca
         this.producer = null;
         this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();
 
-        this.producerBuilder = client.newProducer() //
+        this.producerBuilder = client.newProducer(Schema.AUTO_PRODUCE_BYTES()) //
                 .topic(topicName)
                 .messageRoutingMode(MessageRoutingMode.SinglePartition)
                 .enableBatching(false)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index 2f599eeac8..a57889b3ab 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -41,6 +41,7 @@
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -268,7 +269,7 @@ void shutdown() throws Exception {
             this.namespace = dest.getNamespace();
             this.topicName = dest.toString();
             client = PulsarClient.builder().serviceUrl(url.toString()).statsInterval(0, TimeUnit.SECONDS).build();
-            producer = client.newProducer()
+            producer = client.newProducer(Schema.AUTO_PRODUCE_BYTES())
                 .topic(topicName)
                 .enableBatching(false)
                 .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -281,7 +282,7 @@ void shutdown() throws Exception {
             this.namespace = dest.getNamespace();
             this.topicName = dest.toString();
             client = PulsarClient.builder().serviceUrl(url.toString()).statsInterval(0, TimeUnit.SECONDS).build();
-            ProducerBuilder<byte[]> producerBuilder = client.newProducer()
+            ProducerBuilder<byte[]> producerBuilder = client.newProducer(Schema.AUTO_PRODUCE_BYTES())
                 .topic(topicName)
                 .enableBatching(batch)
                 .batchingMaxPublishDelay(1, TimeUnit.SECONDS)
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
index 5a9a651527..ede4b358fb 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
@@ -22,6 +22,7 @@
 
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
 
 public class PulsarProducerKafkaConfig {
 
@@ -35,7 +36,7 @@
     public static final String BATCHING_MAX_MESSAGES = "pulsar.producer.batching.max.messages";
 
     public static ProducerBuilder<byte[]> getProducerBuilder(PulsarClient client, Properties properties) {
-        ProducerBuilder<byte[]> producerBuilder = client.newProducer();
+        ProducerBuilder<byte[]> producerBuilder = client.newProducer(Schema.AUTO_PRODUCE_BYTES());
 
         if (properties.containsKey(PRODUCER_NAME)) {
             producerBuilder.producerName(properties.getProperty(PRODUCER_NAME));
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
index 36520a9ee7..c3b9ecc57b 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
@@ -23,16 +23,20 @@
 
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
 
 /**
  * Auto detect schema.
  */
 public class AutoProduceBytesSchema<T> implements Schema<byte[]> {
 
+    private boolean requireSchemaValidation = true;
     private Schema<T> schema;
 
     public void setSchema(Schema<T> schema) {
         this.schema = schema;
+        this.requireSchemaValidation = SchemaType.BYTES != schema.getSchemaInfo().getType()
+            && SchemaType.NONE != schema.getSchemaInfo().getType();
     }
 
     private void ensureSchemaInitialized() {
@@ -43,8 +47,10 @@ private void ensureSchemaInitialized() {
     public byte[] encode(byte[] message) {
         ensureSchemaInitialized();
 
-        // verify if the message can be decoded by the underlying schema
-        schema.decode(message);
+        if (requireSchemaValidation) {
+            // verify if the message can be decoded by the underlying schema
+            schema.decode(message);
+        }
 
         return message;
     }
@@ -53,8 +59,10 @@ private void ensureSchemaInitialized() {
     public byte[] decode(byte[] bytes) {
         ensureSchemaInitialized();
 
-        // verify the message can be detected by the underlying schema
-        schema.decode(bytes);
+        if (requireSchemaValidation) {
+            // verify the message can be detected by the underlying schema
+            schema.decode(bytes);
+        }
 
         return bytes;
     }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
index c559494ea3..4409e027cc 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
@@ -33,6 +33,7 @@
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -139,7 +140,9 @@ public int run() throws PulsarClientException {
 
         try {
             PulsarClient client = clientBuilder.build();
-            Producer<byte[]> producer = client.newProducer().topic(topic).create();
+            Producer<byte[]> producer = client.newProducer(Schema.AUTO_PRODUCE_BYTES())
+                .topic(topic)
+                .create();
 
             List<byte[]> messageBodies = generateMessageBodies(this.messages, this.messageFileNames);
             RateLimiter limiter = (this.publishRate > 0) ? RateLimiter.create(this.publishRate) : null;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/ContinuousAsyncProducer.java b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/ContinuousAsyncProducer.java
index a28acafdc3..185690d709 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/ContinuousAsyncProducer.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/ContinuousAsyncProducer.java
@@ -23,16 +23,18 @@
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 
 public class ContinuousAsyncProducer {
     public static void main(String[] args) throws PulsarClientException, InterruptedException, IOException {
         PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("http://127.0.0.1:8080").build();
 
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-tenant/my-ns/my-topic")
-                .blockIfQueueFull(true).create();
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+            .topic("persistent://my-tenant/my-ns/my-topic")
+            .blockIfQueueFull(true).create();
 
         while (true) {
-            producer.sendAsync("my-message".getBytes());
+            producer.sendAsync("my-message");
         }
     }
 }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/ContinuousProducer.java b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/ContinuousProducer.java
index 30ce04f747..1c920c97dd 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/ContinuousProducer.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/ContinuousProducer.java
@@ -23,17 +23,19 @@
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 
 public class ContinuousProducer {
     public static void main(String[] args) throws PulsarClientException, InterruptedException, IOException {
         PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("http://127.0.0.1:8080").build();
 
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-tenant/my-ns/my-topic")
-                .create();
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+            .topic("persistent://my-tenant/my-ns/my-topic")
+            .create();
 
         while (true) {
             try {
-                producer.send("my-message".getBytes());
+                producer.send("my-message");
                 Thread.sleep(1000);
 
             } catch (Exception e) {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleAsyncProducer.java b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleAsyncProducer.java
index b4b3f23b9c..61c2fc6572 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleAsyncProducer.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleAsyncProducer.java
@@ -31,20 +31,22 @@
 import com.google.common.collect.Lists;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
 
 @Slf4j
 public class SampleAsyncProducer {
     public static void main(String[] args) throws PulsarClientException, InterruptedException, IOException {
         PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("http://localhost:8080").build();
 
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-tenant/my-ns/my-topic")
-                .sendTimeout(3, TimeUnit.SECONDS).create();
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+            .topic("persistent://my-tenant/my-ns/my-topic")
+            .sendTimeout(3, TimeUnit.SECONDS).create();
 
         List<CompletableFuture<MessageId>> futures = Lists.newArrayList();
 
         for (int i = 0; i < 10; i++) {
             final String content = "my-message-" + i;
-            CompletableFuture<MessageId> future = producer.sendAsync(content.getBytes());
+            CompletableFuture<MessageId> future = producer.sendAsync(content);
 
             future.handle((v, ex) -> {
                 if (ex == null) {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleCryptoProducer.java b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleCryptoProducer.java
index 478969289a..83b6172cd9 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleCryptoProducer.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleCryptoProducer.java
@@ -30,6 +30,7 @@
 import org.apache.pulsar.client.api.PulsarClientException;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
 
 @Slf4j
 public class SampleCryptoProducer {
@@ -76,12 +77,13 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
         PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("http://127.0.0.1:8080").build();
 
         // Setup the CryptoKeyReader with the file name where public/private key is kept
-        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-tenant/my-ns/my-topic")
-                .cryptoKeyReader(new RawFileKeyReader("test_ecdsa_pubkey.pem", "test_ecdsa_privkey.pem"))
-                .addEncryptionKey("myappkey").create();
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+            .topic("persistent://my-tenant/my-ns/my-topic")
+            .cryptoKeyReader(new RawFileKeyReader("test_ecdsa_pubkey.pem", "test_ecdsa_privkey.pem"))
+            .addEncryptionKey("myappkey").create();
 
         for (int i = 0; i < 10; i++) {
-            producer.send("my-message".getBytes());
+            producer.send("my-message");
         }
 
         pulsarClient.close();
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleProducer.java b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleProducer.java
index 467d34cd5b..003818ed42 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleProducer.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleProducer.java
@@ -23,15 +23,17 @@
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 
 public class SampleProducer {
     public static void main(String[] args) throws PulsarClientException, InterruptedException, IOException {
         PulsarClient client = PulsarClient.builder().serviceUrl("http://localhost:6650").build();
 
-        Producer<byte[]> producer = client.newProducer().topic("persistent://my-tenant/my-ns/my-topic").create();
+        Producer<String> producer = client.newProducer(Schema.STRING)
+            .topic("persistent://my-tenant/my-ns/my-topic").create();
 
         for (int i = 0; i < 10; i++) {
-            producer.send("my-message".getBytes());
+            producer.send("my-message");
         }
 
         client.close();
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 4d474333b9..cf57cf4422 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -115,8 +115,10 @@ public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, Li
         this.inputTopics = inputTopics;
         this.topicSchema = new TopicSchema(client);
 
-        this.producerBuilder = (ProducerBuilderImpl<?>) client.newProducer().blockIfQueueFull(true).enableBatching(true)
-                .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS);
+        this.producerBuilder = (ProducerBuilderImpl<?>) client.newProducer(Schema.AUTO_PRODUCE_BYTES())
+            .blockIfQueueFull(true)
+            .enableBatching(true)
+            .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS);
 
         if (config.getFunctionDetails().getUserConfig().isEmpty()) {
             userConfigs = new HashMap<>();
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index f5108fc6c1..4129635878 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -36,6 +36,7 @@
 import org.apache.pulsar.client.impl.ProducerBuilderImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
 import org.apache.pulsar.functions.instance.state.StateContextImpl;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.junit.Before;
@@ -63,7 +64,9 @@ public void setup() {
         config.setFunctionDetails(functionDetails);
         logger = mock(Logger.class);
         client = mock(PulsarClientImpl.class);
-        when(client.newProducer()).thenReturn(new ProducerBuilderImpl(client, Schema.BYTES));
+        AutoProduceBytesSchema<byte[]> autoProduceBytesSchema = (AutoProduceBytesSchema<byte[]>) Schema.AUTO_PRODUCE_BYTES();
+        autoProduceBytesSchema.setSchema(Schema.BYTES);
+        when(client.newProducer(any(Schema.class))).thenReturn(new ProducerBuilderImpl(client, autoProduceBytesSchema));
         when(client.createProducerAsync(Matchers.any(ProducerConfigurationData.class), Matchers.any(Schema.class), eq(null)))
                 .thenReturn(CompletableFuture.completedFuture(producer));
         when(client.getSchema(anyString())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services