You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/09/13 04:32:41 UTC

[GitHub] merlimat closed pull request #2538: [io] jdbc connector uses Schema.AUTO

merlimat closed pull request #2538: [io] jdbc connector uses Schema.AUTO
URL: https://github.com/apache/incubator-pulsar/pull/2538
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-functions/java-examples/pom.xml b/pulsar-functions/java-examples/pom.xml
index b077dea2f0..747acff2a1 100644
--- a/pulsar-functions/java-examples/pom.xml
+++ b/pulsar-functions/java-examples/pom.xml
@@ -41,7 +41,6 @@
       <artifactId>pulsar-functions-api</artifactId>
       <version>${project.version}</version>
     </dependency>
-
   </dependencies>
 
 </project>
diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
index 425fb57ac1..91d043db5f 100644
--- a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
+++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
@@ -54,8 +54,6 @@
     private JdbcUtils.TableId tableId;
     private PreparedStatement insertStatement;
 
-    // TODO: turn to getSchema from SinkContext.getTopicSchema.getSchema(inputTopic)
-    protected String schema;
     protected JdbcUtils.TableDefinition tableDefinition;
 
     // for flush
@@ -89,7 +87,6 @@ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exc
         connection.setAutoCommit(false);
         log.info("Opened jdbc connection: {}, autoCommit: {}", jdbcUrl, connection.getAutoCommit());
 
-        schema = jdbcSinkConfig.getSchema();
         tableName = jdbcSinkConfig.getTableName();
         tableId = JdbcUtils.getTableId(connection, tableName);
         tableDefinition = JdbcUtils.getTableDefinition(connection, tableId);
diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAvroSchemaSink.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java
similarity index 61%
rename from pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAvroSchemaSink.java
rename to pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java
index ec2822010e..c18dc81b82 100644
--- a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAvroSchemaSink.java
+++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java
@@ -20,50 +20,27 @@
 package org.apache.pulsar.io.jdbc;
 
 import java.sql.PreparedStatement;
-import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.util.Utf8;
+import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.jdbc.JdbcUtils.ColumnId;
 
 /**
- * A Simple Jdbc sink, which assume input Record as AvroSchema format
+ * A Simple Jdbc sink, which interprets input Record in generic record.
  */
 @Slf4j
