You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/18 14:41:44 UTC

[pulsar] branch master updated: [schema] Fix joda dependency issue. (#4207)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 364ed5e  [schema] Fix joda dependency issue. (#4207)
364ed5e is described below

commit 364ed5e91b0170e5f40a36e12be5e5e520377a2f
Author: lipenghui <pe...@apache.org>
AuthorDate: Sat May 18 22:41:38 2019 +0800

    [schema] Fix joda dependency issue. (#4207)
    
    * [schema] Fix joda dependency issue.
    
    * [schema] Add binary license of joda-time.
    
    * [schema] Make joda-time dependency optional.
    
    * [schema] Change log level to debug for logical type init.
    
    * [schema] fix joda-time license in presto distribution
    
    * [schema] fix joda-time license in presto distribution
---
 distribution/server/src/assemble/LICENSE.bin.txt   |  3 +-
 pom.xml                                            |  5 ++
 pulsar-client/pom.xml                              |  7 ++-
 .../pulsar/client/impl/schema/AvroSchema.java      | 43 ++++++++-------
 pulsar-sql/presto-distribution/LICENSE             |  1 +
 tests/integration/pom.xml                          |  6 +++
 .../tests/integration/schema/SchemaTest.java       | 61 ++++++++++++++++++++++
 .../pulsar/tests/integration/schema/Schemas.java   | 30 +++++++++++
 8 files changed, 136 insertions(+), 20 deletions(-)

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index af125e2..8b81a87 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -464,7 +464,8 @@ The Apache Software License, Version 2.0
     - io.kubernetes-client-java-api-2.0.0.jar
     - io.kubernetes-client-java-proto-2.0.0.jar
   * Joda Time
-    - joda-time-joda-time-2.9.3.jar
+    - joda-time-2.10.1.jar
+    - joda-time-joda-time-2.10.1.jar
   * Dropwizard
     - io.dropwizard.metrics-metrics-core-3.1.0.jar
     - io.dropwizard.metrics-metrics-graphite-3.1.0.jar
diff --git a/pom.xml b/pom.xml
index 2c05b23..5b28f3d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -983,6 +983,11 @@ flexible messaging model and an intuitive client API.</description>
         <artifactId>elasticsearch-rest-high-level-client</artifactId>
         <version>${elasticsearch.version}</version>
       </dependency>
+      <dependency>
+        <groupId>joda-time</groupId>
+        <artifactId>joda-time</artifactId>
+        <version>${joda.version}</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 
diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml
index 9590906..f790074 100644
--- a/pulsar-client/pom.xml
+++ b/pulsar-client/pom.xml
@@ -116,6 +116,12 @@
       <groupId>com.fasterxml.jackson.module</groupId>
       <artifactId>jackson-module-jsonSchema</artifactId>
     </dependency>
+
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+      <scope>provided</scope>
+    </dependency>
     
     <!-- httpclient-hostname-verification depends on below dependencies  --> 
     <dependency>
@@ -139,7 +145,6 @@
     <dependency>
       <groupId>joda-time</groupId>
       <artifactId>joda-time</artifactId>
-      <version>${joda.version}</version>
       <scope>test</scope>
     </dependency>
   </dependencies>
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
index efe08d2..14021f2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
@@ -44,27 +44,34 @@ public class AvroSchema<T> extends StructSchema<T> {
 //      https://issues.apache.org/jira/browse/AVRO-1891  bug address explain
 //      fix the avro logical type read and write
     static {
-        ReflectData reflectDataAllowNull = ReflectData.AllowNull.get();
+        try {
+            ReflectData reflectDataAllowNull = ReflectData.AllowNull.get();
 
-        reflectDataAllowNull.addLogicalTypeConversion(new Conversions.DecimalConversion());
-        reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.DateConversion());
-        reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.LossyTimeMicrosConversion());
-        reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.LossyTimestampMicrosConversion());
-        reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
-        reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
-        reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampConversion());
-        reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimeConversion());
+            reflectDataAllowNull.addLogicalTypeConversion(new Conversions.DecimalConversion());
+            reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.DateConversion());
+            reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.LossyTimeMicrosConversion());
+            reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.LossyTimestampMicrosConversion());
+            reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
+            reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
+            reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampConversion());
+            reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimeConversion());
 
-        ReflectData reflectDataNotAllowNull = ReflectData.get();
+            ReflectData reflectDataNotAllowNull = ReflectData.get();
 
