You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/12/09 08:42:25 UTC

[GitHub] [pulsar] codelipenghui commented on a change in pull request #4847: Pulsar sql avro support schema version

codelipenghui commented on a change in pull request #4847: Pulsar sql avro support schema version
URL: https://github.com/apache/pulsar/pull/4847#discussion_r355316081
 
 

 ##########
 File path: pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
 ##########
 @@ -18,87 +18,92 @@
  */
 package org.apache.pulsar.sql.presto;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import io.airlift.log.Logger;
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.util.ReferenceCountUtil;
-import io.netty.util.concurrent.FastThreadLocal;
-import java.io.IOException;
+
 import java.util.List;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DecoderFactory;
+
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
+import org.apache.pulsar.common.api.raw.RawMessage;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
 
 /**
  * Schema handler for payload in the Avro format.
  */
 public class AvroSchemaHandler implements SchemaHandler {
 
-    private final DatumReader<GenericRecord> datumReader;
-
     private final List<PulsarColumnHandle> columnHandles;
 
-    private static final FastThreadLocal<BinaryDecoder> decoders =
-            new FastThreadLocal<>();
+    private final GenericAvroSchema genericAvroSchema;
+
+    private final SchemaInfo schemaInfo;
 
     private static final Logger log = Logger.get(AvroSchemaHandler.class);
 
-    public AvroSchemaHandler(Schema schema, List<PulsarColumnHandle> columnHandles) {
-        this.datumReader = new GenericDatumReader<>(schema);
+    AvroSchemaHandler(TopicName topicName,
+                      PulsarConnectorConfig pulsarConnectorConfig,
+                      SchemaInfo schemaInfo,
+                      List<PulsarColumnHandle> columnHandles) throws PulsarClientException {
+        this.schemaInfo = schemaInfo;
+        this.genericAvroSchema = new GenericAvroSchema(schemaInfo);
+        this.genericAvroSchema
+                .setSchemaInfoProvider(
+                        new PulsarSqlSchemaInfoProvider(topicName, pulsarConnectorConfig.getPulsarAdmin()));
         this.columnHandles = columnHandles;
     }
 
-    @Override
-    public Object deserialize(ByteBuf payload) {
-
-        ByteBuf heapBuffer = null;
-        try {
-            BinaryDecoder decoderFromCache = decoders.get();
-
-            // Make a copy into a heap buffer, since Avro cannot deserialize directly from direct memory
-            int size = payload.readableBytes();
-            heapBuffer = ByteBufAllocator.DEFAULT.heapBuffer(size, size);
-            heapBuffer.writeBytes(payload);
+    AvroSchemaHandler(PulsarSqlSchemaInfoProvider pulsarSqlSchemaInfoProvider,
+                      SchemaInfo schemaInfo, List<PulsarColumnHandle> columnHandles) {
+        this.schemaInfo = schemaInfo;
+        this.genericAvroSchema = new GenericAvroSchema(schemaInfo);
+        this.genericAvroSchema.setSchemaInfoProvider(pulsarSqlSchemaInfoProvider);
+        this.columnHandles = columnHandles;
+    }
 
-            BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(heapBuffer.array(), heapBuffer.arrayOffset(),
-                    heapBuffer.readableBytes(), decoderFromCache);
-            if (decoderFromCache == null) {
-                decoders.set(decoder);
-            }
-            return this.datumReader.read(null, decoder);
-        } catch (IOException e) {
-            log.error(e);
-        } finally {
-            ReferenceCountUtil.safeRelease(heapBuffer);
+    @Override
+    public Object deserialize(RawMessage rawMessage) {
 
 Review comment:
   You can just add a method deserialize(ByteBuf byteBuf, byte[] schemaVersion) to instead use RawMessage as an input param.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services