You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/18 14:41:44 UTC
[pulsar] branch master updated: [schema] Fix joda dependency issue.
(#4207)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 364ed5e [schema] Fix joda dependency issue. (#4207)
364ed5e is described below
commit 364ed5e91b0170e5f40a36e12be5e5e520377a2f
Author: lipenghui <pe...@apache.org>
AuthorDate: Sat May 18 22:41:38 2019 +0800
[schema] Fix joda dependency issue. (#4207)
* [schema] Fix joda dependency issue.
* [schema] Add binary license of joda-time.
* [schema] Make joda-time dependency optional.
* [schema] Change log level to debug for logical type init.
* [schema] fix joda-time license in presto distribution
* [schema] fix joda-time license in presto distribution
---
distribution/server/src/assemble/LICENSE.bin.txt | 3 +-
pom.xml | 5 ++
pulsar-client/pom.xml | 7 ++-
.../pulsar/client/impl/schema/AvroSchema.java | 43 ++++++++-------
pulsar-sql/presto-distribution/LICENSE | 1 +
tests/integration/pom.xml | 6 +++
.../tests/integration/schema/SchemaTest.java | 61 ++++++++++++++++++++++
.../pulsar/tests/integration/schema/Schemas.java | 30 +++++++++++
8 files changed, 136 insertions(+), 20 deletions(-)
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index af125e2..8b81a87 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -464,7 +464,8 @@ The Apache Software License, Version 2.0
- io.kubernetes-client-java-api-2.0.0.jar
- io.kubernetes-client-java-proto-2.0.0.jar
* Joda Time
- - joda-time-joda-time-2.9.3.jar
+ - joda-time-2.10.1.jar
+ - joda-time-joda-time-2.10.1.jar
* Dropwizard
- io.dropwizard.metrics-metrics-core-3.1.0.jar
- io.dropwizard.metrics-metrics-graphite-3.1.0.jar
diff --git a/pom.xml b/pom.xml
index 2c05b23..5b28f3d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -983,6 +983,11 @@ flexible messaging model and an intuitive client API.</description>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <version>${joda.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml
index 9590906..f790074 100644
--- a/pulsar-client/pom.xml
+++ b/pulsar-client/pom.xml
@@ -116,6 +116,12 @@
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-jsonSchema</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <scope>provided</scope>
+ </dependency>
<!-- httpclient-hostname-verification depends on below dependencies -->
<dependency>
@@ -139,7 +145,6 @@
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
- <version>${joda.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
index efe08d2..14021f2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
@@ -44,27 +44,34 @@ public class AvroSchema<T> extends StructSchema<T> {
// https://issues.apache.org/jira/browse/AVRO-1891 bug address explain
// fix the avro logical type read and write
static {
- ReflectData reflectDataAllowNull = ReflectData.AllowNull.get();
+ try {
+ ReflectData reflectDataAllowNull = ReflectData.AllowNull.get();
- reflectDataAllowNull.addLogicalTypeConversion(new Conversions.DecimalConversion());
- reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.DateConversion());
- reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.LossyTimeMicrosConversion());
- reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.LossyTimestampMicrosConversion());
- reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
- reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
- reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampConversion());
- reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimeConversion());
+ reflectDataAllowNull.addLogicalTypeConversion(new Conversions.DecimalConversion());
+ reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.DateConversion());
+ reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.LossyTimeMicrosConversion());
+ reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.LossyTimestampMicrosConversion());
+ reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
+ reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
+ reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampConversion());
+ reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimeConversion());
- ReflectData reflectDataNotAllowNull = ReflectData.get();
+ ReflectData reflectDataNotAllowNull = ReflectData.get();
- reflectDataNotAllowNull.addLogicalTypeConversion(new Conversions.DecimalConversion());
- reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.DateConversion());
- reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampConversion());
- reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.LossyTimeMicrosConversion());
- reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.LossyTimestampMicrosConversion());
- reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
- reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
- reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimeConversion());
+ reflectDataNotAllowNull.addLogicalTypeConversion(new Conversions.DecimalConversion());
+ reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.DateConversion());
+ reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampConversion());
+ reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.LossyTimeMicrosConversion());
+ reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.LossyTimestampMicrosConversion());
+ reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
+ reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
+ reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimeConversion());
+ } catch (Throwable t) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Avro logical types are not available. If you are going to use avro logical types, " +
+ "you can include `joda-time` in your dependency.");
+ }
+ }
}
private AvroSchema(SchemaInfo schemaInfo) {
diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE
index e96086e..9bf082c 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -254,6 +254,7 @@ The Apache Software License, Version 2.0
- netty-transport-native-unix-common-4.1.31.Final.jar
* Joda Time
- joda-time-2.9.9.jar
+ - joda-time-2.10.1.jar
* Jetty
- http2-client-9.4.11.v20180605.jar
- http2-common-9.4.11.v20180605.jar
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index 14a6352..1029579 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -131,6 +131,12 @@
<artifactId>elasticsearch-rest-high-level-client</artifactId>
</dependency>
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java
index 12b8122..8be7a14 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java
@@ -31,10 +31,17 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.tests.integration.schema.Schemas.Person;
import org.apache.pulsar.tests.integration.schema.Schemas.PersonConsumeSchema;
import org.apache.pulsar.tests.integration.schema.Schemas.Student;
+import org.apache.pulsar.tests.integration.schema.Schemas.AvroLogicalType;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
+import org.joda.time.DateTime;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
+import org.joda.time.chrono.ISOChronology;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import java.math.BigDecimal;
+
/**
* Test Pulsar Schema.
*/
@@ -157,4 +164,58 @@ public class SchemaTest extends PulsarTestSuite {
}
}
+ @Test
+ public void testAvroLogicalType() throws Exception {
+ final String tenant = PUBLIC_TENANT;
+ final String namespace = "test-namespace-" + randomName(16);
+ final String topic = "test-logical-type-schema";
+ final String fqtn = TopicName.get(
+ TopicDomain.persistent.value(),
+ tenant,
+ namespace,
+ topic
+ ).toString();
+
+ admin.namespaces().createNamespace(
+ tenant + "/" + namespace,
+ Sets.newHashSet(pulsarCluster.getClusterName())
+ );
+
+ AvroLogicalType messageForSend = AvroLogicalType.builder()
+ .decimal(new BigDecimal("12.34"))
+ .timestampMicros(System.currentTimeMillis() * 1000)
+ .timestampMillis(new DateTime("2019-03-26T04:39:58.469Z", ISOChronology.getInstanceUTC()))
+ .timeMillis(LocalTime.now())
+ .timeMicros(System.currentTimeMillis() * 1000)
+ .date(LocalDate.now())
+ .build();
+
+ try (Producer<AvroLogicalType> producer = client
+ .newProducer(Schema.AVRO(AvroLogicalType.class))
+ .topic(fqtn)
+ .create()
+ ) {
+ producer.send(messageForSend);
+ log.info("Successfully published avro logical type message : {}", messageForSend);
+ }
+
+ try (Consumer<AvroLogicalType> consumer = client
+ .newConsumer(Schema.AVRO(AvroLogicalType.class))
+ .topic(fqtn)
+ .subscribe()
+ ) {
+ AvroLogicalType received = consumer.receive().getValue();
+ assertEquals(messageForSend.getDecimal(), received.getDecimal());
+ assertEquals(messageForSend.getTimeMicros(), received.getTimeMicros());
+ assertEquals(messageForSend.getTimeMillis(), received.getTimeMillis());
+ assertEquals(messageForSend.getTimestampMicros(), received.getTimestampMicros());
+ assertEquals(messageForSend.getTimestampMillis(), received.getTimestampMillis());
+ assertEquals(messageForSend.getDate(), received.getDate());
+
+ log.info("Successfully consumer avro logical type message : {}", received);
+ }
+
+
+ }
+
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/Schemas.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/Schemas.java
index 2c4af95..25180dd 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/Schemas.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/Schemas.java
@@ -31,12 +31,18 @@
*/
package org.apache.pulsar.tests.integration.schema;
+import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.avro.reflect.AvroDefault;
+import org.joda.time.DateTime;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
+
+import java.math.BigDecimal;
/**
* Keep a list of schemas for testing.
@@ -92,6 +98,30 @@ public final class Schemas {
}
+ @Data
+ @ToString
+ @EqualsAndHashCode
+ @Builder
+ public static class AvroLogicalType{
+ @org.apache.avro.reflect.AvroSchema("{\n" +
+ " \"type\": \"bytes\",\n" +
+ " \"logicalType\": \"decimal\",\n" +
+ " \"precision\": 4,\n" +
+ " \"scale\": 2\n" +
+ "}")
+ BigDecimal decimal;
+ @org.apache.avro.reflect.AvroSchema("{\"type\":\"int\",\"logicalType\":\"date\"}")
+ LocalDate date;
+ @org.apache.avro.reflect.AvroSchema("{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}")
+ DateTime timestampMillis;
+ @org.apache.avro.reflect.AvroSchema("{\"type\":\"int\",\"logicalType\":\"time-millis\"}")
+ LocalTime timeMillis;
+ @org.apache.avro.reflect.AvroSchema("{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}")
+ long timestampMicros;
+ @org.apache.avro.reflect.AvroSchema("{\"type\":\"long\",\"logicalType\":\"time-micros\"}")
+ long timeMicros;
+ }
+
private Schemas() {}
}