You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2022/06/01 15:59:37 UTC
[pulsar] branch master updated: [improve][connector] JDBC sinks: support Avro specific datatypes (#15845)
This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3530349a47b [improve][connector] JDBC sinks: support Avro specific datatypes (#15845)
3530349a47b is described below
commit 3530349a47b2ce7255778503a3db01391ef6387e
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Wed Jun 1 17:59:29 2022 +0200
[improve][connector] JDBC sinks: support Avro specific datatypes (#15845)
---
.../pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java | 40 +++++-
.../pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java | 140 +++++++++++++++++++++
.../apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java | 12 +-
3 files changed, 187 insertions(+), 5 deletions(-)
diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
index a4a746ebd89..e5efb15362a 100644
--- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
@@ -20,6 +20,7 @@
package org.apache.pulsar.io.jdbc;
import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import java.sql.PreparedStatement;
import java.util.HashMap;
@@ -135,7 +136,7 @@ public abstract class BaseJdbcAutoSchemaSink extends JdbcAbstractSink<GenericObj
}
}
- private Object getValueFromJsonNode(final JsonNode fn) {
+ private static Object getValueFromJsonNode(final JsonNode fn) {
if (fn == null || fn.isNull()) {
return null;
}
@@ -159,7 +160,7 @@ public abstract class BaseJdbcAutoSchemaSink extends JdbcAbstractSink<GenericObj
}
}
- private void fillKeyValueSchemaData(org.apache.pulsar.client.api.Schema<GenericObject> schema,
+ private static void fillKeyValueSchemaData(org.apache.pulsar.client.api.Schema<GenericObject> schema,
GenericObject record,
Map<String, Object> data) {
switch (schema.getSchemaInfo().getType()) {
@@ -176,7 +177,8 @@ public abstract class BaseJdbcAutoSchemaSink extends JdbcAbstractSink<GenericObj
org.apache.avro.generic.GenericRecord avroNode =
(org.apache.avro.generic.GenericRecord) record.getNativeObject();
for (Schema.Field field : avroNode.getSchema().getFields()) {
- data.put(field.name(), avroNode.get(field.name()));
+ final String fieldName = field.name();
+ data.put(fieldName, convertAvroField(avroNode.get(fieldName), field.schema()));
}
break;
default:
@@ -185,5 +187,37 @@ public abstract class BaseJdbcAutoSchemaSink extends JdbcAbstractSink<GenericObj
+ " with KeyValueSchema");
}
}
+
+ @VisibleForTesting
+ static Object convertAvroField(Object avroValue, Schema schema) {
+ switch (schema.getType()) {
+ case NULL:
+ case INT:
+ case LONG:
+ case DOUBLE:
+ case FLOAT:
+ case BOOLEAN:
+ return avroValue;
+ case ENUM:
+ case STRING:
+ return avroValue.toString(); // can be a String or org.apache.avro.util.Utf8
+ case UNION:
+ for (Schema s : schema.getTypes()) {
+ if (s.getType() == Schema.Type.NULL) {
+ continue;
+ }
+ return convertAvroField(avroValue, s);
+ }
+ throw new IllegalArgumentException("Found UNION schema but it doesn't contain any type");
+ case ARRAY:
+ case BYTES:
+ case FIXED:
+ case RECORD:
+ case MAP:
+ default:
+ throw new UnsupportedOperationException("Unsupported avro schema type=" + schema.getType()
+ + " for value field schema " + schema.getName());
+ }
+ }
}
diff --git a/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java b/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
new file mode 100644
index 00000000000..de31466466d
--- /dev/null
+++ b/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.jdbc;
+
+import java.util.function.Function;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.util.Utf8;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class BaseJdbcAutoSchemaSinkTest {
+
+ @Test
+ public void testConvertAvroString() {
+ Object converted = BaseJdbcAutoSchemaSink.convertAvroField("mystring", createFieldAndGetSchema((builder) ->
+ builder.name("field").type().stringType().noDefault()));
+ Assert.assertEquals(converted, "mystring");
+
+ converted = BaseJdbcAutoSchemaSink.convertAvroField(new Utf8("mystring"), createFieldAndGetSchema((builder) ->
+ builder.name("field").type().stringType().noDefault()));
+ Assert.assertEquals(converted, "mystring");
+
+ }
+
+ @Test
+ public void testConvertAvroInt() {
+ Object converted = BaseJdbcAutoSchemaSink.convertAvroField(Integer.MIN_VALUE, createFieldAndGetSchema((builder) ->
+ builder.name("field").type().intType().noDefault()));
+ Assert.assertEquals(converted, Integer.MIN_VALUE);
+ }
+
+ @Test
+ public void testConvertAvroLong() {
+ Object converted = BaseJdbcAutoSchemaSink.convertAvroField(Long.MIN_VALUE, createFieldAndGetSchema((builder) ->
+ builder.name("field").type().longType().noDefault()));
+ Assert.assertEquals(converted, Long.MIN_VALUE);
+ }
+
+ @Test
+ public void testConvertAvroBoolean() {
+ Object converted = BaseJdbcAutoSchemaSink.convertAvroField(true, createFieldAndGetSchema((builder) ->
+ builder.name("field").type().booleanType().noDefault()));
+ Assert.assertEquals(converted, true);
+ }
+
+ @Test
+ public void testConvertAvroEnum() {
+ Object converted = BaseJdbcAutoSchemaSink.convertAvroField("e1", createFieldAndGetSchema((builder) ->
+ builder.name("field").type().enumeration("myenum").symbols("e1", "e2").noDefault()));
+ Assert.assertEquals(converted, "e1");
+ }
+
+ @Test
+ public void testConvertAvroFloat() {
+ Object converted = BaseJdbcAutoSchemaSink.convertAvroField(Float.MIN_VALUE, createFieldAndGetSchema((builder) ->
+ builder.name("field").type().floatType().noDefault()));
+ Assert.assertEquals(converted, Float.MIN_VALUE);
+ }
+
+ @Test
+ public void testConvertAvroDouble() {
+ Object converted = BaseJdbcAutoSchemaSink.convertAvroField(Double.MIN_VALUE, createFieldAndGetSchema((builder) ->
+ builder.name("field").type().doubleType().noDefault()));
+ Assert.assertEquals(converted, Double.MIN_VALUE);
+ }
+
+
+ @Test
+ public void testConvertAvroUnion() {
+ Object converted = BaseJdbcAutoSchemaSink.convertAvroField(Integer.MAX_VALUE, createFieldAndGetSchema((builder) ->
+ builder.name("field").type().unionOf().intType().endUnion().noDefault()));
+ Assert.assertEquals(converted, Integer.MAX_VALUE);
+ }
+
+ @Test(expectedExceptions = UnsupportedOperationException.class,
+ expectedExceptionsMessageRegExp = "Unsupported avro schema type.*")
+ public void testNotSupportedAvroTypesBytes() {
+ BaseJdbcAutoSchemaSink.convertAvroField(null, createFieldAndGetSchema((builder) ->
+ builder.name("field").type().bytesType().noDefault()));
+ }
+
+ @Test(expectedExceptions = UnsupportedOperationException.class,
+ expectedExceptionsMessageRegExp = "Unsupported avro schema type.*")
+ public void testNotSupportedAvroTypesFixed() {
+ BaseJdbcAutoSchemaSink.convertAvroField(null, createFieldAndGetSchema((builder) ->
+ builder.name("field").type().fixed("fix").size(16).noDefault()));
+ }
+ @Test(expectedExceptions = UnsupportedOperationException.class,
+ expectedExceptionsMessageRegExp = "Unsupported avro schema type.*")
+ public void testNotSupportedAvroTypesRecord() {
+ BaseJdbcAutoSchemaSink.convertAvroField(null, createFieldAndGetSchema((builder) ->
+ builder.name("field").type()
+ .record("myrecord").fields()
+ .name("f1").type().intType().noDefault()
+ .endRecord().noDefault()));
+ }
+
+ @Test(expectedExceptions = UnsupportedOperationException.class,
+ expectedExceptionsMessageRegExp = "Unsupported avro schema type.*")
+ public void testNotSupportedAvroTypesMap() {
+ BaseJdbcAutoSchemaSink.convertAvroField(null, createFieldAndGetSchema((builder) ->
+ builder.name("field").type().map().values().stringType().noDefault()));
+ }
+
+
+ @Test(expectedExceptions = UnsupportedOperationException.class,
+ expectedExceptionsMessageRegExp = "Unsupported avro schema type.*")
+ public void testNotSupportedAvroTypesArray() {
+ BaseJdbcAutoSchemaSink.convertAvroField(null, createFieldAndGetSchema((builder) ->
+ builder.name("field").type().array().items().stringType().noDefault()));
+ }
+
+
+ private Schema createFieldAndGetSchema(Function<SchemaBuilder.FieldAssembler<Schema>,
+ SchemaBuilder.FieldAssembler<Schema>> consumer) {
+ final SchemaBuilder.FieldAssembler<Schema> record = SchemaBuilder.record("record")
+ .fields();
+ return consumer.apply(record).endRecord().getFields().get(0).schema();
+ }
+
+
+}
\ No newline at end of file
diff --git a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
index d92d437f664..1ce3bc5b353 100644
--- a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
+++ b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.util.Utf8;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericObject;
@@ -425,6 +426,7 @@ public class SqliteJdbcSinkTest {
RecordSchemaBuilder valueSchemaBuilder = org.apache.pulsar.client.api.schema.SchemaBuilder.record("value");
valueSchemaBuilder.field("string").type(SchemaType.STRING).optional().defaultValue(null);
+ valueSchemaBuilder.field("stringutf8").type(SchemaType.STRING).optional().defaultValue(null);
valueSchemaBuilder.field("int").type(SchemaType.INT32).optional().defaultValue(null);
valueSchemaBuilder.field("bool").type(SchemaType.BOOLEAN).optional().defaultValue(null);
valueSchemaBuilder.field("double").type(SchemaType.DOUBLE).optional().defaultValue(null);
@@ -434,6 +436,7 @@ public class SqliteJdbcSinkTest {
GenericRecord valueGenericRecord = valueSchema.newRecordBuilder()
.set("string", "thestring")
+ .set("stringutf8", schemaType == SchemaType.AVRO ? new Utf8("thestringutf8"): "thestringutf8")
.set("int", Integer.MAX_VALUE)
.set("bool", true)
.set("double", Double.MAX_VALUE)
@@ -475,6 +478,8 @@ public class SqliteJdbcSinkTest {
"CREATE TABLE kvtable (" +
" key TEXT," +
" int INTEGER," +
+ " string TEXT," +
+ " stringutf8 TEXT," +
" nulltext TEXT," +
" bool NUMERIC," +
" double NUMERIC," +
@@ -488,7 +493,7 @@ public class SqliteJdbcSinkTest {
conf.put("jdbcUrl", jdbcUrl);
conf.put("tableName", "kvtable");
conf.put("key", "key");
- conf.put("nonKey", "long,int,double,float,bool,nulltext");
+ conf.put("nonKey", "long,int,double,float,bool,nulltext,string,stringutf8");
// change batchSize to 1, to flush on each write.
conf.put("batchSize", 1);
try (SqliteJdbcAutoSchemaSink kvSchemaJdbcSink = new SqliteJdbcAutoSchemaSink();) {
@@ -496,9 +501,12 @@ public class SqliteJdbcSinkTest {
kvSchemaJdbcSink.write(genericObjectRecord);
Awaitility.await().untilAsserted(() -> {
- final int count = sqliteUtils.select("select int,bool,double,float,long,nulltext from kvtable where key='mykey'", (resultSet) -> {
+ final int count = sqliteUtils.select("select int,string,stringutf8,bool,double,float," +
+ "long,nulltext from kvtable where key='mykey'", (resultSet) -> {
int index = 1;
Assert.assertEquals(resultSet.getInt(index++), Integer.MAX_VALUE);
+ Assert.assertEquals(resultSet.getString(index++), "thestring");
+ Assert.assertEquals(resultSet.getString(index++), "thestringutf8");
Assert.assertEquals(resultSet.getBoolean(index++), true);
Assert.assertEquals(resultSet.getDouble(index++), Double.MAX_VALUE);
Assert.assertEquals(resultSet.getFloat(index++), Float.MAX_VALUE);