You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/02/08 15:45:29 UTC

[GitHub] [pulsar] congbobo184 commented on a change in pull request #9481: Allow Pulsar IO Sources to push GenericRecord instances encoded with AVRO

congbobo184 commented on a change in pull request #9481:
URL: https://github.com/apache/pulsar/pull/9481#discussion_r572143808



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroWriter.java
##########
@@ -52,4 +67,29 @@ public GenericAvroWriter(Schema schema) {
             this.byteArrayOutputStream.reset();
         }
     }
+
+    /**
+     * This is an adapter from Pulsar GenericRecord to Avro classes.
+     */
+    private class GenericRecordAdapter extends SpecificRecordBase {
+        private GenericRecord message;
+
+        void setCurrentMessage(GenericRecord message) {
+            this.message = message;
+        }
+        @Override
+        public Schema getSchema() {
+            return schema;
+        }
+
+        @Override
+        public Object get(int field) {
+            return message.getField(schema.getFields().get(field).name());
+        }
+
+        @Override
+        public void put(int field, Object value) {
+            throw new UnsupportedOperationException();
+        }
+    }

Review comment:
       Now we generate writer is `GenericDatumWriter`, it can't write `SpecificRecordBase` , we should support SpecificDatumWriter. So, I don't think it a good way to do this.

##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
##########
@@ -196,6 +197,9 @@ private static boolean isProtobufClass(Class<?> pojoClazz) {
         String schemaTypeOrClassName = conf.getSchemaType();
         if (StringUtils.isEmpty(schemaTypeOrClassName) || DEFAULT_SERDE.equals(schemaTypeOrClassName)) {
             // No preferred schema was provided, auto-discover schema or fallback to defaults
+            if (!input && clazz.equals(GenericRecord.class)) {
+                return new AutoProduceBytesSchema();
+            }

Review comment:
       It seems integration test don't cover here, it also use `record.getSchema()`. What is the integration test testing? I did not understand here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org