You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/09/13 04:32:42 UTC
[incubator-pulsar] branch master updated: [io] jdbc connector uses
Schema.AUTO (#2538)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7364563 [io] jdbc connector uses Schema.AUTO (#2538)
7364563 is described below
commit 73645631849ffce2f144f23d861763fe52a25af9
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Wed Sep 12 21:32:39 2018 -0700
[io] jdbc connector uses Schema.AUTO (#2538)
* [schema] enable Schema.AUTO if functions or connectors are using GenericRecord
* auto schema in jdbc sink
* [io] jdbc connector uses Schema.AUTO
---
pulsar-functions/java-examples/pom.xml | 1 -
.../apache/pulsar/io/jdbc/JdbcAbstractSink.java | 3 --
...AvroSchemaSink.java => JdbcAutoSchemaSink.java} | 39 +++++-----------------
.../org/apache/pulsar/io/jdbc/JdbcSinkConfig.java | 3 --
.../resources/META-INF/services/pulsar-io.yaml | 2 +-
.../org/apache/pulsar/io/jdbc/JdbcSinkTest.java | 17 ++++++----
.../integration/functions/PulsarFunctionsTest.java | 21 +++++++++---
.../tests/integration/io/JdbcSinkTester.java | 10 +++---
.../pulsar/tests/integration/io/SinkTester.java | 5 +++
9 files changed, 47 insertions(+), 54 deletions(-)
diff --git a/pulsar-functions/java-examples/pom.xml b/pulsar-functions/java-examples/pom.xml
index b077dea..747acff 100644
--- a/pulsar-functions/java-examples/pom.xml
+++ b/pulsar-functions/java-examples/pom.xml
@@ -41,7 +41,6 @@
<artifactId>pulsar-functions-api</artifactId>
<version>${project.version}</version>
</dependency>
-
</dependencies>
</project>
diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
index 425fb57..91d043d 100644
--- a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
+++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
@@ -54,8 +54,6 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
private JdbcUtils.TableId tableId;
private PreparedStatement insertStatement;
- // TODO: turn to getSchema from SinkContext.getTopicSchema.getSchema(inputTopic)
- protected String schema;
protected JdbcUtils.TableDefinition tableDefinition;
// for flush
@@ -89,7 +87,6 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
connection.setAutoCommit(false);
log.info("Opened jdbc connection: {}, autoCommit: {}", jdbcUrl, connection.getAutoCommit());
- schema = jdbcSinkConfig.getSchema();
tableName = jdbcSinkConfig.getTableName();
tableId = JdbcUtils.getTableId(connection, tableName);
tableDefinition = JdbcUtils.getTableDefinition(connection, tableId);
diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAvroSchemaSink.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java
similarity index 61%
rename from pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAvroSchemaSink.java
rename to pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java
index ec28220..c18dc81 100644
--- a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAvroSchemaSink.java
+++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java
@@ -20,50 +20,27 @@
package org.apache.pulsar.io.jdbc;
import java.sql.PreparedStatement;
-import java.util.Map;
import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.util.Utf8;
+import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.jdbc.JdbcUtils.ColumnId;
/**
- * A Simple Jdbc sink, which assume input Record as AvroSchema format
+ * A Simple Jdbc sink, which interprets input Record in generic record.
*/
@Slf4j
-public class JdbcAvroSchemaSink extends JdbcAbstractSink<byte[]> {
-
- private Schema avroSchema = null;
- private DatumReader<GenericRecord> reader = null;
-
+public class JdbcAutoSchemaSink extends JdbcAbstractSink<GenericRecord> {
@Override
- public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
- super.open(config, sinkContext);
- // get reader, and read value out as GenericRecord
- if (avroSchema == null || reader == null) {
- avroSchema = Schema.parse(schema);
- reader = new GenericDatumReader<>(avroSchema);
- }
- log.info("open JdbcAvroSchemaSink with schema: {}, and tableDefinition: {}", schema, tableDefinition.toString());
- }
-
-
public void bindValue(PreparedStatement statement,
- Record<byte[]> message) throws Exception {
+ Record<GenericRecord> message) throws Exception {
- byte[] value = message.getValue();
- GenericRecord record = reader.read(null, DecoderFactory.get().binaryDecoder(value, null));
+ GenericRecord record = message.getValue();
int index = 1;
for (ColumnId columnId : tableDefinition.getColumns()) {
String colName = columnId.getName();
- Object obj = record.get(colName);
+ Object obj = record.getField(colName);
setColumnValue(statement, index++, obj);
log.info("set column value: {}", obj.toString());
}
@@ -80,8 +57,8 @@ public class JdbcAvroSchemaSink extends JdbcAbstractSink<byte[]> {
statement.setFloat(index, (Float) value);
} else if (value instanceof Boolean) {
statement.setBoolean(index, (Boolean) value);
- } else if (value instanceof Utf8) {
- statement.setString(index, ((Utf8)value).toString());
+ } else if (value instanceof String) {
+ statement.setString(index, (String )value);
} else if (value instanceof Short) {
statement.setShort(index, (Short) value);
} else {
diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
index 3419811..6cc95d6 100644
--- a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
+++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
@@ -44,9 +44,6 @@ public class JdbcSinkConfig implements Serializable {
private String jdbcUrl;
private String tableName;
- // schema for input topic
- private String schema;
-
// Optional
private int timeoutMs = 500;
private int batchSize = 200;
diff --git a/pulsar-io/jdbc/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/jdbc/src/main/resources/META-INF/services/pulsar-io.yaml
index d9d06bd..38d9e3c 100644
--- a/pulsar-io/jdbc/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/jdbc/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -19,4 +19,4 @@
name: jdbc
description: Jdbc sink
-sinkClass: org.apache.pulsar.io.jdbc.JdbcAvroSchemaSink
+sinkClass: org.apache.pulsar.io.jdbc.JdbcAutoSchemaSink
diff --git a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
index 33bb859..84f6a82 100644
--- a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
+++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
@@ -28,9 +28,11 @@ import lombok.EqualsAndHashCode;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.schema.AutoSchema;
import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericSchema;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.testng.Assert;
@@ -69,7 +71,7 @@ public class JdbcSinkTest {
@Test
public void TestOpenAndWriteSink() throws Exception {
- JdbcAvroSchemaSink jdbcSink;
+ JdbcAutoSchemaSink jdbcSink;
Map<String, Object> conf;
String tableName = "TestOpenAndWriteSink";
@@ -78,7 +80,7 @@ public class JdbcSinkTest {
conf.put("jdbcUrl", jdbcUrl);
conf.put("tableName", tableName);
- jdbcSink = new JdbcAvroSchemaSink();
+ jdbcSink = new JdbcAutoSchemaSink();
sqliteUtils.createTable(
"CREATE TABLE " + tableName + "(" +
@@ -94,13 +96,14 @@ public class JdbcSinkTest {
obj.setField2("ValueOfField1");
obj.setField3(3);
AvroSchema<Foo> schema = AvroSchema.of(Foo.class);
- conf.put("schema", new String(schema.getSchemaInfo().getSchema()));
- log.info("schema: {}", new String(schema.getSchemaInfo().getSchema()));
byte[] bytes = schema.encode(obj);
ByteBuf payload = Unpooled.copiedBuffer(bytes);
- Message<byte[]> message = new MessageImpl("77:777", conf, payload, Schema.BYTES);
- Record<byte[]> record = PulsarRecord.<byte[]>builder()
+ AutoSchema autoSchema = new AutoSchema();
+ autoSchema.setSchema(GenericSchema.of(schema.getSchemaInfo()));
+
+ Message<GenericRecord> message = new MessageImpl("77:777", conf, payload, autoSchema);
+ Record<GenericRecord> record = PulsarRecord.<GenericRecord>builder()
.message(message)
.topicName("fake_topic_name")
.build();
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 1cb388e..5f11eef 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -45,6 +45,7 @@ import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
import org.apache.pulsar.tests.integration.io.*;
+import org.apache.pulsar.tests.integration.io.JdbcSinkTester.Foo;
import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.testng.annotations.Test;
@@ -103,6 +104,11 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
// prepare the testing environment for sink
prepareSink(tester);
+ ensureSubscriptionCreated(
+ inputTopicName,
+ String.format("public/default/%s", sinkName),
+ tester.getInputTopicSchema());
+
// submit the sink connector
submitSinkConnector(tester, tenant, namespace, sinkName, inputTopicName);
@@ -251,13 +257,14 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
// This for JdbcSinkTester
protected Map<String, String> produceSchemaMessagesToInputTopic(String inputTopicName,
- int numMessages, Schema schema) throws Exception {
+ int numMessages,
+ Schema<Foo> schema) throws Exception {
@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();
@Cleanup
- Producer<String> producer = client.newProducer(Schema.STRING)
+ Producer<Foo> producer = client.newProducer(schema)
.topic(inputTopicName)
.create();
LinkedHashMap<String, String> kvs = new LinkedHashMap<>();
@@ -273,7 +280,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
kvs.put(key, value);
producer.newMessage()
.key(key)
- .value(value)
+ .value(obj)
.send();
}
return kvs;
@@ -635,7 +642,13 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
commands);
assertTrue(result.getStdout().contains("\"Created successfully\""));
+ ensureSubscriptionCreated(inputTopicName, String.format("public/default/%s", functionName), inputTopicSchema);
+ }
+ private static <T> void ensureSubscriptionCreated(String inputTopicName,
+ String subscriptionName,
+ Schema<T> inputTopicSchema)
+ throws Exception {
// ensure the function subscription exists before we start producing messages
try (PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
@@ -643,7 +656,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
try (Consumer<T> ignored = client.newConsumer(inputTopicSchema)
.topic(inputTopicName)
.subscriptionType(SubscriptionType.Shared)
- .subscriptionName(String.format("public/default/%s", functionName))
+ .subscriptionName(subscriptionName)
.subscribe()) {
}
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
index e4aa401..7c14ba9 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
@@ -31,6 +31,7 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.MySQLContainer;
@@ -69,14 +70,15 @@ public class JdbcSinkTester extends SinkTester {
sinkConfig.put("userName", "test");
sinkConfig.put("password", "test");
sinkConfig.put("tableName", tableName);
-
- // prepare schema
- sinkConfig.put("schema", new String(schema.getSchemaInfo().getSchema()));
- log.info("schema: {}", new String(schema.getSchemaInfo().getSchema()));
sinkConfig.put("batchSize", 1);
}
@Override
+ public Schema<?> getInputTopicSchema() {
+ return schema;
+ }
+
+ @Override
public void findSinkServiceContainer(Map<String, GenericContainer<?>> containers) {
GenericContainer<?> container = containers.get("mysql");
checkState(container instanceof MySQLContainer,
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
index 5e060d5..d2917e6 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.tests.integration.io;
import java.util.Map;
import lombok.Getter;
+import org.apache.pulsar.client.api.Schema;
import org.testcontainers.containers.GenericContainer;
import org.testng.collections.Maps;
@@ -57,6 +58,10 @@ public abstract class SinkTester {
this.sinkConfig = Maps.newHashMap();
}
+ public Schema<?> getInputTopicSchema() {
+ return Schema.STRING;
+ }
+
public abstract void findSinkServiceContainer(Map<String, GenericContainer<?>> externalServices);
public SinkType sinkType() {