You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/11 07:23:53 UTC

[pulsar] branch master updated: Pulsar SQL Support Avro Schema `ByteBuffer` Type (#6925)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3aaed24  Pulsar SQL Support Avro Schema `ByteBuffer` Type (#6925)
3aaed24 is described below

commit 3aaed249f5c200431f5e8dacad2de2cbd64ee1ad
Author: ran <ga...@126.com>
AuthorDate: Mon May 11 15:23:38 2020 +0800

    Pulsar SQL Support Avro Schema `ByteBuffer` Type (#6925)
    
    Fixes #6749
    
    ### Motivation
    
    Currently, the Pulsar SQL couldn't support AvroSchema use the `ByteBuffer` as the field type. For example, use the POJO class as below.
    
    ```
    @Data
    public static class LogFile {
        int id;
        String name;
        ByteBuffer data;
    }
    
    Producer<LogFile> producer = pulsarClient.newProducer(Schema.AVRO(LogFile.class)).topic(topic).create();
    ```
    
    Error Log
    ```
    2020-05-08T23:34:47.079+0800	ERROR	SplitRunner-5-101	com.facebook.presto.execution.executor.TaskExecutor	Error processing Split 20200508_153445_00006_nxngm.1.0-1 PulsarSplit{splitId=1, connectorId='pulsar', originSchemaName='bytes-sql-test4', schemaName='public/default', tableName='bytes-sql-test4', splitSize=4, schema='{"type":"record","name":"LogFile","namespace":"com.ran.schema.KeyValueSchemaTest$","fields":[{"name":"id","type":"int"},{"name":"name","type":["null","string"],"defaul [...]
    java.lang.ClassCastException: java.nio.HeapByteBuffer cannot be cast to [B
    	at org.apache.pulsar.sql.presto.PulsarRecordCursor.getSlice(PulsarRecordCursor.java:516)
    	at com.facebook.presto.spi.RecordPageSource.getNextPage(RecordPageSource.java:117)
    	at com.facebook.presto.operator.TableScanOperator.getOutput(TableScanOperator.java:242)
    	at com.facebook.presto.operator.Driver.processInternal(Driver.java:373)
    	at com.facebook.presto.operator.Driver.lambda$processFor$8(Driver.java:282)
    	at com.facebook.presto.operator.Driver.tryWithLock(Driver.java:672)
    	at com.facebook.presto.operator.Driver.processFor(Driver.java:276)
    	at com.facebook.presto.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:973)
    	at com.facebook.presto.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:162)
    	at com.facebook.presto.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:477)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    	at java.lang.Thread.run(Thread.java:748)
    ```
    
    ### Modifications
    
    When the presto field record type is `VarbinaryType.VARBINARY`, check the record type is `ByteBuffer`, `byte[]`, `ByteBuf` or others, and to process the field record by the type.
    
    * pulsar sql support avro schema `ByteBuffer` type
    
    * add ByteBuf check and unit tests.
---
 .../pulsar/sql/presto/PulsarRecordCursor.java      | 30 ++++++++++++-
 .../pulsar/sql/presto/TestPulsarRecordCursor.java  | 51 ++++++++++++++++++++++
 2 files changed, 80 insertions(+), 1 deletion(-)

diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index 97ce023..003c51f 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -42,6 +42,7 @@ import io.airlift.slice.Slice;
 import io.airlift.slice.Slices;
 import io.netty.buffer.ByteBuf;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
@@ -513,12 +514,39 @@ public class PulsarRecordCursor implements RecordCursor {
         if (type == VarcharType.VARCHAR) {
             return Slices.utf8Slice(record.toString());
         } else if (type == VarbinaryType.VARBINARY) {
-            return Slices.wrappedBuffer((byte[]) record);
+            return Slices.wrappedBuffer(toBytes(record));
         } else {
             throw new PrestoException(NOT_SUPPORTED, "Unsupported type " + type);
         }
     }
 
+    private byte[] toBytes(Object record) {
+        if (record instanceof ByteBuffer) {
+            ByteBuffer byteBuffer = (ByteBuffer) record;
+            if (byteBuffer.hasArray()) {
+                return byteBuffer.array();
+            }
+            byte[] bytes = new byte[byteBuffer.position()];
+            byteBuffer.flip();
+            byteBuffer.get(bytes);
+            return bytes;
+        } else if (record instanceof ByteBuf) {
+            ByteBuf byteBuf = (ByteBuf) record;
+            if (byteBuf.hasArray()) {
+                return byteBuf.array();
+            }
+            byte[] bytes = new byte[byteBuf.readableBytes()];
+            byteBuf.readBytes(bytes);
+            return bytes;
+        } else {
+            try {
+                return (byte[]) record;
+            } catch (Exception e) {
+                throw new PrestoException(NOT_SUPPORTED, "Unsupported type " + record.getClass().getName());
+            }
+        }
+    }
+
     @Override
     public Object getObject(int field) {
         throw new UnsupportedOperationException();
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
index c81fc40..c2a3371 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
@@ -19,14 +19,24 @@
 package org.apache.pulsar.sql.presto;
 
 import io.airlift.log.Logger;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import org.apache.pulsar.common.naming.TopicName;
+import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
 
 @Test(singleThreaded = true)
 public class TestPulsarRecordCursor extends TestPulsarConnector {
@@ -129,4 +139,45 @@ public class TestPulsarRecordCursor extends TestPulsarConnector {
             pulsarRecordCursor.close();
         }
     }
+
+    @Test
+    public void testRecordToBytes() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+        PulsarRecordCursor pulsarRecordCursor = Mockito.mock(PulsarRecordCursor.class);
+        Method method = PulsarRecordCursor.class.getDeclaredMethod("toBytes", Object.class);
+        method.setAccessible(true);
+
+        final String msg = "Hello!";
+
+        byte[] bytes = msg.getBytes();
+        Object obj = method.invoke(pulsarRecordCursor, bytes);
+        assertNotNull(obj);
+        assertEquals(new String((byte[]) obj), msg);
+
+        ByteBuffer byteBuffer1 = ByteBuffer.wrap(msg.getBytes());
+        assertTrue(byteBuffer1.hasArray());
+        obj = method.invoke(pulsarRecordCursor, byteBuffer1);
+        assertNotNull(obj);
+        assertEquals(new String((byte[]) obj), msg);
+
+        ByteBuffer byteBuffer2 = ByteBuffer.allocateDirect(msg.getBytes().length);
+        byteBuffer2.put(msg.getBytes());
+        assertFalse(byteBuffer2.hasArray());
+        obj = method.invoke(pulsarRecordCursor, byteBuffer2);
+        assertNotNull(obj);
+        assertEquals(new String((byte[]) obj), msg);
+
+        ByteBuf byteBuf1 = Unpooled.wrappedBuffer(msg.getBytes());
+        assertTrue(byteBuf1.hasArray());
+        obj = method.invoke(pulsarRecordCursor, byteBuf1);
+        assertNotNull(obj);
+        assertEquals(new String((byte[]) obj), msg);
+
+        ByteBuf byteBuf2 = Unpooled.directBuffer();
+        byteBuf2.writeBytes(msg.getBytes());
+        assertFalse(byteBuf2.hasArray());
+        obj = method.invoke(pulsarRecordCursor, byteBuf2);
+        assertNotNull(obj);
+        assertEquals(new String((byte[]) obj), msg);
+    }
+
 }