You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/03/27 05:15:10 UTC
[pulsar] branch master updated: Logical type use (#3900)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 971c981 Logical type use (#3900)
971c981 is described below
commit 971c98187bd449b0032d621fc59b9643e9c3010f
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Wed Mar 27 13:15:03 2019 +0800
Logical type use (#3900)
## Motivation
To compromise avro‘s bug for avro filed logical type
https://issues.apache.org/jira/browse/AVRO-1891
## Modifications
add some initialize class
## Verifying this change
Add logical type test in AvroSchemaTest
---
pom.xml | 1 +
pulsar-client/pom.xml | 6 +++
.../pulsar/client/impl/schema/AvroSchema.java | 29 ++++++++++++++
.../pulsar/client/impl/schema/AvroSchemaTest.java | 46 ++++++++++++++++++++++
4 files changed, 82 insertions(+)
diff --git a/pom.xml b/pom.xml
index f3731b7..481fdbb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -179,6 +179,7 @@ flexible messaging model and an intuitive client API.</description>
<rabbitmq-client.version>5.1.1</rabbitmq-client.version>
<aws-sdk.version>1.11.297</aws-sdk.version>
<avro.version>1.8.2</avro.version>
+ <joda.version>2.10.1</joda.version>
<jclouds.version>2.1.1</jclouds.version>
<sqlite-jdbc.version>3.8.11.2</sqlite-jdbc.version>
<mysql-jdbc.version>8.0.11</mysql-jdbc.version>
diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml
index 3ad3fb4..a069189 100644
--- a/pulsar-client/pom.xml
+++ b/pulsar-client/pom.xml
@@ -136,6 +136,12 @@
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <version>${joda.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
index a00112c..a2ce0d1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
@@ -19,10 +19,13 @@
package org.apache.pulsar.client.impl.schema;
import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Conversions;
+import org.apache.avro.data.TimeConversions;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.pulsar.client.api.SchemaSerializationException;
@@ -47,6 +50,32 @@ public class AvroSchema<T> extends StructSchema<T> {
private static final ThreadLocal<BinaryDecoder> decoders =
new ThreadLocal<>();
+// the aim to fix avro's bug
+// https://issues.apache.org/jira/browse/AVRO-1891 bug address explain
+// fix the avro logical type read and write
+ static {
+ ReflectData reflectDataAllowNull = ReflectData.AllowNull.get();
+
+ reflectDataAllowNull.addLogicalTypeConversion(new Conversions.DecimalConversion());
+ reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.DateConversion());
+ reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.LossyTimeMicrosConversion());
+ reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.LossyTimestampMicrosConversion());
+ reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
+ reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
+ reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampConversion());
+ reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimeConversion());
+
+ ReflectData reflectDataNotAllowNull = ReflectData.get();
+
+ reflectDataNotAllowNull.addLogicalTypeConversion(new Conversions.DecimalConversion());
+ reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.DateConversion());
+ reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampConversion());
+ reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.LossyTimeMicrosConversion());
+ reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.LossyTimestampMicrosConversion());
+ reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
+ reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
+ reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimeConversion());
+ }
private AvroSchema(org.apache.avro.Schema schema,
SchemaDefinition schemaDefinition) {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
index bbd753b..3cf0e73 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
@@ -26,6 +26,7 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.fail;
+import java.math.BigDecimal;
import java.util.Arrays;
import lombok.Data;
@@ -44,6 +45,10 @@ import org.apache.avro.reflect.ReflectData;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
import org.apache.pulsar.common.schema.SchemaType;
+import org.joda.time.DateTime;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
+import org.joda.time.chrono.ISOChronology;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -67,6 +72,27 @@ public class AvroSchemaTest {
Long field3;
}
+ @Data
+ private static class SchemaLogicalType{
+ @org.apache.avro.reflect.AvroSchema("{\n" +
+ " \"type\": \"bytes\",\n" +
+ " \"logicalType\": \"decimal\",\n" +
+ " \"precision\": 4,\n" +
+ " \"scale\": 2\n" +
+ "}")
+ BigDecimal decimal;
+ @org.apache.avro.reflect.AvroSchema("{\"type\":\"int\",\"logicalType\":\"date\"}")
+ LocalDate date;
+ @org.apache.avro.reflect.AvroSchema("{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}")
+ DateTime timestampMillis;
+ @org.apache.avro.reflect.AvroSchema("{\"type\":\"int\",\"logicalType\":\"time-millis\"}")
+ LocalTime timeMillis;
+ @org.apache.avro.reflect.AvroSchema("{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}")
+ long timestampMicros;
+ @org.apache.avro.reflect.AvroSchema("{\"type\":\"long\",\"logicalType\":\"time-micros\"}")
+ long timeMicros;
+ }
+
@Test
public void testSchemaDefinition() throws SchemaValidationException {
org.apache.avro.Schema schema1 = ReflectData.get().getSchema(DefaultStruct.class);
@@ -212,5 +238,25 @@ public class AvroSchemaTest {
}
+ @Test
+ public void testLogicalType() {
+ AvroSchema<SchemaLogicalType> avroSchema = AvroSchema.of(SchemaDefinition.<SchemaLogicalType>builder().withPojo(SchemaLogicalType.class).build());
+
+ SchemaLogicalType schemaLogicalType = new SchemaLogicalType();
+ schemaLogicalType.setTimestampMicros(System.currentTimeMillis()*1000);
+ schemaLogicalType.setTimestampMillis(new DateTime("2019-03-26T04:39:58.469Z", ISOChronology.getInstanceUTC()));
+ schemaLogicalType.setDecimal(new BigDecimal("12.34"));
+ schemaLogicalType.setDate(LocalDate.now());
+ schemaLogicalType.setTimeMicros(System.currentTimeMillis()*1000);
+ schemaLogicalType.setTimeMillis(LocalTime.now());
+
+ byte[] bytes1 = avroSchema.encode(schemaLogicalType);
+ Assert.assertTrue(bytes1.length > 0);
+
+ SchemaLogicalType object1 = avroSchema.decode(bytes1);
+
+ assertEquals(object1, schemaLogicalType);
+
+ }
}