You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/04/10 02:38:36 UTC

[GitHub] [pulsar] rdhabalia opened a new pull request #10184: PIP 83 : Pulsar client: Message consumption with pooled buffer

rdhabalia opened a new pull request #10184:
URL: https://github.com/apache/pulsar/pull/10184


   ### Motivation
   Pulsar client library should provide support for an API to allow the application to access message payload from pooled buffers. The library has to also provide an associated release API, to release and deallocate pooled buffers used by the message.
   
   ### Modification
   - add support of pool message in consumer
   - add `Message::release()` api
   - add pool-message functionality support to client-tools for testing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia commented on a change in pull request #10184: PIP 83 : Pulsar client: Message consumption with pooled buffer

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on a change in pull request #10184:
URL: https://github.com/apache/pulsar/pull/10184#discussion_r616916831



##########
File path: pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
##########
@@ -122,7 +123,9 @@
     @Parameter(names = { "-st", "--schema-type"}, description = "Set a schema type on the consumer, it can be 'bytes' or 'auto_consume'")
     private String schematype = "bytes";
 
-    
+    @Parameter(names = { "-pm", "--pool-messages" }, description = "Use the pooled message")
+    private boolean poolMessages = false;

Review comment:
       I changed default value to true. let's keep this configuration for a release because for a small memory size client-host, they might want to use heap-memory as GC can help to make some space when internal queue size tuning is not right and all the memory is used.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia commented on pull request #10184: PIP 83 : Pulsar client: Message consumption with pooled buffer

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on pull request #10184:
URL: https://github.com/apache/pulsar/pull/10184#issuecomment-823488910


   @eolivelli @merlimat addressed the feedback.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on pull request #10184: PIP 83 : Pulsar client: Message consumption with pooled buffer

Posted by GitBox <gi...@apache.org>.
eolivelli commented on pull request #10184:
URL: https://github.com/apache/pulsar/pull/10184#issuecomment-823500624


   I have already approved.
   Awesome work


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #10184: PIP 83 : Pulsar client: Message consumption with pooled buffer

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #10184:
URL: https://github.com/apache/pulsar/pull/10184#discussion_r611078422



##########
File path: pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
##########
@@ -122,7 +123,9 @@
     @Parameter(names = { "-st", "--schema-type"}, description = "Set a schema type on the consumer, it can be 'bytes' or 'auto_consume'")
     private String schematype = "bytes";
 
-    
+    @Parameter(names = { "-pm", "--pool-messages" }, description = "Use the pooled message")
+    private boolean poolMessages = false;

Review comment:
       I'd say to not have it configurable for the tool. Just enabled it always.

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
##########
@@ -320,6 +393,14 @@ public boolean publishedEarlierThan(long timestamp) {
         }
     }
 
