You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/09/21 00:22:21 UTC
[incubator-pulsar] branch master updated: [schema] Add ByteBuf
schema and fix ByteBuffer schema (#2624)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c1199d0 [schema] Add ByteBuf schema and fix ByteBuffer schema (#2624)
c1199d0 is described below
commit c1199d03ef5ce96451e7d2845d1dc0185d1b5e44
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Thu Sep 20 17:22:16 2018 -0700
[schema] Add ByteBuf schema and fix ByteBuffer schema (#2624)
*Motivation*
ByteBuffer is a variant of `bytes`. so it should be `SchemaType.BYTES`, not `SchemaType.BYTEBUFFER`
*Changes*
- Fix bytebuffer schema type
- Add bytebuf schema
---
.../{ByteBufferSchema.java => ByteBufSchema.java} | 40 ++++++++--------------
.../client/impl/schema/ByteBufferSchema.java | 4 +--
.../pulsar/client/schema/PrimitiveSchemaTest.java | 12 ++++---
.../apache/pulsar/common/schema/SchemaType.java | 5 ---
tests/scripts/pre-integ-tests.sh | 2 +-
5 files changed, 26 insertions(+), 37 deletions(-)
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java
similarity index 61%
copy from pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
copy to pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java
index ee8ba66..4e7e6d0 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java
@@ -18,53 +18,43 @@
*/
package org.apache.pulsar.client.impl.schema;
-import java.nio.ByteBuffer;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
/**
- * A bytebuffer schema.
+ * A variant `Bytes` schema that takes {@link io.netty.buffer.ByteBuf}.
*/
-public class ByteBufferSchema implements Schema<ByteBuffer> {
+public class ByteBufSchema implements Schema<ByteBuf> {
- public static ByteBufferSchema of() {
+ public static ByteBufSchema of() {
return INSTANCE;
}
- private static final ByteBufferSchema INSTANCE = new ByteBufferSchema();
+ private static final ByteBufSchema INSTANCE = new ByteBufSchema();
private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
- .setName("ByteBuffer")
- .setType(SchemaType.BYTEBUFFER)
+ .setName("ByteBuf")
+ .setType(SchemaType.BYTES)
.setSchema(new byte[0]);
@Override
- public byte[] encode(ByteBuffer data) {
- if (data == null) {
+ public byte[] encode(ByteBuf message) {
+ if (message == null) {
return null;
}
- data.rewind();
-
- if (data.hasArray()) {
- byte[] arr = data.array();
- if (data.arrayOffset() == 0 && arr.length == data.remaining()) {
- return arr;
- }
- }
-
- byte[] ret = new byte[data.remaining()];
- data.get(ret, 0, ret.length);
- data.rewind();
- return ret;
+ return ByteBufUtil.getBytes(message);
}
@Override
- public ByteBuffer decode(byte[] data) {
- if (null == data) {
+ public ByteBuf decode(byte[] bytes) {
+ if (null == bytes) {
return null;
} else {
- return ByteBuffer.wrap(data);
+ return Unpooled.wrappedBuffer(bytes);
}
}
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
index ee8ba66..251cd93 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
@@ -24,7 +24,7 @@ import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
/**
- * A bytebuffer schema.
+ * A bytebuffer schema is effectively a `BYTES` schema.
*/
public class ByteBufferSchema implements Schema<ByteBuffer> {
@@ -35,7 +35,7 @@ public class ByteBufferSchema implements Schema<ByteBuffer> {
private static final ByteBufferSchema INSTANCE = new ByteBufferSchema();
private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
.setName("ByteBuffer")
- .setType(SchemaType.BYTEBUFFER)
+ .setType(SchemaType.BYTES)
.setSchema(new byte[0]);
@Override
diff --git a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/PrimitiveSchemaTest.java b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/PrimitiveSchemaTest.java
index fdac539..aac2d8c 100644
--- a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/PrimitiveSchemaTest.java
+++ b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/PrimitiveSchemaTest.java
@@ -18,9 +18,11 @@
*/
package org.apache.pulsar.client.schema;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
+import io.netty.buffer.Unpooled;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
@@ -28,6 +30,7 @@ import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.ByteBufSchema;
import org.apache.pulsar.client.impl.schema.ByteBufferSchema;
import org.apache.pulsar.client.impl.schema.ByteSchema;
import org.apache.pulsar.client.impl.schema.BytesSchema;
@@ -55,8 +58,9 @@ public class PrimitiveSchemaTest {
put(LongSchema.of(), Arrays.asList(922337203685477580L, -922337203685477581L));
put(FloatSchema.of(), Arrays.asList(5678567.12312f, -5678567.12341f));
put(DoubleSchema.of(), Arrays.asList(5678567.12312d, -5678567.12341d));
- put(BytesSchema.of(), Arrays.asList("my string".getBytes()));
- put(ByteBufferSchema.of(), Arrays.asList(ByteBuffer.allocate(10).put("my string".getBytes())));
+ put(BytesSchema.of(), Arrays.asList("my string".getBytes(UTF_8)));
+ put(ByteBufferSchema.of(), Arrays.asList(ByteBuffer.allocate(10).put("my string".getBytes(UTF_8))));
+ put(ByteBufSchema.of(), Arrays.asList(Unpooled.wrappedBuffer("my string".getBytes(UTF_8))));
}
};
@@ -94,8 +98,8 @@ public class PrimitiveSchemaTest {
assertEquals(SchemaType.DOUBLE, DoubleSchema.of().getSchemaInfo().getType());
assertEquals(SchemaType.STRING, StringSchema.utf8().getSchemaInfo().getType());
assertEquals(SchemaType.BYTES, BytesSchema.of().getSchemaInfo().getType());
- assertEquals(SchemaType.BYTEBUFFER, ByteBufferSchema.of().getSchemaInfo().getType());
-
+ assertEquals(SchemaType.BYTES, ByteBufferSchema.of().getSchemaInfo().getType());
+ assertEquals(SchemaType.BYTES, ByteBufSchema.of().getSchemaInfo().getType());
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
index 87c0956..a19d587 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
@@ -68,11 +68,6 @@ public enum SchemaType {
BYTES,
/**
- * A bytebuffer.
- */
- BYTEBUFFER,
-
- /**
* JSON object encoding and validation
*/
JSON,
diff --git a/tests/scripts/pre-integ-tests.sh b/tests/scripts/pre-integ-tests.sh
index ab83c53..1a465db 100755
--- a/tests/scripts/pre-integ-tests.sh
+++ b/tests/scripts/pre-integ-tests.sh
@@ -24,7 +24,7 @@ ulimit -a
pwd
df -h
ps -eo euser,pid,ppid,pgid,start,pcpu,pmem,cmd
-docker network prune -f --filter name=pulsarnet_*
+docker system prune -f
docker system events > docker.debug-info & echo $! > docker-log.pid
docker pull apachepulsar/s3mock:latest
docker pull alpine/socat:latest