You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/03/28 06:59:30 UTC
[pulsar] branch master updated: [pulsar-client] add
Date/Time/Timestamp schema (#3856)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bda6a9c [pulsar-client] add Date/Time/Timestamp schema (#3856)
bda6a9c is described below
commit bda6a9cc8edb0aef7d969ce618fcfe212bdb425a
Author: wpl <12...@qq.com>
AuthorDate: Thu Mar 28 01:59:25 2019 -0500
[pulsar-client] add Date/Time/Timestamp schema (#3856)
Fixes #3831
---
.gitignore | 1 +
.../java/org/apache/pulsar/client/api/Schema.java | 18 ++++++
.../client/internal/DefaultImplementation.java | 21 +++++++
.../apache/pulsar/common/schema/SchemaType.java | 18 ++++++
pulsar-client/pom.xml | 17 ++++++
.../pulsar/client/impl/schema/DateSchema.java | 65 ++++++++++++++++++++++
.../client/impl/schema/FieldSchemaBuilderImpl.java | 11 ++++
.../pulsar/client/impl/schema/TimeSchema.java | 65 ++++++++++++++++++++++
.../pulsar/client/impl/schema/TimestampSchema.java | 65 ++++++++++++++++++++++
.../pulsar/client/impl/schema/AvroSchemaTest.java | 45 ++++++++++++++-
.../client/impl/schema/PrimitiveSchemaTest.java | 12 ++++
.../src/test/resources/avro/NasaMission.avsc | 11 ++++
12 files changed, 348 insertions(+), 1 deletion(-)
diff --git a/.gitignore b/.gitignore
index f7194ad..1baa025 100644
--- a/.gitignore
+++ b/.gitignore
@@ -82,3 +82,4 @@ docker.debug-info
# Avro
examples/flink/src/main/java/org/apache/flink/avro/generated
pulsar-flink/src/test/java/org/apache/flink/avro/generated
+pulsar-client/src/test/java/org/apache/pulsar/client/avro/generated
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
index d0bff30..18e02bf 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -19,6 +19,9 @@
package org.apache.pulsar.client.api;
import java.nio.ByteBuffer;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Date;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
@@ -159,6 +162,21 @@ public interface Schema<T> {
Schema<Double> DOUBLE = DefaultImplementation.newDoubleSchema();
/**
+ * Date Schema
+ */
+ Schema<Date> DATE = DefaultImplementation.newDateSchema();
+
+ /**
+ * Time Schema
+ */
+ Schema<Time> TIME = DefaultImplementation.newTimeSchema();
+
+ /**
+ * Timestamp Schema
+ */
+ Schema<Timestamp> TIMESTAMP = DefaultImplementation.newTimestampSchema();
+
+ /**
* Create a Protobuf schema type by extracting the fields of the specified class.
*
* @param clazz the Protobuf generated class to be used to extract the schema
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
index 44dbc10..ccc0a15 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
@@ -27,6 +27,9 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Date;
import java.util.Map;
import java.util.function.Supplier;
@@ -187,6 +190,24 @@ public class DefaultImplementation {
.newInstance());
}
+ public static Schema<Date> newDateSchema() {
+ return catchExceptions(
+ () -> (Schema<Date>) getStaticMethod("org.apache.pulsar.client.impl.schema.DateSchema", "of", null)
+ .invoke(null, null));
+ }
+
+ public static Schema<Time> newTimeSchema() {
+ return catchExceptions(
+ () -> (Schema<Time>) getStaticMethod("org.apache.pulsar.client.impl.schema.TimeSchema", "of", null)
+ .invoke(null, null));
+ }
+
+ public static Schema<Timestamp> newTimestampSchema() {
+ return catchExceptions(
+ () -> (Schema<Timestamp>) getStaticMethod("org.apache.pulsar.client.impl.schema.TimestampSchema", "of", null)
+ .invoke(null, null));
+ }
+
public static <T> Schema<T> newAvroSchema(SchemaDefinition schemaDefinition) {
return catchExceptions(
() -> (Schema<T>) getStaticMethod("org.apache.pulsar.client.impl.schema.AvroSchema", "of", SchemaDefinition.class)
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
index 42884af..7f0f5c9 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
@@ -74,6 +74,24 @@ public enum SchemaType {
BYTES,
/**
+ * Date
+ * @since 2.4.0
+ */
+ DATE,
+
+ /**
+ * Time
+ * @since 2.4.0
+ */
+ TIME,
+
+ /**
+ * Timestamp
+ * @since 2.4.0
+ */
+ TIMESTAMP,
+
+ /**
* JSON object encoding and validation
*/
JSON,
diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml
index a069189..cf70318 100644
--- a/pulsar-client/pom.xml
+++ b/pulsar-client/pom.xml
@@ -186,6 +186,23 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-maven-plugin</artifactId>
+ <version>${avro.version}</version>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>schema</goal>
+ </goals>
+ <configuration>
+ <testSourceDirectory>${project.basedir}/src/test/resources/avro</testSourceDirectory>
+ <testOutputDirectory>${project.basedir}/src/test/java/</testOutputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java
new file mode 100644
index 0000000..819a7d4
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+import java.util.Date;
+
+/**
+ * A schema for `java.util.Date` or `java.sql.Date`.
+ */
+public class DateSchema implements Schema<Date> {
+ public static DateSchema of() {
+ return INSTANCE;
+ }
+
+ private static final DateSchema INSTANCE = new DateSchema();
+ private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
+ .setName("Date")
+ .setType(SchemaType.DATE)
+ .setSchema(new byte[0]);
+
+ @Override
+ public byte[] encode(Date message) {
+ if (null == message) {
+ return null;
+ }
+
+ Long date = message.getTime();
+ return LongSchema.of().encode(date);
+ }
+
+ @Override
+ public Date decode(byte[] bytes) {
+ if (null == bytes) {
+ return null;
+ }
+
+ Long decode = LongSchema.of().decode(bytes);
+ return new Date(decode);
+ }
+
+ @Override
+ public SchemaInfo getSchemaInfo() {
+ return SCHEMA_INFO;
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FieldSchemaBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FieldSchemaBuilderImpl.java
index 67aca5b..ab6f134 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FieldSchemaBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FieldSchemaBuilderImpl.java
@@ -23,6 +23,7 @@ import static java.util.Objects.requireNonNull;
import java.util.HashMap;
import java.util.Map;
import org.apache.avro.JsonProperties;
+import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.SchemaBuilder;
@@ -121,6 +122,16 @@ class FieldSchemaBuilderImpl implements FieldSchemaBuilder<FieldSchemaBuilderImp
case BYTES:
baseSchema = SchemaBuilder.builder().bytesType();
break;
+ // DATE, TIME, TIMESTAMP support from generic record
+ case DATE:
+ baseSchema = LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
+ break;
+ case TIME:
+ baseSchema = LogicalTypes.timeMillis().addToSchema(Schema.create(Schema.Type.INT));
+ break;
+ case TIMESTAMP:
+ baseSchema = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
+ break;
default:
throw new RuntimeException("Schema `" + type + "` is not supported to be used as a field for now");
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java
new file mode 100644
index 0000000..212b555
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+import java.sql.Time;
+
+/**
+ * A schema for `java.sql.Time`.
+ */
+public class TimeSchema implements Schema<Time> {
+ public static TimeSchema of() {
+ return INSTANCE;
+ }
+
+ private static final TimeSchema INSTANCE = new TimeSchema();
+ private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
+ .setName("Time")
+ .setType(SchemaType.TIME)
+ .setSchema(new byte[0]);
+
+ @Override
+ public byte[] encode(Time message) {
+ if (null == message) {
+ return null;
+ }
+
+ Long time = message.getTime();
+ return LongSchema.of().encode(time);
+ }
+
+ @Override
+ public Time decode(byte[] bytes) {
+ if (null == bytes) {
+ return null;
+ }
+
+ Long decode = LongSchema.of().decode(bytes);
+ return new Time(decode);
+ }
+
+ @Override
+ public SchemaInfo getSchemaInfo() {
+ return SCHEMA_INFO;
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimestampSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimestampSchema.java
new file mode 100644
index 0000000..de8646f
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimestampSchema.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+import java.sql.Timestamp;
+
+/**
+ * A schema for `java.sql.Timestamp`.
+ */
+public class TimestampSchema implements Schema<Timestamp> {
+ public static TimestampSchema of() {
+ return INSTANCE;
+ }
+
+ private static final TimestampSchema INSTANCE = new TimestampSchema();
+ private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
+ .setName("Timestamp")
+ .setType(SchemaType.TIMESTAMP)
+ .setSchema(new byte[0]);
+
+ @Override
+ public byte[] encode(Timestamp message) {
+ if (null == message) {
+ return null;
+ }
+
+ Long timestamp = message.getTime();
+ return LongSchema.of().encode(timestamp);
+ }
+
+ @Override
+ public Timestamp decode(byte[] bytes) {
+ if (null == bytes) {
+ return null;
+ }
+
+ Long decode = LongSchema.of().decode(bytes);
+ return new Timestamp(decode);
+ }
+
+ @Override
+ public SchemaInfo getSchemaInfo() {
+ return SCHEMA_INFO;
+ }
+}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
index 3cf0e73..6e31dbd 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
@@ -28,6 +28,7 @@ import static org.testng.Assert.fail;
import java.math.BigDecimal;
import java.util.Arrays;
+import java.util.Date;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@@ -41,9 +42,12 @@ import org.apache.avro.SchemaValidatorBuilder;
import org.apache.avro.reflect.AvroDefault;
import org.apache.avro.reflect.Nullable;
import org.apache.avro.reflect.ReflectData;
-
+import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
+import org.apache.pulsar.client.api.schema.SchemaBuilder;
+import org.apache.pulsar.client.avro.generated.NasaMission;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
+import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.joda.time.DateTime;
import org.joda.time.LocalDate;
@@ -259,4 +263,43 @@ public class AvroSchemaTest {
}
+ @Test
+ public void testDateAndTimestamp() {
+ RecordSchemaBuilder recordSchemaBuilder =
+ SchemaBuilder.record("org.apache.pulsar.client.avro.generated.NasaMission");
+ recordSchemaBuilder.field("id")
+ .type(SchemaType.INT32);
+ recordSchemaBuilder.field("name")
+ .type(SchemaType.STRING);
+ recordSchemaBuilder.field("create_year")
+ .type(SchemaType.DATE);
+ recordSchemaBuilder.field("create_time")
+ .type(SchemaType.TIME);
+ recordSchemaBuilder.field("create_timestamp")
+ .type(SchemaType.TIMESTAMP);
+ SchemaInfo schemaInfo = recordSchemaBuilder.build(
+ SchemaType.AVRO
+ );
+
+ org.apache.avro.Schema recordSchema = new org.apache.avro.Schema.Parser().parse(
+ new String(schemaInfo.getSchema(), UTF_8)
+ );
+ AvroSchema<NasaMission> avroSchema = AvroSchema.of(SchemaDefinition.<NasaMission>builder().withPojo(NasaMission.class).build());
+ assertEquals(recordSchema, avroSchema.schema);
+
+ NasaMission nasaMission = NasaMission.newBuilder()
+ .setId(1001)
+ .setName("one")
+ .setCreateYear(new LocalDate(new Date().getTime()))
+ .setCreateTime(new LocalTime(new Date().getTime()))
+ .setCreateTimestamp(new DateTime(new Date().getTime()))
+ .build();
+
+ byte[] bytes = avroSchema.encode(nasaMission);
+ Assert.assertTrue(bytes.length > 0);
+
+ NasaMission object = avroSchema.decode(bytes);
+ assertEquals(object, nasaMission);
+ }
+
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/PrimitiveSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/PrimitiveSchemaTest.java
index 8326b56..31d1c14 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/PrimitiveSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/PrimitiveSchemaTest.java
@@ -24,6 +24,9 @@ import static org.testng.Assert.assertNull;
import io.netty.buffer.Unpooled;
import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -53,6 +56,9 @@ public class PrimitiveSchemaTest {
put(BytesSchema.of(), Arrays.asList("my string".getBytes(UTF_8)));
put(ByteBufferSchema.of(), Arrays.asList(ByteBuffer.allocate(10).put("my string".getBytes(UTF_8))));
put(ByteBufSchema.of(), Arrays.asList(Unpooled.wrappedBuffer("my string".getBytes(UTF_8))));
+ put(DateSchema.of(), Arrays.asList(new Date(new java.util.Date().getTime() - 10000), new Date(new java.util.Date().getTime())));
+ put(TimeSchema.of(), Arrays.asList(new Time(new java.util.Date().getTime() - 10000), new Time(new java.util.Date().getTime())));
+ put(TimestampSchema.of(), Arrays.asList(new Timestamp(new java.util.Date().getTime()), new Timestamp(new java.util.Date().getTime())));
}
};
@@ -68,6 +74,9 @@ public class PrimitiveSchemaTest {
put(Schema.DOUBLE, Arrays.asList(5678567.12312d, -5678567.12341d));
put(Schema.BYTES, Arrays.asList("my string".getBytes(UTF_8)));
put(Schema.BYTEBUFFER, Arrays.asList(ByteBuffer.allocate(10).put("my string".getBytes(UTF_8))));
+ put(Schema.DATE, Arrays.asList(new Date(new java.util.Date().getTime() - 10000), new Date(new java.util.Date().getTime())));
+ put(Schema.TIME, Arrays.asList(new Time(new java.util.Date().getTime() - 10000), new Time(new java.util.Date().getTime())));
+ put(Schema.TIMESTAMP, Arrays.asList(new Timestamp(new java.util.Date().getTime() - 10000), new Timestamp(new java.util.Date().getTime())));
}
};
@@ -113,6 +122,9 @@ public class PrimitiveSchemaTest {
assertEquals(SchemaType.BYTES, BytesSchema.of().getSchemaInfo().getType());
assertEquals(SchemaType.BYTES, ByteBufferSchema.of().getSchemaInfo().getType());
assertEquals(SchemaType.BYTES, ByteBufSchema.of().getSchemaInfo().getType());
+ assertEquals(SchemaType.DATE, DateSchema.of().getSchemaInfo().getType());
+ assertEquals(SchemaType.TIME, TimeSchema.of().getSchemaInfo().getType());
+ assertEquals(SchemaType.TIMESTAMP, TimestampSchema.of().getSchemaInfo().getType());
}
diff --git a/pulsar-client/src/test/resources/avro/NasaMission.avsc b/pulsar-client/src/test/resources/avro/NasaMission.avsc
new file mode 100644
index 0000000..c7620af
--- /dev/null
+++ b/pulsar-client/src/test/resources/avro/NasaMission.avsc
@@ -0,0 +1,11 @@
+{"namespace": "org.apache.pulsar.client.avro.generated",
+ "type": "record",
+ "name": "NasaMission",
+ "fields": [
+ {"name": "id", "type": "int"},
+ {"name": "name", "type": "string"},
+ {"name": "create_year", "type": { "type": "int", "logicalType": "date" }},
+ {"name": "create_time", "type": { "type": "int", "logicalType": "time-millis"}},
+ {"name": "create_timestamp", "type": { "type": "long", "logicalType": "timestamp-millis" }}
+ ]
+}