You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/05/30 18:17:58 UTC

[GitHub] [pulsar] nicoloboschi opened a new pull request, #15845: [improve][connector] JDBC sinks: support Avro specific datatypes

nicoloboschi opened a new pull request, #15845:
URL: https://github.com/apache/pulsar/pull/15845

   ### Motivation
   JDBC sinks doesn't support Avro datatypes like Utf8.
   
   ```
   2022-05-30T16:16:35,850+0000 [pool-5-thread-1] ERROR org.apache.pulsar.io.jdbc.JdbcAbstractSink - Got exception 
   java.lang.Exception: Not support value type, need to add it. class org.apache.avro.util.Utf8
   	at org.apache.pulsar.io.jdbc.BaseJdbcAutoSchemaSink.setColumnValue(BaseJdbcAutoSchemaSink.java:134) ~[pulsar-io-jdbc-core-2.10.0.5-SNAPSHOT.jar:?]
   	at org.apache.pulsar.io.jdbc.BaseJdbcAutoSchemaSink.bindValue(BaseJdbcAutoSchemaSink.java:89) ~[pulsar-io-jdbc-core-2.10.0.5-SNAPSHOT.jar:?]
   	at org.apache.pulsar.io.jdbc.JdbcAbstractSink.flush(JdbcAbstractSink.java:206) ~[pulsar-io-jdbc-core-2.10.0.5-SNAPSHOT.jar:?]
   	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
   	at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
   	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
   	at java.lang.Thread.run(Thread.java:829) [?:?]
   ```
   ### Modifications
   
   Added converted (inspired by the ElasticSearch one) to do proper conversion of data, from Avro generic record to JDBC compatible type.
   
   - [x] `doc-not-needed` 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nicoloboschi commented on a diff in pull request #15845: [improve][connector] JDBC sinks: support Avro specific datatypes

Posted by GitBox <gi...@apache.org>.
nicoloboschi commented on code in PR #15845:
URL: https://github.com/apache/pulsar/pull/15845#discussion_r885083901


##########
pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:
##########
@@ -185,5 +187,35 @@ private void fillKeyValueSchemaData(org.apache.pulsar.client.api.Schema<GenericO
                         + " with KeyValueSchema");
         }
     }
+
+    private static Object convertAvroField(Object avroValue, Schema schema) {
+        switch (schema.getType()) {
+            case NULL:
+            case INT:
+            case LONG:
+            case DOUBLE:
+            case FLOAT:
+            case BOOLEAN:
+                return avroValue;
+            case BYTES:
+                // system default charset
+                return new String((byte[]) avroValue);

Review Comment:
   just realized that you meant bytes array type, like bytea in Postgre, will fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #15845: [improve][connector] JDBC sinks: support Avro specific datatypes

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #15845:
URL: https://github.com/apache/pulsar/pull/15845#discussion_r885055797


##########
pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:
##########
@@ -185,5 +187,35 @@ private void fillKeyValueSchemaData(org.apache.pulsar.client.api.Schema<GenericO
                         + " with KeyValueSchema");
         }
     }
+
+    private static Object convertAvroField(Object avroValue, Schema schema) {
+        switch (schema.getType()) {
+            case NULL:
+            case INT:
+            case LONG:
+            case DOUBLE:
+            case FLOAT:
+            case BOOLEAN:
+                return avroValue;
+            case BYTES:
+                // system default charset
+                return new String((byte[]) avroValue);

Review Comment:
   This looks wrong. In jdbc we have the array type. We should use that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nicoloboschi commented on a diff in pull request #15845: [improve][connector] JDBC sinks: support Avro specific datatypes

Posted by GitBox <gi...@apache.org>.
nicoloboschi commented on code in PR #15845:
URL: https://github.com/apache/pulsar/pull/15845#discussion_r885399546


##########
pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:
##########
@@ -185,5 +187,36 @@ private void fillKeyValueSchemaData(org.apache.pulsar.client.api.Schema<GenericO
                         + " with KeyValueSchema");
         }
     }
