You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/07/18 18:40:05 UTC

[GitHub] [pulsar] tjiuming opened a new pull request, #16659: [schema][client][improve] Add decode InputStream for Schema

tjiuming opened a new pull request, #16659:
URL: https://github.com/apache/pulsar/pull/16659

   ### Motivation
   
   Add decode(InputStream) for Schema, for the purpose of preventing allocate heap memory directly.
   
   ### Modifications
   
   1. Add new methods in Schema.java with default implementation.
   2. implement new methods for AbstractStructSchema and AutoConsumeSchema
   3. replace decode(byte[]) to decode(InputStream) in MessageImpl#decodeBySchema
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency):  (no)
     - The public API: (yes)
     - The schema: (yes)
     - The default values of configurations: (no)
     - The wire protocol: (no)
     - The rest endpoints: (no)
     - The admin cli options: (no)
     - Anything that affects deployment: (no)
   
   ### Documentation
   
   Check the box below or label this PR directly.
   
   Need to update docs? 
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
     
   - [x] `doc-not-needed` 
   (Please explain why)
     
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on pull request #16659: [schema][client][improve] Add decode InputStream for Schema

Posted by GitBox <gi...@apache.org>.
nodece commented on PR #16659:
URL: https://github.com/apache/pulsar/pull/16659#issuecomment-1200454831

   @tjiuming Don't run `/pulsarbot run-failure-checks` again, the CI has been passed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on pull request #16659: [schema][client][improve] Add decode InputStream for Schema

Posted by GitBox <gi...@apache.org>.
tjiuming commented on PR #16659:
URL: https://github.com/apache/pulsar/pull/16659#issuecomment-1200452745

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #16659: [schema][client][improve] Add decode InputStream for Schema

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #16659:
URL: https://github.com/apache/pulsar/pull/16659#discussion_r925416800


