You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/06 09:40:55 UTC

[pulsar] branch master updated: Fix pulsar sql issues when run select count(*) for the table with primary schema (#10840)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a1076a  Fix pulsar sql issues when run select count(*) for the table with primary schema (#10840)
4a1076a is described below

commit 4a1076a4b5f088d75c7284fbf419ce87939a2674
Author: lipenghui <pe...@apache.org>
AuthorDate: Sun Jun 6 17:40:07 2021 +0800

    Fix pulsar sql issues when run select count(*) for the table with primary schema (#10840)
    
    error log:
    
    ```
    2021-06-06T14:36:41.669+0800 ERROR SplitRunner-3-106 io.prestosql.execution.executor.TaskExecutor Error processing Split 20210606_063634_00002_6j8bm.1.0-1 PulsarSplit{splitId=1, connectorId='pulsar', originSchemaName='Bytes', schemaName='public/default', tableName='my-topic', splitSize=39324, schema='', schemaType=BYTES, startPositionEntryId=39325, endPositionEntryId=78649, startPositionLedgerId=10, endPositionLedgerId=10, schemaInfoProperties={}} (start = 2.24098441109938E8, wall = 3 [...]
    java.lang.RuntimeException: Primitive type must has only one ColumnHandle.
     at org.apache.pulsar.sql.presto.decoder.primitive.PulsarPrimitiveRowDecoderFactory.createRowDecoder(PulsarPrimitiveRowDecoderFactory.java:67)
     at org.apache.pulsar.sql.presto.PulsarDispatchingRowDecoderFactory.createRowDecoder(PulsarDispatchingRowDecoderFactory.java:59)
     at org.apache.pulsar.sql.presto.PulsarRecordCursor.advanceNextPosition(PulsarRecordCursor.java:545)
     at io.prestosql.spi.connector.RecordPageSource.getNextPage(RecordPageSource.java:90)
     at io.prestosql.operator.TableScanOperator.getOutput(TableScanOperator.java:302)
     at io.prestosql.operator.Driver.processInternal(Driver.java:379)
     at io.prestosql.operator.Driver.lambda$processFor$8(Driver.java:283)
     at io.prestosql.operator.Driver.tryWithLock(Driver.java:675)
     at io.prestosql.operator.Driver.processFor(Driver.java:276)
     at io.prestosql.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1075)
     at io.prestosql.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:163)
     at io.prestosql.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:484)
     at io.prestosql.$gen.Presto_332__testversion____20210606_063438_2.run(Unknown Source)
     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
     at java.base/java.lang.Thread.run(Thread.java:834)
     ```
---
 .../sql/presto/decoder/primitive/PulsarPrimitiveRowDecoder.java    | 3 +++
 .../presto/decoder/primitive/PulsarPrimitiveRowDecoderFactory.java | 3 ++-
 .../apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java  | 7 +++++++
 3 files changed, 12 insertions(+), 1 deletion(-)

diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/primitive/PulsarPrimitiveRowDecoder.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/primitive/PulsarPrimitiveRowDecoder.java
index f2da259..5eb2d7f 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/primitive/PulsarPrimitiveRowDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/primitive/PulsarPrimitiveRowDecoder.java
@@ -66,6 +66,9 @@ public class PulsarPrimitiveRowDecoder implements PulsarRowDecoder {
 
     @Override
     public Optional<Map<DecoderColumnHandle, FieldValueProvider>> decodeRow(ByteBuf byteBuf) {
+        if (columnHandle == null) {
+            return Optional.empty();
+        }
         Object value = schema.decode(byteBuf);
         Map<DecoderColumnHandle, FieldValueProvider> primitiveColumn = new HashMap<>();
         if (value == null) {
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/primitive/PulsarPrimitiveRowDecoderFactory.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/primitive/PulsarPrimitiveRowDecoderFactory.java
index a36f9fc..abe2351 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/primitive/PulsarPrimitiveRowDecoderFactory.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/primitive/PulsarPrimitiveRowDecoderFactory.java
@@ -64,7 +64,8 @@ public class PulsarPrimitiveRowDecoderFactory implements PulsarRowDecoderFactory
             return new PulsarPrimitiveRowDecoder((AbstractSchema<?>) AutoConsumeSchema.getSchema(schemaInfo),
                     columns.iterator().next());
         } else {
-            throw new RuntimeException("Primitive type must has only one ColumnHandle.");
+            return new PulsarPrimitiveRowDecoder((AbstractSchema<?>) AutoConsumeSchema.getSchema(schemaInfo),
+                    null);
         }
     }
 
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
index 821a0ab..54b82fb 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
@@ -258,6 +258,13 @@ public class TestPulsarSQLBase extends PulsarSQLTestSuite {
 
         log.info("Executing query: result for topic {} returnedTimestamps size: {}", topic, returnedTimestamps.size());
         assertThat(returnedTimestamps.size()).isEqualTo(0);
+
+        query = String.format("select count(*) from pulsar.\"%s\".\"%s\"", namespace, topic);
+        log.info("Executing query: {}", query);
+        res = connection.createStatement().executeQuery(query);
+        res.next();
+        int count = res.getInt("_col0");
+        assertThat(count).isGreaterThan(messageNum - 2);
     }
 
     public ContainerExecResult execQuery(final String query) throws Exception {