+
+    private static Object convertAvroField(Object avroValue, Schema schema) {
+        switch (schema.getType()) {
+            case NULL:
+            case INT:
+            case LONG:
+            case DOUBLE:
+            case FLOAT:
+            case BOOLEAN:
+                return avroValue;
+            case ENUM:
+            case STRING:
+                return avroValue.toString(); // can be a String or org.apache.avro.util.Utf8
+            case UNION:
+                for (Schema s : schema.getTypes()) {
+                    if (s.getType() == Schema.Type.NULL) {
+                        continue;
+                    }
+                    return convertAvroField(avroValue, s);
+                }
+                throw new IllegalArgumentException("Found UNION schema but it doesn't contain any type");
+            case ARRAY:
+            case BYTES:
+            case FIXED:
+            case RECORD:
+            case MAP:

Review Comment:
   I added test coverage for the avro conversion, PTAL again @shibd 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nicoloboschi commented on pull request #15845: [improve][connector] JDBC sinks: support Avro specific datatypes

Posted by GitBox <gi...@apache.org>.
nicoloboschi commented on PR #15845:
URL: https://github.com/apache/pulsar/pull/15845#issuecomment-1143435056

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nicoloboschi commented on pull request #15845: [improve][connector] JDBC sinks: support Avro specific datatypes

Posted by GitBox <gi...@apache.org>.
nicoloboschi commented on PR #15845:
URL: https://github.com/apache/pulsar/pull/15845#issuecomment-1142344110

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on pull request #15845: [improve][connector] JDBC sinks: support Avro specific datatypes

Posted by GitBox <gi...@apache.org>.
eolivelli commented on PR #15845:
URL: https://github.com/apache/pulsar/pull/15845#issuecomment-1142051204

   @nicoloboschi checkstyle failed PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on a diff in pull request #15845: [improve][connector] JDBC sinks: support Avro specific datatypes

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #15845:
URL: https://github.com/apache/pulsar/pull/15845#discussion_r885363123


##########
pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:
##########
@@ -185,5 +187,36 @@ private void fillKeyValueSchemaData(org.apache.pulsar.client.api.Schema<GenericO
                         + " with KeyValueSchema");
         }
     }
+
+    private static Object convertAvroField(Object avroValue, Schema schema) {
+        switch (schema.getType()) {
+            case NULL:
+            case INT:
+            case LONG:
+            case DOUBLE:
+            case FLOAT:
+            case BOOLEAN:
+                return avroValue;
+            case ENUM:
+            case STRING:
+                return avroValue.toString(); // can be a String or org.apache.avro.util.Utf8
+            case UNION:
+                for (Schema s : schema.getTypes()) {
+                    if (s.getType() == Schema.Type.NULL) {
+                        continue;
+                    }
+                    return convertAvroField(avroValue, s);
+                }
+                throw new IllegalArgumentException("Found UNION schema but it doesn't contain any type");
+            case ARRAY:
+            case BYTES:
+            case FIXED:
+            case RECORD:
+            case MAP:

Review Comment:
   It is recommended to add test coverage that does not support these types.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nicoloboschi commented on a diff in pull request #15845: [improve][connector] JDBC sinks: support Avro specific datatypes

Posted by GitBox <gi...@apache.org>.
nicoloboschi commented on code in PR #15845:
URL: https://github.com/apache/pulsar/pull/15845#discussion_r885061090


##########
pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java:
##########
@@ -185,5 +187,35 @@ private void fillKeyValueSchemaData(org.apache.pulsar.client.api.Schema<GenericO
                         + " with KeyValueSchema");
         }
     }
+
+    private static Object convertAvroField(Object avroValue, Schema schema) {
+        switch (schema.getType()) {
+            case NULL:
+            case INT:
+            case LONG:
+            case DOUBLE:
+            case FLOAT:
+            case BOOLEAN:
+                return avroValue;
+            case BYTES:
+                // system default charset
+                return new String((byte[]) avroValue);

Review Comment:
   From [Avro doc](https://dmg.org/pfa/docs/avro_types/index.html)
   
   >The bytes type is referred to as "bytes" and it can accept any byte sequence. It is therefore a generalization of a string (though neither is a subtype of the other).
   Values of type "bytes" are not arrays and cannot be changed in-place (all PFA values are immutable).
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nicoloboschi merged pull request #15845: [improve][connector] JDBC sinks: support Avro specific datatypes

Posted by GitBox <gi...@apache.org>.
nicoloboschi merged PR #15845:
URL: https://github.com/apache/pulsar/pull/15845


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org