You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/02/04 08:12:10 UTC

[GitHub] [pulsar] eolivelli commented on a change in pull request #9448: Pulsar IO - KafkaSource - allow to manage Avro Encoded messages

eolivelli commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r569527016



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -365,6 +365,17 @@ public void addCallback(MessageImpl<?> msg, SendCallback scb) {
         }
     }
 
+    /**
+     * Compress the payload if compression is configured
+     * @param payload
+     * @return a new payload
+     */
+    private ByteBuf applyCompression(ByteBuf payload) {

Review comment:
       changes to ProducerImpl are part of https://github.com/apache/pulsar/pull/9396
   I will revert them as soon as #9396 is committed

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
##########
@@ -3854,4 +3857,49 @@ public void testIncomingMessageSize(boolean isPartitioned) throws Exception {
             Assert.assertEquals(size, 0);
         });
     }
+
+
+    @Data
+    @EqualsAndHashCode
+    public static class MyBean {
+        private String field;
+    }
+
+    @DataProvider(name = "enableBatching")
+    public static Object[] isEnableBatching() {
+        return new Object[]{false, true};
+    }
+
+    @Test(dataProvider = "enableBatching")
+    public void testSendCompressedWithDeferredSchemaSetup(boolean enableBatching) throws Exception {

Review comment:
       this is part of https://github.com/apache/pulsar/pull/9396

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroWriter.java
##########
@@ -18,32 +18,47 @@
  */
 package org.apache.pulsar.client.impl.schema.generic;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificRecordBase;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.SchemaWriter;
 
 import java.io.ByteArrayOutputStream;
-
+@Slf4j
 public class GenericAvroWriter implements SchemaWriter<GenericRecord> {
 
     private final GenericDatumWriter<org.apache.avro.generic.GenericRecord> writer;
     private BinaryEncoder encoder;
     private final ByteArrayOutputStream byteArrayOutputStream;
+    private final Schema schema;
+    private final GenericRecordAdapter adapter;
 
     public GenericAvroWriter(Schema schema) {
+        this.schema = schema;
         this.writer = new GenericDatumWriter<>(schema);
         this.byteArrayOutputStream = new ByteArrayOutputStream();
         this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, encoder);
+        this.adapter = new GenericRecordAdapter();
     }
 
     @Override
     public synchronized byte[] write(GenericRecord message) {
         try {
-            writer.write(((GenericAvroRecord)message).getAvroRecord(), this.encoder);
+            if (message instanceof GenericAvroRecord) {
+                writer.write(((GenericAvroRecord) message).getAvroRecord(), this.encoder);
+            } else {
+                adapter.setCurrentMessage(message);

Review comment:
       @sijie here we are not extending the Pulsar Schema API, but simply we are allowing this method to work with any implementation of GenericRecord.

##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
##########
@@ -385,28 +388,43 @@ public void close() throws Exception {
         }
     }
 
+    @AllArgsConstructor
+    static class InitSchemaResult<T> {
+        final Schema<T> schema;
+        final boolean requireSink;
+    }
+
     @SuppressWarnings("unchecked")
     @VisibleForTesting
-    Schema<T> initializeSchema() throws ClassNotFoundException {
+    InitSchemaResult<T> initializeSchema() throws ClassNotFoundException {
         if (StringUtils.isEmpty(this.pulsarSinkConfig.getTypeClassName())) {
-            return (Schema<T>) Schema.BYTES;
+            return new InitSchemaResult((Schema<T>) Schema.BYTES, true);
         }
 
         Class<?> typeArg = Reflections.loadClass(this.pulsarSinkConfig.getTypeClassName(), functionClassLoader);
         if (Void.class.equals(typeArg)) {
             // return type is 'void', so there's no schema to check
-            return null;
+            log.info("typeClassName is {}, no need to force a schema", this.pulsarSinkConfig.getTypeClassName());
+            return new InitSchemaResult(null, false);
+        }
+        if (GenericRecord.class.equals(typeArg)) {

Review comment:
       with this change we can allow Pulsar Sources to produce GenericRecord




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org