You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/11/03 13:40:00 UTC

[GitHub] [pulsar] hnail opened a new pull request #8422: [SQL][WIP]migrate SchemaHandle to Presto-decoder

hnail opened a new pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422


   Fixes #4747 、 #7652
   
   ### Motivation
    
   *Explain here the context, and why you're making that change. What is the problem you're trying to solve.*
   
   ### Modifications
   
   *Describe the modifications you've done.*
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API: (yes / no)
     - The schema: (yes / no / don't know)
     - The default values of configurations: (yes / no)
     - The wire protocol: (yes / no)
     - The rest endpoints: (yes / no)
     - The admin cli options: (yes / no)
     - Anything that affects deployment: (yes / no / don't know)
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
     - If a feature is not applicable for documentation, explain why?
     - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jiazhai commented on pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
jiazhai commented on pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#issuecomment-731749080


   @gaoran10  to help review this PR


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#issuecomment-770232003


   @hnail @gaoran10 Could you please help take a look the failed test? 
   ```
   Error:  Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:testCompile (default-testCompile) on project pulsar-presto-connector-original: Compilation failure
   Error:  /Users/runner/work/pulsar/pulsar/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java:[352,40] method asyncReadEntries in class org.apache.bookkeeper.mledger.impl.ManagedCursorImpl cannot be applied to given types;
   Error:    required: int,org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback,java.lang.Object,org.apache.bookkeeper.mledger.impl.PositionImpl
   Error:    found: int,java.lang.Object,java.lang.Object
   Error:    reason: actual and formal argument lists differ in length
   ```
   The interesting thing this why other tests are passed. Might be related to the other tests only run the install for profile `core-modules` ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hnail commented on pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
hnail commented on pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#issuecomment-762642000


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#issuecomment-728667500


   move to 2.8.0 first.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hnail commented on a change in pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
hnail commented on a change in pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#discussion_r547767525



##########
File path: pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
##########
@@ -420,17 +435,92 @@ public boolean advanceNextPosition() {
         //start time for deseralizing record
         metricsTracker.start_RECORD_DESERIALIZE_TIME();
 
-        if (this.schemaHandler instanceof KeyValueSchemaHandler) {
-            ByteBuf keyByteBuf = null;
+        SchemaInfo schemaInfo;
+        try {
+            schemaInfo =  schemaInfoProvider.getSchemaByVersion(this.currentMessage.getSchemaVersion()).get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+
+        Map<ColumnHandle, FieldValueProvider> currentRowValuesMap = new HashMap<>();
+
+        if (schemaInfo.getType().equals(SchemaType.KEY_VALUE)) {
+
+            PulsarRowDecoder keyDecoder = decoderFactory.createRowDecoder(topicName,
+                    schemaInfo,
+                    columnHandles.stream()
+                            .filter(col -> !col.isInternal())
+                            .filter(col -> PulsarColumnHandle.HandleKeyValueType.KEY
+                                    .equals(col.getHandleKeyValueType()))
+                            .collect(toImmutableSet()));
+
+            PulsarRowDecoder messageDecoder = decoderFactory.createRowDecoder(topicName,
+                    schemaInfo,
+                    columnHandles.stream()
+                            .filter(col -> !col.isInternal())
+                            .filter(col -> PulsarColumnHandle.HandleKeyValueType.VALUE
+                                    .equals(col.getHandleKeyValueType()))
+                            .collect(toImmutableSet()));
+
+            Optional<Map<DecoderColumnHandle, FieldValueProvider>> decodedKey;
             if (this.currentMessage.getKeyBytes().isPresent()) {
-                keyByteBuf = this.currentMessage.getKeyBytes().get();
+                decodedKey = keyDecoder.decodeRow(this.currentMessage.getKeyBytes().get());
+                decodedKey.ifPresent(currentRowValuesMap::putAll);
             }
-            currentRecord = this.schemaHandler.deserialize(keyByteBuf,
-                    this.currentMessage.getData(), this.currentMessage.getSchemaVersion());
+            Optional<Map<DecoderColumnHandle, FieldValueProvider>> decodedValue =
+                    messageDecoder.decodeRow(this.currentMessage.getData());
+            decodedValue.ifPresent(currentRowValuesMap::putAll);
         } else {
-            currentRecord = this.schemaHandler.deserialize(this.currentMessage.getData(),
-                    this.currentMessage.getSchemaVersion());
+            PulsarRowDecoder messageDecoder = decoderFactory.createRowDecoder(topicName,
+                    schemaInfo,
+                    columnHandles.stream()
+                            .filter(col -> !col.isInternal())
+                            .filter(col -> PulsarColumnHandle.HandleKeyValueType.NONE
+                                    .equals(col.getHandleKeyValueType()))
+                            .collect(toImmutableSet()));
+            Optional<Map<DecoderColumnHandle, FieldValueProvider>> decodedValue =
+                    messageDecoder.decodeRow(this.currentMessage.getData());
+            decodedValue.ifPresent(currentRowValuesMap::putAll);
+        }
+
+        for (DecoderColumnHandle columnHandle : columnHandles) {
+            if (columnHandle.isInternal()) {
+                if (PulsarInternalColumn.PARTITION.getName().equals(columnHandle.getName())) {

Review comment:
       thanks for the suggest, PulsarInternalColumn is `static` & `final` field, maybe this improvement is relatively small ? If we want optimize this, i suggest add another individual PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hnail commented on a change in pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
hnail commented on a change in pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#discussion_r547789188



##########
File path: pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
##########
@@ -179,28 +150,13 @@
     public static class Bar {
         public Integer field1;
         public String field2;
-        public Boo test;

Review comment:
       Yes, `Foo.bar` tested in main modules, and `TestAvroDecoder\TestJsonDecoder` has also test nest object in Decoder.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hnail commented on a change in pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
hnail commented on a change in pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#discussion_r547797081



##########
File path: pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarKeyValueSchemaHandler.java
##########
@@ -1,353 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.sql.presto;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.prestosql.spi.connector.ColumnMetadata;
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
-import lombok.Data;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.impl.schema.KeyValueSchema;
-import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
-import org.apache.pulsar.common.api.raw.RawMessage;
-import org.apache.pulsar.common.api.raw.RawMessageImpl;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.schema.KeyValue;
-import org.apache.pulsar.common.schema.KeyValueEncodingType;
-import org.apache.pulsar.common.schema.SchemaInfo;
-import org.mockito.Mockito;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import static org.mockito.Mockito.mock;
-
-
-/**
- * Unit test for KeyValueSchemaHandler
- */
-@Slf4j
-public class TestPulsarKeyValueSchemaHandler {

Review comment:
       Thanks for the remind, added `TestPulsarRecordCursor.TestKeyValueXXXSchema`. In current version, KeyValue is a single-layer container schema without separate decoder, this Keep the codeStyle similar with presto-kafka.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hnail commented on a change in pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
hnail commented on a change in pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#discussion_r547756546



##########
File path: pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnMetadata.java
##########
@@ -128,9 +117,91 @@ public int hashCode() {
         int result = super.hashCode();
         result = 31 * result + (isInternal ? 1 : 0);
         result = 31 * result + (nameWithCase != null ? nameWithCase.hashCode() : 0);
-        result = 31 * result + Arrays.hashCode(fieldNames);
-        result = 31 * result + Arrays.hashCode(positionIndices);

Review comment:
       thanks for the remind, fixed as suggestion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hnail edited a comment on pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
hnail edited a comment on pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#issuecomment-750006123


   > @hnail Great work. Could you rebase the branch with apache/master first?
   
   @gaoran10 Thanks a lot for your review and help with this PR, i fixed as your suggest. can you help re-review it ? 
   
   ps : _Sorry for late response,also._
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
sijie commented on pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#issuecomment-735527924


   @hnail Can you rebase this pull request to the latest master?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on a change in pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on a change in pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#discussion_r532628448



##########
File path: pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarDispatchingRowDecoderFactory.java
##########
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.sql.presto;
+
+import static java.lang.String.format;
+
+import com.google.inject.Inject;
+
+import io.airlift.log.Logger;
+
+import io.prestosql.decoder.DecoderColumnHandle;
+import io.prestosql.spi.connector.ColumnMetadata;
+import io.prestosql.spi.type.TypeManager;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.sql.presto.decoder.avro.PulsarAvroRowDecoderFactory;
+import org.apache.pulsar.sql.presto.decoder.json.PulsarJsonRowDecoderFactory;
+import org.apache.pulsar.sql.presto.decoder.primitive.PulsarPrimitiveRowDecoderFactory;
+
+/**
+ * dispatcher RowDecoderFactory for {@link org.apache.pulsar.common.schema.SchemaType}.
+ */
+public class PulsarDispatchingRowDecoderFactory {
+
+    private static final Logger log = Logger.get(PulsarDispatchingRowDecoderFactory.class);
+
+    private TypeManager typeManager;
+
+    @Inject
+    public PulsarDispatchingRowDecoderFactory(TypeManager typeManager) {
+        this.typeManager = typeManager;
+    }
+
+    public PulsarRowDecoder createRowDecoder(TopicName topicName, SchemaInfo schemaInfo,
+                                             Set<DecoderColumnHandle> columns) {
+        PulsarRowDecoderFactory rowDecoderFactory = createDecoderFactory(schemaInfo);
+        return rowDecoderFactory.createRowDecoder(topicName, schemaInfo, columns);
+    }
+
+    public List<ColumnMetadata> extractColumnMetadata(TopicName topicName, SchemaInfo schemaInfo,
+                                                      PulsarColumnHandle.HandleKeyValueType handleKeyValueType) {
+        PulsarRowDecoderFactory rowDecoderFactory = createDecoderFactory(schemaInfo);
+        return rowDecoderFactory.extractColumnMetadata(topicName, schemaInfo, handleKeyValueType);
+    }
+
+    private PulsarRowDecoderFactory createDecoderFactory(SchemaInfo schemaInfo) {

Review comment:
       The various decoder factories could be initialized at the Pulsar SQL beginning, it could be reused.

##########
File path: pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarDispatchingRowDecoderFactory.java
##########
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.sql.presto;
+
+import static java.lang.String.format;
+
+import com.google.inject.Inject;
+
+import io.airlift.log.Logger;
+
+import io.prestosql.decoder.DecoderColumnHandle;
+import io.prestosql.spi.connector.ColumnMetadata;
+import io.prestosql.spi.type.TypeManager;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.sql.presto.decoder.avro.PulsarAvroRowDecoderFactory;
+import org.apache.pulsar.sql.presto.decoder.json.PulsarJsonRowDecoderFactory;
+import org.apache.pulsar.sql.presto.decoder.primitive.PulsarPrimitiveRowDecoderFactory;
+
+/**
+ * dispatcher RowDecoderFactory for {@link org.apache.pulsar.common.schema.SchemaType}.
+ */
+public class PulsarDispatchingRowDecoderFactory {
+
+    private static final Logger log = Logger.get(PulsarDispatchingRowDecoderFactory.class);
+
+    private TypeManager typeManager;
+
+    @Inject
+    public PulsarDispatchingRowDecoderFactory(TypeManager typeManager) {
+        this.typeManager = typeManager;
+    }
+
+    public PulsarRowDecoder createRowDecoder(TopicName topicName, SchemaInfo schemaInfo,
+                                             Set<DecoderColumnHandle> columns) {
+        PulsarRowDecoderFactory rowDecoderFactory = createDecoderFactory(schemaInfo);
+        return rowDecoderFactory.createRowDecoder(topicName, schemaInfo, columns);
+    }
+
+    public List<ColumnMetadata> extractColumnMetadata(TopicName topicName, SchemaInfo schemaInfo,
+                                                      PulsarColumnHandle.HandleKeyValueType handleKeyValueType) {
+        PulsarRowDecoderFactory rowDecoderFactory = createDecoderFactory(schemaInfo);
+        return rowDecoderFactory.extractColumnMetadata(topicName, schemaInfo, handleKeyValueType);
+    }
+
+    private PulsarRowDecoderFactory createDecoderFactory(SchemaInfo schemaInfo) {

Review comment:
       The various decoder factories could be initialized at the Pulsar SQL beginning, they could be reused.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hnail edited a comment on pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
hnail edited a comment on pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#issuecomment-750006123


   > @hnail Great work. Could you rebase the branch with apache/master first?
   
   @gaoran10 Thanks a lot for your review and help with this PR, i fixed as your suggest. can you help re-review it ? 
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hnail commented on a change in pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
hnail commented on a change in pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#discussion_r547783095



##########
File path: pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoderFactory.java
##########
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.sql.presto.decoder.avro;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
+import static io.prestosql.spi.type.DateType.DATE;
+import static io.prestosql.spi.type.TimeType.TIME;
+import static io.prestosql.spi.type.VarcharType.createUnboundedVarcharType;
+import static java.lang.String.format;
+import static java.util.stream.Collectors.toList;
+
+import com.google.common.collect.ImmutableList;
+import io.prestosql.decoder.DecoderColumnHandle;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.connector.ColumnMetadata;
+import io.prestosql.spi.type.ArrayType;
+import io.prestosql.spi.type.BigintType;
+import io.prestosql.spi.type.BooleanType;
+import io.prestosql.spi.type.DoubleType;
+import io.prestosql.spi.type.IntegerType;
+import io.prestosql.spi.type.RealType;
+import io.prestosql.spi.type.RowType;
+import io.prestosql.spi.type.StandardTypes;
+import io.prestosql.spi.type.TimestampType;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.TypeManager;
+import io.prestosql.spi.type.TypeSignature;
+import io.prestosql.spi.type.TypeSignatureParameter;
+import io.prestosql.spi.type.VarbinaryType;
+import io.prestosql.spi.type.VarcharType;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaParseException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonSchema;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.sql.presto.PulsarColumnHandle;
+import org.apache.pulsar.sql.presto.PulsarColumnMetadata;
+import org.apache.pulsar.sql.presto.PulsarRowDecoder;
+import org.apache.pulsar.sql.presto.PulsarRowDecoderFactory;
+
+/**
+ * PulsarRowDecoderFactory for {@link org.apache.pulsar.common.schema.SchemaType#AVRO}.
+ */
+public class PulsarAvroRowDecoderFactory implements PulsarRowDecoderFactory {
+
+    private TypeManager typeManager;
+
+    public PulsarAvroRowDecoderFactory(TypeManager typeManager) {
+        this.typeManager = typeManager;
+    }
+
+    @Override
+    public PulsarRowDecoder createRowDecoder(TopicName topicName, SchemaInfo schemaInfo,
+                                             Set<DecoderColumnHandle> columns) {
+        return new PulsarAvroRowDecoder((GenericAvroSchema) GenericAvroSchema.of(schemaInfo), columns);
+    }
+
+    @Override
+    public List<ColumnMetadata> extractColumnMetadata(TopicName topicName, SchemaInfo schemaInfo,
+                                                      PulsarColumnHandle.HandleKeyValueType handleKeyValueType) {
+        String schemaJson = new String(schemaInfo.getSchema());
+        if (StringUtils.isBlank(schemaJson)) {
+            throw new PrestoException(NOT_SUPPORTED, "Topic "
+                    + topicName.toString() + " does not have a valid schema");
+        }
+        Schema schema;
+        try {
+            schema = GenericJsonSchema.of(schemaInfo).getAvroSchema();
+        } catch (SchemaParseException ex) {
+            throw new PrestoException(NOT_SUPPORTED, "Topic "
+                    + topicName.toString() + " does not have a valid schema");
+        }
+
+        //TODO : check schema cyclic definitions which may case java.lang.StackOverflowError

Review comment:
       I wanted to leave it to the future before, **As your suggest, fix it in this PR may be fine choice.**
   
   Circular reference is bad schema in relational database. ( e.g. _spark_ https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala#142).
   At current code position, Cyclic definition will case java.lang.StackOverflowError instead of OOM, so this only cause current query failed instead of JVM crash. (_so i wanted to leave it to the future before_)
   
   As your suggest , i add _StackOverflowError check_ (`TestAvroDecoder/TestJsonDecoder.testCyclicDefinitionDetect`) as a simple solution, if we need complex solution, we can add a separate PR ( based graph traversals: https://www.geeksforgeeks.org/detect-cycle-in-a-graph/) to optimize it.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jiazhai merged pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
jiazhai merged pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on a change in pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on a change in pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#discussion_r532604246



##########
File path: pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnMetadata.java
##########
@@ -88,8 +85,6 @@ public String toString() {
         return "PulsarColumnMetadata{"
             + "isInternal=" + isInternal
             + ", nameWithCase='" + nameWithCase + '\''
-            + ", fieldNames=" + Arrays.toString(fieldNames)
-            + ", positionIndices=" + Arrays.toString(positionIndices)

Review comment:
       Does the `DecoderExtraInfo` need to be printed?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jiazhai commented on pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
jiazhai commented on pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#issuecomment-769506253


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on a change in pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on a change in pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#discussion_r532660123



##########
File path: pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
##########
@@ -420,17 +435,92 @@ public boolean advanceNextPosition() {
         //start time for deseralizing record
         metricsTracker.start_RECORD_DESERIALIZE_TIME();
 
-        if (this.schemaHandler instanceof KeyValueSchemaHandler) {
-            ByteBuf keyByteBuf = null;
+        SchemaInfo schemaInfo;
+        try {
+            schemaInfo =  schemaInfoProvider.getSchemaByVersion(this.currentMessage.getSchemaVersion()).get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+
+        Map<ColumnHandle, FieldValueProvider> currentRowValuesMap = new HashMap<>();
+
+        if (schemaInfo.getType().equals(SchemaType.KEY_VALUE)) {
+
+            PulsarRowDecoder keyDecoder = decoderFactory.createRowDecoder(topicName,

Review comment:
       It seems that the multi-version schema decoder cache could be added and the decoder could be reused.

##########
File path: pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoderFactory.java
##########
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.sql.presto.decoder.avro;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
+import static io.prestosql.spi.type.DateType.DATE;
+import static io.prestosql.spi.type.TimeType.TIME;
+import static io.prestosql.spi.type.VarcharType.createUnboundedVarcharType;
+import static java.lang.String.format;
+import static java.util.stream.Collectors.toList;
+
+import com.google.common.collect.ImmutableList;
+import io.prestosql.decoder.DecoderColumnHandle;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.connector.ColumnMetadata;
+import io.prestosql.spi.type.ArrayType;
+import io.prestosql.spi.type.BigintType;
+import io.prestosql.spi.type.BooleanType;
+import io.prestosql.spi.type.DoubleType;
+import io.prestosql.spi.type.IntegerType;
+import io.prestosql.spi.type.RealType;
+import io.prestosql.spi.type.RowType;
+import io.prestosql.spi.type.StandardTypes;
+import io.prestosql.spi.type.TimestampType;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.TypeManager;
+import io.prestosql.spi.type.TypeSignature;
+import io.prestosql.spi.type.TypeSignatureParameter;
+import io.prestosql.spi.type.VarbinaryType;
+import io.prestosql.spi.type.VarcharType;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaParseException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonSchema;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.sql.presto.PulsarColumnHandle;
+import org.apache.pulsar.sql.presto.PulsarColumnMetadata;
+import org.apache.pulsar.sql.presto.PulsarRowDecoder;
+import org.apache.pulsar.sql.presto.PulsarRowDecoderFactory;
+
+/**
+ * PulsarRowDecoderFactory for {@link org.apache.pulsar.common.schema.SchemaType#AVRO}.
+ */
+public class PulsarAvroRowDecoderFactory implements PulsarRowDecoderFactory {
+
+    private TypeManager typeManager;
+
+    public PulsarAvroRowDecoderFactory(TypeManager typeManager) {
+        this.typeManager = typeManager;
+    }
+
+    @Override
+    public PulsarRowDecoder createRowDecoder(TopicName topicName, SchemaInfo schemaInfo,
+                                             Set<DecoderColumnHandle> columns) {
+        return new PulsarAvroRowDecoder((GenericAvroSchema) GenericAvroSchema.of(schemaInfo), columns);
+    }
+
+    @Override
+    public List<ColumnMetadata> extractColumnMetadata(TopicName topicName, SchemaInfo schemaInfo,
+                                                      PulsarColumnHandle.HandleKeyValueType handleKeyValueType) {
+        String schemaJson = new String(schemaInfo.getSchema());
+        if (StringUtils.isBlank(schemaJson)) {
+            throw new PrestoException(NOT_SUPPORTED, "Topic "
+                    + topicName.toString() + " does not have a valid schema");
+        }
+        Schema schema;
+        try {
+            schema = GenericJsonSchema.of(schemaInfo).getAvroSchema();
+        } catch (SchemaParseException ex) {
+            throw new PrestoException(NOT_SUPPORTED, "Topic "
+                    + topicName.toString() + " does not have a valid schema");
+        }
+
+        //TODO : check schema cyclic definitions which may case java.lang.StackOverflowError

Review comment:
       Does this problem is fixed?

##########
File path: pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
##########
@@ -420,17 +435,92 @@ public boolean advanceNextPosition() {
         //start time for deseralizing record
         metricsTracker.start_RECORD_DESERIALIZE_TIME();
 
-        if (this.schemaHandler instanceof KeyValueSchemaHandler) {
-            ByteBuf keyByteBuf = null;
+        SchemaInfo schemaInfo;
+        try {
+            schemaInfo =  schemaInfoProvider.getSchemaByVersion(this.currentMessage.getSchemaVersion()).get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+
+        Map<ColumnHandle, FieldValueProvider> currentRowValuesMap = new HashMap<>();
+
+        if (schemaInfo.getType().equals(SchemaType.KEY_VALUE)) {
+
+            PulsarRowDecoder keyDecoder = decoderFactory.createRowDecoder(topicName,
+                    schemaInfo,
+                    columnHandles.stream()
+                            .filter(col -> !col.isInternal())
+                            .filter(col -> PulsarColumnHandle.HandleKeyValueType.KEY
+                                    .equals(col.getHandleKeyValueType()))
+                            .collect(toImmutableSet()));
+
+            PulsarRowDecoder messageDecoder = decoderFactory.createRowDecoder(topicName,

Review comment:
       same as above.

##########
File path: pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
##########
@@ -110,16 +81,20 @@
 
     protected Map<TopicName, PulsarRecordCursor> pulsarRecordCursors = new HashMap<>();
 
+    protected static PulsarDispatchingRowDecoderFactory dispatchingRowDecoderFactory;
+
     protected final static PulsarConnectorId pulsarConnectorId = new PulsarConnectorId("test-connector");
 
-    protected static List<TopicName> topicNames;
-    protected static List<TopicName> partitionedTopicNames;
+    protected static List<TopicName>  topicNames;
+    protected static List<TopicName>  partitionedTopicNames;

Review comment:
       ```suggestion
       protected static List<TopicName> partitionedTopicNames;
   ```

##########
File path: pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoderFactory.java
##########
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.sql.presto.decoder.avro;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
+import static io.prestosql.spi.type.DateType.DATE;
+import static io.prestosql.spi.type.TimeType.TIME;
+import static io.prestosql.spi.type.VarcharType.createUnboundedVarcharType;
+import static java.lang.String.format;
+import static java.util.stream.Collectors.toList;
+
+import com.google.common.collect.ImmutableList;
+import io.prestosql.decoder.DecoderColumnHandle;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.connector.ColumnMetadata;
+import io.prestosql.spi.type.ArrayType;
+import io.prestosql.spi.type.BigintType;
+import io.prestosql.spi.type.BooleanType;
+import io.prestosql.spi.type.DoubleType;
+import io.prestosql.spi.type.IntegerType;
+import io.prestosql.spi.type.RealType;
+import io.prestosql.spi.type.RowType;
+import io.prestosql.spi.type.StandardTypes;
+import io.prestosql.spi.type.TimestampType;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.TypeManager;
+import io.prestosql.spi.type.TypeSignature;
+import io.prestosql.spi.type.TypeSignatureParameter;
+import io.prestosql.spi.type.VarbinaryType;
+import io.prestosql.spi.type.VarcharType;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaParseException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonSchema;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.sql.presto.PulsarColumnHandle;
+import org.apache.pulsar.sql.presto.PulsarColumnMetadata;
+import org.apache.pulsar.sql.presto.PulsarRowDecoder;
+import org.apache.pulsar.sql.presto.PulsarRowDecoderFactory;
+
+/**
+ * PulsarRowDecoderFactory for {@link org.apache.pulsar.common.schema.SchemaType#AVRO}.
+ */
+public class PulsarAvroRowDecoderFactory implements PulsarRowDecoderFactory {
+
+    private TypeManager typeManager;
+
+    public PulsarAvroRowDecoderFactory(TypeManager typeManager) {
+        this.typeManager = typeManager;
+    }
+
+    @Override
+    public PulsarRowDecoder createRowDecoder(TopicName topicName, SchemaInfo schemaInfo,
+                                             Set<DecoderColumnHandle> columns) {
+        return new PulsarAvroRowDecoder((GenericAvroSchema) GenericAvroSchema.of(schemaInfo), columns);
+    }
+
+    @Override
+    public List<ColumnMetadata> extractColumnMetadata(TopicName topicName, SchemaInfo schemaInfo,
+                                                      PulsarColumnHandle.HandleKeyValueType handleKeyValueType) {
+        String schemaJson = new String(schemaInfo.getSchema());
+        if (StringUtils.isBlank(schemaJson)) {
+            throw new PrestoException(NOT_SUPPORTED, "Topic "
+                    + topicName.toString() + " does not have a valid schema");
+        }
+        Schema schema;
+        try {
+            schema = GenericJsonSchema.of(schemaInfo).getAvroSchema();
+        } catch (SchemaParseException ex) {
+            throw new PrestoException(NOT_SUPPORTED, "Topic "
+                    + topicName.toString() + " does not have a valid schema");
+        }
+
+        //TODO : check schema cyclic definitions which may case java.lang.StackOverflowError
+
+        return schema.getFields().stream()
+                .map(field ->
+                        new PulsarColumnMetadata(field.name(), parseAvroPrestoType(
+                                field.name(), field.schema()), field.schema().toString(), null, false, false,
+                                handleKeyValueType, new PulsarColumnMetadata.DecoderExtraInfo(field.name(),
+                                null, null))
+
+                ).collect(toList());
+    }
+
+    private Type parseAvroPrestoType(String fieldname, Schema schema) {
+        Schema.Type type = schema.getType();
+        LogicalType logicalType  = schema.getLogicalType();
+        switch (type) {
+            case STRING:
+            case ENUM:
+                return createUnboundedVarcharType();
+            case NULL:
+                throw new UnsupportedOperationException(
+                        format("field '%s' NULL type code should not be reached,"
+                                + "please check the schema or report the bug.", fieldname));
+            case FIXED:
+            case BYTES:
+                //TODO: support decimal logicalType
+                return VarbinaryType.VARBINARY;
+            case INT:
+                if (logicalType == LogicalTypes.timeMillis()) {
+                    return TIME;
+                } else if (logicalType == LogicalTypes.date()) {
+                    return DATE;
+                }
+                return IntegerType.INTEGER;
+            case LONG:
+                if (logicalType == LogicalTypes.timestampMillis()) {
+                    return TimestampType.TIMESTAMP;
+                }
+                //TODO:  support timestamp_microseconds logicalType : https://github.com/prestosql/presto/issues/1284

Review comment:
       same as above.

##########
File path: pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
##########
@@ -179,28 +150,13 @@
     public static class Bar {
         public Integer field1;
         public String field2;
-        public Boo test;

Review comment:
       Is there any test to test the nest object query?

##########
File path: pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
##########
@@ -420,17 +435,92 @@ public boolean advanceNextPosition() {
         //start time for deseralizing record
         metricsTracker.start_RECORD_DESERIALIZE_TIME();
 
-        if (this.schemaHandler instanceof KeyValueSchemaHandler) {
-            ByteBuf keyByteBuf = null;
+        SchemaInfo schemaInfo;
+        try {
+            schemaInfo =  schemaInfoProvider.getSchemaByVersion(this.currentMessage.getSchemaVersion()).get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+
+        Map<ColumnHandle, FieldValueProvider> currentRowValuesMap = new HashMap<>();
+
+        if (schemaInfo.getType().equals(SchemaType.KEY_VALUE)) {
+
+            PulsarRowDecoder keyDecoder = decoderFactory.createRowDecoder(topicName,
+                    schemaInfo,
+                    columnHandles.stream()
+                            .filter(col -> !col.isInternal())
+                            .filter(col -> PulsarColumnHandle.HandleKeyValueType.KEY
+                                    .equals(col.getHandleKeyValueType()))
+                            .collect(toImmutableSet()));
+
+            PulsarRowDecoder messageDecoder = decoderFactory.createRowDecoder(topicName,
+                    schemaInfo,
+                    columnHandles.stream()
+                            .filter(col -> !col.isInternal())
+                            .filter(col -> PulsarColumnHandle.HandleKeyValueType.VALUE
+                                    .equals(col.getHandleKeyValueType()))
+                            .collect(toImmutableSet()));
+
+            Optional<Map<DecoderColumnHandle, FieldValueProvider>> decodedKey;
             if (this.currentMessage.getKeyBytes().isPresent()) {
-                keyByteBuf = this.currentMessage.getKeyBytes().get();
+                decodedKey = keyDecoder.decodeRow(this.currentMessage.getKeyBytes().get());
+                decodedKey.ifPresent(currentRowValuesMap::putAll);
             }
-            currentRecord = this.schemaHandler.deserialize(keyByteBuf,
-                    this.currentMessage.getData(), this.currentMessage.getSchemaVersion());
+            Optional<Map<DecoderColumnHandle, FieldValueProvider>> decodedValue =
+                    messageDecoder.decodeRow(this.currentMessage.getData());
+            decodedValue.ifPresent(currentRowValuesMap::putAll);
         } else {
-            currentRecord = this.schemaHandler.deserialize(this.currentMessage.getData(),
-                    this.currentMessage.getSchemaVersion());
+            PulsarRowDecoder messageDecoder = decoderFactory.createRowDecoder(topicName,
+                    schemaInfo,
+                    columnHandles.stream()
+                            .filter(col -> !col.isInternal())
+                            .filter(col -> PulsarColumnHandle.HandleKeyValueType.NONE
+                                    .equals(col.getHandleKeyValueType()))
+                            .collect(toImmutableSet()));
+            Optional<Map<DecoderColumnHandle, FieldValueProvider>> decodedValue =
+                    messageDecoder.decodeRow(this.currentMessage.getData());
+            decodedValue.ifPresent(currentRowValuesMap::putAll);
+        }
+
+        for (DecoderColumnHandle columnHandle : columnHandles) {
+            if (columnHandle.isInternal()) {
+                if (PulsarInternalColumn.PARTITION.getName().equals(columnHandle.getName())) {

Review comment:
       The switch is more efficient and the `PulsarInternalColumn` could be changed to an enum.

##########
File path: pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java
##########
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.sql.presto.decoder.json;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
+import static io.prestosql.spi.type.DateType.DATE;
+import static io.prestosql.spi.type.TimeType.TIME;
+import static io.prestosql.spi.type.VarcharType.createUnboundedVarcharType;
+import static java.lang.String.format;
+import static java.util.stream.Collectors.toList;
+
+import com.google.common.collect.ImmutableList;
+import io.prestosql.decoder.DecoderColumnHandle;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.connector.ColumnMetadata;
+import io.prestosql.spi.type.ArrayType;
+import io.prestosql.spi.type.BigintType;
+import io.prestosql.spi.type.BooleanType;
+import io.prestosql.spi.type.DoubleType;
+import io.prestosql.spi.type.IntegerType;
+import io.prestosql.spi.type.RealType;
+import io.prestosql.spi.type.RowType;
+import io.prestosql.spi.type.StandardTypes;
+import io.prestosql.spi.type.TimestampType;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.TypeManager;
+import io.prestosql.spi.type.TypeSignature;
+import io.prestosql.spi.type.TypeSignatureParameter;
+import io.prestosql.spi.type.VarbinaryType;
+import io.prestosql.spi.type.VarcharType;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaParseException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonSchema;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.sql.presto.PulsarColumnHandle;
+import org.apache.pulsar.sql.presto.PulsarColumnMetadata;
+import org.apache.pulsar.sql.presto.PulsarRowDecoderFactory;
+
+/**
+ * PulsarRowDecoderFactory for {@link org.apache.pulsar.common.schema.SchemaType#JSON}.
+ */
+public class PulsarJsonRowDecoderFactory implements PulsarRowDecoderFactory {
+
+    private TypeManager typeManager;
+
+    public PulsarJsonRowDecoderFactory(TypeManager typeManager) {
+        this.typeManager = typeManager;
+    }
+
+    @Override
+    public PulsarJsonRowDecoder createRowDecoder(TopicName topicName, SchemaInfo schemaInfo,
+                                                 Set<DecoderColumnHandle> columns) {
+        return new PulsarJsonRowDecoder((GenericJsonSchema) GenericJsonSchema.of(schemaInfo), columns);
+    }
+
+    @Override
+    public List<ColumnMetadata> extractColumnMetadata(TopicName topicName, SchemaInfo schemaInfo,
+                                                      PulsarColumnHandle.HandleKeyValueType handleKeyValueType) {
+        String schemaJson = new String(schemaInfo.getSchema());
+        if (StringUtils.isBlank(schemaJson)) {
+            throw new PrestoException(NOT_SUPPORTED, "Topic "
+                    + topicName.toString() + " does not have a valid schema");
+        }
+        Schema schema;
+        try {
+            schema = GenericJsonSchema.of(schemaInfo).getAvroSchema();
+        } catch (SchemaParseException ex) {
+            throw new PrestoException(NOT_SUPPORTED, "Topic "
+                    + topicName.toString() + " does not have a valid schema");
+        }
+
+        //TODO : check schema cyclic definitions which may case java.lang.StackOverflowError

Review comment:
       Does this problem is fixed?

##########
File path: pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
##########
@@ -110,16 +81,20 @@
 
     protected Map<TopicName, PulsarRecordCursor> pulsarRecordCursors = new HashMap<>();
 
+    protected static PulsarDispatchingRowDecoderFactory dispatchingRowDecoderFactory;
+
     protected final static PulsarConnectorId pulsarConnectorId = new PulsarConnectorId("test-connector");
 
-    protected static List<TopicName> topicNames;
-    protected static List<TopicName> partitionedTopicNames;
+    protected static List<TopicName>  topicNames;

Review comment:
       ```suggestion
       protected static List<TopicName> topicNames;
   ```

##########
File path: pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoderFactory.java
##########
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.sql.presto.decoder.avro;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
+import static io.prestosql.spi.type.DateType.DATE;
+import static io.prestosql.spi.type.TimeType.TIME;
+import static io.prestosql.spi.type.VarcharType.createUnboundedVarcharType;
+import static java.lang.String.format;
+import static java.util.stream.Collectors.toList;
+
+import com.google.common.collect.ImmutableList;
+import io.prestosql.decoder.DecoderColumnHandle;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.connector.ColumnMetadata;
+import io.prestosql.spi.type.ArrayType;
+import io.prestosql.spi.type.BigintType;
+import io.prestosql.spi.type.BooleanType;
+import io.prestosql.spi.type.DoubleType;
+import io.prestosql.spi.type.IntegerType;
+import io.prestosql.spi.type.RealType;
+import io.prestosql.spi.type.RowType;
+import io.prestosql.spi.type.StandardTypes;
+import io.prestosql.spi.type.TimestampType;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.TypeManager;
+import io.prestosql.spi.type.TypeSignature;
+import io.prestosql.spi.type.TypeSignatureParameter;
+import io.prestosql.spi.type.VarbinaryType;
+import io.prestosql.spi.type.VarcharType;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaParseException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonSchema;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.sql.presto.PulsarColumnHandle;
+import org.apache.pulsar.sql.presto.PulsarColumnMetadata;
+import org.apache.pulsar.sql.presto.PulsarRowDecoder;
+import org.apache.pulsar.sql.presto.PulsarRowDecoderFactory;
+
+/**
+ * PulsarRowDecoderFactory for {@link org.apache.pulsar.common.schema.SchemaType#AVRO}.
+ */
+public class PulsarAvroRowDecoderFactory implements PulsarRowDecoderFactory {
+
+    private TypeManager typeManager;
+
+    public PulsarAvroRowDecoderFactory(TypeManager typeManager) {
+        this.typeManager = typeManager;
+    }
+
+    @Override
+    public PulsarRowDecoder createRowDecoder(TopicName topicName, SchemaInfo schemaInfo,
+                                             Set<DecoderColumnHandle> columns) {
+        return new PulsarAvroRowDecoder((GenericAvroSchema) GenericAvroSchema.of(schemaInfo), columns);
+    }
+
+    @Override
+    public List<ColumnMetadata> extractColumnMetadata(TopicName topicName, SchemaInfo schemaInfo,
+                                                      PulsarColumnHandle.HandleKeyValueType handleKeyValueType) {
+        String schemaJson = new String(schemaInfo.getSchema());
+        if (StringUtils.isBlank(schemaJson)) {
+            throw new PrestoException(NOT_SUPPORTED, "Topic "
+                    + topicName.toString() + " does not have a valid schema");
+        }
+        Schema schema;
+        try {
+            schema = GenericJsonSchema.of(schemaInfo).getAvroSchema();
+        } catch (SchemaParseException ex) {
+            throw new PrestoException(NOT_SUPPORTED, "Topic "
+                    + topicName.toString() + " does not have a valid schema");
+        }
+
+        //TODO : check schema cyclic definitions which may case java.lang.StackOverflowError
+
+        return schema.getFields().stream()
+                .map(field ->
+                        new PulsarColumnMetadata(field.name(), parseAvroPrestoType(
+                                field.name(), field.schema()), field.schema().toString(), null, false, false,
+                                handleKeyValueType, new PulsarColumnMetadata.DecoderExtraInfo(field.name(),
+                                null, null))
+
+                ).collect(toList());
+    }
+
+    private Type parseAvroPrestoType(String fieldname, Schema schema) {
+        Schema.Type type = schema.getType();
+        LogicalType logicalType  = schema.getLogicalType();
+        switch (type) {
+            case STRING:
+            case ENUM:
+                return createUnboundedVarcharType();
+            case NULL:
+                throw new UnsupportedOperationException(
+                        format("field '%s' NULL type code should not be reached,"
+                                + "please check the schema or report the bug.", fieldname));
+            case FIXED:
+            case BYTES:
+                //TODO: support decimal logicalType

Review comment:
       This feature will be added in the future, right?

##########
File path: pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
##########
@@ -110,16 +81,20 @@
 
     protected Map<TopicName, PulsarRecordCursor> pulsarRecordCursors = new HashMap<>();
 
+    protected static PulsarDispatchingRowDecoderFactory dispatchingRowDecoderFactory;
+
     protected final static PulsarConnectorId pulsarConnectorId = new PulsarConnectorId("test-connector");
 
-    protected static List<TopicName> topicNames;
-    protected static List<TopicName> partitionedTopicNames;
+    protected static List<TopicName>  topicNames;
+    protected static List<TopicName>  partitionedTopicNames;
     protected static Map<String, Integer> partitionedTopicsToPartitions;
     protected static Map<String, SchemaInfo> topicsToSchemas;
     protected static Map<String, Long> topicsToNumEntries;
 
     private final static ObjectMapper objectMapper = new ObjectMapper();
 
+    protected static List<String> fooFieldNames =  new ArrayList<>();

Review comment:
       ```suggestion
       protected static List<String> fooFieldNames = new ArrayList<>();
   ```

##########
File path: pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarKeyValueSchemaHandler.java
##########
@@ -1,353 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.sql.presto;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.prestosql.spi.connector.ColumnMetadata;
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
-import lombok.Data;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.impl.schema.KeyValueSchema;
-import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
-import org.apache.pulsar.common.api.raw.RawMessage;
-import org.apache.pulsar.common.api.raw.RawMessageImpl;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.schema.KeyValue;
-import org.apache.pulsar.common.schema.KeyValueEncodingType;
-import org.apache.pulsar.common.schema.SchemaInfo;
-import org.mockito.Mockito;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import static org.mockito.Mockito.mock;
-
-
-/**
- * Unit test for KeyValueSchemaHandler
- */
-@Slf4j
-public class TestPulsarKeyValueSchemaHandler {

Review comment:
       Could you add a test for `KeyValueSchema` datas?

##########
File path: pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarPrimitiveSchemaHandler.java
##########
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.sql.presto;
-
-import io.netty.buffer.ByteBufAllocator;
-import io.prestosql.spi.connector.ColumnMetadata;
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.pulsar.client.impl.schema.BooleanSchema;
-import org.apache.pulsar.client.impl.schema.ByteSchema;
-import org.apache.pulsar.client.impl.schema.BytesSchema;
-import org.apache.pulsar.client.impl.schema.DateSchema;
-import org.apache.pulsar.client.impl.schema.DoubleSchema;
-import org.apache.pulsar.client.impl.schema.FloatSchema;
-import org.apache.pulsar.client.impl.schema.IntSchema;
-import org.apache.pulsar.client.impl.schema.LongSchema;
-import org.apache.pulsar.client.impl.schema.ShortSchema;
-import org.apache.pulsar.client.impl.schema.StringSchema;
-import org.apache.pulsar.client.impl.schema.TimeSchema;
-import org.apache.pulsar.client.impl.schema.TimestampSchema;
-import org.apache.pulsar.common.api.raw.RawMessage;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.schema.SchemaInfo;
-import org.apache.pulsar.common.schema.SchemaType;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.util.Date;
-import java.util.List;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-@Slf4j
-public class TestPulsarPrimitiveSchemaHandler {

Review comment:
       Could you add a test for `PulsarPrimitiveRowDecoder`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hnail edited a comment on pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
hnail edited a comment on pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#issuecomment-750006123


   > @hnail Great work. Could you rebase the branch with apache/master first?
   
   @gaoran10 Thanks a lot for your review and help with this PR, i fixed as your suggest. can you help re-review it ? 
   
   ps : _Sorry for late response._
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet commented on pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
Anonymitaet commented on pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#issuecomment-751991578


   Confirmed w/ @congbobo184, no need to update doc for this PR. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on a change in pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on a change in pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#discussion_r559068257



##########
File path: pulsar-sql/presto-pulsar/pom.xml
##########
@@ -109,6 +109,32 @@
           <version>${joda.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>io.prestosql</groupId>
+            <artifactId>presto-record-decoder</artifactId>
+            <version>${presto.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>pulsar-client-original</artifactId>
+            <version>${project.version}</version>
+        </dependency>

Review comment:
       It seems that the dependency `pulsar-client-original` is already in the `pulsar-client-admin-original`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on a change in pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on a change in pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#discussion_r532628448



##########
File path: pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarDispatchingRowDecoderFactory.java
##########
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.sql.presto;
+
+import static java.lang.String.format;
+
+import com.google.inject.Inject;
+
+import io.airlift.log.Logger;
+
+import io.prestosql.decoder.DecoderColumnHandle;
+import io.prestosql.spi.connector.ColumnMetadata;
+import io.prestosql.spi.type.TypeManager;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.sql.presto.decoder.avro.PulsarAvroRowDecoderFactory;
+import org.apache.pulsar.sql.presto.decoder.json.PulsarJsonRowDecoderFactory;
+import org.apache.pulsar.sql.presto.decoder.primitive.PulsarPrimitiveRowDecoderFactory;
+
+/**
+ * dispatcher RowDecoderFactory for {@link org.apache.pulsar.common.schema.SchemaType}.
+ */
+public class PulsarDispatchingRowDecoderFactory {
+
+    private static final Logger log = Logger.get(PulsarDispatchingRowDecoderFactory.class);
+
+    private TypeManager typeManager;
+
+    @Inject
+    public PulsarDispatchingRowDecoderFactory(TypeManager typeManager) {
+        this.typeManager = typeManager;
+    }
+
+    public PulsarRowDecoder createRowDecoder(TopicName topicName, SchemaInfo schemaInfo,
+                                             Set<DecoderColumnHandle> columns) {
+        PulsarRowDecoderFactory rowDecoderFactory = createDecoderFactory(schemaInfo);
+        return rowDecoderFactory.createRowDecoder(topicName, schemaInfo, columns);
+    }
+
+    public List<ColumnMetadata> extractColumnMetadata(TopicName topicName, SchemaInfo schemaInfo,
+                                                      PulsarColumnHandle.HandleKeyValueType handleKeyValueType) {
+        PulsarRowDecoderFactory rowDecoderFactory = createDecoderFactory(schemaInfo);
+        return rowDecoderFactory.extractColumnMetadata(topicName, schemaInfo, handleKeyValueType);
+    }
+
+    private PulsarRowDecoderFactory createDecoderFactory(SchemaInfo schemaInfo) {

Review comment:
       The various decoder factories could be initialized at the Pulsar SQL beginning, one time is enough and they could be reused.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hnail commented on a change in pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
hnail commented on a change in pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#discussion_r547752566



##########
File path: pulsar-sql/presto-pulsar/pom.xml
##########
@@ -109,6 +109,32 @@
           <version>${joda.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>io.prestosql</groupId>
+            <artifactId>presto-record-decoder</artifactId>
+            <version>${presto.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>pulsar-client-original</artifactId>
+            <version>${project.version}</version>
+        </dependency>

Review comment:
       Yes , `PulsarRowDecoder.decodeRow()` call `GenericXXXSchema.decode(ByteBuf byteBuf) ` for GenericRecord.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hnail commented on a change in pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
hnail commented on a change in pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#discussion_r547763068



##########
File path: pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarDispatchingRowDecoderFactory.java
##########
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.sql.presto;
+
+import static java.lang.String.format;
+
+import com.google.inject.Inject;
+
+import io.airlift.log.Logger;
+
+import io.prestosql.decoder.DecoderColumnHandle;
+import io.prestosql.spi.connector.ColumnMetadata;
+import io.prestosql.spi.type.TypeManager;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.sql.presto.decoder.avro.PulsarAvroRowDecoderFactory;
+import org.apache.pulsar.sql.presto.decoder.json.PulsarJsonRowDecoderFactory;
+import org.apache.pulsar.sql.presto.decoder.primitive.PulsarPrimitiveRowDecoderFactory;
+
+/**
+ * dispatcher RowDecoderFactory for {@link org.apache.pulsar.common.schema.SchemaType}.
+ */
+public class PulsarDispatchingRowDecoderFactory {
+
+    private static final Logger log = Logger.get(PulsarDispatchingRowDecoderFactory.class);
+
+    private TypeManager typeManager;
+
+    @Inject
+    public PulsarDispatchingRowDecoderFactory(TypeManager typeManager) {
+        this.typeManager = typeManager;
+    }
+
+    public PulsarRowDecoder createRowDecoder(TopicName topicName, SchemaInfo schemaInfo,
+                                             Set<DecoderColumnHandle> columns) {
+        PulsarRowDecoderFactory rowDecoderFactory = createDecoderFactory(schemaInfo);
+        return rowDecoderFactory.createRowDecoder(topicName, schemaInfo, columns);
+    }
+
+    public List<ColumnMetadata> extractColumnMetadata(TopicName topicName, SchemaInfo schemaInfo,
+                                                      PulsarColumnHandle.HandleKeyValueType handleKeyValueType) {
+        PulsarRowDecoderFactory rowDecoderFactory = createDecoderFactory(schemaInfo);
+        return rowDecoderFactory.extractColumnMetadata(topicName, schemaInfo, handleKeyValueType);
+    }
+
+    private PulsarRowDecoderFactory createDecoderFactory(SchemaInfo schemaInfo) {

Review comment:
       @gaoran10 Agreed, That sounds like a good ieda to me. But i proposal can add another individual `PR` to optimize this , in current state, Make `PR-8422` more simpler for friendly review is a good choice?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hnail commented on pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
hnail commented on pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#issuecomment-770346111


   > @hnail @gaoran10 Could you please help take a look the failed test?
   > 
   > ```
   > Error:  Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:testCompile (default-testCompile) on project pulsar-presto-connector-original: Compilation failure
   > Error:  /Users/runner/work/pulsar/pulsar/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java:[352,40] method asyncReadEntries in class org.apache.bookkeeper.mledger.impl.ManagedCursorImpl cannot be applied to given types;
   > Error:    required: int,org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback,java.lang.Object,org.apache.bookkeeper.mledger.impl.PositionImpl
   > Error:    found: int,java.lang.Object,java.lang.Object
   > Error:    reason: actual and formal argument lists differ in length
   > ```
   > 
   > The interesting thing this why other tests are passed. Might be related to the other tests only run the install for profile `core-modules` ?
   
   @codelipenghui fixed. cc @gaoran10 @jiazhai 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on a change in pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on a change in pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#discussion_r532628448



##########
File path: pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarDispatchingRowDecoderFactory.java
##########
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.sql.presto;
+
+import static java.lang.String.format;
+
+import com.google.inject.Inject;
+
+import io.airlift.log.Logger;
+
+import io.prestosql.decoder.DecoderColumnHandle;
+import io.prestosql.spi.connector.ColumnMetadata;
+import io.prestosql.spi.type.TypeManager;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.sql.presto.decoder.avro.PulsarAvroRowDecoderFactory;
+import org.apache.pulsar.sql.presto.decoder.json.PulsarJsonRowDecoderFactory;
+import org.apache.pulsar.sql.presto.decoder.primitive.PulsarPrimitiveRowDecoderFactory;
+
+/**
+ * dispatcher RowDecoderFactory for {@link org.apache.pulsar.common.schema.SchemaType}.
+ */
+public class PulsarDispatchingRowDecoderFactory {
+
+    private static final Logger log = Logger.get(PulsarDispatchingRowDecoderFactory.class);
+
+    private TypeManager typeManager;
+
+    @Inject
+    public PulsarDispatchingRowDecoderFactory(TypeManager typeManager) {
+        this.typeManager = typeManager;
+    }
+
+    public PulsarRowDecoder createRowDecoder(TopicName topicName, SchemaInfo schemaInfo,
+                                             Set<DecoderColumnHandle> columns) {
+        PulsarRowDecoderFactory rowDecoderFactory = createDecoderFactory(schemaInfo);
+        return rowDecoderFactory.createRowDecoder(topicName, schemaInfo, columns);
+    }
+
+    public List<ColumnMetadata> extractColumnMetadata(TopicName topicName, SchemaInfo schemaInfo,
+                                                      PulsarColumnHandle.HandleKeyValueType handleKeyValueType) {
+        PulsarRowDecoderFactory rowDecoderFactory = createDecoderFactory(schemaInfo);
+        return rowDecoderFactory.extractColumnMetadata(topicName, schemaInfo, handleKeyValueType);
+    }
+
+    private PulsarRowDecoderFactory createDecoderFactory(SchemaInfo schemaInfo) {

Review comment:
       The various decoder factories could be initialized at the Pulsar SQL beginning.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hnail commented on a change in pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
hnail commented on a change in pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#discussion_r547754752



##########
File path: pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnMetadata.java
##########
@@ -88,8 +85,6 @@ public String toString() {
         return "PulsarColumnMetadata{"
             + "isInternal=" + isInternal
             + ", nameWithCase='" + nameWithCase + '\''
-            + ", fieldNames=" + Arrays.toString(fieldNames)
-            + ", positionIndices=" + Arrays.toString(positionIndices)

Review comment:
       @gaoran10 thanks for the remind,  fixed as suggestion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hnail commented on a change in pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
hnail commented on a change in pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#discussion_r547763797



##########
File path: pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
##########
@@ -420,17 +435,92 @@ public boolean advanceNextPosition() {
         //start time for deseralizing record
         metricsTracker.start_RECORD_DESERIALIZE_TIME();
 
-        if (this.schemaHandler instanceof KeyValueSchemaHandler) {
-            ByteBuf keyByteBuf = null;
+        SchemaInfo schemaInfo;
+        try {
+            schemaInfo =  schemaInfoProvider.getSchemaByVersion(this.currentMessage.getSchemaVersion()).get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+
+        Map<ColumnHandle, FieldValueProvider> currentRowValuesMap = new HashMap<>();
+
+        if (schemaInfo.getType().equals(SchemaType.KEY_VALUE)) {
+
+            PulsarRowDecoder keyDecoder = decoderFactory.createRowDecoder(topicName,
+                    schemaInfo,
+                    columnHandles.stream()
+                            .filter(col -> !col.isInternal())
+                            .filter(col -> PulsarColumnHandle.HandleKeyValueType.KEY
+                                    .equals(col.getHandleKeyValueType()))
+                            .collect(toImmutableSet()));
+
+            PulsarRowDecoder messageDecoder = decoderFactory.createRowDecoder(topicName,

Review comment:
       same as above. I proposal add another individual PR to optimize this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#issuecomment-725434949


   @gaoran10 Could you please help review this PR?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on a change in pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on a change in pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#discussion_r532604644



##########
File path: pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnMetadata.java
##########
@@ -88,8 +85,6 @@ public String toString() {
         return "PulsarColumnMetadata{"
             + "isInternal=" + isInternal
             + ", nameWithCase='" + nameWithCase + '\''
-            + ", fieldNames=" + Arrays.toString(fieldNames)
-            + ", positionIndices=" + Arrays.toString(positionIndices)

Review comment:
       Does the `DecoderExtraInfo` need to be printed?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on a change in pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on a change in pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#discussion_r532580116



##########
File path: pulsar-sql/presto-pulsar/pom.xml
##########
@@ -109,6 +109,32 @@
           <version>${joda.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>io.prestosql</groupId>
+            <artifactId>presto-record-decoder</artifactId>
+            <version>${presto.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>pulsar-client-original</artifactId>
+            <version>${project.version}</version>
+        </dependency>

Review comment:
       Does this dependency is necessary?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hnail commented on a change in pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
hnail commented on a change in pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#discussion_r547789990



##########
File path: pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarPrimitiveSchemaHandler.java
##########
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.sql.presto;
-
-import io.netty.buffer.ByteBufAllocator;
-import io.prestosql.spi.connector.ColumnMetadata;
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.pulsar.client.impl.schema.BooleanSchema;
-import org.apache.pulsar.client.impl.schema.ByteSchema;
-import org.apache.pulsar.client.impl.schema.BytesSchema;
-import org.apache.pulsar.client.impl.schema.DateSchema;
-import org.apache.pulsar.client.impl.schema.DoubleSchema;
-import org.apache.pulsar.client.impl.schema.FloatSchema;
-import org.apache.pulsar.client.impl.schema.IntSchema;
-import org.apache.pulsar.client.impl.schema.LongSchema;
-import org.apache.pulsar.client.impl.schema.ShortSchema;
-import org.apache.pulsar.client.impl.schema.StringSchema;
-import org.apache.pulsar.client.impl.schema.TimeSchema;
-import org.apache.pulsar.client.impl.schema.TimestampSchema;
-import org.apache.pulsar.common.api.raw.RawMessage;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.schema.SchemaInfo;
-import org.apache.pulsar.common.schema.SchemaType;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.util.Date;
-import java.util.List;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-@Slf4j
-public class TestPulsarPrimitiveSchemaHandler {

Review comment:
       thanks for the remind, added `TestPrimitiveDecoder`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hnail commented on pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
hnail commented on pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#issuecomment-769689650


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hnail commented on a change in pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
hnail commented on a change in pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#discussion_r547786527



##########
File path: pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/avro/PulsarAvroRowDecoderFactory.java
##########
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.sql.presto.decoder.avro;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
+import static io.prestosql.spi.type.DateType.DATE;
+import static io.prestosql.spi.type.TimeType.TIME;
+import static io.prestosql.spi.type.VarcharType.createUnboundedVarcharType;
+import static java.lang.String.format;
+import static java.util.stream.Collectors.toList;
+
+import com.google.common.collect.ImmutableList;
+import io.prestosql.decoder.DecoderColumnHandle;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.connector.ColumnMetadata;
+import io.prestosql.spi.type.ArrayType;
+import io.prestosql.spi.type.BigintType;
+import io.prestosql.spi.type.BooleanType;
+import io.prestosql.spi.type.DoubleType;
+import io.prestosql.spi.type.IntegerType;
+import io.prestosql.spi.type.RealType;
+import io.prestosql.spi.type.RowType;
+import io.prestosql.spi.type.StandardTypes;
+import io.prestosql.spi.type.TimestampType;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.TypeManager;
+import io.prestosql.spi.type.TypeSignature;
+import io.prestosql.spi.type.TypeSignatureParameter;
+import io.prestosql.spi.type.VarbinaryType;
+import io.prestosql.spi.type.VarcharType;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaParseException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonSchema;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.sql.presto.PulsarColumnHandle;
+import org.apache.pulsar.sql.presto.PulsarColumnMetadata;
+import org.apache.pulsar.sql.presto.PulsarRowDecoder;
+import org.apache.pulsar.sql.presto.PulsarRowDecoderFactory;
+
+/**
+ * PulsarRowDecoderFactory for {@link org.apache.pulsar.common.schema.SchemaType#AVRO}.
+ */
+public class PulsarAvroRowDecoderFactory implements PulsarRowDecoderFactory {
+
+    private TypeManager typeManager;
+
+    public PulsarAvroRowDecoderFactory(TypeManager typeManager) {
+        this.typeManager = typeManager;
+    }
+
+    @Override
+    public PulsarRowDecoder createRowDecoder(TopicName topicName, SchemaInfo schemaInfo,
+                                             Set<DecoderColumnHandle> columns) {
+        return new PulsarAvroRowDecoder((GenericAvroSchema) GenericAvroSchema.of(schemaInfo), columns);
+    }
+
+    @Override
+    public List<ColumnMetadata> extractColumnMetadata(TopicName topicName, SchemaInfo schemaInfo,
+                                                      PulsarColumnHandle.HandleKeyValueType handleKeyValueType) {
+        String schemaJson = new String(schemaInfo.getSchema());
+        if (StringUtils.isBlank(schemaJson)) {
+            throw new PrestoException(NOT_SUPPORTED, "Topic "
+                    + topicName.toString() + " does not have a valid schema");
+        }
+        Schema schema;
+        try {
+            schema = GenericJsonSchema.of(schemaInfo).getAvroSchema();
+        } catch (SchemaParseException ex) {
+            throw new PrestoException(NOT_SUPPORTED, "Topic "
+                    + topicName.toString() + " does not have a valid schema");
+        }
+
+        //TODO : check schema cyclic definitions which may case java.lang.StackOverflowError
+
+        return schema.getFields().stream()
+                .map(field ->
+                        new PulsarColumnMetadata(field.name(), parseAvroPrestoType(
+                                field.name(), field.schema()), field.schema().toString(), null, false, false,
+                                handleKeyValueType, new PulsarColumnMetadata.DecoderExtraInfo(field.name(),
+                                null, null))
+
+                ).collect(toList());
+    }
+
+    private Type parseAvroPrestoType(String fieldname, Schema schema) {
+        Schema.Type type = schema.getType();
+        LogicalType logicalType  = schema.getLogicalType();
+        switch (type) {
+            case STRING:
+            case ENUM:
+                return createUnboundedVarcharType();
+            case NULL:
+                throw new UnsupportedOperationException(
+                        format("field '%s' NULL type code should not be reached,"
+                                + "please check the schema or report the bug.", fieldname));
+            case FIXED:
+            case BYTES:
+                //TODO: support decimal logicalType

Review comment:
       Yes,presto kafka and spark haven't support it also :
   - kafka https://prestosql.io/docs/current/connector/kafka.html#avro-encoder
   - spark https://github.com/databricks/spark-avro/blob/branch-4.0/README-for-old-spark-versions.md




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hnail commented on a change in pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
hnail commented on a change in pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#discussion_r547763797



##########
File path: pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
##########
@@ -420,17 +435,92 @@ public boolean advanceNextPosition() {
         //start time for deseralizing record
         metricsTracker.start_RECORD_DESERIALIZE_TIME();
 
-        if (this.schemaHandler instanceof KeyValueSchemaHandler) {
-            ByteBuf keyByteBuf = null;
+        SchemaInfo schemaInfo;
+        try {
+            schemaInfo =  schemaInfoProvider.getSchemaByVersion(this.currentMessage.getSchemaVersion()).get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+
+        Map<ColumnHandle, FieldValueProvider> currentRowValuesMap = new HashMap<>();
+
+        if (schemaInfo.getType().equals(SchemaType.KEY_VALUE)) {
+
+            PulsarRowDecoder keyDecoder = decoderFactory.createRowDecoder(topicName,
+                    schemaInfo,
+                    columnHandles.stream()
+                            .filter(col -> !col.isInternal())
+                            .filter(col -> PulsarColumnHandle.HandleKeyValueType.KEY
+                                    .equals(col.getHandleKeyValueType()))
+                            .collect(toImmutableSet()));
+
+            PulsarRowDecoder messageDecoder = decoderFactory.createRowDecoder(topicName,

Review comment:
       same as above. I proposal can add another individual PR to optimize this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on a change in pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on a change in pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#discussion_r532605629



##########
File path: pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnMetadata.java
##########
@@ -128,9 +117,91 @@ public int hashCode() {
         int result = super.hashCode();
         result = 31 * result + (isInternal ? 1 : 0);
         result = 31 * result + (nameWithCase != null ? nameWithCase.hashCode() : 0);
-        result = 31 * result + Arrays.hashCode(fieldNames);
-        result = 31 * result + Arrays.hashCode(positionIndices);

Review comment:
       Is it necessary to compute the `DecoderExtraInfo`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#issuecomment-770152251


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hnail commented on pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
hnail commented on pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#issuecomment-750006123


   > @hnail Great work. Could you rebase the branch with apache/master first?
   
   @gaoran10 Thanks a lot for your review and help with this PR, i fixed as your suggest. can you help re-review it ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hnail commented on pull request #8422: [PIP][SQL]Migrate SchemaHandle to Presto-decoder

Posted by GitBox <gi...@apache.org>.
hnail commented on pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#issuecomment-724739246


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on a change in pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on a change in pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#discussion_r532605317



##########
File path: pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnMetadata.java
##########
@@ -114,12 +109,6 @@ public boolean equals(Object o) {
         if (nameWithCase != null ? !nameWithCase.equals(that.nameWithCase) : that.nameWithCase != null) {
             return false;
         }
-        if (!Arrays.deepEquals(fieldNames, that.fieldNames)) {
-            return false;
-        }
-        if (!Arrays.deepEquals(positionIndices, that.positionIndices)) {
-            return false;
-        }

Review comment:
       Is it necessary to compare the `DecoderExtraInfo`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hnail commented on a change in pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
hnail commented on a change in pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#discussion_r547763449



##########
File path: pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
##########
@@ -420,17 +435,92 @@ public boolean advanceNextPosition() {
         //start time for deseralizing record
         metricsTracker.start_RECORD_DESERIALIZE_TIME();
 
-        if (this.schemaHandler instanceof KeyValueSchemaHandler) {
-            ByteBuf keyByteBuf = null;
+        SchemaInfo schemaInfo;
+        try {
+            schemaInfo =  schemaInfoProvider.getSchemaByVersion(this.currentMessage.getSchemaVersion()).get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+
+        Map<ColumnHandle, FieldValueProvider> currentRowValuesMap = new HashMap<>();
+
+        if (schemaInfo.getType().equals(SchemaType.KEY_VALUE)) {
+
+            PulsarRowDecoder keyDecoder = decoderFactory.createRowDecoder(topicName,

Review comment:
       same as above. I proposal can add another individual PR to optimize this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on a change in pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on a change in pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#discussion_r532660123



##########
File path: pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
##########
@@ -420,17 +435,92 @@ public boolean advanceNextPosition() {
         //start time for deseralizing record
         metricsTracker.start_RECORD_DESERIALIZE_TIME();
 
-        if (this.schemaHandler instanceof KeyValueSchemaHandler) {
-            ByteBuf keyByteBuf = null;
+        SchemaInfo schemaInfo;
+        try {
+            schemaInfo =  schemaInfoProvider.getSchemaByVersion(this.currentMessage.getSchemaVersion()).get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+
+        Map<ColumnHandle, FieldValueProvider> currentRowValuesMap = new HashMap<>();
+
+        if (schemaInfo.getType().equals(SchemaType.KEY_VALUE)) {
+
+            PulsarRowDecoder keyDecoder = decoderFactory.createRowDecoder(topicName,

Review comment:
       It seems that the multi-version schema decoder cache could be added and the decoders could be reused.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jiazhai commented on pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
jiazhai commented on pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#issuecomment-770916795


   @hnail  Thanks a lot for this great feature.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hnail commented on a change in pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
hnail commented on a change in pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#discussion_r547763449



##########
File path: pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
##########
@@ -420,17 +435,92 @@ public boolean advanceNextPosition() {
         //start time for deseralizing record
         metricsTracker.start_RECORD_DESERIALIZE_TIME();
 
-        if (this.schemaHandler instanceof KeyValueSchemaHandler) {
-            ByteBuf keyByteBuf = null;
+        SchemaInfo schemaInfo;
+        try {
+            schemaInfo =  schemaInfoProvider.getSchemaByVersion(this.currentMessage.getSchemaVersion()).get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+
+        Map<ColumnHandle, FieldValueProvider> currentRowValuesMap = new HashMap<>();
+
+        if (schemaInfo.getType().equals(SchemaType.KEY_VALUE)) {
+
+            PulsarRowDecoder keyDecoder = decoderFactory.createRowDecoder(topicName,

Review comment:
       same as above. I proposal add another individual PR to optimize this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#issuecomment-766363539


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hnail commented on a change in pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
hnail commented on a change in pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#discussion_r547755022



##########
File path: pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnMetadata.java
##########
@@ -114,12 +109,6 @@ public boolean equals(Object o) {
         if (nameWithCase != null ? !nameWithCase.equals(that.nameWithCase) : that.nameWithCase != null) {
             return false;
         }
-        if (!Arrays.deepEquals(fieldNames, that.fieldNames)) {
-            return false;
-        }
-        if (!Arrays.deepEquals(positionIndices, that.positionIndices)) {
-            return false;
-        }

Review comment:
       thanks for the remind, fixed as suggestion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jiazhai commented on pull request #8422: [PIP-71][SQL]Pulsar SQL migrate SchemaHandle to presto decoder

Posted by GitBox <gi...@apache.org>.
jiazhai commented on pull request #8422:
URL: https://github.com/apache/pulsar/pull/8422#issuecomment-768774231


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org