You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/12/11 22:33:05 UTC
[pulsar] branch master updated: improve avro schema deserialization
by reusing binary decoder (#3158)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c029b95 improve avro schema deserialization by reusing binary decoder (#3158)
c029b95 is described below
commit c029b957a21f10db28b2c8bf37b631bf01dca6f8
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Tue Dec 11 14:32:59 2018 -0800
improve avro schema deserialization by reusing binary decoder (#3158)
* improve avro schema deserialization by reusing binary decoder
* change class variable to be private
* fix shading issue
---
pulsar-client-schema/pom.xml | 10 ----------
.../org/apache/pulsar/client/impl/schema/AvroSchema.java | 12 +++++++++++-
.../java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java | 2 +-
3 files changed, 12 insertions(+), 12 deletions(-)
diff --git a/pulsar-client-schema/pom.xml b/pulsar-client-schema/pom.xml
index 3db0cd2..c984dd3 100644
--- a/pulsar-client-schema/pom.xml
+++ b/pulsar-client-schema/pom.xml
@@ -131,16 +131,11 @@
<include>commons-codec:commons-codec</include>
<include>commons-collections:commons-collections</include>
<include>org.asynchttpclient:*</include>
- <include>io.netty:netty-codec-http</include>
- <include>io.netty:netty-transport-native-epoll</include>
<include>org.reactivestreams:reactive-streams</include>
- <include>com.typesafe.netty:netty-reactive-streams</include>
<include>org.javassist:javassist</include>
<include>com.google.guava:guava</include>
<include>com.google.code.gson:gson</include>
<include>com.fasterxml.jackson.core</include>
- <include>io.netty:netty</include>
- <include>io.netty:netty-all</include>
<include>org.apache.bookkeeper:circe-checksum</include>
<include>com.yahoo.datasketches:sketches-core</include>
<include>org.glassfish.jersey*:*</include>
@@ -150,7 +145,6 @@
<include>com.fasterxml.jackson.*:*</include>
<include>io.grpc:*</include>
<include>com.yahoo.datasketches:*</include>
- <include>io.netty:*</include>
<include>com.squareup.*:*</include>
<include>commons-*:*</include>
<include>org.apache.httpcomponents:*</include>
@@ -211,10 +205,6 @@
<shadedPattern>org.apache.pulsar.shade.com.fasterxml.jackson</shadedPattern>
</relocation>
<relocation>
- <pattern>io.netty</pattern>
- <shadedPattern>org.apache.pulsar.shade.io.netty</shadedPattern>
- </relocation>
- <relocation>
<pattern>org.apache.pulsar.policies</pattern>
<shadedPattern>org.apache.pulsar.shade.org.apache.pulsar.policies</shadedPattern>
</relocation>
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
index 6867fdc..a9f1146 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
@@ -18,7 +18,9 @@
*/
package org.apache.pulsar.client.impl.schema;
+import io.netty.util.concurrent.FastThreadLocal;
import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
@@ -45,6 +47,9 @@ public class AvroSchema<T> implements Schema<T> {
private BinaryEncoder encoder;
private ByteArrayOutputStream byteArrayOutputStream;
+ private static final FastThreadLocal<BinaryDecoder> decoders =
+ new FastThreadLocal<>();
+
private AvroSchema(org.apache.avro.Schema schema,
Map<String, String> properties) {
this.schema = schema;
@@ -77,7 +82,12 @@ public class AvroSchema<T> implements Schema<T> {
@Override
public T decode(byte[] bytes) {
try {
- return reader.read(null, DecoderFactory.get().binaryDecoder(bytes, null));
+ BinaryDecoder decoderFromCache = decoders.get();
+ BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bytes, decoderFromCache);
+ if (decoderFromCache == null) {
+ decoders.set(decoder);
+ }
+ return reader.read(null, DecoderFactory.get().binaryDecoder(bytes, decoder));
} catch (IOException e) {
throw new SchemaSerializationException(e);
}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
index ed3759a..41c2f6f 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
@@ -40,7 +40,7 @@ public class AvroSchemaHandler implements SchemaHandler {
private final List<PulsarColumnHandle> columnHandles;
- public static final FastThreadLocal<BinaryDecoder> decoders =
+ private static final FastThreadLocal<BinaryDecoder> decoders =
new FastThreadLocal<>();
private static final Logger log = Logger.get(AvroSchemaHandler.class);