You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/11/18 09:31:33 UTC

[pulsar] 10/12: [Pulsar SQL] Handle message null schema version in PulsarRecordCursor (#12809)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c43d1da5cc28377a331403a5ccb7dc34f59fe13f
Author: ran <ga...@126.com>
AuthorDate: Tue Nov 16 13:16:58 2021 +0800

    [Pulsar SQL] Handle message null schema version in PulsarRecordCursor (#12809)
    
    ### Motivation
    
    Currently, if the schema version of the message is null, the Pulsar SQL will encounter an NPE problem.
    
    ### Modifications
    
    Adjust logic for null schema version in `PulsarRecordCursor`.
    
    1. If the schema type of pulsarSplit is NONE or BYTES, use the BYTES schema.
    2. If the schema type of pulsarSplit is BYTEBUFFER, use the BYTEBUFFER schema.
    3. If the schema version of the message is null, use the latest schema of the topic.
    4. If the schema version of the message is not null, get the specific version schema by PulsarAdmin.
    5. If the final schema is null throw a runtime exception.
    
    (cherry picked from commit e5619cffce702d9f446c27e69927148e45797b28)
---
 .../pulsar/sql/presto/PulsarRecordCursor.java      |  42 ++++++--
 .../sql/presto/PulsarSqlSchemaInfoProvider.java    |   9 +-
 .../pulsar/sql/presto/TestPulsarRecordCursor.java  | 107 ++++++++++++++++++++-
 3 files changed, 146 insertions(+), 12 deletions(-)

diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index f1e2bdb..b1230d3 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -61,6 +61,7 @@ import org.apache.pulsar.common.api.raw.RawMessage;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaInfo;
@@ -479,14 +480,7 @@ public class PulsarRecordCursor implements RecordCursor {
         //start time for deseralizing record
         metricsTracker.start_RECORD_DESERIALIZE_TIME();
 
-        SchemaInfo schemaInfo = getBytesSchemaInfo(pulsarSplit.getSchemaType(), pulsarSplit.getSchemaName());
-        try {
-            if (schemaInfo == null) {
-                schemaInfo =  schemaInfoProvider.getSchemaByVersion(this.currentMessage.getSchemaVersion()).get();
-            }
-        } catch (InterruptedException | ExecutionException e) {
-            throw new RuntimeException(e);
-        }
+        SchemaInfo schemaInfo = getSchemaInfo(pulsarSplit);
 
         Map<ColumnHandle, FieldValueProvider> currentRowValuesMap = new HashMap<>();
 
@@ -600,6 +594,38 @@ public class PulsarRecordCursor implements RecordCursor {
         return true;
     }
 
+    /**
+     * Get the schemaInfo of the message.
+     *
+     * 1. If the schema type of pulsarSplit is NONE or BYTES, use the BYTES schema.
+     * 2. If the schema type of pulsarSplit is BYTEBUFFER, use the BYTEBUFFER schema.
+     * 3. If the schema version of the message is null, use the schema info of pulsarSplit.
+     * 4. If the schema version of the message is not null, get the specific version schema by PulsarAdmin.
+     * 5. If the final schema is null throw a runtime exception.
+     */
+    private SchemaInfo getSchemaInfo(PulsarSplit pulsarSplit) {
+        SchemaInfo schemaInfo = getBytesSchemaInfo(pulsarSplit.getSchemaType(), pulsarSplit.getSchemaName());
+        if (schemaInfo != null) {
+            return schemaInfo;
+        }
+        try {
+            if (this.currentMessage.getSchemaVersion() == null) {
+                schemaInfo = pulsarSplit.getSchemaInfo();
+            } else {
+                schemaInfo =  schemaInfoProvider.getSchemaByVersion(this.currentMessage.getSchemaVersion()).get();
+            }
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+        if (schemaInfo == null) {
+            String schemaVersion = this.currentMessage.getSchemaVersion() == null
+                    ? "null" : BytesSchemaVersion.of(this.currentMessage.getSchemaVersion()).toString();
+            throw new RuntimeException("The specific version (" + schemaVersion + ") schema of the table "
+                    + pulsarSplit.getTableName() + " is null");
+        }
+        return schemaInfo;
+    }
+
     private SchemaInfo getBytesSchemaInfo(SchemaType schemaType, String schemaName) {
         if (!schemaType.equals(SchemaType.BYTES) && !schemaType.equals(SchemaType.NONE)) {
             return null;
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java
index 3a9233c..828ceef 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java
@@ -102,8 +102,13 @@ public class PulsarSqlSchemaInfoProvider implements SchemaInfoProvider {
         ClassLoader originalContextLoader = Thread.currentThread().getContextClassLoader();
         try {
             Thread.currentThread().setContextClassLoader(InjectionManagerFactory.class.getClassLoader());
-            return pulsarAdmin.schemas()
-                    .getSchemaInfo(topicName.toString(), ByteBuffer.wrap(bytesSchemaVersion.get()).getLong());
+            long version = ByteBuffer.wrap(bytesSchemaVersion.get()).getLong();
+            SchemaInfo schemaInfo = pulsarAdmin.schemas().getSchemaInfo(topicName.toString(), version);
+            if (schemaInfo == null) {
+                throw new RuntimeException(
+                        "The specific version (" + version + ") schema of the topic " + topicName + " is null");
+            }
+            return schemaInfo;
         } finally {
             Thread.currentThread().setContextClassLoader(originalContextLoader);
         }
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
index 8d52183..60d3fed 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
@@ -34,18 +34,31 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.mledger.impl.ReadOnlyCursorImpl;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.Schemas;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.api.raw.RawMessage;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.LongSchemaVersion;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.testng.annotations.Test;
 
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.nio.charset.Charset;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -56,6 +69,8 @@ import static java.util.concurrent.CompletableFuture.completedFuture;
 import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
@@ -65,6 +80,7 @@ import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 public class TestPulsarRecordCursor extends TestPulsarConnector {
 
@@ -323,9 +339,14 @@ public class TestPulsarRecordCursor extends TestPulsarConnector {
 
                                     MessageMetadata messageMetadata =
                                             new MessageMetadata()
-                                                    .setProducerName("test-producer").setSequenceId(positions.get(topic))
+                                                    .setProducerName("test-producer")
+                                                    .setSequenceId(positions.get(topic))
                                                     .setPublishTime(System.currentTimeMillis());
 
+                                    if (i % 2 == 0) {
+                                        messageMetadata.setSchemaVersion(new LongSchemaVersion(1L).bytes());
+                                    }
+
                                     if (KeyValueEncodingType.SEPARATED.equals(schema.getKeyValueEncodingType())) {
                                         messageMetadata
                                                 .setPartitionKey(new String(schema
@@ -380,7 +401,7 @@ public class TestPulsarRecordCursor extends TestPulsarConnector {
         PulsarSplit split = new PulsarSplit(0, pulsarConnectorId.toString(),
                 topicName.getNamespace(), topicName.getLocalName(), topicName.getLocalName(),
                 entriesNum,
-                new String(schema.getSchemaInfo().getSchema()),
+                new String(schema.getSchemaInfo().getSchema(),  "ISO8859-1"),
                 schema.getSchemaInfo().getType(),
                 0, entriesNum,
                 0, 0, TupleDomain.all(),
@@ -416,4 +437,86 @@ public class TestPulsarRecordCursor extends TestPulsarConnector {
         private Double field3;
     }
 
+    @Test
+    public void testGetSchemaInfo() throws Exception {
+        String topic = "get-schema-test";
+        PulsarSplit pulsarSplit = Mockito.mock(PulsarSplit.class);
+        Mockito.when(pulsarSplit.getTableName()).thenReturn(TopicName.get(topic).getLocalName());
+        Mockito.when(pulsarSplit.getSchemaName()).thenReturn("public/default");
+        PulsarAdmin pulsarAdmin = Mockito.mock(PulsarAdmin.class);
+        Schemas schemas = Mockito.mock(Schemas.class);
+        Mockito.when(pulsarAdmin.schemas()).thenReturn(schemas);
+        PulsarConnectorConfig connectorConfig = spy(new PulsarConnectorConfig());
+        Mockito.when(connectorConfig.getPulsarAdmin()).thenReturn(pulsarAdmin);
+        PulsarRecordCursor pulsarRecordCursor = spy(new PulsarRecordCursor(
+                new ArrayList<>(), pulsarSplit, connectorConfig, Mockito.mock(ManagedLedgerFactory.class),
+                new ManagedLedgerConfig(), null, null));
+
+        Class<PulsarRecordCursor> clazz =  PulsarRecordCursor.class;
+        Method getSchemaInfo = clazz.getDeclaredMethod("getSchemaInfo", PulsarSplit.class);
+        getSchemaInfo.setAccessible(true);
+        Field currentMessage = clazz.getDeclaredField("currentMessage");
+        currentMessage.setAccessible(true);
+        RawMessage rawMessage = Mockito.mock(RawMessage.class);
+        currentMessage.set(pulsarRecordCursor, rawMessage);
+
+        // If the schemaType of pulsarSplit is NONE or BYTES, using bytes schema
+        Mockito.when(pulsarSplit.getSchemaType()).thenReturn(SchemaType.NONE);
+        SchemaInfo schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor, pulsarSplit);
+        assertEquals(SchemaType.BYTES, schemaInfo.getType());
+
+        Mockito.when(pulsarSplit.getSchemaType()).thenReturn(SchemaType.BYTES);
+        schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor, pulsarSplit);
+        assertEquals(SchemaType.BYTES, schemaInfo.getType());
+
+        Mockito.when(pulsarSplit.getSchemaName()).thenReturn(Schema.BYTEBUFFER.getSchemaInfo().getName());
+        schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor, pulsarSplit);
+        assertEquals(SchemaType.BYTES, schemaInfo.getType());
+
+        // If the schemaVersion of the message is not null, try to get the schema.
+        Mockito.when(pulsarSplit.getSchemaType()).thenReturn(SchemaType.AVRO);
+        Mockito.when(rawMessage.getSchemaVersion()).thenReturn(new LongSchemaVersion(0).bytes());
+        Mockito.when(schemas.getSchemaInfo(anyString(), eq(0L)))
+                .thenReturn(Schema.AVRO(Foo.class).getSchemaInfo());
+        schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor, pulsarSplit);
+        assertEquals(SchemaType.AVRO, schemaInfo.getType());
+
+        String schemaTopic = "persistent://public/default/" + topic;
+
+        // If the schemaVersion of the message is null and the schema of pulsarSplit is null, throw runtime exception.
+        Mockito.when(pulsarSplit.getSchemaInfo()).thenReturn(null);
+        Mockito.when(rawMessage.getSchemaVersion()).thenReturn(null);
+        try {
+            schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor, pulsarSplit);
+            fail("The message schema version is null and the latest schema is null, should fail.");
+        } catch (InvocationTargetException e) {
+            assertTrue(e.getCause() instanceof RuntimeException);
+            assertTrue(e.getCause().getMessage().contains("schema of the table " + topic + " is null"));
+        }
+
+        // If the schemaVersion of the message is null, try to get the latest schema.
+        Mockito.when(rawMessage.getSchemaVersion()).thenReturn(null);
+        Mockito.when(pulsarSplit.getSchemaInfo()).thenReturn(Schema.AVRO(Foo.class).getSchemaInfo());
+        schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor, pulsarSplit);
+        assertEquals(Schema.AVRO(Foo.class).getSchemaInfo(), schemaInfo);
+
+        // If the specific version schema is null, throw runtime exception.
+        Mockito.when(rawMessage.getSchemaVersion()).thenReturn(new LongSchemaVersion(1L).bytes());
+        Mockito.when(schemas.getSchemaInfo(schemaTopic, 1)).thenReturn(null);
+        try {
+            schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor, pulsarSplit);
+            fail("The specific version " + 1 + " schema is null, should fail.");
+        } catch (InvocationTargetException e) {
+            String schemaVersion = BytesSchemaVersion.of(new LongSchemaVersion(1L).bytes()).toString();
+            assertTrue(e.getCause() instanceof RuntimeException);
+            assertTrue(e.getCause().getMessage().contains("schema of the topic " + schemaTopic + " is null"));
+        }
+
+        // Get the specific version schema.
+        Mockito.when(rawMessage.getSchemaVersion()).thenReturn(new LongSchemaVersion(2L).bytes());
+        Mockito.when(schemas.getSchemaInfo(schemaTopic, 2)).thenReturn(Schema.AVRO(Foo.class).getSchemaInfo());
+        schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor, pulsarSplit);
+        assertEquals(Schema.AVRO(Foo.class).getSchemaInfo(), schemaInfo);
+    }
+
 }