You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2022/06/01 15:59:37 UTC

[pulsar] branch master updated: [improve][connector] JDBC sinks: support Avro specific datatypes (#15845)

This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3530349a47b [improve][connector] JDBC sinks: support Avro specific datatypes (#15845)
3530349a47b is described below

commit 3530349a47b2ce7255778503a3db01391ef6387e
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Wed Jun 1 17:59:29 2022 +0200

    [improve][connector] JDBC sinks: support Avro specific datatypes (#15845)
---
 .../pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java     |  40 +++++-
 .../pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java | 140 +++++++++++++++++++++
 .../apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java  |  12 +-
 3 files changed, 187 insertions(+), 5 deletions(-)

diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
index a4a746ebd89..e5efb15362a 100644
--- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
@@ -20,6 +20,7 @@
 package org.apache.pulsar.io.jdbc;
 
 import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import java.sql.PreparedStatement;
 import java.util.HashMap;
@@ -135,7 +136,7 @@ public abstract class BaseJdbcAutoSchemaSink extends JdbcAbstractSink<GenericObj
         }
     }
 
-    private Object getValueFromJsonNode(final JsonNode fn) {
+    private static Object getValueFromJsonNode(final JsonNode fn) {
         if (fn == null || fn.isNull()) {
             return null;
         }
@@ -159,7 +160,7 @@ public abstract class BaseJdbcAutoSchemaSink extends JdbcAbstractSink<GenericObj
         }
     }
 
-    private void fillKeyValueSchemaData(org.apache.pulsar.client.api.Schema<GenericObject> schema,
+    private static void fillKeyValueSchemaData(org.apache.pulsar.client.api.Schema<GenericObject> schema,
                                         GenericObject record,
                                         Map<String, Object> data) {
         switch (schema.getSchemaInfo().getType()) {
@@ -176,7 +177,8 @@ public abstract class BaseJdbcAutoSchemaSink extends JdbcAbstractSink<GenericObj
                 org.apache.avro.generic.GenericRecord avroNode =
                         (org.apache.avro.generic.GenericRecord) record.getNativeObject();
                 for (Schema.Field field : avroNode.getSchema().getFields()) {
-                    data.put(field.name(), avroNode.get(field.name()));
+                    final String fieldName = field.name();
+                    data.put(fieldName, convertAvroField(avroNode.get(fieldName), field.schema()));
                 }
                 break;
             default:
@@ -185,5 +187,37 @@ public abstract class BaseJdbcAutoSchemaSink extends JdbcAbstractSink<GenericObj
                         + " with KeyValueSchema");
         }
     }
+
+    @VisibleForTesting
+    static Object convertAvroField(Object avroValue, Schema schema) {
+        switch (schema.getType()) {
+            case NULL:
+            case INT:
+            case LONG:
+            case DOUBLE:
+            case FLOAT:
+            case BOOLEAN:
+                return avroValue;
+            case ENUM:
+            case STRING:
+                return avroValue.toString(); // can be a String or org.apache.avro.util.Utf8
+            case UNION:
+                for (Schema s : schema.getTypes()) {
+                    if (s.getType() == Schema.Type.NULL) {
+                        continue;
+                    }
+                    return convertAvroField(avroValue, s);
+                }
+                throw new IllegalArgumentException("Found UNION schema but it doesn't contain any type");
+            case ARRAY:
+            case BYTES:
+            case FIXED:
+            case RECORD:
+            case MAP:
+            default:
+                throw new UnsupportedOperationException("Unsupported avro schema type=" + schema.getType()
+                        + " for value field schema " + schema.getName());
+        }
+    }
 }
 
