You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/12 13:44:57 UTC
[pulsar] 06/17: Pulsar SQL Support Avro Schema `ByteBuffer` Type
(#6925)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e7b8e3d5efcb2f7f84ff8f89960ffb40cc67d3ff
Author: ran <ga...@126.com>
AuthorDate: Mon May 11 15:23:38 2020 +0800
Pulsar SQL Support Avro Schema `ByteBuffer` Type (#6925)
Fixes #6749
### Motivation
Currently, the Pulsar SQL couldn't support AvroSchema use the `ByteBuffer` as the field type. For example, use the POJO class as below.
```
@Data
public static class LogFile {
int id;
String name;
ByteBuffer data;
}
Producer<LogFile> producer = pulsarClient.newProducer(Schema.AVRO(LogFile.class)).topic(topic).create();
```
Error Log
```
2020-05-08T23:34:47.079+0800 ERROR SplitRunner-5-101 com.facebook.presto.execution.executor.TaskExecutor Error processing Split 20200508_153445_00006_nxngm.1.0-1 PulsarSplit{splitId=1, connectorId='pulsar', originSchemaName='bytes-sql-test4', schemaName='public/default', tableName='bytes-sql-test4', splitSize=4, schema='{"type":"record","name":"LogFile","namespace":"com.ran.schema.KeyValueSchemaTest$","fields":[{"name":"id","type":"int"},{"name":"name","type":["null","string"],"defaul [...]
java.lang.ClassCastException: java.nio.HeapByteBuffer cannot be cast to [B
at org.apache.pulsar.sql.presto.PulsarRecordCursor.getSlice(PulsarRecordCursor.java:516)
at com.facebook.presto.spi.RecordPageSource.getNextPage(RecordPageSource.java:117)
at com.facebook.presto.operator.TableScanOperator.getOutput(TableScanOperator.java:242)
at com.facebook.presto.operator.Driver.processInternal(Driver.java:373)
at com.facebook.presto.operator.Driver.lambda$processFor$8(Driver.java:282)
at com.facebook.presto.operator.Driver.tryWithLock(Driver.java:672)
at com.facebook.presto.operator.Driver.processFor(Driver.java:276)
at com.facebook.presto.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:973)
at com.facebook.presto.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:162)
at com.facebook.presto.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:477)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
### Modifications
When the presto field record type is `VarbinaryType.VARBINARY`, check the record type is `ByteBuffer`, `byte[]`, `ByteBuf` or others, and to process the field record by the type.
* pulsar sql support avro schema `ByteBuffer` type
* add ByteBuf check and unit tests.
(cherry picked from commit 3aaed249f5c200431f5e8dacad2de2cbd64ee1ad)
---
.../pulsar/sql/presto/PulsarRecordCursor.java | 30 ++++++++++++-
.../pulsar/sql/presto/TestPulsarRecordCursor.java | 51 ++++++++++++++++++++++
2 files changed, 80 insertions(+), 1 deletion(-)
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index 6899f97..6ac16b3 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -42,6 +42,7 @@ import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
@@ -513,12 +514,39 @@ public class PulsarRecordCursor implements RecordCursor {
if (type == VarcharType.VARCHAR) {
return Slices.utf8Slice(record.toString());
} else if (type == VarbinaryType.VARBINARY) {
- return Slices.wrappedBuffer((byte[]) record);
+ return Slices.wrappedBuffer(toBytes(record));
} else {
throw new PrestoException(NOT_SUPPORTED, "Unsupported type " + type);
}
}
+ private byte[] toBytes(Object record) {
+ if (record instanceof ByteBuffer) {
+ ByteBuffer byteBuffer = (ByteBuffer) record;
+ if (byteBuffer.hasArray()) {
+ return byteBuffer.array();
+ }
+ byte[] bytes = new byte[byteBuffer.position()];
+ byteBuffer.flip();
+ byteBuffer.get(bytes);
+ return bytes;
+ } else if (record instanceof ByteBuf) {
+ ByteBuf byteBuf = (ByteBuf) record;
+ if (byteBuf.hasArray()) {
+ return byteBuf.array();
+ }
+ byte[] bytes = new byte[byteBuf.readableBytes()];
+ byteBuf.readBytes(bytes);
+ return bytes;
+ } else {
+ try {
+ return (byte[]) record;
+ } catch (Exception e) {
+ throw new PrestoException(NOT_SUPPORTED, "Unsupported type " + record.getClass().getName());
+ }
+ }
+ }
+
@Override
public Object getObject(int field) {
throw new UnsupportedOperationException();
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
index c81fc40..c2a3371 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
@@ -19,14 +19,24 @@
package org.apache.pulsar.sql.presto;
import io.airlift.log.Logger;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import org.apache.pulsar.common.naming.TopicName;
+import org.mockito.Mock;
+import org.mockito.Mockito;
import org.testng.annotations.Test;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
@Test(singleThreaded = true)
public class TestPulsarRecordCursor extends TestPulsarConnector {
@@ -129,4 +139,45 @@ public class TestPulsarRecordCursor extends TestPulsarConnector {
pulsarRecordCursor.close();
}
}
+
+ @Test
+ public void testRecordToBytes() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+ PulsarRecordCursor pulsarRecordCursor = Mockito.mock(PulsarRecordCursor.class);
+ Method method = PulsarRecordCursor.class.getDeclaredMethod("toBytes", Object.class);
+ method.setAccessible(true);
+
+ final String msg = "Hello!";
+
+ byte[] bytes = msg.getBytes();
+ Object obj = method.invoke(pulsarRecordCursor, bytes);
+ assertNotNull(obj);
+ assertEquals(new String((byte[]) obj), msg);
+
+ ByteBuffer byteBuffer1 = ByteBuffer.wrap(msg.getBytes());
+ assertTrue(byteBuffer1.hasArray());
+ obj = method.invoke(pulsarRecordCursor, byteBuffer1);
+ assertNotNull(obj);
+ assertEquals(new String((byte[]) obj), msg);
+
+ ByteBuffer byteBuffer2 = ByteBuffer.allocateDirect(msg.getBytes().length);
+ byteBuffer2.put(msg.getBytes());
+ assertFalse(byteBuffer2.hasArray());
+ obj = method.invoke(pulsarRecordCursor, byteBuffer2);
+ assertNotNull(obj);
+ assertEquals(new String((byte[]) obj), msg);
+
+ ByteBuf byteBuf1 = Unpooled.wrappedBuffer(msg.getBytes());
+ assertTrue(byteBuf1.hasArray());
+ obj = method.invoke(pulsarRecordCursor, byteBuf1);
+ assertNotNull(obj);
+ assertEquals(new String((byte[]) obj), msg);
+
+ ByteBuf byteBuf2 = Unpooled.directBuffer();
+ byteBuf2.writeBytes(msg.getBytes());
+ assertFalse(byteBuf2.hasArray());
+ obj = method.invoke(pulsarRecordCursor, byteBuf2);
+ assertNotNull(obj);
+ assertEquals(new String((byte[]) obj), msg);
+ }
+
}