-public class JdbcAvroSchemaSink extends JdbcAbstractSink<byte[]> {
-
-    private Schema avroSchema = null;
-    private DatumReader<GenericRecord> reader = null;
-
+public class JdbcAutoSchemaSink extends JdbcAbstractSink<GenericRecord> {
 
     @Override
-    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
-        super.open(config, sinkContext);
-        // get reader, and read value out as GenericRecord
-        if (avroSchema == null || reader == null) {
-            avroSchema = Schema.parse(schema);
-            reader = new GenericDatumReader<>(avroSchema);
-        }
-        log.info("open JdbcAvroSchemaSink with schema: {}, and tableDefinition: {}", schema, tableDefinition.toString());
-    }
-
-
     public void bindValue(PreparedStatement statement,
-                          Record<byte[]> message) throws Exception {
+                          Record<GenericRecord> message) throws Exception {
 
-        byte[] value = message.getValue();
-        GenericRecord record = reader.read(null, DecoderFactory.get().binaryDecoder(value, null));
+        GenericRecord record = message.getValue();
 
         int index = 1;
         for (ColumnId columnId : tableDefinition.getColumns()) {
             String colName = columnId.getName();
-            Object obj = record.get(colName);
+            Object obj = record.getField(colName);
             setColumnValue(statement, index++, obj);
             log.info("set column value: {}", obj.toString());
         }
@@ -80,8 +57,8 @@ private static void setColumnValue(PreparedStatement statement, int index, Objec
             statement.setFloat(index, (Float) value);
         } else if (value instanceof Boolean) {
             statement.setBoolean(index, (Boolean) value);
-        } else if (value instanceof Utf8) {
-            statement.setString(index, ((Utf8)value).toString());
+        } else if (value instanceof String) {
+            statement.setString(index, (String )value);
         } else if (value instanceof Short) {
             statement.setShort(index, (Short) value);
         } else {
diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
index 3419811e0a..6cc95d6bac 100644
--- a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
+++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
@@ -44,9 +44,6 @@
     private String jdbcUrl;
     private String tableName;
 
-    // schema for input topic
-    private String schema;
-
     // Optional
     private int timeoutMs = 500;
     private int batchSize = 200;
diff --git a/pulsar-io/jdbc/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/jdbc/src/main/resources/META-INF/services/pulsar-io.yaml
index d9d06bde47..38d9e3c295 100644
--- a/pulsar-io/jdbc/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/jdbc/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -19,4 +19,4 @@
 
 name: jdbc
 description: Jdbc sink
-sinkClass: org.apache.pulsar.io.jdbc.JdbcAvroSchemaSink
+sinkClass: org.apache.pulsar.io.jdbc.JdbcAutoSchemaSink
diff --git a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
index 33bb859547..84f6a82f03 100644
--- a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
+++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
@@ -28,9 +28,11 @@
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.schema.AutoSchema;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericSchema;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.source.PulsarRecord;
 import org.testng.Assert;
@@ -69,7 +71,7 @@ public void tearDown() throws Exception {
 
     @Test
     public void TestOpenAndWriteSink() throws Exception {
-        JdbcAvroSchemaSink jdbcSink;
+        JdbcAutoSchemaSink jdbcSink;
         Map<String, Object> conf;
         String tableName = "TestOpenAndWriteSink";
 
@@ -78,7 +80,7 @@ public void TestOpenAndWriteSink() throws Exception {
         conf.put("jdbcUrl", jdbcUrl);
         conf.put("tableName", tableName);
 
-        jdbcSink = new JdbcAvroSchemaSink();
+        jdbcSink = new JdbcAutoSchemaSink();
 
         sqliteUtils.createTable(
             "CREATE TABLE " + tableName + "(" +
@@ -94,13 +96,14 @@ public void TestOpenAndWriteSink() throws Exception {
         obj.setField2("ValueOfField1");
         obj.setField3(3);
         AvroSchema<Foo> schema = AvroSchema.of(Foo.class);
-        conf.put("schema",  new String(schema.getSchemaInfo().getSchema()));
-        log.info("schema: {}", new String(schema.getSchemaInfo().getSchema()));
 
         byte[] bytes = schema.encode(obj);
         ByteBuf payload = Unpooled.copiedBuffer(bytes);
-        Message<byte[]> message = new MessageImpl("77:777", conf, payload, Schema.BYTES);
-        Record<byte[]> record = PulsarRecord.<byte[]>builder()
+        AutoSchema autoSchema = new AutoSchema();
+        autoSchema.setSchema(GenericSchema.of(schema.getSchemaInfo()));
+
+        Message<GenericRecord> message = new MessageImpl("77:777", conf, payload, autoSchema);
+        Record<GenericRecord> record = PulsarRecord.<GenericRecord>builder()
             .message(message)
             .topicName("fake_topic_name")
             .build();
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 0bf9ded6b3..b1bfc8c073 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -45,6 +45,7 @@
 import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
 import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
 import org.apache.pulsar.tests.integration.io.*;
+import org.apache.pulsar.tests.integration.io.JdbcSinkTester.Foo;
 import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.testng.annotations.Test;
@@ -98,6 +99,11 @@ private void testSink(SinkTester tester, boolean builtin) throws Exception {
         // prepare the testing environment for sink
         prepareSink(tester);
 
+        ensureSubscriptionCreated(
+            inputTopicName,
+            String.format("public/default/%s", sinkName),
+            tester.getInputTopicSchema());
+
         // submit the sink connector
         submitSinkConnector(tester, tenant, namespace, sinkName, inputTopicName);
 
@@ -246,13 +252,14 @@ protected void getSinkStatus(String tenant, String namespace, String sinkName) t
 
     // This for JdbcSinkTester
     protected Map<String, String> produceSchemaMessagesToInputTopic(String inputTopicName,
-                                                              int numMessages,  Schema schema) throws Exception {
+                                                                    int numMessages,
+                                                                    Schema<Foo> schema) throws Exception {
         @Cleanup
         PulsarClient client = PulsarClient.builder()
             .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
             .build();
         @Cleanup
-        Producer<String> producer = client.newProducer(Schema.STRING)
+        Producer<Foo> producer = client.newProducer(schema)
             .topic(inputTopicName)
             .create();
         LinkedHashMap<String, String> kvs = new LinkedHashMap<>();
@@ -268,7 +275,7 @@ protected void getSinkStatus(String tenant, String namespace, String sinkName) t
             kvs.put(key, value);
             producer.newMessage()
                 .key(key)
-                .value(value)
+                .value(obj)
                 .send();
         }
         return kvs;
@@ -630,7 +637,13 @@ private static void submitExclamationFunction(Runtime runtime,
             commands);
         assertTrue(result.getStdout().contains("\"Created successfully\""));
 
+        ensureSubscriptionCreated(inputTopicName, String.format("public/default/%s", functionName), inputTopicSchema);
+    }
 
+    private static <T> void ensureSubscriptionCreated(String inputTopicName,
+                                                      String subscriptionName,
+                                                      Schema<T> inputTopicSchema)
+            throws Exception {
         // ensure the function subscription exists before we start producing messages
         try (PulsarClient client = PulsarClient.builder()
             .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
@@ -638,7 +651,7 @@ private static void submitExclamationFunction(Runtime runtime,
             try (Consumer<T> ignored = client.newConsumer(inputTopicSchema)
                 .topic(inputTopicName)
                 .subscriptionType(SubscriptionType.Shared)
-                .subscriptionName(String.format("public/default/%s", functionName))
+                .subscriptionName(subscriptionName)
                 .subscribe()) {
             }
         }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
index e4aa401042..7c14ba96c0 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
@@ -31,6 +31,7 @@
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.MySQLContainer;
@@ -69,13 +70,14 @@ public JdbcSinkTester() {
         sinkConfig.put("userName", "test");
         sinkConfig.put("password", "test");
         sinkConfig.put("tableName", tableName);
-
-        // prepare schema
-        sinkConfig.put("schema",  new String(schema.getSchemaInfo().getSchema()));
-        log.info("schema: {}", new String(schema.getSchemaInfo().getSchema()));
         sinkConfig.put("batchSize", 1);
     }
 
+    @Override
+    public Schema<?> getInputTopicSchema() {
+        return schema;
+    }
+
     @Override
     public void findSinkServiceContainer(Map<String, GenericContainer<?>> containers) {
         GenericContainer<?> container = containers.get("mysql");
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
index 7f4b2d9a1b..8b61ff2118 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
@@ -20,6 +20,7 @@
 
 import java.util.Map;
 import lombok.Getter;
+import org.apache.pulsar.client.api.Schema;
 import org.testcontainers.containers.GenericContainer;
 import org.testng.collections.Maps;
 
@@ -56,6 +57,10 @@ public SinkTester(String sinkArchive, String sinkClassName) {
         this.sinkConfig = Maps.newHashMap();
     }
 
+    public Schema<?> getInputTopicSchema() {
+        return Schema.STRING;
+    }
+
     public abstract void findSinkServiceContainer(Map<String, GenericContainer<?>> externalServices);
 
     public SinkType sinkType() {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services