##########
pulsar-client-api/src/main/java/org/apache/pulsar/common/utils/IOUtils.java:
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Utils for I/O operations.
+ */
+public final class IOUtils {

Review Comment:
   You don't need to change, this is a prompt message.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nahguam commented on a diff in pull request #16659: [schema][client][improve] Add decode InputStream for Schema

Posted by GitBox <gi...@apache.org>.
nahguam commented on code in PR #16659:
URL: https://github.com/apache/pulsar/pull/16659#discussion_r925552061


##########
pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java:
##########
@@ -1253,14 +1253,14 @@ private void testIncompatibleSchema() throws Exception {
         Message<User> message1 = consumer.receive();
         Assert.assertEquals(test, message1.getValue());
         Message<User> message2 = consumer.receive();
+
+        boolean exceptionHappened = false;

Review Comment:
   AssertJ has assertions for these cases:
   
   ```
   assertThatExceptionOfType(SchemaSerializationException.class).isThrownBy(() -> {
               message2.getValue();
           });
   ```



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java:
##########
@@ -489,9 +490,9 @@ private T decodeBySchema(byte[] schemaVersion) {
             return value;
         }
         if (null == schemaVersion) {
-            return schema.decode(getData());
+            return schema.decode(new ByteBufInputStream(payload));

Review Comment:
   I presume this bit is the optimisation that prevents the payload copy?



##########
pulsar-client-api/src/main/java/org/apache/pulsar/common/utils/IOUtils.java:
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Utils for I/O operations.
+ */
+public final class IOUtils {

Review Comment:
   Do you mean it was copied from JDK 9+ `InputStream.readAllBytes()`?
   
   Would it be useful to put a comment in the code the describe it as such? Perhaps also a TODO to remove it when the client code compilation target is upgraded to >= 9.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #16659: [schema][client][improve] Add decode InputStream for Schema

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #16659:
URL: https://github.com/apache/pulsar/pull/16659#discussion_r925311163


##########
pulsar-client-api/src/main/java/org/apache/pulsar/common/utils/IOUtils.java:
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Utils for I/O operations.
+ */
+public final class IOUtils {

Review Comment:
   This implement is copied from JDK.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #16659: [schema][client][improve] Add decode InputStream for Schema

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #16659:
URL: https://github.com/apache/pulsar/pull/16659#discussion_r926196390


##########
pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java:
##########
@@ -1253,14 +1253,14 @@ private void testIncompatibleSchema() throws Exception {
         Message<User> message1 = consumer.receive();
         Assert.assertEquals(test, message1.getValue());
         Message<User> message2 = consumer.receive();
+
+        boolean exceptionHappened = false;

Review Comment:
   TestNG:
   ```
   assertThrows(SchemaSerializationException.class, () -> message2.getValue());
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on a diff in pull request #16659: [schema][client][improve] Add decode InputStream for Schema

Posted by GitBox <gi...@apache.org>.
tjiuming commented on code in PR #16659:
URL: https://github.com/apache/pulsar/pull/16659#discussion_r925593238


##########
pulsar-client-api/src/main/java/org/apache/pulsar/common/utils/IOUtils.java:
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Utils for I/O operations.
+ */
+public final class IOUtils {

Review Comment:
   commented



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nahguam commented on a diff in pull request #16659: [schema][client][improve] Add decode InputStream for Schema

Posted by GitBox <gi...@apache.org>.
nahguam commented on code in PR #16659:
URL: https://github.com/apache/pulsar/pull/16659#discussion_r925552061


##########
pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java:
##########
@@ -1253,14 +1253,14 @@ private void testIncompatibleSchema() throws Exception {
         Message<User> message1 = consumer.receive();
         Assert.assertEquals(test, message1.getValue());
         Message<User> message2 = consumer.receive();
+
+        boolean exceptionHappened = false;

Review Comment:
   AssertJ has assertions for these cases:
   
   ```
   assertThatExceptionOfType(SchemaSerializationException.class)
       .isThrownBy(() -> {
               message2.getValue();
           });
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on pull request #16659: [schema][client][improve] Add decode InputStream for Schema

Posted by GitBox <gi...@apache.org>.
tjiuming commented on PR #16659:
URL: https://github.com/apache/pulsar/pull/16659#issuecomment-1200390903

   @congbobo184 PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on a diff in pull request #16659: [schema][client][improve] Add decode InputStream for Schema

Posted by GitBox <gi...@apache.org>.
tjiuming commented on code in PR #16659:
URL: https://github.com/apache/pulsar/pull/16659#discussion_r932851305


##########
pulsar-client/src/main/java/org/apache/pulsar/client/util/ReadonlyByteBufInputStream.java:
##########
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.util;
+
+import io.netty.buffer.ByteBuf;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Wrap ByteBuf to InputStream, this will not move ByteBuf readerIndex.
+ */
+public final class ReadonlyByteBufInputStream extends InputStream {

Review Comment:
   yes, I just have a try and see if the CI checks passed, will delete them later



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #16659: [schema][client][improve] Add decode InputStream for Schema

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #16659:
URL: https://github.com/apache/pulsar/pull/16659#discussion_r929493209


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java:
##########
@@ -489,9 +490,9 @@ private T decodeBySchema(byte[] schemaVersion) {
             return value;
         }
         if (null == schemaVersion) {
-            return schema.decode(getData());
+            return schema.decode(new ReadonlyByteBufInputStream(payload));

Review Comment:
   Why should we need to change this logic?
   
   https://github.com/apache/pulsar/blob/176418765eed3775758a1293217ca5928b47579b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java#L70-L72
   
   Has changed this to `ByteBufInputStream`, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on a diff in pull request #16659: [schema][client][improve] Add decode InputStream for Schema

Posted by GitBox <gi...@apache.org>.
tjiuming commented on code in PR #16659:
URL: https://github.com/apache/pulsar/pull/16659#discussion_r925370433


##########
pulsar-client-api/src/main/java/org/apache/pulsar/common/utils/IOUtils.java:
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Utils for I/O operations.
+ */
+public final class IOUtils {

Review Comment:
   yes, from OracleJDK, is it illegal? Or, I'll change it to another implementation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on pull request #16659: [schema][client][improve] Add decode InputStream for Schema

Posted by GitBox <gi...@apache.org>.
tjiuming commented on PR #16659:
URL: https://github.com/apache/pulsar/pull/16659#issuecomment-1199109322

   I've discussed with @congbobo184, that introducing decode(InputStream) to Schema may lead to the incorrect usage of users, so decode(ByteBuffer) is enough.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #16659: [schema][client][improve] Add decode InputStream for Schema

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #16659:
URL: https://github.com/apache/pulsar/pull/16659#discussion_r929493209


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java:
##########
@@ -489,9 +490,9 @@ private T decodeBySchema(byte[] schemaVersion) {
             return value;
         }
         if (null == schemaVersion) {
-            return schema.decode(getData());
+            return schema.decode(new ReadonlyByteBufInputStream(payload));

Review Comment:
   why should we change this logic?
   
   https://github.com/apache/pulsar/blob/176418765eed3775758a1293217ca5928b47579b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java#L70-L72
   
   has changed this to `ByteBufInputStream` right?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java:
##########
@@ -489,9 +490,9 @@ private T decodeBySchema(byte[] schemaVersion) {
             return value;
         }
         if (null == schemaVersion) {
-            return schema.decode(getData());
+            return schema.decode(new ReadonlyByteBufInputStream(payload));
         } else {
-            return schema.decode(getData(), schemaVersion);
+            return schema.decode(new ReadonlyByteBufInputStream(payload), schemaVersion);

Review Comment:
   same above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #16659: [schema][client][improve] Add decode InputStream for Schema

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #16659:
URL: https://github.com/apache/pulsar/pull/16659#discussion_r925282585


##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java:
##########
@@ -120,6 +123,36 @@ default T decode(byte[] bytes, byte[] schemaVersion) {
         return decode(bytes);
     }
 
+    /**
+     * Decode a byte array into an object using the schema definition and deserializer implementation.
+     *
+     * @param input the inputstream to decode
+     * @return the deserialized object
+     */
+    default T decode(InputStream input) {
+        try {
+            return decode(IOUtils.readAllBytes(input));
+        } catch (IOException ex) {
+            throw new SchemaSerializationException(ex);
+        }
+    }
+
+    /**
+     * Decode a byte array into an object using a given version.
+     *
+     * @param input the inputstream to decode
+     * @param schemaVersion
+     *            the schema version to decode the object. null indicates using latest version.
+     * @return the deserialized object
+     */
+    default T decode(InputStream input, byte[] schemaVersion) {
+        try {
+            return decode(IOUtils.readAllBytes(input), schemaVersion);

Review Comment:
   ```suggestion
               return decode(org.apache.commons.io.IOUtils.toByteArray(input), schemaVersion);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #16659: [schema][client][improve] Add decode InputStream for Schema

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #16659:
URL: https://github.com/apache/pulsar/pull/16659#discussion_r932017143


##########
pulsar-client-api/src/main/java/org/apache/pulsar/common/utils/IOUtils.java:
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Utils for I/O operations.
+ * Copied from JDK9 InputStream readAllBytes and readNBytes, this can be remove after JDK version upgrade to JDK9.
+ */
+public final class IOUtils {

Review Comment:
   this class seems not to be used



##########
pulsar-client/src/main/java/org/apache/pulsar/client/util/ReadonlyByteBufInputStream.java:
##########
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.util;
+
+import io.netty.buffer.ByteBuf;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Wrap ByteBuf to InputStream, this will not move ByteBuf readerIndex.
+ */
+public final class ReadonlyByteBufInputStream extends InputStream {

Review Comment:
   this class seems not to be used



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- merged pull request #16659: [schema][client][improve] Add decode InputStream for Schema

Posted by GitBox <gi...@apache.org>.
Technoboy- merged PR #16659:
URL: https://github.com/apache/pulsar/pull/16659


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on a diff in pull request #16659: [schema][client][improve] Add decode InputStream for Schema

Posted by GitBox <gi...@apache.org>.
tjiuming commented on code in PR #16659:
URL: https://github.com/apache/pulsar/pull/16659#discussion_r925289248


##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java:
##########
@@ -120,6 +123,36 @@ default T decode(byte[] bytes, byte[] schemaVersion) {
         return decode(bytes);
     }
 
+    /**
+     * Decode a byte array into an object using the schema definition and deserializer implementation.
+     *
+     * @param input the inputstream to decode
+     * @return the deserialized object
+     */
+    default T decode(InputStream input) {
+        try {
+            return decode(IOUtils.readAllBytes(input));

Review Comment:
   There is no apache-commons-io dependency in client-api, so it can't be work. same as below



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #16659: [schema][client][improve] Add decode InputStream for Schema

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #16659:
URL: https://github.com/apache/pulsar/pull/16659#discussion_r925282127


##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java:
##########
@@ -120,6 +123,36 @@ default T decode(byte[] bytes, byte[] schemaVersion) {
         return decode(bytes);
     }
 
+    /**
+     * Decode a byte array into an object using the schema definition and deserializer implementation.
+     *
+     * @param input the inputstream to decode
+     * @return the deserialized object
+     */
+    default T decode(InputStream input) {
+        try {
+            return decode(IOUtils.readAllBytes(input));

Review Comment:
   ```suggestion
               return decode(org.apache.commons.io.IOUtils.toByteArray(input));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nahguam commented on a diff in pull request #16659: [schema][client][improve] Add decode InputStream for Schema

Posted by GitBox <gi...@apache.org>.
nahguam commented on code in PR #16659:
URL: https://github.com/apache/pulsar/pull/16659#discussion_r925564869


##########
pulsar-client-api/src/main/java/org/apache/pulsar/common/utils/IOUtils.java:
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Utils for I/O operations.
+ */
+public final class IOUtils {

Review Comment:
   Do you mean it was copied from JDK 9+ `InputStream.readAllBytes()`?
   
   Would it be useful to put a comment in the code to describe it as such? Perhaps also a TODO to remove it when the client code compilation target is upgraded to >= 9.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on a diff in pull request #16659: [schema][client][improve] Add decode InputStream for Schema

Posted by GitBox <gi...@apache.org>.
tjiuming commented on code in PR #16659:
URL: https://github.com/apache/pulsar/pull/16659#discussion_r925601975


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java:
##########
@@ -489,9 +490,9 @@ private T decodeBySchema(byte[] schemaVersion) {
             return value;
         }
         if (null == schemaVersion) {
-            return schema.decode(getData());
+            return schema.decode(new ByteBufInputStream(payload));

Review Comment:
   yes, wrap ByteBuf to InputStream, and SchemaReader supports decode InputStream, it won't read all bytes into a byte array directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on pull request #16659: [schema][client][improve] Add decode InputStream for Schema

Posted by GitBox <gi...@apache.org>.
tjiuming commented on PR #16659:
URL: https://github.com/apache/pulsar/pull/16659#issuecomment-1201963797

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org