You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/12/24 18:07:17 UTC

[pulsar] branch master updated: [pulsar-flink] add streaming connectors as a Pulsar stream that serializes data in Avro format (#3231)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a1a148  [pulsar-flink] add streaming connectors as a Pulsar stream that serializes data in Avro format (#3231)
7a1a148 is described below

commit 7a1a14855b7a08a4dd22c2f97b4bb79de33731b7
Author: wpl <12...@qq.com>
AuthorDate: Tue Dec 25 02:07:12 2018 +0800

    [pulsar-flink] add streaming connectors as a Pulsar stream that serializes data in Avro format (#3231)
    
    ### Motivation
    
    add Avro data format flink streaming connectors
    
    
    ### Modifications
    
    add class PulsarAvroTableSink
    
    ### Result
    
    add append-only table sink for serializes data in Avro format.
---
 .gitignore                                         |   4 +-
 examples/flink-consumer-source/pom.xml             |  12 ++
 .../example/FlinkPulsarBatchAvroSinkExample.java   |   2 +-
 ...lsarConsumerSourceWordCountToAvroTableSink.java | 120 +++++++++++++++
 ...lsarConsumerSourceWordCountToJsonTableSink.java | 133 ++++++++++++++++
 .../src/main/resources/avro/NasaMission.avsc       |  12 +-
 .../FlinkPulsarBatchAvroSinkScalaExample.scala     |   2 +-
 pulsar-flink/pom.xml                               |   4 +
 .../connectors/pulsar/PulsarAvroTableSink.java     | 169 +++++++++++++++++++++
 .../serialization/AvroSerializationSchemaTest.java |   2 +-
 .../connectors/pulsar/PulsarAvroTableSinkTest.java | 106 +++++++++++++
 .../connectors/pulsar/PulsarJsonTableSinkTest.java | 100 ++++++++++++
 .../src/test/resources/avro/NasaMission.avsc       |   2 +-
 13 files changed, 661 insertions(+), 7 deletions(-)

diff --git a/.gitignore b/.gitignore
index bd3c93e..4aef5a0 100644
--- a/.gitignore
+++ b/.gitignore
@@ -80,5 +80,5 @@ docker.debug-info
 **/website/translated_docs*
 
 # Avro
-examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/avro/generated
-pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/avro/generated
+examples/flink-consumer-source/src/main/java/org/apache/flink/avro/generated
+pulsar-flink/src/test/java/org/apache/flink/avro/generated
diff --git a/examples/flink-consumer-source/pom.xml b/examples/flink-consumer-source/pom.xml
index 3c08697..35f5924 100644
--- a/examples/flink-consumer-source/pom.xml
+++ b/examples/flink-consumer-source/pom.xml
@@ -61,6 +61,18 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+      <version>${flink.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-table_${scala.binary.version}</artifactId>
+      <version>${flink.version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.pulsar</groupId>
       <artifactId>pulsar-flink</artifactId>
       <version>${project.version}</version>
diff --git a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
index 553bf6c..ef0048c 100644
--- a/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
+++ b/examples/flink-consumer-source/src/main/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkExample.java
@@ -22,8 +22,8 @@ package org.apache.flink.batch.connectors.pulsar.example;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.avro.generated.NasaMission;
 import org.apache.flink.batch.connectors.pulsar.PulsarAvroOutputFormat;
-import org.apache.flink.batch.connectors.pulsar.avro.generated.NasaMission;
 
 import java.util.Arrays;
 import java.util.List;
diff --git a/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java b/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java
new file mode 100644
index 0000000..7b78da5
--- /dev/null
+++ b/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToAvroTableSink.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.streaming.connectors.pulsar.example;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.avro.generated.WordWithCount;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.connectors.pulsar.PulsarAvroTableSink;
+import org.apache.flink.streaming.connectors.pulsar.PulsarSourceBuilder;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.sinks.CsvTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+
+/**
+ * Implements a streaming wordcount program on pulsar topics.
+ *
+ * <p>Example usage:
+ *   --service-url pulsar://localhost:6650 --input-topic test_topic --subscription test_sub
+ */
+public class PulsarConsumerSourceWordCountToAvroTableSink {
+    private static final String SERVICE_URL = "pulsar://localhost:6650";
+    private static final String ROUTING_KEY = "word";
+
+    public static void main(String[] args) throws Exception {
+        // parse input arguments
+        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+        if (parameterTool.getNumberOfParameters() < 2) {
+            System.out.println("Missing parameters!");
+            System.out.println("Usage: pulsar --service-url <pulsar-service-url> --input-topic <topic> --subscription <sub> --output-topic <topic>");
+            return;
+        }
+
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().disableSysoutLogging();
+        env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
+        env.enableCheckpointing(5000);
+        env.getConfig().setGlobalJobParameters(parameterTool);
+        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.getTableEnvironment(env);
+
+        String serviceUrl = parameterTool.getRequired("service-url");
+        String inputTopic = parameterTool.getRequired("input-topic");
+        String subscription = parameterTool.get("subscription", "flink-examples");
+        String outputTopic = parameterTool.get("output-topic", null);
+        int parallelism = parameterTool.getInt("parallelism", 1);
+
+        System.out.println("Parameters:");
+        System.out.println("\tServiceUrl:\t" + serviceUrl);
+        System.out.println("\tInputTopic:\t" + inputTopic);
+        System.out.println("\tSubscription:\t" + subscription);
+        System.out.println("\tOutputTopic:\t" + outputTopic);
+        System.out.println("\tParallelism:\t" + parallelism);
+
+        PulsarSourceBuilder<String> builder = PulsarSourceBuilder.builder(new SimpleStringSchema())
+                .serviceUrl(serviceUrl)
+                .topic(inputTopic)
+                .subscriptionName(subscription);
+        SourceFunction<String> src = builder.build();
+        DataStream<String> input = env.addSource(src);
+
+
+        DataStream<WordWithCount> wc = input
+                .flatMap((FlatMapFunction<String, WordWithCount>) (line, collector) -> {
+                    for (String word : line.split("\\s")) {
+                        collector.collect(
+                                WordWithCount.newBuilder().setWord(word).setCount(1).build()
+                        );
+                    }
+                })
+                .returns(WordWithCount.class)
+                .keyBy("word")
+                .timeWindow(Time.seconds(5))
+                .reduce((ReduceFunction<WordWithCount>) (c1, c2) ->
+                        WordWithCount.newBuilder().setWord(c1.getWord()).setCount(c1.getCount() + c2.getCount()).build()
+                );
+
+        tableEnvironment.registerDataStream("wc",wc);
+
+        Table table = tableEnvironment.sqlQuery("select * from wc");
+        if (null != outputTopic) {
+            PulsarAvroTableSink sink = new PulsarAvroTableSink(SERVICE_URL, outputTopic, new ProducerConfiguration(), ROUTING_KEY,WordWithCount.class);
+            table.writeToSink(sink);
+        } else {
+            TableSink sink = new CsvTableSink("./examples/file",  "|");
+            // print the results with a csv file
+            table.writeToSink(sink);
+        }
+
+        env.execute("Pulsar Stream WordCount");
+    }
+
+}
diff --git a/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java b/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java
new file mode 100644
index 0000000..95b2536
--- /dev/null
+++ b/examples/flink-consumer-source/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCountToJsonTableSink.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.streaming.connectors.pulsar.example;
+
+import lombok.AllArgsConstructor;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.connectors.pulsar.PulsarJsonTableSink;
+import org.apache.flink.streaming.connectors.pulsar.PulsarSourceBuilder;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.sinks.CsvTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+
+/**
+ * Implements a streaming wordcount program on pulsar topics.
+ *
+ * <p>Example usage:
+ *   --service-url pulsar://localhost:6650 --input-topic test_topic --subscription test_sub
+ */
+public class PulsarConsumerSourceWordCountToJsonTableSink {
+    private static final String SERVICE_URL = "pulsar://localhost:6650";
+    private static final String ROUTING_KEY = "word";
+
+    public static void main(String[] args) throws Exception {
+        // parse input arguments
+        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+        if (parameterTool.getNumberOfParameters() < 2) {
+            System.out.println("Missing parameters!");
+            System.out.println("Usage: pulsar --service-url <pulsar-service-url> --input-topic <topic> --subscription <sub> --output-topic <topic>");
+            return;
+        }
+
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().disableSysoutLogging();
+        env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
+        env.enableCheckpointing(5000);
+        env.getConfig().setGlobalJobParameters(parameterTool);
+        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.getTableEnvironment(env);
+
+        String serviceUrl = parameterTool.getRequired("service-url");
+        String inputTopic = parameterTool.getRequired("input-topic");
+        String subscription = parameterTool.get("subscription", "flink-examples");
+        String outputTopic = parameterTool.get("output-topic", null);
+        int parallelism = parameterTool.getInt("parallelism", 1);
+
+        System.out.println("Parameters:");
+        System.out.println("\tServiceUrl:\t" + serviceUrl);
+        System.out.println("\tInputTopic:\t" + inputTopic);
+        System.out.println("\tSubscription:\t" + subscription);
+        System.out.println("\tOutputTopic:\t" + outputTopic);
+        System.out.println("\tParallelism:\t" + parallelism);
+
+        PulsarSourceBuilder<String> builder = PulsarSourceBuilder.builder(new SimpleStringSchema())
+                .serviceUrl(serviceUrl)
+                .topic(inputTopic)
+                .subscriptionName(subscription);
+        SourceFunction<String> src = builder.build();
+        DataStream<String> input = env.addSource(src);
+
+
+        DataStream<WordWithCount> wc = input
+                .flatMap((FlatMapFunction<String, WordWithCount>) (line, collector) -> {
+                    for (String word : line.split("\\s")) {
+                        collector.collect(
+                            new WordWithCount(word, 1)
+                        );
+                    }
+                })
+                .returns(WordWithCount.class)
+                .keyBy("word")
+                .timeWindow(Time.seconds(5))
+                .reduce((ReduceFunction<WordWithCount>) (c1, c2) ->
+                        new WordWithCount(c1.word, c1.count + c2.count));
+
+        tableEnvironment.registerDataStream("wc",wc);
+
+        Table table = tableEnvironment.sqlQuery("select * from wc");
+        if (null != outputTopic) {
+            PulsarJsonTableSink sink = new PulsarJsonTableSink(SERVICE_URL, outputTopic, new ProducerConfiguration(), ROUTING_KEY);
+            table.writeToSink(sink);
+        } else {
+            TableSink sink = new CsvTableSink("./examples/file",  "|");
+            // print the results with a csv file
+            table.writeToSink(sink);
+        }
+
+        env.execute("Pulsar Stream WordCount");
+    }
+
+    /**
+     * Data type for words with count.
+     */
+    @AllArgsConstructor
+    @NoArgsConstructor
+    @ToString
+    public static class WordWithCount {
+
+        public String word;
+        public long count;
+
+    }
+}
diff --git a/examples/flink-consumer-source/src/main/resources/avro/NasaMission.avsc b/examples/flink-consumer-source/src/main/resources/avro/NasaMission.avsc
index 4a669e0..45adc98 100644
--- a/examples/flink-consumer-source/src/main/resources/avro/NasaMission.avsc
+++ b/examples/flink-consumer-source/src/main/resources/avro/NasaMission.avsc
@@ -1,4 +1,5 @@
-{"namespace": "org.apache.flink.batch.connectors.pulsar.avro.generated",
+[
+{"namespace": "org.apache.flink.avro.generated",
  "type": "record",
  "name": "NasaMission",
  "fields": [
@@ -7,4 +8,13 @@
      {"name": "start_year",  "type": ["int", "null"]},
      {"name": "end_year", "type": ["int", "null"]}
  ]
+},
+{"namespace": "org.apache.flink.avro.generated",
+ "type": "record",
+ "name": "WordWithCount",
+ "fields": [
+     {"name": "word", "type": "string"},
+     {"name": "count", "type": "long"}
+ ]
 }
+]
diff --git a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala
index 5f95611..0d255f2 100644
--- a/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala
+++ b/examples/flink-consumer-source/src/main/scala/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchAvroSinkScalaExample.scala
@@ -19,8 +19,8 @@
 package org.apache.flink.batch.connectors.pulsar.example
 
 import org.apache.flink.api.scala._
+import org.apache.flink.avro.generated.NasaMission
 import org.apache.flink.batch.connectors.pulsar.PulsarAvroOutputFormat
-import org.apache.flink.batch.connectors.pulsar.avro.generated.NasaMission
 
 /**
   * Implements a batch Scala program on Pulsar topic by writing Flink DataSet as Avro.
diff --git a/pulsar-flink/pom.xml b/pulsar-flink/pom.xml
index 92eb045..74d3e8c 100644
--- a/pulsar-flink/pom.xml
+++ b/pulsar-flink/pom.xml
@@ -111,6 +111,10 @@
         <directory>src/main/resources</directory>
         <filtering>true</filtering>
       </resource>
+      <resource>
+        <directory>src/test/resources</directory>
+        <filtering>true</filtering>
+      </resource>
     </resources>
 
     <plugins>
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
new file mode 100644
index 0000000..9187a0d
--- /dev/null
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.streaming.connectors.pulsar;
+
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.AvroRowSerializationSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+
+import java.util.Arrays;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An append-only table sink to emit a streaming table as a Pulsar stream that serializes data in Avro format.
+ */
+public class PulsarAvroTableSink implements AppendStreamTableSink<Row> {
+
+    protected final String serviceUrl;
+    protected final String topic;
+    protected final ProducerConfiguration producerConf;
+    protected final String routingKeyFieldName;
+    protected SerializationSchema<Row> serializationSchema;
+    protected String[] fieldNames;
+    protected TypeInformation[] fieldTypes;
+    protected PulsarKeyExtractor<Row> keyExtractor;
+    private Class<? extends SpecificRecord> recordClazz;
+
+    /**
+     * Create PulsarAvroTableSink.
+     *
+     * @param serviceUrl          pulsar service url
+     * @param topic               topic in pulsar to which table is written
+     * @param producerConf        producer configuration
+     * @param routingKeyFieldName routing key field name
+     */
+    public PulsarAvroTableSink(
+            String serviceUrl,
+            String topic,
+            ProducerConfiguration producerConf,
+            String routingKeyFieldName,
+            Class<? extends SpecificRecord> recordClazz) {
+        this.serviceUrl = checkNotNull(serviceUrl, "Service url not set");
+        this.topic = checkNotNull(topic, "Topic is null");
+        this.producerConf = checkNotNull(producerConf, "Producer configuration not set");
+        this.routingKeyFieldName = routingKeyFieldName;
+        this.recordClazz = recordClazz;
+    }
+
+    /**
+     * Returns the low-level producer.
+     */
+    protected FlinkPulsarProducer<Row> createFlinkPulsarProducer() {
+        serializationSchema = new AvroRowSerializationSchema(recordClazz);
+        return new FlinkPulsarProducer<Row>(
+                serviceUrl,
+                topic,
+                serializationSchema,
+                producerConf,
+                keyExtractor);
+    }
+
+    @Override
+    public void emitDataStream(DataStream<Row> dataStream) {
+        checkState(fieldNames != null, "Table sink is not configured");
+        checkState(fieldTypes != null, "Table sink is not configured");
+        checkState(serializationSchema != null, "Table sink is not configured");
+        checkState(keyExtractor != null, "Table sink is not configured");
+        FlinkPulsarProducer<Row> producer = createFlinkPulsarProducer();
+        dataStream.addSink(producer);
+    }
+
+    @Override
+    public TypeInformation<Row> getOutputType() {
+        return new RowTypeInfo(fieldTypes, fieldNames);
+    }
+
+    @Override
+    public String[] getFieldNames() {
+        return fieldNames;
+    }
+
+    @Override
+    public TypeInformation<?>[] getFieldTypes() {
+        return fieldTypes;
+    }
+
+    @Override
+    public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+        PulsarAvroTableSink sink = new PulsarAvroTableSink(serviceUrl, topic, producerConf, routingKeyFieldName, recordClazz);
+
+        sink.fieldNames = checkNotNull(fieldNames, "Field names are null");
+        sink.fieldTypes = checkNotNull(fieldTypes, "Field types are null");
+        checkArgument(fieldNames.length == fieldTypes.length,
+                "Number of provided field names and types do not match");
+
+        sink.serializationSchema = new AvroRowSerializationSchema(recordClazz);
+        sink.keyExtractor = new AvroKeyExtractor(
+                routingKeyFieldName,
+                fieldNames,
+                fieldTypes,
+                recordClazz);
+
+        return sink;
+    }
+
+
+    /**
+     * A key extractor that extracts the routing key from a {@link Row} by field name.
+     */
+    private static class AvroKeyExtractor implements PulsarKeyExtractor<Row> {
+        private final int keyIndex;
+
+        public AvroKeyExtractor(
+                String keyFieldName,
+                String[] fieldNames,
+                TypeInformation<?>[] fieldTypes,
+                Class<? extends SpecificRecord> recordClazz) {
+
+            checkArgument(fieldNames.length == fieldTypes.length,
+                    "Number of provided field names and types does not match.");
+
+            Schema schema = SpecificData.get().getSchema(recordClazz);
+            Schema.Field keyField = schema.getField(keyFieldName);
+            Schema.Type keyType = keyField.schema().getType();
+
+            int keyIndex = Arrays.asList(fieldNames).indexOf(keyFieldName);
+            checkArgument(keyIndex >= 0,
+                    "Key field '" + keyFieldName + "' not found");
+
+            checkArgument(Schema.Type.STRING.equals(keyType),
+                    "Key field must be of type 'STRING'");
+            this.keyIndex = keyIndex;
+        }
+
+        @Override
+        public String getKey(Row event) {
+            return (String) event.getField(keyIndex);
+        }
+    }
+
+}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/AvroSerializationSchemaTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/AvroSerializationSchemaTest.java
index 176acc4..4f8f0c6 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/AvroSerializationSchemaTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/serialization/AvroSerializationSchemaTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.batch.connectors.pulsar.serialization;
 
 import org.apache.avro.generic.GenericRecord;
-import org.apache.flink.batch.connectors.pulsar.avro.generated.NasaMission;
+import org.apache.flink.avro.generated.NasaMission;
 import org.apache.flink.formats.avro.AvroDeserializationSchema;
 import org.junit.Test;
 
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
new file mode 100644
index 0000000..ae336ff
--- /dev/null
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.streaming.connectors.pulsar;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.avro.generated.NasaMission;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
+import org.powermock.api.mockito.PowerMockito;
+
+/**
+ * Unit test of {@link PulsarAvroTableSink}.
+ */
+public class PulsarAvroTableSinkTest {
+    private static final String SERVICE_URL = "pulsar://localhost:6650";
+    private static final String TOPIC_NAME = "test_topic";
+    private static final String ROUTING_KEY = "name";
+
+    private final String[] fieldNames = {"id", "name","start_year","end_year"};
+    private final TypeInformation[] typeInformations = {
+            TypeInformation.of(Integer.class),
+            TypeInformation.of(String.class),
+            TypeInformation.of(Integer.class),
+            TypeInformation.of(Integer.class)
+    };
+
+    /**
+     * Test configure PulsarTableSink.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testConfigure() throws Exception {
+        PulsarAvroTableSink sink = spySink();
+
+        TableSink<Row> configuredSink = sink.configure(fieldNames, typeInformations);
+
+        Assert.assertArrayEquals(fieldNames, configuredSink.getFieldNames());
+        Assert.assertArrayEquals(typeInformations, configuredSink.getFieldTypes());
+        Assert.assertNotNull(((PulsarAvroTableSink) configuredSink).keyExtractor);
+        Assert.assertNotNull(((PulsarAvroTableSink) configuredSink).serializationSchema);
+    }
+
+
+    /**
+     * Test emit data stream.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testEmitDataStream() throws Exception {
+        DataStream mockedDataStream = Mockito.mock(DataStream.class);
+
+        PulsarAvroTableSink sink = spySink();
+
+        sink.emitDataStream(mockedDataStream);
+
+        Mockito.verify(mockedDataStream).addSink(Mockito.any(FlinkPulsarProducer.class));
+    }
+
+
+    private PulsarAvroTableSink spySink() throws Exception {
+
+        PulsarAvroTableSink sink = new PulsarAvroTableSink(SERVICE_URL, TOPIC_NAME, new ProducerConfiguration(), ROUTING_KEY,NasaMission.class);
+        FlinkPulsarProducer producer = Mockito.mock(FlinkPulsarProducer.class);
+        PowerMockito.whenNew(
+                FlinkPulsarProducer.class
+        ).withArguments(
+                Mockito.anyString(),
+                Mockito.anyString(),
+                Mockito.any(SerializationSchema.class),
+                Mockito.any(PowerMockito.class),
+                Mockito.any(PulsarKeyExtractor.class)
+        ).thenReturn(producer);
+        Whitebox.setInternalState(sink, "fieldNames", fieldNames);
+        Whitebox.setInternalState(sink, "fieldTypes", typeInformations);
+        Whitebox.setInternalState(sink, "serializationSchema", Mockito.mock(SerializationSchema.class));
+        Whitebox.setInternalState(sink, "keyExtractor", Mockito.mock(PulsarKeyExtractor.class));
+        return sink;
+    }
+
+}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
new file mode 100644
index 0000000..746e87e
--- /dev/null
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.streaming.connectors.pulsar;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
+import org.powermock.api.mockito.PowerMockito;
+
+/**
+ * Unit test of {@link PulsarJsonTableSink}.
+ */
+public class PulsarJsonTableSinkTest {
+
+    private static final String SERVICE_URL = "pulsar://localhost:6650";
+    private static final String TOPIC_NAME = "test_topic";
+    private static final String ROUTING_KEY = "key";
+    private final String[] fieldNames = {"key", "value"};
+    private final TypeInformation[] typeInformations = {
+            TypeInformation.of(String.class),
+            TypeInformation.of(String.class)
+    };
+
+    /**
+     * Test configure PulsarTableSink.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testConfigure() throws Exception {
+        PulsarJsonTableSink sink = spySink();
+
+        TableSink<Row> configuredSink = sink.configure(fieldNames, typeInformations);
+
+        Assert.assertArrayEquals(fieldNames, configuredSink.getFieldNames());
+        Assert.assertArrayEquals(typeInformations, configuredSink.getFieldTypes());
+        Assert.assertNotNull(((PulsarJsonTableSink) configuredSink).keyExtractor);
+        Assert.assertNotNull(((PulsarJsonTableSink) configuredSink).serializationSchema);
+    }
+
+    /**
+     * Test emit data stream.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testEmitDataStream() throws Exception {
+        DataStream mockedDataStream = Mockito.mock(DataStream.class);
+
+        PulsarJsonTableSink sink = spySink();
+
+        sink.emitDataStream(mockedDataStream);
+
+        Mockito.verify(mockedDataStream).addSink(Mockito.any(FlinkPulsarProducer.class));
+    }
+
+    private PulsarJsonTableSink spySink() throws Exception {
+        PulsarJsonTableSink sink = new PulsarJsonTableSink(SERVICE_URL, TOPIC_NAME, new ProducerConfiguration(), ROUTING_KEY);
+        FlinkPulsarProducer producer = Mockito.mock(FlinkPulsarProducer.class);
+        PowerMockito.whenNew(
+                FlinkPulsarProducer.class
+        ).withArguments(
+                Mockito.anyString(),
+                Mockito.anyString(),
+                Mockito.any(SerializationSchema.class),
+                Mockito.any(PowerMockito.class),
+                Mockito.any(PulsarKeyExtractor.class)
+        ).thenReturn(producer);
+        Whitebox.setInternalState(sink, "fieldNames", fieldNames);
+        Whitebox.setInternalState(sink, "fieldTypes", typeInformations);
+        Whitebox.setInternalState(sink, "serializationSchema", Mockito.mock(SerializationSchema.class));
+        Whitebox.setInternalState(sink, "keyExtractor", Mockito.mock(PulsarKeyExtractor.class));
+        return sink;
+    }
+}
diff --git a/pulsar-flink/src/test/resources/avro/NasaMission.avsc b/pulsar-flink/src/test/resources/avro/NasaMission.avsc
index 4a669e0..521f475 100644
--- a/pulsar-flink/src/test/resources/avro/NasaMission.avsc
+++ b/pulsar-flink/src/test/resources/avro/NasaMission.avsc
@@ -1,4 +1,4 @@
-{"namespace": "org.apache.flink.batch.connectors.pulsar.avro.generated",
+{"namespace": "org.apache.flink.avro.generated",
  "type": "record",
  "name": "NasaMission",
  "fields": [