+    @Override
+    public int size() {
+        if (msgMetadata.isNullValue()) {
+            return 0;
+        }
+        return poolMessage ? payload.readableBytes() : getData().length;

Review comment:
       Shouldn't `payload.readableBytes()` work in both cases?

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
##########
@@ -93,93 +98,156 @@
 
     MessageImpl(String topic, MessageIdImpl messageId, MessageMetadata msgMetadata, ByteBuf payload,
                 Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema) {
-        this(topic, messageId, msgMetadata, payload, encryptionCtx, cnx, schema, 0);
+        this(topic, messageId, msgMetadata, payload, encryptionCtx, cnx, schema, 0, false);
     }
 
     MessageImpl(String topic, MessageIdImpl messageId, MessageMetadata msgMetadata, ByteBuf payload,
-                Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema, int redeliveryCount) {
-        this.msgMetadata = new MessageMetadata().copyFrom(msgMetadata);
-        this.messageId = messageId;
-        this.topic = topic;
-        this.cnx = cnx;
-        this.redeliveryCount = redeliveryCount;
+                Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema, int redeliveryCount,
+                boolean pooledMessage) {
+        this.msgMetadata = new MessageMetadata();
+        init(this, topic, messageId, msgMetadata, payload, encryptionCtx, cnx, schema, redeliveryCount, pooledMessage);
+    }
 
-        // Need to make a copy since the passed payload is using a ref-count buffer that we don't know when could
-        // release, since the Message is passed to the user. Also, the passed ByteBuf is coming from network and is
-        // backed by a direct buffer which we could not expose as a byte[]
-        this.payload = Unpooled.copiedBuffer(payload);
-        this.encryptionCtx = encryptionCtx;
+    public static <T> MessageImpl<T> create(String topic, MessageIdImpl messageId, MessageMetadata msgMetadata,
+            ByteBuf payload, Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema,
+            int redeliveryCount, boolean pooledMessage) {
+        if (pooledMessage) {
+            @SuppressWarnings("unchecked")
+            MessageImpl<T> msg = (MessageImpl<T>) RECYCLER.get();
+            init(msg, topic, messageId, msgMetadata, payload, encryptionCtx, cnx, schema, redeliveryCount,
+                    pooledMessage);
+            return msg;
+        } else {
+            return new MessageImpl<>(topic, messageId, msgMetadata, payload, encryptionCtx, cnx, schema,
+                    redeliveryCount, pooledMessage);
+        }
+    }
+
+    static <T> void init(MessageImpl<T> msg, String topic, MessageIdImpl messageId, MessageMetadata msgMetadata,
+            ByteBuf payload, Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema,
+            int redeliveryCount, boolean pooledMessage) {
+        msg.msgMetadata.clear();
+        msg.msgMetadata.copyFrom(msgMetadata);
+        msg.messageId = messageId;
+        msg.topic = topic;
+        msg.cnx = cnx;
+        msg.redeliveryCount = redeliveryCount;
+
+        msg.poolMessage = pooledMessage;
+        if (pooledMessage) {
+            msg.payload = payload;
+            payload.retain();

Review comment:
       ```suggestion
               msg.payload = payload.retain();
   ```

##########
File path: pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
##########
@@ -167,6 +172,9 @@
     
         @Parameter(names = {"--batch-index-ack" }, description = "Enable or disable the batch index acknowledgment")
         public boolean batchIndexAck = false;
+
+        @Parameter(names = { "-pm", "--pool-messages" }, description = "Use the pooled message")
+        private boolean poolMessages = false;

Review comment:
       Again, for tools, I'd keep it always enabled.

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
##########
@@ -77,8 +77,15 @@ public ByteBuffer decode(byte[] data) {
 
     @Override
     public ByteBuffer decode(ByteBuf byteBuf) {
+        return decode(byteBuf, null);
+    }
+
+    @Override
+    public ByteBuffer decode(ByteBuf byteBuf, byte[] schemaVersion) {
         if (null == byteBuf) {
             return null;
+        } else if(byteBuf.isDirect()){
+            return byteBuf.nioBuffer();

Review comment:
       We can return the NIO buffer even if it's on the heap

##########
File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java
##########
@@ -66,6 +66,13 @@
      */
     byte[] getData();
 
+    /**
+     * Get the message payload size in bytes.
+     * 

Review comment:
       We should specify wether it's compressed or uncompressed size.

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
##########
@@ -310,6 +378,11 @@ public boolean publishedEarlierThan(long timestamp) {
         if (msgMetadata.isNullValue()) {
             return null;
         }
+        if (poolMessage) {

Review comment:
       Why do we need the special case here?

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
##########
@@ -93,93 +98,156 @@
 
     MessageImpl(String topic, MessageIdImpl messageId, MessageMetadata msgMetadata, ByteBuf payload,
                 Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema) {
-        this(topic, messageId, msgMetadata, payload, encryptionCtx, cnx, schema, 0);
+        this(topic, messageId, msgMetadata, payload, encryptionCtx, cnx, schema, 0, false);
     }
 
     MessageImpl(String topic, MessageIdImpl messageId, MessageMetadata msgMetadata, ByteBuf payload,
-                Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema, int redeliveryCount) {
-        this.msgMetadata = new MessageMetadata().copyFrom(msgMetadata);
-        this.messageId = messageId;
-        this.topic = topic;
-        this.cnx = cnx;
-        this.redeliveryCount = redeliveryCount;
+                Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema, int redeliveryCount,
+                boolean pooledMessage) {
+        this.msgMetadata = new MessageMetadata();
+        init(this, topic, messageId, msgMetadata, payload, encryptionCtx, cnx, schema, redeliveryCount, pooledMessage);
+    }
 
-        // Need to make a copy since the passed payload is using a ref-count buffer that we don't know when could
-        // release, since the Message is passed to the user. Also, the passed ByteBuf is coming from network and is
-        // backed by a direct buffer which we could not expose as a byte[]
-        this.payload = Unpooled.copiedBuffer(payload);
-        this.encryptionCtx = encryptionCtx;
+    public static <T> MessageImpl<T> create(String topic, MessageIdImpl messageId, MessageMetadata msgMetadata,
+            ByteBuf payload, Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema,
+            int redeliveryCount, boolean pooledMessage) {
+        if (pooledMessage) {
+            @SuppressWarnings("unchecked")
+            MessageImpl<T> msg = (MessageImpl<T>) RECYCLER.get();
+            init(msg, topic, messageId, msgMetadata, payload, encryptionCtx, cnx, schema, redeliveryCount,
+                    pooledMessage);
+            return msg;
+        } else {
+            return new MessageImpl<>(topic, messageId, msgMetadata, payload, encryptionCtx, cnx, schema,
+                    redeliveryCount, pooledMessage);
+        }
+    }
+
+    static <T> void init(MessageImpl<T> msg, String topic, MessageIdImpl messageId, MessageMetadata msgMetadata,
+            ByteBuf payload, Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema,
+            int redeliveryCount, boolean pooledMessage) {
+        msg.msgMetadata.clear();
+        msg.msgMetadata.copyFrom(msgMetadata);
+        msg.messageId = messageId;
+        msg.topic = topic;
+        msg.cnx = cnx;
+        msg.redeliveryCount = redeliveryCount;
+
+        msg.poolMessage = pooledMessage;
+        if (pooledMessage) {
+            msg.payload = payload;
+            payload.retain();
+        } else {
+            // Need to make a copy since the passed payload is using a ref-count buffer that we don't know when could
+            // release, since the Message is passed to the user. Also, the passed ByteBuf is coming from network and is
+            // backed by a direct buffer which we could not expose as a byte[]
+            msg.payload = Unpooled.copiedBuffer(payload);
+        }
+        msg.encryptionCtx = encryptionCtx;
 
         if (msgMetadata.getPropertiesCount() > 0) {
-            this.properties = Collections.unmodifiableMap(msgMetadata.getPropertiesList().stream()
-                    .collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue,
-                            (oldValue,newValue) -> newValue)));
+            msg.properties = Collections.unmodifiableMap(msgMetadata.getPropertiesList().stream()
+                    .collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue, (oldValue, newValue) -> newValue)));
         } else {
-            properties = Collections.emptyMap();
+            msg.properties = Collections.emptyMap();
         }
-        this.schema = schema;
+        msg.schema = schema;
     }
 
     MessageImpl(String topic, BatchMessageIdImpl batchMessageIdImpl, MessageMetadata msgMetadata,
-                SingleMessageMetadata singleMessageMetadata, ByteBuf payload,
-                Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema) {
-        this(topic, batchMessageIdImpl, msgMetadata, singleMessageMetadata, payload, encryptionCtx, cnx, schema, 0);
+            SingleMessageMetadata singleMessageMetadata, ByteBuf payload, Optional<EncryptionContext> encryptionCtx,
+            ClientCnx cnx, Schema<T> schema) {
+        this(topic, batchMessageIdImpl, msgMetadata, singleMessageMetadata, payload, encryptionCtx, cnx, schema, 0,
+                false);
     }
 
     MessageImpl(String topic, BatchMessageIdImpl batchMessageIdImpl, MessageMetadata batchMetadata,
-                SingleMessageMetadata singleMessageMetadata, ByteBuf payload,
-                Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema, int redeliveryCount) {
-        this.msgMetadata = new MessageMetadata().copyFrom(batchMetadata);
-        this.messageId = batchMessageIdImpl;
-        this.topic = topic;
-        this.cnx = cnx;
-        this.redeliveryCount = redeliveryCount;
+            SingleMessageMetadata singleMessageMetadata, ByteBuf payload, Optional<EncryptionContext> encryptionCtx,
+            ClientCnx cnx, Schema<T> schema, int redeliveryCount, boolean keepMessageInDirectMemory) {
+        this.msgMetadata = new MessageMetadata();
+        init(this, topic, batchMessageIdImpl, batchMetadata, singleMessageMetadata, payload, encryptionCtx, cnx, schema,
+                redeliveryCount, keepMessageInDirectMemory);
+
+    }
+
+    public static <T> MessageImpl<T> create(String topic, BatchMessageIdImpl batchMessageIdImpl,
+            MessageMetadata batchMetadata, SingleMessageMetadata singleMessageMetadata, ByteBuf payload,
+            Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema, int redeliveryCount,
+            boolean pooledMessage) {
+        if (pooledMessage) {
+            @SuppressWarnings("unchecked")
+            MessageImpl<T> msg = (MessageImpl<T>) RECYCLER.get();
+            init(msg, topic, batchMessageIdImpl, batchMetadata, singleMessageMetadata, payload, encryptionCtx, cnx,
+                    schema, redeliveryCount, pooledMessage);
+            return msg;
+        } else {
+            return new MessageImpl<>(topic, batchMessageIdImpl, batchMetadata, singleMessageMetadata, payload,
+                    encryptionCtx, cnx, schema, redeliveryCount, pooledMessage);
+        }
+    }
 
-        this.payload = Unpooled.copiedBuffer(payload);
-        this.encryptionCtx = encryptionCtx;
+    private static <T> void init(MessageImpl<T> msg, String topic, BatchMessageIdImpl batchMessageIdImpl,
+            MessageMetadata batchMetadata, SingleMessageMetadata singleMessageMetadata, ByteBuf payload,
+            Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T> schema, int redeliveryCount,
+            boolean poolMessage) {
+        msg.msgMetadata.clear();
+        msg.msgMetadata.copyFrom(batchMetadata);
+        msg.messageId = batchMessageIdImpl;
+        msg.topic = topic;
+        msg.cnx = cnx;
+        msg.redeliveryCount = redeliveryCount;
+
+        msg.poolMessage = poolMessage;

Review comment:
       This portion is being repeated from the other `init()` method. Is there a way to reuse that?

##########
File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
##########
@@ -120,6 +122,21 @@ default T decode(byte[] bytes, byte[] schemaVersion) {
         return decode(bytes);
     }
 
+    /**
+     * Decode a ByteBuf into an object using a given version. <br/>
+     * <b>NOTE</b>: This method should not modify reader/writer index of ByteBuf else it can cause corruption while
+     * accessing same ByteBuf for decoding and deserialization.
+     *
+     * @param byteBuf
+     *            the byte array to decode
+     * @param schemaVersion
+     *            the schema version to decode the object. null indicates using latest version.
+     * @return the deserialized object
+     */
+    default T decode(ByteBuf bytes, byte[] schemaVersion) {

Review comment:
       Since `Schema` is also part of public API, we can't use Netty `ByteBuf`. Instead we can use `java.nio.ByteBuffer`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10184: PIP 83 : Pulsar client: Message consumption with pooled buffer

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10184:
URL: https://github.com/apache/pulsar/pull/10184#discussion_r611002212



##########
File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
##########
@@ -120,6 +122,21 @@ default T decode(byte[] bytes, byte[] schemaVersion) {
         return decode(bytes);
     }
 
+    /**
+     * Decode a ByteBuf into an object using a given version. <br/>
+     * <b>NOTE</b>: This method should not modify reader/writer index of ByteBuf else it can cause corruption while
+     * accessing same ByteBuf for decoding and deserialization.
+     *
+     * @param byteBuf
+     *            the byte array to decode
+     * @param schemaVersion
+     *            the schema version to decode the object. null indicates using latest version.
+     * @return the deserialized object
+     */
+    default T decode(ByteBuf bytes, byte[] schemaVersion) {
+        return null;

Review comment:
       can you please write the default implementation, that calls the other `decode` method ?
   
   otherwise custom schema may break if in the future we are going to relay more on this method

##########
File path: pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
##########
@@ -171,6 +174,12 @@ private String interpretMessage(Message<?> message, boolean displayHex) throws I
         } else if (value instanceof GenericRecord) {
             Map<String, Object> asMap = genericRecordToMap((GenericRecord) value);
             data = asMap.toString();
+        } else if (value instanceof ByteBuffer) {
+            ByteBuffer payload = ((ByteBuffer)value);

Review comment:
       what about using the internal array of the ByteBuffer in case of "hasArray", offset=0 and len == remaining? I did the same in BytesKafkaSource
   this way we can save a copy.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia commented on a change in pull request #10184: PIP 83 : Pulsar client: Message consumption with pooled buffer

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on a change in pull request #10184:
URL: https://github.com/apache/pulsar/pull/10184#discussion_r611006715



##########
File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
##########
@@ -120,6 +122,21 @@ default T decode(byte[] bytes, byte[] schemaVersion) {
         return decode(bytes);
     }
 
+    /**
+     * Decode a ByteBuf into an object using a given version. <br/>
+     * <b>NOTE</b>: This method should not modify reader/writer index of ByteBuf else it can cause corruption while
+     * accessing same ByteBuf for decoding and deserialization.
+     *
+     * @param byteBuf
+     *            the byte array to decode
+     * @param schemaVersion
+     *            the schema version to decode the object. null indicates using latest version.
+     * @return the deserialized object
+     */
+    default T decode(ByteBuf bytes, byte[] schemaVersion) {
+        return null;

Review comment:
       custom schema will not break, `MessageImpl` already handles and calls default `decode`. calling default decode from here will require ByteBuf conversion to byte[] which should be handled at top level and in this case we are handling at `MessageImpl`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10184: PIP 83 : Pulsar client: Message consumption with pooled buffer

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10184:
URL: https://github.com/apache/pulsar/pull/10184#discussion_r611008720



##########
File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
##########
@@ -120,6 +122,21 @@ default T decode(byte[] bytes, byte[] schemaVersion) {
         return decode(bytes);
     }
 
+    /**
+     * Decode a ByteBuf into an object using a given version. <br/>
+     * <b>NOTE</b>: This method should not modify reader/writer index of ByteBuf else it can cause corruption while
+     * accessing same ByteBuf for decoding and deserialization.
+     *
+     * @param byteBuf
+     *            the byte array to decode
+     * @param schemaVersion
+     *            the schema version to decode the object. null indicates using latest version.
+     * @return the deserialized object
+     */
+    default T decode(ByteBuf bytes, byte[] schemaVersion) {
+        return null;

Review comment:
       if you look at this from Pulsar point of view you are right.
   but sometimes in your application or Pulsar IO Sink you have a Schema instance and you want to decode a given payload using the Schema.
   In this case if you see that there is a `decode(ByteBuf) `method and you already have a `ByteBuf` you are tempted to use this new method, but it is not implemented for most of the system Schema and it is absolutely not implemented for new custom Schema.
   
   this is why I believe that we should provide a default implementation that relies on the fact that the `decode(byte[]....) `method is always implemented.
   
   the default implementation is straightforward and I believe it is worth to add it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia commented on a change in pull request #10184: PIP 83 : Pulsar client: Message consumption with pooled buffer

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on a change in pull request #10184:
URL: https://github.com/apache/pulsar/pull/10184#discussion_r616919819



##########
File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
##########
@@ -120,6 +122,21 @@ default T decode(byte[] bytes, byte[] schemaVersion) {
         return decode(bytes);
     }
 
+    /**
+     * Decode a ByteBuf into an object using a given version. <br/>
+     * <b>NOTE</b>: This method should not modify reader/writer index of ByteBuf else it can cause corruption while
+     * accessing same ByteBuf for decoding and deserialization.
+     *
+     * @param byteBuf
+     *            the byte array to decode
+     * @param schemaVersion
+     *            the schema version to decode the object. null indicates using latest version.
+     * @return the deserialized object
+     */
+    default T decode(ByteBuf bytes, byte[] schemaVersion) {

Review comment:
       I introduced `ByteBuf` earlier as [AbstractSchema](https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java#L60) already has the same API so, it won't impact other existing Schema implementation as all existing impl are extending `AbstracSchema`.
   But I agree. `Schema` is part of public API and it should not have ByteBuf. So, changed to `ByteBuffer`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia merged pull request #10184: PIP 83 : Pulsar client: Message consumption with pooled buffer

Posted by GitBox <gi...@apache.org>.
rdhabalia merged pull request #10184:
URL: https://github.com/apache/pulsar/pull/10184


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org