You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/03/28 06:59:30 UTC

[pulsar] branch master updated: [pulsar-client] add Date/Time/Timestamp schema (#3856)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bda6a9c  [pulsar-client] add Date/Time/Timestamp schema (#3856)
bda6a9c is described below

commit bda6a9cc8edb0aef7d969ce618fcfe212bdb425a
Author: wpl <12...@qq.com>
AuthorDate: Thu Mar 28 01:59:25 2019 -0500

    [pulsar-client] add Date/Time/Timestamp schema (#3856)
    
    Fixes #3831
---
 .gitignore                                         |  1 +
 .../java/org/apache/pulsar/client/api/Schema.java  | 18 ++++++
 .../client/internal/DefaultImplementation.java     | 21 +++++++
 .../apache/pulsar/common/schema/SchemaType.java    | 18 ++++++
 pulsar-client/pom.xml                              | 17 ++++++
 .../pulsar/client/impl/schema/DateSchema.java      | 65 ++++++++++++++++++++++
 .../client/impl/schema/FieldSchemaBuilderImpl.java | 11 ++++
 .../pulsar/client/impl/schema/TimeSchema.java      | 65 ++++++++++++++++++++++
 .../pulsar/client/impl/schema/TimestampSchema.java | 65 ++++++++++++++++++++++
 .../pulsar/client/impl/schema/AvroSchemaTest.java  | 45 ++++++++++++++-
 .../client/impl/schema/PrimitiveSchemaTest.java    | 12 ++++
 .../src/test/resources/avro/NasaMission.avsc       | 11 ++++
 12 files changed, 348 insertions(+), 1 deletion(-)

diff --git a/.gitignore b/.gitignore
index f7194ad..1baa025 100644
--- a/.gitignore
+++ b/.gitignore
@@ -82,3 +82,4 @@ docker.debug-info
 # Avro
 examples/flink/src/main/java/org/apache/flink/avro/generated
 pulsar-flink/src/test/java/org/apache/flink/avro/generated
+pulsar-client/src/test/java/org/apache/pulsar/client/avro/generated
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
index d0bff30..18e02bf 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -19,6 +19,9 @@
 package org.apache.pulsar.client.api;
 
 import java.nio.ByteBuffer;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Date;
 
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.GenericSchema;
@@ -159,6 +162,21 @@ public interface Schema<T> {
     Schema<Double> DOUBLE = DefaultImplementation.newDoubleSchema();
 
     /**
+     * Date Schema
+     */
+    Schema<Date> DATE = DefaultImplementation.newDateSchema();
+
+    /**
+     * Time Schema
+     */
+    Schema<Time> TIME = DefaultImplementation.newTimeSchema();
+
+    /**
+     * Timestamp Schema
+     */
+    Schema<Timestamp> TIMESTAMP = DefaultImplementation.newTimestampSchema();
+
+    /**
      * Create a Protobuf schema type by extracting the fields of the specified class.
      *
      * @param clazz the Protobuf generated class to be used to extract the schema
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
index 44dbc10..ccc0a15 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
@@ -27,6 +27,9 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Date;
 import java.util.Map;
 import java.util.function.Supplier;
 
@@ -187,6 +190,24 @@ public class DefaultImplementation {
                         .newInstance());
     }
 
+    public static Schema<Date> newDateSchema() {
+        return catchExceptions(
+                () -> (Schema<Date>) getStaticMethod("org.apache.pulsar.client.impl.schema.DateSchema", "of", null)
+                        .invoke(null, null));
+    }
+
+    public static Schema<Time> newTimeSchema() {
+        return catchExceptions(
+              () -> (Schema<Time>) getStaticMethod("org.apache.pulsar.client.impl.schema.TimeSchema", "of", null)
+                    .invoke(null, null));
+    }
+
+    public static Schema<Timestamp> newTimestampSchema() {
+        return catchExceptions(
+              () -> (Schema<Timestamp>) getStaticMethod("org.apache.pulsar.client.impl.schema.TimestampSchema", "of", null)
+                    .invoke(null, null));
+    }
+
     public static <T> Schema<T> newAvroSchema(SchemaDefinition schemaDefinition) {
         return catchExceptions(
                 () -> (Schema<T>) getStaticMethod("org.apache.pulsar.client.impl.schema.AvroSchema", "of", SchemaDefinition.class)
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
index 42884af..7f0f5c9 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
@@ -74,6 +74,24 @@ public enum SchemaType {
     BYTES,
 
     /**
+     * Date
+     * @since 2.4.0
+     */
+    DATE,
+
+    /**
+     * Time
+     * @since 2.4.0
+     */
+    TIME,
+
+    /**
+     * Timestamp
+     * @since 2.4.0
+     */
+    TIMESTAMP,
+
+    /**
      * JSON object encoding and validation
      */
     JSON,
diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml
index a069189..cf70318 100644
--- a/pulsar-client/pom.xml
+++ b/pulsar-client/pom.xml
@@ -186,6 +186,23 @@
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <groupId>org.apache.avro</groupId>
+        <artifactId>avro-maven-plugin</artifactId>
+        <version>${avro.version}</version>
+        <executions>
+          <execution>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>schema</goal>
+            </goals>
+            <configuration>
+              <testSourceDirectory>${project.basedir}/src/test/resources/avro</testSourceDirectory>
+              <testOutputDirectory>${project.basedir}/src/test/java/</testOutputDirectory>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
 </project>
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java
new file mode 100644
index 0000000..819a7d4
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+import java.util.Date;
+
+/**
+ * A schema for `java.util.Date` or `java.sql.Date`.
+ */
+public class DateSchema implements Schema<Date> {
+   public static DateSchema of() {
+      return INSTANCE;
+   }
+
+   private static final DateSchema INSTANCE = new DateSchema();
+   private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
+         .setName("Date")
+         .setType(SchemaType.DATE)
+         .setSchema(new byte[0]);
+
+   @Override
+   public byte[] encode(Date message) {
+      if (null == message) {
+         return null;
+      }
+
+      Long date = message.getTime();
+      return LongSchema.of().encode(date);
+   }
+
+   @Override
+   public Date decode(byte[] bytes) {
+      if (null == bytes) {
+         return null;
+      }
+
+      Long decode = LongSchema.of().decode(bytes);
+      return new Date(decode);
+   }
+
+   @Override
+   public SchemaInfo getSchemaInfo() {
+      return SCHEMA_INFO;
+   }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FieldSchemaBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FieldSchemaBuilderImpl.java
index 67aca5b..ab6f134 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FieldSchemaBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FieldSchemaBuilderImpl.java
@@ -23,6 +23,7 @@ import static java.util.Objects.requireNonNull;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.avro.JsonProperties;
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.SchemaBuilder;
@@ -121,6 +122,16 @@ class FieldSchemaBuilderImpl implements FieldSchemaBuilder<FieldSchemaBuilderImp
             case BYTES:
                 baseSchema = SchemaBuilder.builder().bytesType();
                 break;
+            // DATE, TIME, TIMESTAMP support from generic record
+            case DATE:
+                baseSchema = LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
+                break;
+            case TIME:
+                baseSchema = LogicalTypes.timeMillis().addToSchema(Schema.create(Schema.Type.INT));
+                break;
+            case TIMESTAMP:
+                baseSchema = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
+                break;
             default:
                 throw new RuntimeException("Schema `" + type + "` is not supported to be used as a field for now");
         }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java
new file mode 100644
index 0000000..212b555
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+import java.sql.Time;
+
+/**
+ * A schema for `java.sql.Time`.
+ */
+public class TimeSchema implements Schema<Time> {
+   public static TimeSchema of() {
+      return INSTANCE;
+   }
+
+   private static final TimeSchema INSTANCE = new TimeSchema();
+   private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
+         .setName("Time")
+         .setType(SchemaType.TIME)
+         .setSchema(new byte[0]);
+
+   @Override
+   public byte[] encode(Time message) {
+      if (null == message) {
+         return null;
+      }
+
+      Long time = message.getTime();
+      return LongSchema.of().encode(time);
+   }
+
+   @Override
+   public Time decode(byte[] bytes) {
+      if (null == bytes) {
+         return null;
+      }
+
+      Long decode = LongSchema.of().decode(bytes);
+      return new Time(decode);
+   }
+
+   @Override
+   public SchemaInfo getSchemaInfo() {
+      return SCHEMA_INFO;
+   }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimestampSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimestampSchema.java
new file mode 100644
index 0000000..de8646f
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimestampSchema.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+import java.sql.Timestamp;
+
+/**
+ * A schema for `java.sql.Timestamp`.
+ */
+public class TimestampSchema implements Schema<Timestamp> {
+   public static TimestampSchema of() {
+      return INSTANCE;
+   }
+
+   private static final TimestampSchema INSTANCE = new TimestampSchema();
+   private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
+         .setName("Timestamp")
+         .setType(SchemaType.TIMESTAMP)
+         .setSchema(new byte[0]);
+
+   @Override
+   public byte[] encode(Timestamp message) {
+      if (null == message) {
+         return null;
+      }
+
+      Long timestamp = message.getTime();
+      return LongSchema.of().encode(timestamp);
+   }
+
+   @Override
+   public Timestamp decode(byte[] bytes) {
+      if (null == bytes) {
+         return null;
+      }
+
+      Long decode = LongSchema.of().decode(bytes);
+      return new Timestamp(decode);
+   }
+
+   @Override
+   public SchemaInfo getSchemaInfo() {
+      return SCHEMA_INFO;
+   }
+}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
index 3cf0e73..6e31dbd 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
@@ -28,6 +28,7 @@ import static org.testng.Assert.fail;
 
 import java.math.BigDecimal;
 import java.util.Arrays;
+import java.util.Date;
 
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
@@ -41,9 +42,12 @@ import org.apache.avro.SchemaValidatorBuilder;
 import org.apache.avro.reflect.AvroDefault;
 import org.apache.avro.reflect.Nullable;
 import org.apache.avro.reflect.ReflectData;
-
+import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
+import org.apache.pulsar.client.api.schema.SchemaBuilder;
+import org.apache.pulsar.client.avro.generated.NasaMission;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
+import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.joda.time.DateTime;
 import org.joda.time.LocalDate;
@@ -259,4 +263,43 @@ public class AvroSchemaTest {
 
     }
 
+  @Test
+  public void testDateAndTimestamp() {
+    RecordSchemaBuilder recordSchemaBuilder =
+        SchemaBuilder.record("org.apache.pulsar.client.avro.generated.NasaMission");
+    recordSchemaBuilder.field("id")
+        .type(SchemaType.INT32);
+    recordSchemaBuilder.field("name")
+        .type(SchemaType.STRING);
+    recordSchemaBuilder.field("create_year")
+        .type(SchemaType.DATE);
+    recordSchemaBuilder.field("create_time")
+        .type(SchemaType.TIME);
+    recordSchemaBuilder.field("create_timestamp")
+        .type(SchemaType.TIMESTAMP);
+    SchemaInfo schemaInfo = recordSchemaBuilder.build(
+        SchemaType.AVRO
+    );
+
+    org.apache.avro.Schema recordSchema = new org.apache.avro.Schema.Parser().parse(
+        new String(schemaInfo.getSchema(), UTF_8)
+    );
+    AvroSchema<NasaMission> avroSchema = AvroSchema.of(SchemaDefinition.<NasaMission>builder().withPojo(NasaMission.class).build());
+    assertEquals(recordSchema, avroSchema.schema);
+
+    NasaMission nasaMission = NasaMission.newBuilder()
+        .setId(1001)
+        .setName("one")
+        .setCreateYear(new LocalDate(new Date().getTime()))
+        .setCreateTime(new LocalTime(new Date().getTime()))
+        .setCreateTimestamp(new DateTime(new Date().getTime()))
+        .build();
+
+    byte[] bytes = avroSchema.encode(nasaMission);
+    Assert.assertTrue(bytes.length > 0);
+
+    NasaMission object = avroSchema.decode(bytes);
+    assertEquals(object, nasaMission);
+  }
+
 }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/PrimitiveSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/PrimitiveSchemaTest.java
index 8326b56..31d1c14 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/PrimitiveSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/PrimitiveSchemaTest.java
@@ -24,6 +24,9 @@ import static org.testng.Assert.assertNull;
 
 import io.netty.buffer.Unpooled;
 import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -53,6 +56,9 @@ public class PrimitiveSchemaTest {
             put(BytesSchema.of(), Arrays.asList("my string".getBytes(UTF_8)));
             put(ByteBufferSchema.of(), Arrays.asList(ByteBuffer.allocate(10).put("my string".getBytes(UTF_8))));
             put(ByteBufSchema.of(), Arrays.asList(Unpooled.wrappedBuffer("my string".getBytes(UTF_8))));
+            put(DateSchema.of(), Arrays.asList(new Date(new java.util.Date().getTime() - 10000), new Date(new java.util.Date().getTime())));
+            put(TimeSchema.of(), Arrays.asList(new Time(new java.util.Date().getTime() - 10000), new Time(new java.util.Date().getTime())));
+            put(TimestampSchema.of(), Arrays.asList(new Timestamp(new java.util.Date().getTime()), new Timestamp(new java.util.Date().getTime())));
         }
     };
 
@@ -68,6 +74,9 @@ public class PrimitiveSchemaTest {
             put(Schema.DOUBLE, Arrays.asList(5678567.12312d, -5678567.12341d));
             put(Schema.BYTES, Arrays.asList("my string".getBytes(UTF_8)));
             put(Schema.BYTEBUFFER, Arrays.asList(ByteBuffer.allocate(10).put("my string".getBytes(UTF_8))));
+            put(Schema.DATE, Arrays.asList(new Date(new java.util.Date().getTime() - 10000), new Date(new java.util.Date().getTime())));
+            put(Schema.TIME, Arrays.asList(new Time(new java.util.Date().getTime() - 10000), new Time(new java.util.Date().getTime())));
+            put(Schema.TIMESTAMP, Arrays.asList(new Timestamp(new java.util.Date().getTime() - 10000), new Timestamp(new java.util.Date().getTime())));
         }
     };
 
@@ -113,6 +122,9 @@ public class PrimitiveSchemaTest {
         assertEquals(SchemaType.BYTES, BytesSchema.of().getSchemaInfo().getType());
         assertEquals(SchemaType.BYTES, ByteBufferSchema.of().getSchemaInfo().getType());
         assertEquals(SchemaType.BYTES, ByteBufSchema.of().getSchemaInfo().getType());
+        assertEquals(SchemaType.DATE, DateSchema.of().getSchemaInfo().getType());
+        assertEquals(SchemaType.TIME, TimeSchema.of().getSchemaInfo().getType());
+        assertEquals(SchemaType.TIMESTAMP, TimestampSchema.of().getSchemaInfo().getType());
     }
 
 
diff --git a/pulsar-client/src/test/resources/avro/NasaMission.avsc b/pulsar-client/src/test/resources/avro/NasaMission.avsc
new file mode 100644
index 0000000..c7620af
--- /dev/null
+++ b/pulsar-client/src/test/resources/avro/NasaMission.avsc
@@ -0,0 +1,11 @@
+{"namespace": "org.apache.pulsar.client.avro.generated",
+ "type": "record",
+ "name": "NasaMission",
+ "fields": [
+     {"name": "id", "type": "int"},
+     {"name": "name", "type": "string"},
+     {"name": "create_year",  "type": { "type": "int", "logicalType": "date" }},
+     {"name": "create_time", "type": { "type": "int", "logicalType": "time-millis"}},
+     {"name": "create_timestamp", "type": { "type": "long", "logicalType": "timestamp-millis" }}
+ ]
+}