You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/09/11 15:14:15 UTC

[GitHub] [pulsar] yittg commented on a change in pull request #5165: [PIP-43] Support producer to send msg with different schema

yittg commented on a change in pull request #5165: [PIP-43] Support producer to send msg with different schema
URL: https://github.com/apache/pulsar/pull/5165#discussion_r323301079
 
 

 ##########
 File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
 ##########
 @@ -430,6 +438,95 @@ public void sendAsync(Message<T> message, SendCallback callback) {
         }
     }
 
+    private boolean fillMessageSchema(MessageMetadata.Builder msgMetadataBuilder,
+                                      Schema msgSchema,
+                                      SendCallback callback) {
+        if (msgSchema == schema) {
+            schemaVersion.ifPresent(v -> msgMetadataBuilder.setSchemaVersion(ByteString.copyFrom(v)));
+            return true;
+        }
+        byte[] schemaVersion;
+        try {
+            schemaVersion = schemaCache.computeIfAbsent(
+                    SchemaHash.of(msgSchema), (hash) -> {
+                        SchemaInfo schemaInfo = Optional.ofNullable(msgSchema)
+                                                        .map(Schema::getSchemaInfo)
+                                                        .filter(si -> si.getType().getValue() > 0)
+                                                        .orElse(Schema.BYTES.getSchemaInfo());
+                        try {
+                            return getOrCreateSchemaAsync(schemaInfo).get();
+                        } catch (Throwable t) {
+                            log.warn("[{}][{}] GetOrCreateSchema error", topic, producerName, t);
+                            callback.sendComplete(PulsarClientException.unwrap(t));
+                            throw new RuntimeException(t);
+                        }
+                    });
+        } catch (RuntimeException e) {
+            return false;
+        }
+        conf.setMultiSchemaEnabled(true);
 
 Review comment:
   @congbobo184 thanks for your reply. Here enable multiple schema automatically, it works on some path like checking whether messages can be added to a same batch: it is not needed to check whether they have same schema if multiple schema not enabled.
   i'll make it more clear in following commit.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services