You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/03/27 05:15:10 UTC

[pulsar] branch master updated: Logical type use (#3900)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 971c981  Logical type use (#3900)
971c981 is described below

commit 971c98187bd449b0032d621fc59b9643e9c3010f
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Wed Mar 27 13:15:03 2019 +0800

    Logical type use (#3900)
    
    ## Motivation
    
    To compromise avro‘s bug for avro filed logical type
    https://issues.apache.org/jira/browse/AVRO-1891
    
    ## Modifications
    add some initialize class
    
    ## Verifying this change
    Add logical type test in AvroSchemaTest
---
 pom.xml                                            |  1 +
 pulsar-client/pom.xml                              |  6 +++
 .../pulsar/client/impl/schema/AvroSchema.java      | 29 ++++++++++++++
 .../pulsar/client/impl/schema/AvroSchemaTest.java  | 46 ++++++++++++++++++++++
 4 files changed, 82 insertions(+)

diff --git a/pom.xml b/pom.xml
index f3731b7..481fdbb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -179,6 +179,7 @@ flexible messaging model and an intuitive client API.</description>
     <rabbitmq-client.version>5.1.1</rabbitmq-client.version>
     <aws-sdk.version>1.11.297</aws-sdk.version>
     <avro.version>1.8.2</avro.version>
+    <joda.version>2.10.1</joda.version>
     <jclouds.version>2.1.1</jclouds.version>
     <sqlite-jdbc.version>3.8.11.2</sqlite-jdbc.version>
     <mysql-jdbc.version>8.0.11</mysql-jdbc.version>
diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml
index 3ad3fb4..a069189 100644
--- a/pulsar-client/pom.xml
+++ b/pulsar-client/pom.xml
@@ -136,6 +136,12 @@
       <version>${project.parent.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+      <version>${joda.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
index a00112c..a2ce0d1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
@@ -19,10 +19,13 @@
 package org.apache.pulsar.client.impl.schema;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Conversions;
+import org.apache.avro.data.TimeConversions;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.pulsar.client.api.SchemaSerializationException;
@@ -47,6 +50,32 @@ public class AvroSchema<T> extends StructSchema<T> {
 
     private static final ThreadLocal<BinaryDecoder> decoders =
             new ThreadLocal<>();
+//      the aim to fix avro's bug
+//      https://issues.apache.org/jira/browse/AVRO-1891  bug address explain
+//      fix the avro logical type read and write
+    static {
+        ReflectData reflectDataAllowNull = ReflectData.AllowNull.get();
+
+        reflectDataAllowNull.addLogicalTypeConversion(new Conversions.DecimalConversion());
+        reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.DateConversion());
+        reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.LossyTimeMicrosConversion());
+        reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.LossyTimestampMicrosConversion());
+        reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
+        reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
+        reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampConversion());
+        reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimeConversion());
+
+        ReflectData reflectDataNotAllowNull = ReflectData.get();
+
+        reflectDataNotAllowNull.addLogicalTypeConversion(new Conversions.DecimalConversion());
+        reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.DateConversion());
+        reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampConversion());
+        reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.LossyTimeMicrosConversion());
+        reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.LossyTimestampMicrosConversion());
+        reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
+        reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
+        reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimeConversion());
+    }
 
     private AvroSchema(org.apache.avro.Schema schema,
                        SchemaDefinition schemaDefinition) {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
index bbd753b..3cf0e73 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
@@ -26,6 +26,7 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.fail;
 
+import java.math.BigDecimal;
 import java.util.Arrays;
 
 import lombok.Data;
@@ -44,6 +45,10 @@ import org.apache.avro.reflect.ReflectData;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
 import org.apache.pulsar.common.schema.SchemaType;
+import org.joda.time.DateTime;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
+import org.joda.time.chrono.ISOChronology;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -67,6 +72,27 @@ public class AvroSchemaTest {
         Long field3;
     }
 
+    @Data
+    private static class SchemaLogicalType{
+        @org.apache.avro.reflect.AvroSchema("{\n" +
+                "  \"type\": \"bytes\",\n" +
+                "  \"logicalType\": \"decimal\",\n" +
+                "  \"precision\": 4,\n" +
+                "  \"scale\": 2\n" +
+                "}")
+        BigDecimal decimal;
+        @org.apache.avro.reflect.AvroSchema("{\"type\":\"int\",\"logicalType\":\"date\"}")
+        LocalDate date;
+        @org.apache.avro.reflect.AvroSchema("{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}")
+        DateTime timestampMillis;
+        @org.apache.avro.reflect.AvroSchema("{\"type\":\"int\",\"logicalType\":\"time-millis\"}")
+        LocalTime timeMillis;
+        @org.apache.avro.reflect.AvroSchema("{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}")
+        long timestampMicros;
+        @org.apache.avro.reflect.AvroSchema("{\"type\":\"long\",\"logicalType\":\"time-micros\"}")
+        long timeMicros;
+    }
+
     @Test
     public void testSchemaDefinition() throws SchemaValidationException {
         org.apache.avro.Schema schema1 = ReflectData.get().getSchema(DefaultStruct.class);
@@ -212,5 +238,25 @@ public class AvroSchemaTest {
 
     }
 
+    @Test
+    public void testLogicalType() {
+        AvroSchema<SchemaLogicalType> avroSchema = AvroSchema.of(SchemaDefinition.<SchemaLogicalType>builder().withPojo(SchemaLogicalType.class).build());
+
+        SchemaLogicalType schemaLogicalType = new SchemaLogicalType();
+        schemaLogicalType.setTimestampMicros(System.currentTimeMillis()*1000);
+        schemaLogicalType.setTimestampMillis(new DateTime("2019-03-26T04:39:58.469Z", ISOChronology.getInstanceUTC()));
+        schemaLogicalType.setDecimal(new BigDecimal("12.34"));
+        schemaLogicalType.setDate(LocalDate.now());
+        schemaLogicalType.setTimeMicros(System.currentTimeMillis()*1000);
+        schemaLogicalType.setTimeMillis(LocalTime.now());
+
+        byte[] bytes1 = avroSchema.encode(schemaLogicalType);
+        Assert.assertTrue(bytes1.length > 0);
+
+        SchemaLogicalType object1 = avroSchema.decode(bytes1);
+
+        assertEquals(object1, schemaLogicalType);
+
+    }
 
 }