You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/09/13 04:32:42 UTC

[incubator-pulsar] branch master updated: [io] jdbc connector uses Schema.AUTO (#2538)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7364563  [io] jdbc connector uses Schema.AUTO (#2538)
7364563 is described below

commit 73645631849ffce2f144f23d861763fe52a25af9
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Wed Sep 12 21:32:39 2018 -0700

    [io] jdbc connector uses Schema.AUTO (#2538)
    
    * [schema] enable Schema.AUTO if functions or connectors are using GenericRecord
    
    * auto schema in jdbc sink
    
    * [io] jdbc connector uses Schema.AUTO
---
 pulsar-functions/java-examples/pom.xml             |  1 -
 .../apache/pulsar/io/jdbc/JdbcAbstractSink.java    |  3 --
 ...AvroSchemaSink.java => JdbcAutoSchemaSink.java} | 39 +++++-----------------
 .../org/apache/pulsar/io/jdbc/JdbcSinkConfig.java  |  3 --
 .../resources/META-INF/services/pulsar-io.yaml     |  2 +-
 .../org/apache/pulsar/io/jdbc/JdbcSinkTest.java    | 17 ++++++----
 .../integration/functions/PulsarFunctionsTest.java | 21 +++++++++---
 .../tests/integration/io/JdbcSinkTester.java       | 10 +++---
 .../pulsar/tests/integration/io/SinkTester.java    |  5 +++
 9 files changed, 47 insertions(+), 54 deletions(-)

diff --git a/pulsar-functions/java-examples/pom.xml b/pulsar-functions/java-examples/pom.xml
index b077dea..747acff 100644
--- a/pulsar-functions/java-examples/pom.xml
+++ b/pulsar-functions/java-examples/pom.xml
@@ -41,7 +41,6 @@
       <artifactId>pulsar-functions-api</artifactId>
       <version>${project.version}</version>
     </dependency>
-
   </dependencies>
 
 </project>
diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
index 425fb57..91d043d 100644
--- a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
+++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
@@ -54,8 +54,6 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
     private JdbcUtils.TableId tableId;
     private PreparedStatement insertStatement;
 
-    // TODO: turn to getSchema from SinkContext.getTopicSchema.getSchema(inputTopic)
-    protected String schema;
     protected JdbcUtils.TableDefinition tableDefinition;
 
     // for flush
@@ -89,7 +87,6 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
         connection.setAutoCommit(false);
         log.info("Opened jdbc connection: {}, autoCommit: {}", jdbcUrl, connection.getAutoCommit());
 
-        schema = jdbcSinkConfig.getSchema();
         tableName = jdbcSinkConfig.getTableName();
         tableId = JdbcUtils.getTableId(connection, tableName);
         tableDefinition = JdbcUtils.getTableDefinition(connection, tableId);
diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAvroSchemaSink.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java
similarity index 61%
rename from pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAvroSchemaSink.java
rename to pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java
index ec28220..c18dc81 100644
--- a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAvroSchemaSink.java
+++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java
@@ -20,50 +20,27 @@
 package org.apache.pulsar.io.jdbc;
 
 import java.sql.PreparedStatement;
-import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.util.Utf8;
+import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.jdbc.JdbcUtils.ColumnId;
 
 /**
- * A Simple Jdbc sink, which assume input Record as AvroSchema format
+ * A Simple Jdbc sink, which interprets input Record in generic record.
  */
 @Slf4j
-public class JdbcAvroSchemaSink extends JdbcAbstractSink<byte[]> {
-
-    private Schema avroSchema = null;
-    private DatumReader<GenericRecord> reader = null;
-
+public class JdbcAutoSchemaSink extends JdbcAbstractSink<GenericRecord> {
 
     @Override
-    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
-        super.open(config, sinkContext);
-        // get reader, and read value out as GenericRecord
-        if (avroSchema == null || reader == null) {
-            avroSchema = Schema.parse(schema);
-            reader = new GenericDatumReader<>(avroSchema);
-        }
-        log.info("open JdbcAvroSchemaSink with schema: {}, and tableDefinition: {}", schema, tableDefinition.toString());
-    }
-
-
     public void bindValue(PreparedStatement statement,
-                          Record<byte[]> message) throws Exception {
+                          Record<GenericRecord> message) throws Exception {
 
-        byte[] value = message.getValue();
-        GenericRecord record = reader.read(null, DecoderFactory.get().binaryDecoder(value, null));
+        GenericRecord record = message.getValue();
 
         int index = 1;
         for (ColumnId columnId : tableDefinition.getColumns()) {
             String colName = columnId.getName();
-            Object obj = record.get(colName);
+            Object obj = record.getField(colName);
             setColumnValue(statement, index++, obj);
             log.info("set column value: {}", obj.toString());
         }
@@ -80,8 +57,8 @@ public class JdbcAvroSchemaSink extends JdbcAbstractSink<byte[]> {
             statement.setFloat(index, (Float) value);
         } else if (value instanceof Boolean) {
             statement.setBoolean(index, (Boolean) value);
-        } else if (value instanceof Utf8) {
-            statement.setString(index, ((Utf8)value).toString());
+        } else if (value instanceof String) {
+            statement.setString(index, (String )value);
         } else if (value instanceof Short) {
             statement.setShort(index, (Short) value);
         } else {
diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
index 3419811..6cc95d6 100644
--- a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
+++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
@@ -44,9 +44,6 @@ public class JdbcSinkConfig implements Serializable {
     private String jdbcUrl;
     private String tableName;
 
-    // schema for input topic
-    private String schema;
-
     // Optional
     private int timeoutMs = 500;
     private int batchSize = 200;
diff --git a/pulsar-io/jdbc/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/jdbc/src/main/resources/META-INF/services/pulsar-io.yaml
index d9d06bd..38d9e3c 100644
--- a/pulsar-io/jdbc/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/jdbc/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -19,4 +19,4 @@
 
 name: jdbc
 description: Jdbc sink
-sinkClass: org.apache.pulsar.io.jdbc.JdbcAvroSchemaSink
+sinkClass: org.apache.pulsar.io.jdbc.JdbcAutoSchemaSink
diff --git a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
index 33bb859..84f6a82 100644
--- a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
+++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
@@ -28,9 +28,11 @@ import lombok.EqualsAndHashCode;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.schema.AutoSchema;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericSchema;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.source.PulsarRecord;
 import org.testng.Assert;
@@ -69,7 +71,7 @@ public class JdbcSinkTest {
 
     @Test
     public void TestOpenAndWriteSink() throws Exception {
-        JdbcAvroSchemaSink jdbcSink;
+        JdbcAutoSchemaSink jdbcSink;
         Map<String, Object> conf;
         String tableName = "TestOpenAndWriteSink";
 
@@ -78,7 +80,7 @@ public class JdbcSinkTest {
         conf.put("jdbcUrl", jdbcUrl);
         conf.put("tableName", tableName);
 
-        jdbcSink = new JdbcAvroSchemaSink();
+        jdbcSink = new JdbcAutoSchemaSink();
 
         sqliteUtils.createTable(
             "CREATE TABLE " + tableName + "(" +
@@ -94,13 +96,14 @@ public class JdbcSinkTest {
         obj.setField2("ValueOfField1");
         obj.setField3(3);
         AvroSchema<Foo> schema = AvroSchema.of(Foo.class);
-        conf.put("schema",  new String(schema.getSchemaInfo().getSchema()));
-        log.info("schema: {}", new String(schema.getSchemaInfo().getSchema()));
 
         byte[] bytes = schema.encode(obj);
         ByteBuf payload = Unpooled.copiedBuffer(bytes);
-        Message<byte[]> message = new MessageImpl("77:777", conf, payload, Schema.BYTES);
-        Record<byte[]> record = PulsarRecord.<byte[]>builder()
+        AutoSchema autoSchema = new AutoSchema();
+        autoSchema.setSchema(GenericSchema.of(schema.getSchemaInfo()));
+
+        Message<GenericRecord> message = new MessageImpl("77:777", conf, payload, autoSchema);
+        Record<GenericRecord> record = PulsarRecord.<GenericRecord>builder()
             .message(message)
             .topicName("fake_topic_name")
             .build();
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 1cb388e..5f11eef 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -45,6 +45,7 @@ import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
 import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
 import org.apache.pulsar.tests.integration.io.*;
+import org.apache.pulsar.tests.integration.io.JdbcSinkTester.Foo;
 import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.testng.annotations.Test;
@@ -103,6 +104,11 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
         // prepare the testing environment for sink
         prepareSink(tester);
 
+        ensureSubscriptionCreated(
+            inputTopicName,
+            String.format("public/default/%s", sinkName),
+            tester.getInputTopicSchema());
+
         // submit the sink connector
         submitSinkConnector(tester, tenant, namespace, sinkName, inputTopicName);
 
@@ -251,13 +257,14 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
 
     // This for JdbcSinkTester
     protected Map<String, String> produceSchemaMessagesToInputTopic(String inputTopicName,
-                                                              int numMessages,  Schema schema) throws Exception {
+                                                                    int numMessages,
+                                                                    Schema<Foo> schema) throws Exception {
         @Cleanup
         PulsarClient client = PulsarClient.builder()
             .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
             .build();
         @Cleanup
-        Producer<String> producer = client.newProducer(Schema.STRING)
+        Producer<Foo> producer = client.newProducer(schema)
             .topic(inputTopicName)
             .create();
         LinkedHashMap<String, String> kvs = new LinkedHashMap<>();
@@ -273,7 +280,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
             kvs.put(key, value);
             producer.newMessage()
                 .key(key)
-                .value(value)
+                .value(obj)
                 .send();
         }
         return kvs;
@@ -635,7 +642,13 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
             commands);
         assertTrue(result.getStdout().contains("\"Created successfully\""));
 
+        ensureSubscriptionCreated(inputTopicName, String.format("public/default/%s", functionName), inputTopicSchema);
+    }
 
+    private static <T> void ensureSubscriptionCreated(String inputTopicName,
+                                                      String subscriptionName,
+                                                      Schema<T> inputTopicSchema)
+            throws Exception {
         // ensure the function subscription exists before we start producing messages
         try (PulsarClient client = PulsarClient.builder()
             .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
@@ -643,7 +656,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
             try (Consumer<T> ignored = client.newConsumer(inputTopicSchema)
                 .topic(inputTopicName)
                 .subscriptionType(SubscriptionType.Shared)
-                .subscriptionName(String.format("public/default/%s", functionName))
+                .subscriptionName(subscriptionName)
                 .subscribe()) {
             }
         }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
index e4aa401..7c14ba9 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
@@ -31,6 +31,7 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.MySQLContainer;
@@ -69,14 +70,15 @@ public class JdbcSinkTester extends SinkTester {
         sinkConfig.put("userName", "test");
         sinkConfig.put("password", "test");
         sinkConfig.put("tableName", tableName);
-
-        // prepare schema
-        sinkConfig.put("schema",  new String(schema.getSchemaInfo().getSchema()));
-        log.info("schema: {}", new String(schema.getSchemaInfo().getSchema()));
         sinkConfig.put("batchSize", 1);
     }
 
     @Override
+    public Schema<?> getInputTopicSchema() {
+        return schema;
+    }
+
+    @Override
     public void findSinkServiceContainer(Map<String, GenericContainer<?>> containers) {
         GenericContainer<?> container = containers.get("mysql");
         checkState(container instanceof MySQLContainer,
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
index 5e060d5..d2917e6 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.tests.integration.io;
 
 import java.util.Map;
 import lombok.Getter;
+import org.apache.pulsar.client.api.Schema;
 import org.testcontainers.containers.GenericContainer;
 import org.testng.collections.Maps;
 
@@ -57,6 +58,10 @@ public abstract class SinkTester {
         this.sinkConfig = Maps.newHashMap();
     }
 
+    public Schema<?> getInputTopicSchema() {
+        return Schema.STRING;
+    }
+
     public abstract void findSinkServiceContainer(Map<String, GenericContainer<?>> externalServices);
 
     public SinkType sinkType() {