You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/12/11 22:33:05 UTC

[pulsar] branch master updated: improve avro schema deserialization by reusing binary decoder (#3158)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c029b95  improve avro schema deserialization by reusing binary decoder (#3158)
c029b95 is described below

commit c029b957a21f10db28b2c8bf37b631bf01dca6f8
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Tue Dec 11 14:32:59 2018 -0800

    improve avro schema deserialization by reusing binary decoder (#3158)
    
    * improve avro schema deserialization by reusing binary decoder
    
    * change class variable to be private
    
    * fix shading issue
---
 pulsar-client-schema/pom.xml                                 | 10 ----------
 .../org/apache/pulsar/client/impl/schema/AvroSchema.java     | 12 +++++++++++-
 .../java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java |  2 +-
 3 files changed, 12 insertions(+), 12 deletions(-)

diff --git a/pulsar-client-schema/pom.xml b/pulsar-client-schema/pom.xml
index 3db0cd2..c984dd3 100644
--- a/pulsar-client-schema/pom.xml
+++ b/pulsar-client-schema/pom.xml
@@ -131,16 +131,11 @@
                                     <include>commons-codec:commons-codec</include>
                                     <include>commons-collections:commons-collections</include>
                                     <include>org.asynchttpclient:*</include>
-                                    <include>io.netty:netty-codec-http</include>
-                                    <include>io.netty:netty-transport-native-epoll</include>
                                     <include>org.reactivestreams:reactive-streams</include>
-                                    <include>com.typesafe.netty:netty-reactive-streams</include>
                                     <include>org.javassist:javassist</include>
                                     <include>com.google.guava:guava</include>
                                     <include>com.google.code.gson:gson</include>
                                     <include>com.fasterxml.jackson.core</include>
-                                    <include>io.netty:netty</include>
-                                    <include>io.netty:netty-all</include>
                                     <include>org.apache.bookkeeper:circe-checksum</include>
                                     <include>com.yahoo.datasketches:sketches-core</include>
                                     <include>org.glassfish.jersey*:*</include>
@@ -150,7 +145,6 @@
                                     <include>com.fasterxml.jackson.*:*</include>
                                     <include>io.grpc:*</include>
                                     <include>com.yahoo.datasketches:*</include>
-                                    <include>io.netty:*</include>
                                     <include>com.squareup.*:*</include>
                                     <include>commons-*:*</include>
                                     <include>org.apache.httpcomponents:*</include>
@@ -211,10 +205,6 @@
                                     <shadedPattern>org.apache.pulsar.shade.com.fasterxml.jackson</shadedPattern>
                                 </relocation>
                                 <relocation>
-                                    <pattern>io.netty</pattern>
-                                    <shadedPattern>org.apache.pulsar.shade.io.netty</shadedPattern>
-                                </relocation>
-                                <relocation>
                                     <pattern>org.apache.pulsar.policies</pattern>
                                     <shadedPattern>org.apache.pulsar.shade.org.apache.pulsar.policies</shadedPattern>
                                 </relocation>
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
index 6867fdc..a9f1146 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
+import io.netty.util.concurrent.FastThreadLocal;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.EncoderFactory;
@@ -45,6 +47,9 @@ public class AvroSchema<T> implements Schema<T> {
     private BinaryEncoder encoder;
     private ByteArrayOutputStream byteArrayOutputStream;
 
+    private static final FastThreadLocal<BinaryDecoder> decoders =
+            new FastThreadLocal<>();
+
     private AvroSchema(org.apache.avro.Schema schema,
                        Map<String, String> properties) {
         this.schema = schema;
@@ -77,7 +82,12 @@ public class AvroSchema<T> implements Schema<T> {
     @Override
     public T decode(byte[] bytes) {
         try {
-            return reader.read(null, DecoderFactory.get().binaryDecoder(bytes, null));
+            BinaryDecoder decoderFromCache = decoders.get();
+            BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bytes, decoderFromCache);
+            if (decoderFromCache == null) {
+                decoders.set(decoder);
+            }
+            return reader.read(null, DecoderFactory.get().binaryDecoder(bytes, decoder));
         } catch (IOException e) {
             throw new SchemaSerializationException(e);
         }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
index ed3759a..41c2f6f 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
@@ -40,7 +40,7 @@ public class AvroSchemaHandler implements SchemaHandler {
 
     private final List<PulsarColumnHandle> columnHandles;
 
-    public static final FastThreadLocal<BinaryDecoder> decoders =
+    private static final FastThreadLocal<BinaryDecoder> decoders =
             new FastThreadLocal<>();
 
     private static final Logger log = Logger.get(AvroSchemaHandler.class);