You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/09/21 00:22:21 UTC

[incubator-pulsar] branch master updated: [schema] Add ByteBuf schema and fix ByteBuffer schema (#2624)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c1199d0  [schema] Add ByteBuf schema and fix ByteBuffer schema (#2624)
c1199d0 is described below

commit c1199d03ef5ce96451e7d2845d1dc0185d1b5e44
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Thu Sep 20 17:22:16 2018 -0700

    [schema] Add ByteBuf schema and fix ByteBuffer schema (#2624)
    
    *Motivation*
    
    ByteBuffer is a variant of `bytes`. so it should be `SchemaType.BYTES`, not `SchemaType.BYTEBUFFER`
    
    *Changes*
    
    - Fix bytebuffer schema type
    - Add bytebuf schema
---
 .../{ByteBufferSchema.java => ByteBufSchema.java}  | 40 ++++++++--------------
 .../client/impl/schema/ByteBufferSchema.java       |  4 +--
 .../pulsar/client/schema/PrimitiveSchemaTest.java  | 12 ++++---
 .../apache/pulsar/common/schema/SchemaType.java    |  5 ---
 tests/scripts/pre-integ-tests.sh                   |  2 +-
 5 files changed, 26 insertions(+), 37 deletions(-)

diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java
similarity index 61%
copy from pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
copy to pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java
index ee8ba66..4e7e6d0 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java
@@ -18,53 +18,43 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
-import java.nio.ByteBuffer;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
 /**
- * A bytebuffer schema.
+ * A variant `Bytes` schema that takes {@link io.netty.buffer.ByteBuf}.
  */
-public class ByteBufferSchema implements Schema<ByteBuffer> {
+public class ByteBufSchema implements Schema<ByteBuf> {
 
-    public static ByteBufferSchema of() {
+    public static ByteBufSchema of() {
         return INSTANCE;
     }
 
-    private static final ByteBufferSchema INSTANCE = new ByteBufferSchema();
+    private static final ByteBufSchema INSTANCE = new ByteBufSchema();
     private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
-        .setName("ByteBuffer")
-        .setType(SchemaType.BYTEBUFFER)
+        .setName("ByteBuf")
+        .setType(SchemaType.BYTES)
         .setSchema(new byte[0]);
 
     @Override
-    public byte[] encode(ByteBuffer data) {
-        if (data == null) {
+    public byte[] encode(ByteBuf message) {
+        if (message == null) {
             return null;
         }
 
-        data.rewind();
-
-        if (data.hasArray()) {
-            byte[] arr = data.array();
-            if (data.arrayOffset() == 0 && arr.length == data.remaining()) {
-                return arr;
-            }
-        }
-
-        byte[] ret = new byte[data.remaining()];
-        data.get(ret, 0, ret.length);
-        data.rewind();
-        return ret;
+        return ByteBufUtil.getBytes(message);
     }
 
     @Override
-    public ByteBuffer decode(byte[] data) {
-        if (null == data) {
+    public ByteBuf decode(byte[] bytes) {
+        if (null == bytes) {
             return null;
         } else {
-            return ByteBuffer.wrap(data);
+            return Unpooled.wrappedBuffer(bytes);
         }
     }
 
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
index ee8ba66..251cd93 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
@@ -24,7 +24,7 @@ import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
 /**
- * A bytebuffer schema.
+ * A bytebuffer schema is effectively a `BYTES` schema.
  */
 public class ByteBufferSchema implements Schema<ByteBuffer> {
 
@@ -35,7 +35,7 @@ public class ByteBufferSchema implements Schema<ByteBuffer> {
     private static final ByteBufferSchema INSTANCE = new ByteBufferSchema();
     private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
         .setName("ByteBuffer")
-        .setType(SchemaType.BYTEBUFFER)
+        .setType(SchemaType.BYTES)
         .setSchema(new byte[0]);
 
     @Override
diff --git a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/PrimitiveSchemaTest.java b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/PrimitiveSchemaTest.java
index fdac539..aac2d8c 100644
--- a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/PrimitiveSchemaTest.java
+++ b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/PrimitiveSchemaTest.java
@@ -18,9 +18,11 @@
  */
 package org.apache.pulsar.client.schema;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
 
+import io.netty.buffer.Unpooled;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -28,6 +30,7 @@ import java.util.List;
 import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.ByteBufSchema;
 import org.apache.pulsar.client.impl.schema.ByteBufferSchema;
 import org.apache.pulsar.client.impl.schema.ByteSchema;
 import org.apache.pulsar.client.impl.schema.BytesSchema;
@@ -55,8 +58,9 @@ public class PrimitiveSchemaTest {
             put(LongSchema.of(), Arrays.asList(922337203685477580L, -922337203685477581L));
             put(FloatSchema.of(), Arrays.asList(5678567.12312f, -5678567.12341f));
             put(DoubleSchema.of(), Arrays.asList(5678567.12312d, -5678567.12341d));
-            put(BytesSchema.of(), Arrays.asList("my string".getBytes()));
-            put(ByteBufferSchema.of(), Arrays.asList(ByteBuffer.allocate(10).put("my string".getBytes())));
+            put(BytesSchema.of(), Arrays.asList("my string".getBytes(UTF_8)));
+            put(ByteBufferSchema.of(), Arrays.asList(ByteBuffer.allocate(10).put("my string".getBytes(UTF_8))));
+            put(ByteBufSchema.of(), Arrays.asList(Unpooled.wrappedBuffer("my string".getBytes(UTF_8))));
         }
     };
 
@@ -94,8 +98,8 @@ public class PrimitiveSchemaTest {
         assertEquals(SchemaType.DOUBLE, DoubleSchema.of().getSchemaInfo().getType());
         assertEquals(SchemaType.STRING, StringSchema.utf8().getSchemaInfo().getType());
         assertEquals(SchemaType.BYTES, BytesSchema.of().getSchemaInfo().getType());
-        assertEquals(SchemaType.BYTEBUFFER, ByteBufferSchema.of().getSchemaInfo().getType());
-
+        assertEquals(SchemaType.BYTES, ByteBufferSchema.of().getSchemaInfo().getType());
+        assertEquals(SchemaType.BYTES, ByteBufSchema.of().getSchemaInfo().getType());
     }
 
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
index 87c0956..a19d587 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
@@ -68,11 +68,6 @@ public enum SchemaType {
     BYTES,
 
     /**
-     * A bytebuffer.
-     */
-    BYTEBUFFER,
-
-    /**
      * JSON object encoding and validation
      */
     JSON,
diff --git a/tests/scripts/pre-integ-tests.sh b/tests/scripts/pre-integ-tests.sh
index ab83c53..1a465db 100755
--- a/tests/scripts/pre-integ-tests.sh
+++ b/tests/scripts/pre-integ-tests.sh
@@ -24,7 +24,7 @@ ulimit -a
 pwd
 df -h
 ps -eo euser,pid,ppid,pgid,start,pcpu,pmem,cmd
-docker network prune -f --filter name=pulsarnet_*
+docker system prune -f
 docker system events > docker.debug-info & echo $! > docker-log.pid
 docker pull apachepulsar/s3mock:latest
 docker pull alpine/socat:latest