You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/08 12:34:15 UTC
[pulsar] 29/38: Fixing JDBC sink to handle null fields. Also added
new unit tests (#6848)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 53f74c7fd9eb0bf4a81f09d2dc1ab0a7895e1b97
Author: Chris Bartholomew <c_...@yahoo.com>
AuthorDate: Fri May 1 13:53:23 2020 -0400
Fixing JDBC sink to handle null fields. Also added new unit tests (#6848)
### Motivation
JDBC sink does not handle `null` fields. For example, the field `example` can potentially be null. The schema registered in Pulsar allows for it, and the table schema in MySQL has a column of the same name, is configured as double and also allows nulls. When messages are sent to the JDBC sink without that field, an exception like this is seen:
```
21:08:38.472 [pool-5-thread-1] ERROR org.apache.pulsar.io.jdbc.JdbcAbstractSink - Got exception
java.sql.SQLException: Data truncated for column 'example' at row 1
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:127) ~[mysql-connector-java-8.0.11.jar:8.0.11]
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:95) ~[mysql-connector-java-8.0.11.jar:8.0.11]
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122) ~[mysql-connector-java-8.0.11.jar:8.0.11]
at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:960) ~[mysql-connector-java-8.0.11.jar:8.0.11]
at com.mysql.cj.jdbc.ClientPreparedStatement.execute(ClientPreparedStatement.java:388) ~[mysql-connector-java-8.0.11.jar:8.0.11]
at org.apache.pulsar.io.jdbc.JdbcAbstractSink.flush(JdbcAbstractSink.java:202) ~[pulsar-io-jdbc-2.5.0.nar-unpacked/:?]
at org.apache.pulsar.io.jdbc.JdbcAbstractSink.lambda$open$0(JdbcAbstractSink.java:108) ~[pulsar-io-jdbc-2.5.0.nar-unpacked/:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_232]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_232]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_232]
```
Looking at the code for the JDBC sink, there was no handling of the case where the field was `null`. The PR adds code to handle that case. It also adds unit tests to cover this for both binary and JSON encoding of the schema.
### Modifications
When the sink encounters a `null` field value it uses the `setColumnNull` method to properly reflect this in the database row.
(cherry picked from commit c622de5116f41cdb7174647b09ad70d9f2462bbc)
---
.../apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java | 37 ++++-
.../org/apache/pulsar/io/jdbc/JdbcSinkTest.java | 150 ++++++++++++++++++++-
2 files changed, 184 insertions(+), 3 deletions(-)
diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java
index de146c4..a916ca3 100644
--- a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java
@@ -20,6 +20,7 @@
package org.apache.pulsar.io.jdbc;
import java.sql.PreparedStatement;
+import java.sql.Types;
import java.util.List;
import com.google.common.collect.Lists;
@@ -60,12 +61,44 @@ public class JdbcAutoSchemaSink extends JdbcAbstractSink<GenericRecord> {
int index = 1;
for (ColumnId columnId : columns) {
String colName = columnId.getName();
- Object obj = record.getField(colName);
- setColumnValue(statement, index++, obj);
+ int colType = columnId.getType();
+ if (log.isDebugEnabled()) {
+ log.debug("colName: {} colType: {}", colName, colType);
+ }
+ try {
+ Object obj = record.getField(colName);
+ if (obj != null) {
+ setColumnValue(statement, index++, obj);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Column {} is null", colName);
+ }
+ setColumnNull(statement, index++, colType);
+ }
+ } catch (NullPointerException e) {
+ // With JSON schema field is omitted, so get NPE
+ // In this case we want to set column to Null
+ if (log.isDebugEnabled()) {
+ log.debug("Column {} is null", colName);
+ }
+ setColumnNull(statement, index++, colType);
+ }
+
}
}
+ private static void setColumnNull(PreparedStatement statement, int index, int type) throws Exception {
+ if (log.isDebugEnabled()) {
+ log.debug("Setting column value to null, statement: {}, index: {}, value: {}", statement.toString(), index);
+ }
+ statement.setNull(index, type);
+
+ }
+
private static void setColumnValue(PreparedStatement statement, int index, Object value) throws Exception {
+
+ log.debug("Setting column value, statement: {}, index: {}, value: {}", statement.toString(), index, value.toString());
+
if (value instanceof Integer) {
statement.setInt(index, (Integer) value);
} else if (value instanceof Long) {
diff --git a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
index aa2a76a..3c33a16 100644
--- a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
+++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
@@ -34,7 +34,9 @@ import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.testng.Assert;
@@ -109,13 +111,144 @@ public class JdbcSinkTest {
jdbcSink.close();
}
+ private void testOpenAndWriteSinkNullValue(Map<String, String> actionProperties) throws Exception {
+ Message<GenericRecord> insertMessage = mock(MessageImpl.class);
+ GenericSchema<GenericRecord> genericAvroSchema;
+ // prepare a foo Record
+ Foo insertObj = new Foo();
+ insertObj.setField1("ValueOfField1");
+ // Not setting field2
+ // Field1 is the key and field3 is used for selecting records
+ insertObj.setField3(3);
+ AvroSchema<Foo> schema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(true).build());
+
+ byte[] insertBytes = schema.encode(insertObj);
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ Record<GenericRecord> insertRecord = PulsarRecord.<GenericRecord>builder()
+ .message(insertMessage)
+ .topicName("fake_topic_name")
+ .ackFunction(() -> future.complete(null))
+ .build();
+
+ genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
+ when(insertMessage.getValue()).thenReturn(genericAvroSchema.decode(insertBytes));
+ when(insertMessage.getProperties()).thenReturn(actionProperties);
+ log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
+ insertObj.toString(),
+ insertMessage.getValue().toString(),
+ insertRecord.getValue().toString());
+
+ // write should success.
+ jdbcSink.write(insertRecord);
+ log.info("executed write");
+ // sleep to wait backend flush complete
+ future.get(1, TimeUnit.SECONDS);
+
+ // value has been written to db, read it out and verify.
+ String querySql = "SELECT * FROM " + tableName + " WHERE field3=3";
+ int count = sqliteUtils.select(querySql, (resultSet) -> {
+ Assert.assertEquals(insertObj.getField1(), resultSet.getString(1));
+ Assert.assertNull(insertObj.getField2());
+ Assert.assertEquals(insertObj.getField3(), resultSet.getInt(3));
+ });
+ Assert.assertEquals(count, 1);
+
+ }
+
+ private void testOpenAndWriteSinkJson(Map<String, String> actionProperties) throws Exception {
+ Message<GenericRecord> insertMessage = mock(MessageImpl.class);
+ GenericSchema<GenericRecord> genericAvroSchema;
+ // prepare a foo Record
+ Foo insertObj = new Foo();
+ insertObj.setField1("ValueOfField1");
+ insertObj.setField2("ValueOfField2");
+ insertObj.setField3(3);
+ JSONSchema<Foo> schema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(true).build());
+
+ byte[] insertBytes = schema.encode(insertObj);
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ Record<GenericRecord> insertRecord = PulsarRecord.<GenericRecord>builder()
+ .message(insertMessage)
+ .topicName("fake_topic_name")
+ .ackFunction(() -> future.complete(null))
+ .build();
+
+ GenericSchema<GenericRecord> decodeSchema = GenericSchemaImpl.of(schema.getSchemaInfo());
+ when(insertMessage.getValue()).thenReturn(decodeSchema.decode(insertBytes));
+ when(insertMessage.getProperties()).thenReturn(actionProperties);
+ log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
+ insertObj.toString(),
+ insertMessage.getValue().toString(),
+ insertRecord.getValue().toString());
+
+ // write should success.
+ jdbcSink.write(insertRecord);
+ log.info("executed write");
+ // sleep to wait backend flush complete
+ future.get(1, TimeUnit.SECONDS);
+
+ // value has been written to db, read it out and verify.
+ String querySql = "SELECT * FROM " + tableName + " WHERE field3=3";
+ int count = sqliteUtils.select(querySql, (resultSet) -> {
+ Assert.assertEquals(insertObj.getField1(), resultSet.getString(1));
+ Assert.assertEquals(insertObj.getField2(), resultSet.getString(2));
+ Assert.assertEquals(insertObj.getField3(), resultSet.getInt(3));
+ });
+ Assert.assertEquals(count, 1);
+
+ }
+
+ private void testOpenAndWriteSinkNullValueJson(Map<String, String> actionProperties) throws Exception {
+ Message<GenericRecord> insertMessage = mock(MessageImpl.class);
+ GenericSchema<GenericRecord> genericAvroSchema;
+ // prepare a foo Record
+ Foo insertObj = new Foo();
+ insertObj.setField1("ValueOfField1");
+ // Not setting field2
+ // Field1 is the key and field3 is used for selecting records
+ insertObj.setField3(3);
+ JSONSchema<Foo> schema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(true).build());
+
+ byte[] insertBytes = schema.encode(insertObj);
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ Record<GenericRecord> insertRecord = PulsarRecord.<GenericRecord>builder()
+ .message(insertMessage)
+ .topicName("fake_topic_name")
+ .ackFunction(() -> future.complete(null))
+ .build();
+
+ GenericSchema<GenericRecord> decodeSchema = GenericSchemaImpl.of(schema.getSchemaInfo());
+ when(insertMessage.getValue()).thenReturn(decodeSchema.decode(insertBytes));
+ when(insertMessage.getProperties()).thenReturn(actionProperties);
+ log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
+ insertObj.toString(),
+ insertMessage.getValue().toString(),
+ insertRecord.getValue().toString());
+
+ // write should success.
+ jdbcSink.write(insertRecord);
+ log.info("executed write");
+ // sleep to wait backend flush complete
+ future.get(1, TimeUnit.SECONDS);
+
+ // value has been written to db, read it out and verify.
+ String querySql = "SELECT * FROM " + tableName + " WHERE field3=3";
+ int count = sqliteUtils.select(querySql, (resultSet) -> {
+ Assert.assertEquals(insertObj.getField1(), resultSet.getString(1));
+ Assert.assertNull(insertObj.getField2());
+ Assert.assertEquals(insertObj.getField3(), resultSet.getInt(3));
+ });
+ Assert.assertEquals(count, 1);
+
+ }
+
private void testOpenAndWriteSink(Map<String, String> actionProperties) throws Exception {
Message<GenericRecord> insertMessage = mock(MessageImpl.class);
GenericSchema<GenericRecord> genericAvroSchema;
// prepare a foo Record
Foo insertObj = new Foo();
insertObj.setField1("ValueOfField1");
- insertObj.setField2("ValueOfField1");
+ insertObj.setField2("ValueOfField2");
insertObj.setField3(3);
AvroSchema<Foo> schema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
@@ -163,6 +296,21 @@ public class JdbcSinkTest {
}
@Test
+ public void TestNoActionNullValue() throws Exception {
+ testOpenAndWriteSinkNullValue(ImmutableMap.of("ACTION", "INSERT"));
+ }
+
+ @Test
+ public void TestNoActionNullValueJson() throws Exception {
+ testOpenAndWriteSinkNullValueJson(ImmutableMap.of("ACTION", "INSERT"));
+ }
+
+ @Test
+ public void TestNoActionJson() throws Exception {
+ testOpenAndWriteSinkJson(ImmutableMap.of("ACTION", "INSERT"));
+ }
+
+ @Test
public void TestUnknownAction() throws Exception {
Record<GenericRecord> recordRecord = mock(Record.class);
when(recordRecord.getProperties()).thenReturn(ImmutableMap.of("ACTION", "UNKNOWN"));