diff --git a/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java b/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
new file mode 100644
index 00000000000..de31466466d
--- /dev/null
+++ b/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import java.util.function.Function;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.util.Utf8;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class BaseJdbcAutoSchemaSinkTest {
+
+    @Test
+    public void testConvertAvroString() {
+        Object converted = BaseJdbcAutoSchemaSink.convertAvroField("mystring", createFieldAndGetSchema((builder) ->
+                builder.name("field").type().stringType().noDefault()));
+        Assert.assertEquals(converted, "mystring");
+
+        converted = BaseJdbcAutoSchemaSink.convertAvroField(new Utf8("mystring"), createFieldAndGetSchema((builder) ->
+                builder.name("field").type().stringType().noDefault()));
+        Assert.assertEquals(converted, "mystring");
+
+    }
+
+    @Test
+    public void testConvertAvroInt() {
+        Object converted = BaseJdbcAutoSchemaSink.convertAvroField(Integer.MIN_VALUE, createFieldAndGetSchema((builder) ->
+                builder.name("field").type().intType().noDefault()));
+        Assert.assertEquals(converted, Integer.MIN_VALUE);
+    }
+
+    @Test
+    public void testConvertAvroLong() {
+        Object converted = BaseJdbcAutoSchemaSink.convertAvroField(Long.MIN_VALUE, createFieldAndGetSchema((builder) ->
+                builder.name("field").type().longType().noDefault()));
+        Assert.assertEquals(converted, Long.MIN_VALUE);
+    }
+
+    @Test
+    public void testConvertAvroBoolean() {
+        Object converted = BaseJdbcAutoSchemaSink.convertAvroField(true, createFieldAndGetSchema((builder) ->
+                builder.name("field").type().booleanType().noDefault()));
+        Assert.assertEquals(converted, true);
+    }
+
+    @Test
+    public void testConvertAvroEnum() {
+        Object converted = BaseJdbcAutoSchemaSink.convertAvroField("e1", createFieldAndGetSchema((builder) ->
+                builder.name("field").type().enumeration("myenum").symbols("e1", "e2").noDefault()));
+        Assert.assertEquals(converted, "e1");
+    }
+
+    @Test
+    public void testConvertAvroFloat() {
+        Object converted = BaseJdbcAutoSchemaSink.convertAvroField(Float.MIN_VALUE, createFieldAndGetSchema((builder) ->
+                builder.name("field").type().floatType().noDefault()));
+        Assert.assertEquals(converted, Float.MIN_VALUE);
+    }
+
+    @Test
+    public void testConvertAvroDouble() {
+        Object converted = BaseJdbcAutoSchemaSink.convertAvroField(Double.MIN_VALUE, createFieldAndGetSchema((builder) ->
+                builder.name("field").type().doubleType().noDefault()));
+        Assert.assertEquals(converted, Double.MIN_VALUE);
+    }
+
+
+    @Test
+    public void testConvertAvroUnion() {
+        Object converted = BaseJdbcAutoSchemaSink.convertAvroField(Integer.MAX_VALUE, createFieldAndGetSchema((builder) ->
+                builder.name("field").type().unionOf().intType().endUnion().noDefault()));
+        Assert.assertEquals(converted, Integer.MAX_VALUE);
+    }
+
+    @Test(expectedExceptions = UnsupportedOperationException.class,
+            expectedExceptionsMessageRegExp = "Unsupported avro schema type.*")
+    public void testNotSupportedAvroTypesBytes() {
+        BaseJdbcAutoSchemaSink.convertAvroField(null, createFieldAndGetSchema((builder) ->
+                builder.name("field").type().bytesType().noDefault()));
+    }
+
+    @Test(expectedExceptions = UnsupportedOperationException.class,
+            expectedExceptionsMessageRegExp = "Unsupported avro schema type.*")
+    public void testNotSupportedAvroTypesFixed() {
+        BaseJdbcAutoSchemaSink.convertAvroField(null, createFieldAndGetSchema((builder) ->
+                builder.name("field").type().fixed("fix").size(16).noDefault()));
+    }
+    @Test(expectedExceptions = UnsupportedOperationException.class,
+            expectedExceptionsMessageRegExp = "Unsupported avro schema type.*")
+    public void testNotSupportedAvroTypesRecord() {
+        BaseJdbcAutoSchemaSink.convertAvroField(null, createFieldAndGetSchema((builder) ->
+                builder.name("field").type()
+                        .record("myrecord").fields()
+                        .name("f1").type().intType().noDefault()
+                        .endRecord().noDefault()));
+    }
+
+    @Test(expectedExceptions = UnsupportedOperationException.class,
+            expectedExceptionsMessageRegExp = "Unsupported avro schema type.*")
+    public void testNotSupportedAvroTypesMap() {
+        BaseJdbcAutoSchemaSink.convertAvroField(null, createFieldAndGetSchema((builder) ->
+                builder.name("field").type().map().values().stringType().noDefault()));
+    }
+
+
+    @Test(expectedExceptions = UnsupportedOperationException.class,
+            expectedExceptionsMessageRegExp = "Unsupported avro schema type.*")
+    public void testNotSupportedAvroTypesArray() {
+        BaseJdbcAutoSchemaSink.convertAvroField(null, createFieldAndGetSchema((builder) ->
+                builder.name("field").type().array().items().stringType().noDefault()));
+    }
+
+
+    private Schema createFieldAndGetSchema(Function<SchemaBuilder.FieldAssembler<Schema>,
+            SchemaBuilder.FieldAssembler<Schema>> consumer) {
+        final SchemaBuilder.FieldAssembler<Schema> record = SchemaBuilder.record("record")
+                .fields();
+        return consumer.apply(record).endRecord().getFields().get(0).schema();
+    }
+
+
+}
\ No newline at end of file
diff --git a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
index d92d437f664..1ce3bc5b353 100644
--- a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
+++ b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.util.Utf8;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.GenericObject;
@@ -425,6 +426,7 @@ public class SqliteJdbcSinkTest {
 
         RecordSchemaBuilder valueSchemaBuilder = org.apache.pulsar.client.api.schema.SchemaBuilder.record("value");
         valueSchemaBuilder.field("string").type(SchemaType.STRING).optional().defaultValue(null);
+        valueSchemaBuilder.field("stringutf8").type(SchemaType.STRING).optional().defaultValue(null);
         valueSchemaBuilder.field("int").type(SchemaType.INT32).optional().defaultValue(null);
         valueSchemaBuilder.field("bool").type(SchemaType.BOOLEAN).optional().defaultValue(null);
         valueSchemaBuilder.field("double").type(SchemaType.DOUBLE).optional().defaultValue(null);
@@ -434,6 +436,7 @@ public class SqliteJdbcSinkTest {
 
         GenericRecord valueGenericRecord = valueSchema.newRecordBuilder()
                 .set("string", "thestring")
+                .set("stringutf8", schemaType == SchemaType.AVRO ? new Utf8("thestringutf8"): "thestringutf8")
                 .set("int", Integer.MAX_VALUE)
                 .set("bool", true)
                 .set("double", Double.MAX_VALUE)
@@ -475,6 +478,8 @@ public class SqliteJdbcSinkTest {
                 "CREATE TABLE kvtable (" +
                         "    key  TEXT," +
                         "    int  INTEGER," +
+                        "    string TEXT," +
+                        "    stringutf8 TEXT," +
                         "    nulltext  TEXT," +
                         "    bool  NUMERIC," +
                         "    double NUMERIC," +
@@ -488,7 +493,7 @@ public class SqliteJdbcSinkTest {
         conf.put("jdbcUrl", jdbcUrl);
         conf.put("tableName", "kvtable");
         conf.put("key", "key");
-        conf.put("nonKey", "long,int,double,float,bool,nulltext");
+        conf.put("nonKey", "long,int,double,float,bool,nulltext,string,stringutf8");
         // change batchSize to 1, to flush on each write.
         conf.put("batchSize", 1);
         try (SqliteJdbcAutoSchemaSink kvSchemaJdbcSink = new SqliteJdbcAutoSchemaSink();) {
@@ -496,9 +501,12 @@ public class SqliteJdbcSinkTest {
             kvSchemaJdbcSink.write(genericObjectRecord);
 
             Awaitility.await().untilAsserted(() -> {
-                final int count = sqliteUtils.select("select int,bool,double,float,long,nulltext from kvtable where key='mykey'", (resultSet) -> {
+                final int count = sqliteUtils.select("select int,string,stringutf8,bool,double,float," +
+                        "long,nulltext from kvtable where key='mykey'", (resultSet) -> {
                     int index = 1;
                     Assert.assertEquals(resultSet.getInt(index++), Integer.MAX_VALUE);
+                    Assert.assertEquals(resultSet.getString(index++), "thestring");
+                    Assert.assertEquals(resultSet.getString(index++), "thestringutf8");
                     Assert.assertEquals(resultSet.getBoolean(index++), true);
                     Assert.assertEquals(resultSet.getDouble(index++), Double.MAX_VALUE);
                     Assert.assertEquals(resultSet.getFloat(index++), Float.MAX_VALUE);