You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/05/01 17:53:37 UTC

[pulsar] branch master updated: Fixing JDBC sink to handle null fields. Also added new unit tests (#6848)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c622de5  Fixing JDBC sink to handle null fields. Also added new unit tests (#6848)
c622de5 is described below

commit c622de5116f41cdb7174647b09ad70d9f2462bbc
Author: Chris Bartholomew <c_...@yahoo.com>
AuthorDate: Fri May 1 13:53:23 2020 -0400

    Fixing JDBC sink to handle null fields. Also added new unit tests (#6848)
    
    ### Motivation
    
    JDBC sink does not handle `null` fields. For example, the field `example` can potentially be null. The schema registered in Pulsar allows for it, and the table schema in MySQL has a column of the same name, is configured as double and also allows nulls. When messages are sent to the JDBC sink without that field, an exception like this is seen:
    
    ```
    21:08:38.472 [pool-5-thread-1] ERROR org.apache.pulsar.io.jdbc.JdbcAbstractSink - Got exception
    java.sql.SQLException: Data truncated for column 'example' at row 1
    	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:127) ~[mysql-connector-java-8.0.11.jar:8.0.11]
    	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:95) ~[mysql-connector-java-8.0.11.jar:8.0.11]
    	at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122) ~[mysql-connector-java-8.0.11.jar:8.0.11]
    	at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:960) ~[mysql-connector-java-8.0.11.jar:8.0.11]
    	at com.mysql.cj.jdbc.ClientPreparedStatement.execute(ClientPreparedStatement.java:388) ~[mysql-connector-java-8.0.11.jar:8.0.11]
    	at org.apache.pulsar.io.jdbc.JdbcAbstractSink.flush(JdbcAbstractSink.java:202) ~[pulsar-io-jdbc-2.5.0.nar-unpacked/:?]
    	at org.apache.pulsar.io.jdbc.JdbcAbstractSink.lambda$open$0(JdbcAbstractSink.java:108) ~[pulsar-io-jdbc-2.5.0.nar-unpacked/:?]
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_232]
    	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_232]
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_232]
    ```
    Looking at the code for the JDBC sink, there was no handling of the case where the field was `null`. The PR adds code to handle that case. It also adds unit tests to cover this for both binary and JSON encoding of the schema.
    
    ### Modifications
    
    When the sink encounters a `null` field value it uses the `setColumnNull` method to properly reflect this in the database row.
---
 .../apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java  |  37 ++++-
 .../org/apache/pulsar/io/jdbc/JdbcSinkTest.java    | 150 ++++++++++++++++++++-
 2 files changed, 184 insertions(+), 3 deletions(-)

diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java
index de146c4..a916ca3 100644
--- a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java
@@ -20,6 +20,7 @@
 package org.apache.pulsar.io.jdbc;
 
 import java.sql.PreparedStatement;
+import java.sql.Types;
 import java.util.List;
 
 import com.google.common.collect.Lists;
@@ -60,12 +61,44 @@ public class JdbcAutoSchemaSink extends JdbcAbstractSink<GenericRecord> {
         int index = 1;
         for (ColumnId columnId : columns) {
             String colName = columnId.getName();
-            Object obj = record.getField(colName);
-            setColumnValue(statement, index++, obj);
+            int colType = columnId.getType();
+            if (log.isDebugEnabled()) {
+                log.debug("colName: {} colType: {}", colName, colType);
+            }
+            try {
+                Object obj = record.getField(colName);
+                if (obj != null) {
+                    setColumnValue(statement, index++, obj);
+                } else {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Column {} is null", colName);
+                    }
+                    setColumnNull(statement, index++, colType);
+                }
+            } catch (NullPointerException e) {
+                // With JSON schema field is omitted, so get NPE
+                // In this case we want to set column to Null
+                if (log.isDebugEnabled()) {
+                    log.debug("Column {} is null", colName);
+                }
+                setColumnNull(statement, index++, colType);
+            }
+            
         }
     }
 
+    private static void setColumnNull(PreparedStatement statement, int index, int type) throws Exception {
+        if (log.isDebugEnabled()) {
+            log.debug("Setting column value to null, statement: {}, index: {}, value: {}", statement.toString(), index);
+        }
+        statement.setNull(index, type);
+
+    }
+
     private static void setColumnValue(PreparedStatement statement, int index, Object value) throws Exception {
+
+        log.debug("Setting column value, statement: {}, index: {}, value: {}", statement.toString(), index, value.toString());
+
         if (value instanceof Integer) {
             statement.setInt(index, (Integer) value);
         } else if (value instanceof Long) {
diff --git a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
index aa2a76a..3c33a16 100644
--- a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
+++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
@@ -34,7 +34,9 @@ import org.apache.pulsar.client.api.schema.GenericSchema;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.source.PulsarRecord;
 import org.testng.Assert;
@@ -109,13 +111,144 @@ public class JdbcSinkTest {
         jdbcSink.close();
     }
 
+    private void testOpenAndWriteSinkNullValue(Map<String, String> actionProperties) throws Exception {
+        Message<GenericRecord> insertMessage = mock(MessageImpl.class);
+        GenericSchema<GenericRecord> genericAvroSchema;
+        // prepare a foo Record
+        Foo insertObj = new Foo();
+        insertObj.setField1("ValueOfField1");
+        // Not setting field2
+        // Field1 is the key and field3 is used for selecting records 
+        insertObj.setField3(3);
+        AvroSchema<Foo> schema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(true).build());
+
+        byte[] insertBytes = schema.encode(insertObj);
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        Record<GenericRecord> insertRecord = PulsarRecord.<GenericRecord>builder()
+            .message(insertMessage)
+            .topicName("fake_topic_name")
+            .ackFunction(() -> future.complete(null))
+            .build();
+
+        genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
+        when(insertMessage.getValue()).thenReturn(genericAvroSchema.decode(insertBytes));
+        when(insertMessage.getProperties()).thenReturn(actionProperties);
+        log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
+                insertObj.toString(),
+                insertMessage.getValue().toString(),
+                insertRecord.getValue().toString());
+
+        // write should success.
+        jdbcSink.write(insertRecord);
+        log.info("executed write");
+        // sleep to wait backend flush complete
+        future.get(1, TimeUnit.SECONDS);
+
+        // value has been written to db, read it out and verify.
+        String querySql = "SELECT * FROM " + tableName + " WHERE field3=3";
+        int count = sqliteUtils.select(querySql, (resultSet) -> {
+            Assert.assertEquals(insertObj.getField1(), resultSet.getString(1));
+            Assert.assertNull(insertObj.getField2());
+            Assert.assertEquals(insertObj.getField3(), resultSet.getInt(3));
+        });
+        Assert.assertEquals(count, 1);
+
+    }
+
+    private void testOpenAndWriteSinkJson(Map<String, String> actionProperties) throws Exception {
+        Message<GenericRecord> insertMessage = mock(MessageImpl.class);
+        GenericSchema<GenericRecord> genericAvroSchema;
+        // prepare a foo Record
+        Foo insertObj = new Foo();
+        insertObj.setField1("ValueOfField1");
+        insertObj.setField2("ValueOfField2");
+        insertObj.setField3(3);
+        JSONSchema<Foo> schema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(true).build());
+
+        byte[] insertBytes = schema.encode(insertObj);
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        Record<GenericRecord> insertRecord = PulsarRecord.<GenericRecord>builder()
+            .message(insertMessage)
+            .topicName("fake_topic_name")
+            .ackFunction(() -> future.complete(null))
+            .build();
+
+        GenericSchema<GenericRecord> decodeSchema = GenericSchemaImpl.of(schema.getSchemaInfo());
+        when(insertMessage.getValue()).thenReturn(decodeSchema.decode(insertBytes));
+        when(insertMessage.getProperties()).thenReturn(actionProperties);
+        log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
+                insertObj.toString(),
+                insertMessage.getValue().toString(),
+                insertRecord.getValue().toString());
+
+        // write should success.
+        jdbcSink.write(insertRecord);
+        log.info("executed write");
+        // sleep to wait backend flush complete
+        future.get(1, TimeUnit.SECONDS);
+
+        // value has been written to db, read it out and verify.
+        String querySql = "SELECT * FROM " + tableName + " WHERE field3=3";
+        int count = sqliteUtils.select(querySql, (resultSet) -> {
+            Assert.assertEquals(insertObj.getField1(), resultSet.getString(1));
+            Assert.assertEquals(insertObj.getField2(), resultSet.getString(2));
+            Assert.assertEquals(insertObj.getField3(), resultSet.getInt(3));
+        });
+        Assert.assertEquals(count, 1);
+
+    }
+
+    private void testOpenAndWriteSinkNullValueJson(Map<String, String> actionProperties) throws Exception {
+        Message<GenericRecord> insertMessage = mock(MessageImpl.class);
+        GenericSchema<GenericRecord> genericAvroSchema;
+        // prepare a foo Record
+        Foo insertObj = new Foo();
+        insertObj.setField1("ValueOfField1");
+        // Not setting field2
+        // Field1 is the key and field3 is used for selecting records 
+        insertObj.setField3(3);
+        JSONSchema<Foo> schema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(true).build());
+
+        byte[] insertBytes = schema.encode(insertObj);
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        Record<GenericRecord> insertRecord = PulsarRecord.<GenericRecord>builder()
+            .message(insertMessage)
+            .topicName("fake_topic_name")
+            .ackFunction(() -> future.complete(null))
+            .build();
+
+        GenericSchema<GenericRecord> decodeSchema = GenericSchemaImpl.of(schema.getSchemaInfo());
+        when(insertMessage.getValue()).thenReturn(decodeSchema.decode(insertBytes));
+        when(insertMessage.getProperties()).thenReturn(actionProperties);
+        log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
+                insertObj.toString(),
+                insertMessage.getValue().toString(),
+                insertRecord.getValue().toString());
+
+        // write should success.
+        jdbcSink.write(insertRecord);
+        log.info("executed write");
+        // sleep to wait backend flush complete
+        future.get(1, TimeUnit.SECONDS);
+
+        // value has been written to db, read it out and verify.
+        String querySql = "SELECT * FROM " + tableName + " WHERE field3=3";
+        int count = sqliteUtils.select(querySql, (resultSet) -> {
+            Assert.assertEquals(insertObj.getField1(), resultSet.getString(1));
+            Assert.assertNull(insertObj.getField2());
+            Assert.assertEquals(insertObj.getField3(), resultSet.getInt(3));
+        });
+        Assert.assertEquals(count, 1);
+
+    }
+
     private void testOpenAndWriteSink(Map<String, String> actionProperties) throws Exception {
         Message<GenericRecord> insertMessage = mock(MessageImpl.class);
         GenericSchema<GenericRecord> genericAvroSchema;
         // prepare a foo Record
         Foo insertObj = new Foo();
         insertObj.setField1("ValueOfField1");
-        insertObj.setField2("ValueOfField1");
+        insertObj.setField2("ValueOfField2");
         insertObj.setField3(3);
         AvroSchema<Foo> schema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
 
@@ -163,6 +296,21 @@ public class JdbcSinkTest {
     }
 
     @Test
+    public void TestNoActionNullValue() throws Exception {
+        testOpenAndWriteSinkNullValue(ImmutableMap.of("ACTION", "INSERT"));
+    }
+
+    @Test
+    public void TestNoActionNullValueJson() throws Exception {
+        testOpenAndWriteSinkNullValueJson(ImmutableMap.of("ACTION", "INSERT"));
+    }
+
+    @Test
+    public void TestNoActionJson() throws Exception {
+        testOpenAndWriteSinkJson(ImmutableMap.of("ACTION", "INSERT"));
+    }
+
+    @Test
     public void TestUnknownAction() throws Exception {
         Record<GenericRecord> recordRecord = mock(Record.class);
         when(recordRecord.getProperties()).thenReturn(ImmutableMap.of("ACTION", "UNKNOWN"));