-        reflectDataNotAllowNull.addLogicalTypeConversion(new Conversions.DecimalConversion());
-        reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.DateConversion());
-        reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampConversion());
-        reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.LossyTimeMicrosConversion());
-        reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.LossyTimestampMicrosConversion());
-        reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
-        reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
-        reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimeConversion());
+            reflectDataNotAllowNull.addLogicalTypeConversion(new Conversions.DecimalConversion());
+            reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.DateConversion());
+            reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampConversion());
+            reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.LossyTimeMicrosConversion());
+            reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.LossyTimestampMicrosConversion());
+            reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
+            reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
+            reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimeConversion());
+        } catch (Throwable t) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Avro logical types are not available. If you are going to use avro logical types, " +
+                        "you can include `joda-time` in your dependency.");
+            }
+        }
     }
 
     private AvroSchema(SchemaInfo schemaInfo) {
diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE
index e96086e..9bf082c 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -254,6 +254,7 @@ The Apache Software License, Version 2.0
     - netty-transport-native-unix-common-4.1.31.Final.jar
  * Joda Time
     - joda-time-2.9.9.jar
+    - joda-time-2.10.1.jar
  * Jetty
     - http2-client-9.4.11.v20180605.jar
     - http2-common-9.4.11.v20180605.jar
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index 14a6352..1029579 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -131,6 +131,12 @@
   	  <artifactId>elasticsearch-rest-high-level-client</artifactId>
   	</dependency>
 
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
   <build>
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java
index 12b8122..8be7a14 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java
@@ -31,10 +31,17 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.tests.integration.schema.Schemas.Person;
 import org.apache.pulsar.tests.integration.schema.Schemas.PersonConsumeSchema;
 import org.apache.pulsar.tests.integration.schema.Schemas.Student;
+import org.apache.pulsar.tests.integration.schema.Schemas.AvroLogicalType;
 import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
+import org.joda.time.DateTime;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
+import org.joda.time.chrono.ISOChronology;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import java.math.BigDecimal;
+
 /**
  * Test Pulsar Schema.
  */
@@ -157,4 +164,58 @@ public class SchemaTest extends PulsarTestSuite {
         }
     }
 
+    @Test
+    public void testAvroLogicalType() throws Exception {
+        final String tenant = PUBLIC_TENANT;
+        final String namespace = "test-namespace-" + randomName(16);
+        final String topic = "test-logical-type-schema";
+        final String fqtn = TopicName.get(
+                TopicDomain.persistent.value(),
+                tenant,
+                namespace,
+                topic
+        ).toString();
+
+        admin.namespaces().createNamespace(
+                tenant + "/" + namespace,
+                Sets.newHashSet(pulsarCluster.getClusterName())
+        );
+
+        AvroLogicalType messageForSend = AvroLogicalType.builder()
+                .decimal(new BigDecimal("12.34"))
+                .timestampMicros(System.currentTimeMillis() * 1000)
+                .timestampMillis(new DateTime("2019-03-26T04:39:58.469Z", ISOChronology.getInstanceUTC()))
+                .timeMillis(LocalTime.now())
+                .timeMicros(System.currentTimeMillis() * 1000)
+                .date(LocalDate.now())
+                .build();
+
+        try (Producer<AvroLogicalType> producer = client
+                .newProducer(Schema.AVRO(AvroLogicalType.class))
+                .topic(fqtn)
+                .create()
+        ) {
+            producer.send(messageForSend);
+            log.info("Successfully published avro logical type message : {}", messageForSend);
+        }
+
+        try (Consumer<AvroLogicalType> consumer = client
+                .newConsumer(Schema.AVRO(AvroLogicalType.class))
+                .topic(fqtn)
+                .subscribe()
+        ) {
+            AvroLogicalType received = consumer.receive().getValue();
+            assertEquals(messageForSend.getDecimal(), received.getDecimal());
+            assertEquals(messageForSend.getTimeMicros(), received.getTimeMicros());
+            assertEquals(messageForSend.getTimeMillis(), received.getTimeMillis());
+            assertEquals(messageForSend.getTimestampMicros(), received.getTimestampMicros());
+            assertEquals(messageForSend.getTimestampMillis(), received.getTimestampMillis());
+            assertEquals(messageForSend.getDate(), received.getDate());
+
+            log.info("Successfully consumer avro logical type message : {}", received);
+        }
+
+
+    }
+
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/Schemas.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/Schemas.java
index 2c4af95..25180dd 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/Schemas.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/Schemas.java
@@ -31,12 +31,18 @@
  */
 package org.apache.pulsar.tests.integration.schema;
 
+import lombok.Builder;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
 import org.apache.avro.reflect.AvroDefault;
+import org.joda.time.DateTime;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
+
+import java.math.BigDecimal;
 
 /**
  * Keep a list of schemas for testing.
@@ -92,6 +98,30 @@ public final class Schemas {
 
     }
 
+    @Data
+    @ToString
+    @EqualsAndHashCode
+    @Builder
+    public static class AvroLogicalType{
+        @org.apache.avro.reflect.AvroSchema("{\n" +
+                "  \"type\": \"bytes\",\n" +
+                "  \"logicalType\": \"decimal\",\n" +
+                "  \"precision\": 4,\n" +
+                "  \"scale\": 2\n" +
+                "}")
+        BigDecimal decimal;
+        @org.apache.avro.reflect.AvroSchema("{\"type\":\"int\",\"logicalType\":\"date\"}")
+        LocalDate date;
+        @org.apache.avro.reflect.AvroSchema("{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}")
+        DateTime timestampMillis;
+        @org.apache.avro.reflect.AvroSchema("{\"type\":\"int\",\"logicalType\":\"time-millis\"}")
+        LocalTime timeMillis;
+        @org.apache.avro.reflect.AvroSchema("{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}")
+        long timestampMicros;
+        @org.apache.avro.reflect.AvroSchema("{\"type\":\"long\",\"logicalType\":\"time-micros\"}")
+        long timeMicros;
+    }
+
     private Schemas() {}
 
 }